Compare commits

...

7 Commits

Author SHA1 Message Date
7c78a7f17a Updating README.md. 2021-02-26 12:02:31 +01:00
a526f388df Adding a dry-run mode on local-clean & a retry policy.
Minor fixes to handle invalid metric record.
2021-02-26 11:45:30 +01:00
3d316921e1 Implementing "local_clean" command. 2021-02-25 23:28:09 +01:00
640954ab5a Module refactoring. 2021-02-25 21:25:00 +01:00
b8a670bbce Set cassandra servers with args or env var. 2021-02-24 23:57:41 +01:00
9dbf1542b2 Updating documentation. 2021-02-24 21:43:31 +01:00
9fb6975188 Implementing "clean" command. 2021-02-24 21:41:26 +01:00
13 changed files with 894 additions and 316 deletions

View File

@@ -11,22 +11,35 @@ Don't forget to download & install [cassandra-cpp](https://downloads.datastax.co
## Run ## Run
```sh ```sh
$ cargo build
Finished dev [unoptimized + debuginfo] target(s) in 0.04s
$ cargo run -- --help
bgutil-rs bgutil-rs
USAGE: USAGE:
bgutil-rs <SUBCOMMAND> bgutil-rs [FLAGS] [OPTIONS] <SUBCOMMAND>
FLAGS:
--dry-run Do not write in database (local-clean only)
-h, --help Prints help information
-V, --version Prints version information
OPTIONS:
--contact-metadata <contact-metadata>
[env: CASSANDRA_CONTACT_METADATA=localhost]
--contact-points <contact-points>
[env: CASSANDRA_CONTACT_POINTS=localhost]
SUBCOMMANDS: SUBCOMMANDS:
clean Clean outdated metrics & empty directories
delete Delete metric(s) delete Delete metric(s)
help Prints this message or the help of the given subcommand(s) help Prints this message or the help of the given subcommand(s)
info Information about a metric info Information about a metric
list List metrics with given pattern list List metrics with given pattern
local-clean Clean a directory of outdated metrics & empty sub-directories
read Read a metric contents read Read a metric contents
stats Stats
write Write a metric and its value write Write a metric and its value
``` ```
### Info ### Info
@@ -150,6 +163,46 @@ ARGS:
<metric> <metric>
``` ```
### Clean
```sh
$ cargo run -- clean --help
bgutil-rs-clean
Stats
USAGE:
bgutil-rs clean [FLAGS] [OPTIONS]
FLAGS:
--clean-directories
--clean-metrics
OPTIONS:
--end-key <end-key>
--start-key <start-key>
```
### Local-clean
Clean outdated metrics in a given directory.
```sh
$ cargo run -- local-clean --help
bgutil-rs-local-clean
Clean a directory of outdated metrics & empty sub-directories
USAGE:
bgutil-rs local-clean <directory>
FLAGS:
-h, --help Prints help information
-V, --version Prints version information
ARGS:
<directory>
```
## Todo ## Todo
* command: read * command: read
@@ -160,3 +213,11 @@ ARGS:
- Arguments handling - Arguments handling
* command: delete * command: delete
- with recursive - with recursive
* command: clean
- progress bar
* ...
## Dedication
This piece of software was written during the mourning of Jean-Yves Moyart, aka Maître Mô, 21/10/1967-20/02/2021. My thoughts were with him, his family, his friends and all of us, who really appreciated him. Rest in Peace.

View File

@@ -3,9 +3,9 @@
* *
* Author: Patrick MARIE <pm@mkz.me> * Author: Patrick MARIE <pm@mkz.me>
*/ */
use std::str::FromStr; use std::str::FromStr;
use std::convert::TryFrom; use std::fmt;
use std::error;
use crate::metric::Metric; use crate::metric::Metric;
use crate::session::Session; use crate::session::Session;
@@ -14,31 +14,165 @@ use crate::timerange::TimeRange;
use cassandra_cpp::Session as CassSession; use cassandra_cpp::Session as CassSession;
use cassandra_cpp::Uuid as CassUuid; use cassandra_cpp::Uuid as CassUuid;
use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,Cluster,Error,LogLevel,Map}; use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,Cluster,Error,LogLevel,Map,RetryPolicy,Statement};
use cassandra_cpp::{set_level,stmt}; use cassandra_cpp::{set_level,stmt};
use chrono::Duration;
use uuid::Uuid; use uuid::Uuid;
#[derive(Debug, Clone)]
struct NoRecord;
impl fmt::Display for NoRecord {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "no record suitable")
}
}
impl error::Error for NoRecord {}
pub fn connect(contact_points: &str) -> Result<CassSession, Error> { pub fn connect(contact_points: &str) -> Result<CassSession, Error> {
set_level(LogLevel::DISABLED); set_level(LogLevel::DISABLED);
let mut cluster = Cluster::default(); let mut cluster = Cluster::default();
cluster.set_contact_points(contact_points).unwrap(); cluster.set_contact_points(contact_points).unwrap();
cluster.set_load_balance_round_robin(); cluster.set_load_balance_round_robin();
cluster.set_retry_policy(RetryPolicy::downgrading_consistency_new());
cluster.set_request_timeout(Duration::seconds(30));
cluster.set_protocol_version(4)?; cluster.set_protocol_version(4)?;
cluster.connect() cluster.connect()
} }
pub fn fetch_metric(session: &Session, metric_name: &str) -> Result<Metric, Error> { pub fn prepare_component_query(table_name: &str, arguments: &Vec<&str>) -> Result<Statement, Error> {
let mut q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name);
let mut component_number = 0;
let mut components = vec![];
for (id, component) in arguments.iter().enumerate() {
let mut operator = "=";
if *component == "*" {
component_number += 1;
continue;
}
if component_number != 0 {
q.push_str("AND ");
}
if component.ends_with("*") {
operator = "LIKE";
}
q.push_str(format!("component_{} {} ? ", id, operator).as_str());
component_number += 1;
components.push(component.replace("*", "%"));
}
if component_number != 0 {
q.push_str("AND ");
}
// Adding last component for __END__.
q.push_str(format!("component_{} = ? ALLOW FILTERING;", component_number).as_str());
components.push("__END__".to_string());
let mut query = stmt!(q.as_str());
for (id, arg) in components.iter().enumerate() {
query.bind(id, arg.as_str())?;
}
Ok(query)
}
pub fn prepare_component_query_globstar(table_name: &str, arguments: &Vec<&str>) -> Result<Vec<Statement>, Error> {
let _q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name);
let _component_number = 0;
let mut out = vec![];
let pos_globstar = arguments.iter().enumerate().filter(|(_, &x)| x == "**").map(|(id, _)| id).collect::<Vec<usize>>();
if pos_globstar.len() != 1 {
// XXX return error
return Ok(vec![prepare_component_query(table_name, arguments)?]);
}
let pos_globstar = pos_globstar[0];
let mut queries = vec![];
let mut init_args = vec![];
let mut end_args = arguments[pos_globstar+1..].to_vec();
end_args.push("__END__");
for (id, el) in arguments[0..pos_globstar].iter().enumerate() {
if *el == "*" {
continue;
}
if el.ends_with("*") {
init_args.push((id, "LIKE", el.replace("*", "%")));
} else {
init_args.push((id, "=", el.to_string()));
}
}
let components = 16;
for id in init_args.len()..(components-end_args.len()+1) {
let mut current_query = init_args.to_vec();
for (sub_id, el) in end_args.iter().enumerate() {
if *el == "*" {
continue;
}
if el.ends_with("*") {
current_query.push((sub_id + id, "LIKE", el.replace("*", "%")));
} else {
current_query.push((sub_id + id, "=", el.to_string()));
}
}
queries.push(current_query);
}
for query in &queries {
let mut current_query = _q.to_string();
for el in query {
if el.0 != 0 {
current_query.push_str("AND ");
}
current_query.push_str(&format!("component_{} {} ? ", el.0, el.1));
}
current_query.push_str(&String::from("ALLOW FILTERING;"));
let mut statement = stmt!(&current_query);
for (id, el) in query.iter().enumerate() {
statement.bind(id, el.2.as_str())?;
}
out.push(statement);
}
Ok(out)
}
pub fn fetch_metric(session: &Session, metric_name: &str) -> Result<Metric, Box<dyn error::Error>> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?"); let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.set_consistency(session.read_consistency())?;
query.bind(0, metric_name)?; query.bind(0, metric_name)?;
// XXX set consistency
// query.set_consistency(session.read_consistency());
let result = session.metadata_session().execute(&query).wait()?; let result = session.metadata_session().execute(&query).wait()?;
if result.row_count() == 0 {
return Err(NoRecord.into());
}
Ok(result.first_row().unwrap().into()) Ok(result.first_row().unwrap().into())
} }
@@ -195,62 +329,33 @@ pub fn create_metric(session: &Session, metric: &str) -> Result<(), Error> {
Ok(()) Ok(())
} }
pub fn metric_delete(session: &Session, metric_name: &str) -> Result<(), Error> { pub fn delete_metric(session: &Session, name: &str) -> Result<(), Error> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?"); let namespace = "biggraphite_metadata";
query.bind(0, metric_name)?; let delete_metric_query = format!("DELETE FROM {}.metrics WHERE name = ?;", namespace);
let delete_metadata_query = format!("DELETE FROM {}.metrics_metadata WHERE name = ?;", namespace);
let result = session.metadata_session().execute(&query).wait()?; let mut delete_metric_query = stmt!(delete_metric_query.as_str());
if result.row_count() == 0 { delete_metric_query.set_consistency(session.write_consistency())?;
println!("Metric is not existing"); delete_metric_query.bind(0, name)?;
return Ok(()); session.metadata_session().execute(&delete_metric_query).wait()?;
}
let _metric = fetch_metric(session, metric_name)?; let mut delete_metadata_query = stmt!(delete_metadata_query.as_str());
delete_metric_query.set_consistency(session.write_consistency())?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;"); delete_metadata_query.bind(0, name)?;
query.bind(0, metric_name)?; session.metadata_session().execute(&delete_metric_query).wait()?;
query.set_consistency(session.write_consistency())?;
session.metadata_session().execute(&query).wait()?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;");
query.bind(0, metric_name)?;
query.set_consistency(session.write_consistency())?;
session.metadata_session().execute(&query).wait()?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.directories WHERE name = ?;");
query.bind(0, metric_name)?;
query.set_consistency(session.write_consistency())?;
session.metadata_session().execute(&query).wait()?;
Ok(()) Ok(())
} }
pub fn metric_write(session: &Session, metric_name: &str, value: f64, retention: &str, timestamp: i64) -> Result<(), Error> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.bind(0, metric_name)?;
let result = session.metadata_session().execute(&query).wait()?; pub fn delete_directory(session: &Session, name: &str) -> Result<(), Error> {
if result.row_count() == 0 { let namespace = "biggraphite_metadata";
create_metric(session, metric_name)?; let delete_directory_query = format!("DELETE FROM {}.directories WHERE name = ?;", namespace);
}
let stage = Stage::try_from(retention)?; let mut delete_directory_query = stmt!(delete_directory_query.as_str());
delete_directory_query.set_consistency(session.write_consistency())?;
let metric = fetch_metric(session, metric_name)?; delete_directory_query.bind(0, name)?;
let (time_start_ms, offset) = stage.time_offset_ms(timestamp); session.metadata_session().execute(&delete_directory_query).wait()?;
let query = format!(
"INSERT INTO biggraphite.{} (metric, time_start_ms, offset, value) VALUES (?, ?, ?, ?);",
stage.table_name()
);
let mut query = stmt!(&query);
query.bind(0, CassUuid::from_str(metric.id().as_str())?)?;
query.bind(1, time_start_ms)?;
query.bind(2, offset as i16)?;
query.bind(3, value)?;
session.points_session().execute(&query).wait()?;
Ok(()) Ok(())
} }

12
src/cmd.rs Normal file
View File

@@ -0,0 +1,12 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
pub mod clean;
pub mod delete;
pub mod info;
pub mod list;
pub mod local_clean;
pub mod stats;
pub mod write;

148
src/cmd/clean.rs Normal file
View File

@@ -0,0 +1,148 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use crate::Session;
use cassandra_cpp::{BindRustType,Error};
use cassandra_cpp::stmt;
use chrono::Utc;
pub fn metrics_clean(session: &Session, start_key: i64, end_key: i64, clean_metrics: bool, clean_directories: bool) -> Result<(), Error> {
let mut current_token = start_key;
let cutoff : u64 = (Utc::now().timestamp() as u64 - 86400 * 14) * 1000;
let namespace = "biggraphite_metadata";
let batch_limit = 1000;
let query = format!("SELECT name, token(name) FROM {}.metrics_metadata \
WHERE updated_on <= maxTimeuuid({}) and token(name) > ? and token(name) < ? LIMIT {};",
namespace, cutoff, batch_limit);
let delete_metric_query = format!("DELETE FROM {}.metrics WHERE name = ?;", namespace);
let delete_metadata_query = format!("DELETE FROM {}.metrics_metadata WHERE name = ?;", namespace);
let mut deleted_metrics_count = 0;
let mut scanned_metrics_count = 0;
let mut scanned_directories_count = 0;
let mut deleted_directories_count = 0;
// clean metrics
loop {
if !clean_metrics || current_token >= end_key {
// println!("Stopping: {} >= {}", current_token, end_key);
break;
}
let mut outdated_metrics_query = stmt!(query.as_str());
outdated_metrics_query.set_consistency(session.read_consistency())?;
outdated_metrics_query.bind(0, current_token)?;
outdated_metrics_query.bind(1, end_key)?;
let result = session.metadata_session().execute(&outdated_metrics_query).wait()?;
if result.row_count() == 0 {
break;
}
let mut queries = vec![];
for row in result.iter() {
let name = row.get_column_by_name("name".to_string())?.to_string();
scanned_metrics_count += 1;
let mut delete_metric_query = stmt!(delete_metric_query.as_str());
delete_metric_query.set_consistency(session.write_consistency())?;
delete_metric_query.bind(0, name.as_str())?;
queries.push(session.metadata_session().execute(&delete_metric_query));
let mut delete_metadata_query = stmt!(delete_metadata_query.as_str());
delete_metric_query.set_consistency(session.write_consistency())?;
delete_metadata_query.bind(0, name.as_str())?;
queries.push(session.metadata_session().execute(&delete_metadata_query));
deleted_metrics_count += 1;
current_token = row.get_column(1)?.get_i64()?;
}
if result.row_count() != batch_limit {
// println!("Stopping because count == 0");
break;
}
for query in queries {
if let Err(err) = query.wait() {
eprintln!("Failed: {:?}", err);
}
}
}
let list_directories_query = format!("SELECT name, token(name) FROM {}.directories WHERE token(name) > ? AND token(name) < ? LIMIT {};",
namespace, batch_limit);
let metric_query = format!("SELECT name FROM {}.metrics WHERE parent LIKE ? LIMIT 1", namespace);
let delete_directory_query = format!("DELETE FROM {}.directories WHERE name = ?;", namespace);
current_token = start_key;
// clean directories
loop {
if !clean_directories || current_token >= end_key {
break;
}
let mut list_directories_query = stmt!(list_directories_query.as_str());
list_directories_query.set_consistency(session.read_consistency())?;
list_directories_query.bind(0, current_token)?;
list_directories_query.bind(1, end_key)?;
let list_result = session.metadata_session().execute(&list_directories_query).wait()?;
if list_result.row_count() == 0 {
break;
}
let mut queries = vec![];
for row in list_result.iter() {
let mut name = row.get_column_by_name("name".to_string())?.to_string();
let orig_name = name.clone();
name.push_str(".%");
current_token = row.get_column(1)?.get_i64()?;
let mut metric_query = stmt!(metric_query.as_str());
metric_query.set_consistency(session.read_consistency())?;
metric_query.bind(0, name.as_str())?;
let query = session.metadata_session().execute(&metric_query);
queries.push((orig_name, query));
}
let mut to_delete_queries = vec![];
for el in queries {
let result = el.1.wait()?;
scanned_directories_count += 1;
if result.row_count() != 0 {
continue;
}
let mut delete_directory_query = stmt!(delete_directory_query.as_str());
delete_directory_query.set_consistency(session.write_consistency())?;
delete_directory_query.bind(0, el.0.as_str())?;
to_delete_queries.push(session.metadata_session().execute(&delete_directory_query));
deleted_directories_count += 1;
}
for to_delete in to_delete_queries {
to_delete.wait()?;
}
if list_result.row_count() != batch_limit {
break;
}
}
println!("Deleted {} metrics, {} directories.", deleted_metrics_count, deleted_directories_count);
println!("Scanned {} metrics, {} directories", scanned_metrics_count, scanned_directories_count);
Ok(())
}

42
src/cmd/delete.rs Normal file
View File

@@ -0,0 +1,42 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use std::error;
use cassandra_cpp::stmt;
use cassandra_cpp::BindRustType;
use crate::fetch_metric;
use crate::Session;
pub fn metric_delete(session: &Session, metric_name: &str) -> Result<(), Box<dyn error::Error>> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.bind(0, metric_name)?;
let result = session.metadata_session().execute(&query).wait()?;
if result.row_count() == 0 {
println!("Metric is not existing");
return Ok(());
}
let _metric = fetch_metric(session, metric_name)?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;");
query.bind(0, metric_name)?;
query.set_consistency(session.write_consistency())?;
session.metadata_session().execute(&query).wait()?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;");
query.bind(0, metric_name)?;
query.set_consistency(session.write_consistency())?;
session.metadata_session().execute(&query).wait()?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.directories WHERE name = ?;");
query.bind(0, metric_name)?;
query.set_consistency(session.write_consistency())?;
session.metadata_session().execute(&query).wait()?;
Ok(())
}

17
src/cmd/info.rs Normal file
View File

@@ -0,0 +1,17 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use std::error;
use crate::Session;
use crate::fetch_metric;
pub fn metric_info(session: &Session, metric_name: &str) -> Result<(), Box<dyn error::Error>> {
let metric = fetch_metric(session, metric_name)?;
println!("{}", metric);
Ok(())
}

62
src/cmd/list.rs Normal file
View File

@@ -0,0 +1,62 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use crate::prepare_component_query_globstar;
use crate::fetch_metrics;
use crate::Session;
use cassandra_cpp::{Error};
pub fn metric_list(session: &Session, glob: &str) -> Result<(), Error> {
let components = glob.split(".").collect::<Vec<&str>>();
let query_directories = prepare_component_query_globstar("directories", &components)?;
let mut results = vec![];
for mut q in query_directories {
q.set_consistency(session.read_consistency())?;
results.push(session.metadata_session().execute(&q));
}
for result in results {
let rows = result.wait()?;
for row in rows.iter() {
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
println!("d {}", name);
}
}
let query_metrics = prepare_component_query_globstar("metrics", &components)?;
let mut results = vec![];
for mut q in query_metrics {
q.set_consistency(session.read_consistency())?;
results.push(session.metadata_session().execute(&q));
}
for result in results {
let res = result.wait();
if let Err(err) = res {
eprintln!("Query failed: {}", err);
continue;
}
let rows = res.unwrap();
let names = rows
.iter()
.map(|x| {
x.get_column_by_name("name".to_string()).unwrap().to_string()
})
.collect::<Vec<String>>();
let metrics = fetch_metrics(session, &names)?;
for metric in metrics {
println!("m {}", metric);
}
}
Ok(())
}

135
src/cmd/local_clean.rs Normal file
View File

@@ -0,0 +1,135 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use std::error;
use crate::Session;
use crate::delete_directory;
use crate::delete_metric;
use crate::fetch_metric;
use crate::prepare_component_query_globstar;
use cassandra_cpp::stmt;
use cassandra_cpp::BindRustType;
use chrono::Utc;
fn clean_metrics_in_directory(session: &Session, directory: &str) -> Result<(), Box<dyn error::Error>> {
// println!("Cleaning metrics in directory: '{}'", directory);
let mut directory = String::from(directory);
directory.push_str(".**");
let components = directory.split(".").collect::<Vec<&str>>();
let mut results = vec![];
let query_metrics = prepare_component_query_globstar("metrics", &components)?;
let outdated_ts : u64 = (Utc::now().timestamp() as u64 - 14 * 86400) * 1000;
for mut q in query_metrics {
q.set_consistency(session.read_consistency())?;
results.push(session.metadata_session().execute(&q));
}
for result in results {
let rows = result.wait()?;
for row in rows.iter() {
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
let metric = fetch_metric(session, &name);
if let Err(e) = metric {
eprintln!("Error while retrieving metric: {}", e);
continue;
}
let metric = metric.unwrap();
if metric.updated_on() > outdated_ts {
continue;
}
println!("Deleting metric {}", metric.name());
if session.is_dry_run() {
continue;
}
delete_metric(session, metric.name())?;
}
}
Ok(())
}
fn directory_has_metrics(session: &Session, directory: &str) -> Result<bool, Box<dyn error::Error>> {
let query = format!("SELECT name FROM biggraphite_metadata.metrics WHERE parent LIKE ? LIMIT 1;");
let mut query = stmt!(query.as_str());
let mut directory = String::from(directory);
directory.push_str(".%");
query.bind(0, directory.as_str())?;
let result = session.metadata_session().execute(&query).wait()?;
Ok(result.row_count() != 0)
}
fn clean_empty_directories_in_directory(session: &Session, directory: &str) -> Result<(), Box<dyn error::Error>> {
// println!("Cleaning empty directories in directory '{}'", directory);
let mut directory = String::from(directory);
directory.push_str(".**");
let components = directory.split(".").collect::<Vec<&str>>();
let mut results = vec![];
let query_directories = prepare_component_query_globstar("directories", &components)?;
for mut q in query_directories {
q.set_consistency(session.read_consistency())?;
results.push(session.metadata_session().execute(&q));
}
for result in results {
let rows = result.wait()?;
for row in rows.iter() {
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
if directory_has_metrics(session, &name)? {
continue;
}
println!("Deleting directory {}", name);
if session.is_dry_run() {
continue;
}
delete_directory(session, &name)?;
}
}
Ok(())
}
pub fn metrics_local_clean(session: &Session, directory: &str) -> Result<(), Box<dyn error::Error>> {
let components = directory.split(".").collect::<Vec<&str>>();
let query_directories = prepare_component_query_globstar("directories", &components)?;
let mut results = vec![];
for mut q in query_directories {
q.set_consistency(session.read_consistency())?;
results.push(session.metadata_session().execute(&q));
}
for result in results {
let rows = result.wait()?;
for row in rows.iter() {
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
clean_metrics_in_directory(session, &name)?;
clean_empty_directories_in_directory(session, &name)?;
}
}
Ok(())
}

70
src/cmd/stats.rs Normal file
View File

@@ -0,0 +1,70 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use cassandra_cpp::{BindRustType,Error};
use cassandra_cpp::stmt;
use std::collections::HashMap;
use crate::Metric;
use crate::Session;
pub fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), Error> {
let q =
"SELECT id, name, token(name), config, created_on, updated_on, read_on \
FROM biggraphite_metadata.metrics_metadata WHERE token(name) > ? LIMIT 1000";
let mut current_token = start_key;
let mut n = 0;
let mut points : u64 = 0;
let mut stats : HashMap<String, usize> = HashMap::new();
while current_token < end_key {
let mut query = stmt!(q);
query.bind(0, current_token)?;
let results = session.metadata_session().execute(&query).wait()?;
for row in results.iter() {
current_token = row.get_column(2)?.get_i64()?;
let metric : Metric = row.into();
let stages = match metric.stages() {
Ok(stages) => stages,
Err(_) => continue,
};
for stage in stages {
points += stage.points() as u64;
}
let parts = metric.name().split(".").collect::<Vec<&str>>();
*stats.entry(String::from(parts[0])).or_insert(0) += 1;
n += 1;
}
}
let p : f64 = ((current_token - start_key) / std::i64::MAX) as f64;
println!("Range: {} -> {} ({:.4}%)", start_key, current_token, 100. * p);
println!("{} metrics", n);
println!("{} points", points);
println!("-----");
let mut vec : Vec<(&String, &usize)> = stats.iter().collect();
vec.sort_by(|a, b| b.1.cmp(a.1));
for (id, v) in vec.iter().enumerate() {
println!("{} {}", v.0, v.1);
if id == 10 {
break;
}
}
Ok(())
}

49
src/cmd/write.rs Normal file
View File

@@ -0,0 +1,49 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use std::error;
use std::str::FromStr;
use crate::Session;
use crate::Stage;
use crate::create_metric;
use crate::fetch_metric;
use cassandra_cpp::stmt;
use cassandra_cpp::BindRustType;
use cassandra_cpp::Uuid as CassUuid;
use std::convert::TryFrom;
pub fn metric_write(session: &Session, metric_name: &str, value: f64, retention: &str, timestamp: i64) -> Result<(), Box<dyn error::Error>> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.bind(0, metric_name)?;
let result = session.metadata_session().execute(&query).wait()?;
if result.row_count() == 0 {
create_metric(session, metric_name)?;
}
let stage = Stage::try_from(retention)?;
let metric = fetch_metric(session, metric_name)?;
let (time_start_ms, offset) = stage.time_offset_ms(timestamp);
let query = format!(
"INSERT INTO biggraphite.{} (metric, time_start_ms, offset, value) VALUES (?, ?, ?, ?);",
stage.table_name()
);
let mut query = stmt!(&query);
query.bind(0, CassUuid::from_str(metric.id().as_str())?)?;
query.bind(1, time_start_ms)?;
query.bind(2, offset as i16)?;
query.bind(3, value)?;
session.points_session().execute(&query).wait()?;
Ok(())
}

View File

@@ -3,13 +3,10 @@
* *
* Author: Patrick MARIE <pm@mkz.me> * Author: Patrick MARIE <pm@mkz.me>
*/ */
use std::collections::HashMap;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::error; use std::error;
use cassandra_cpp::{BindRustType,CassResult,Error,Statement}; use cassandra_cpp::CassResult;
use cassandra_cpp::{stmt};
use chrono::Utc; use chrono::Utc;
use clap::{App,AppSettings,Arg,SubCommand}; use clap::{App,AppSettings,Arg,SubCommand};
@@ -18,12 +15,20 @@ mod metric;
mod session; mod session;
mod stage; mod stage;
mod timerange; mod timerange;
mod cmd;
use crate::cassandra::*; use crate::cassandra::*;
use crate::session::Session; use crate::session::Session;
use crate::stage::*; use crate::stage::Stage;
use crate::metric::Metric; use crate::metric::Metric;
use crate::cmd::clean::*;
use crate::cmd::delete::*;
use crate::cmd::info::*;
use crate::cmd::list::*;
use crate::cmd::local_clean::*;
use crate::cmd::stats::*;
use crate::cmd::write::*;
#[allow(dead_code)] #[allow(dead_code)]
fn describe_result(result: &CassResult) { fn describe_result(result: &CassResult) {
@@ -38,181 +43,20 @@ fn describe_result(result: &CassResult) {
} }
} }
pub fn metric_info(session: &Session, metric_name: &str) -> Result<(), Error> {
let metric = fetch_metric(session, metric_name)?;
println!("{}", metric);
Ok(())
}
fn prepare_component_query(table_name: &str, arguments: &Vec<&str>) -> Result<Statement, Error> {
let mut q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name);
let mut component_number = 0;
let mut components = vec![];
for (id, component) in arguments.iter().enumerate() {
let mut operator = "=";
if *component == "*" {
component_number += 1;
continue;
}
if component_number != 0 {
q.push_str("AND ");
}
if component.ends_with("*") {
operator = "LIKE";
}
q.push_str(format!("component_{} {} ? ", id, operator).as_str());
component_number += 1;
components.push(component.replace("*", "%"));
}
if component_number != 0 {
q.push_str("AND ");
}
// Adding last component for __END__.
q.push_str(format!("component_{} = ? ALLOW FILTERING;", component_number).as_str());
components.push("__END__".to_string());
let mut query = stmt!(q.as_str());
for (id, arg) in components.iter().enumerate() {
query.bind(id, arg.as_str())?;
}
Ok(query)
}
fn prepare_component_query_globstar(table_name: &str, arguments: &Vec<&str>) -> Result<Vec<Statement>, Error> {
let _q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name);
let _component_number = 0;
let mut out = vec![];
let pos_globstar = arguments.iter().enumerate().filter(|(_, &x)| x == "**").map(|(id, _)| id).collect::<Vec<usize>>();
if pos_globstar.len() != 1 {
// XXX return error
return Ok(vec![prepare_component_query(table_name, arguments)?]);
}
let pos_globstar = pos_globstar[0];
let mut queries = vec![];
let mut init_args = vec![];
let mut end_args = arguments[pos_globstar+1..].to_vec();
end_args.push("__END__");
for (id, el) in arguments[0..pos_globstar].iter().enumerate() {
if *el == "*" {
continue;
}
if el.ends_with("*") {
init_args.push((id, "LIKE", el.replace("*", "%")));
} else {
init_args.push((id, "=", el.to_string()));
}
}
let components = 16;
for id in init_args.len()..(components-end_args.len()+1) {
let mut current_query = init_args.to_vec();
for (sub_id, el) in end_args.iter().enumerate() {
if *el == "*" {
continue;
}
if el.ends_with("*") {
current_query.push((sub_id + id, "LIKE", el.replace("*", "%")));
} else {
current_query.push((sub_id + id, "=", el.to_string()));
}
}
queries.push(current_query);
}
for query in &queries {
let mut current_query = _q.to_string();
for el in query {
if el.0 != 0 {
current_query.push_str("AND ");
}
current_query.push_str(&format!("component_{} {} ? ", el.0, el.1));
}
current_query.push_str(&String::from("ALLOW FILTERING;"));
let mut statement = stmt!(&current_query);
for (id, el) in query.iter().enumerate() {
statement.bind(id, el.2.as_str())?;
}
out.push(statement);
}
Ok(out)
}
fn metric_list(session: &Session, glob: &str) -> Result<(), Error> {
let components = glob.split(".").collect::<Vec<&str>>();
let query_directories = prepare_component_query_globstar("directories", &components)?;
let mut results = vec![];
for mut q in query_directories {
q.set_consistency(session.read_consistency())?;
results.push(session.metadata_session().execute(&q));
}
for result in results {
let rows = result.wait()?;
for row in rows.iter() {
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
println!("d {}", name);
}
}
let query_metrics = prepare_component_query_globstar("metrics", &components)?;
let mut results = vec![];
for mut q in query_metrics {
q.set_consistency(session.read_consistency())?;
results.push(session.metadata_session().execute(&q));
}
for result in results {
let rows = result.wait()?;
let names = rows
.iter()
.map(|x| {
x.get_column_by_name("name".to_string()).unwrap().to_string()
})
.collect::<Vec<String>>();
let metrics = fetch_metrics(session, &names)?;
for metric in metrics {
println!("m {}", metric);
}
}
Ok(())
}
fn main() -> Result<(), Box<dyn error::Error>> { fn main() -> Result<(), Box<dyn error::Error>> {
let matches = App::new("bgutil-rs") let matches = App::new("bgutil-rs")
.setting(AppSettings::SubcommandRequired) .setting(AppSettings::SubcommandRequired)
.arg(Arg::with_name("contact-metadata")
.long("contact-metadata")
.env("CASSANDRA_CONTACT_METADATA")
.takes_value(true))
.arg(Arg::with_name("contact-points")
.long("contact-points")
.env("CASSANDRA_CONTACT_POINTS")
.takes_value(true))
.arg(Arg::with_name("dry-run")
.help("Do not write in database (local-clean only)")
.long("dry-run"))
.subcommand(SubCommand::with_name("info") .subcommand(SubCommand::with_name("info")
.about("Information about a metric") .about("Information about a metric")
.arg(Arg::with_name("metric") .arg(Arg::with_name("metric")
@@ -269,12 +113,39 @@ fn main() -> Result<(), Box<dyn error::Error>> {
.arg(Arg::with_name("end-key") .arg(Arg::with_name("end-key")
.long("end-key") .long("end-key")
.takes_value(true))) .takes_value(true)))
.subcommand(SubCommand::with_name("clean")
.about("Clean outdated metrics & empty directories")
.arg(Arg::with_name("start-key")
.long("start-key")
.takes_value(true))
.arg(Arg::with_name("end-key")
.long("end-key")
.takes_value(true))
.arg(Arg::with_name("clean-metrics")
.long("clean-metrics"))
.arg(Arg::with_name("clean-directories")
.long("clean-directories")))
.subcommand(SubCommand::with_name("local-clean")
.about("Clean a directory of outdated metrics & empty sub-directories")
.arg(Arg::with_name("directory")
.index(1)
.required(true)))
.get_matches(); .get_matches();
let contact_points_metadata = "tag--cstars07--cassandra-cstars07.query.consul.preprod.crto.in"; let mut contact_points_metadata = "localhost";
let contact_points_data = "tag--cstars04--cassandra-cstars04.query.consul.preprod.crto.in"; if matches.is_present("contact-metadata") {
contact_points_metadata = matches.value_of("contact-metadata").unwrap();
}
let session = Session::new(&contact_points_metadata, &contact_points_data)?; let mut contact_points_data = "localhost";
if matches.is_present("contact-points") {
contact_points_data = matches.value_of("contact-points").unwrap();
}
let dry_run = matches.is_present("dry-run");
let mut session = Session::new(&contact_points_metadata, &contact_points_data)?;
session.set_dry_run(dry_run);
match matches.subcommand_name() { match matches.subcommand_name() {
Some("info") => { Some("info") => {
@@ -359,8 +230,8 @@ fn main() -> Result<(), Box<dyn error::Error>> {
}, },
Some("stats") => { Some("stats") => {
let matches = matches.subcommand_matches("stats").unwrap(); let matches = matches.subcommand_matches("stats").unwrap();
let start_key = matches.value_of("start-key"); // 0 let start_key = matches.value_of("start-key");
let end_key = matches.value_of("end-key"); // 100000000000000 let end_key = matches.value_of("end-key");
let start_key = match start_key { let start_key = match start_key {
None => 0, None => 0,
@@ -386,6 +257,45 @@ fn main() -> Result<(), Box<dyn error::Error>> {
metric_stats(&session, start_key, end_key)?; metric_stats(&session, start_key, end_key)?;
}, },
Some("clean") => {
let matches = matches.subcommand_matches("clean").unwrap();
let start_key = matches.value_of("start-key");
let end_key = matches.value_of("end-key");
let start_key = match start_key {
None => std::i64::MIN,
Some(s) => match s.parse::<i64>() {
Ok(n) => n,
Err(_) => {
eprintln!("Could not parse {}", s);
return Ok(())
}
}
};
let end_key = match end_key {
None => std::i64::MAX,
Some(s) => match s.parse::<i64>() {
Ok(n) => n,
Err(_) => {
eprintln!("Could not parse {}", s);
return Ok(())
}
}
};
let clean_metrics = matches.is_present("clean-metrics");
let clean_directories = matches.is_present("clean-directories");
metrics_clean(&session, start_key, end_key, clean_metrics, clean_directories)?;
},
Some("local-clean") => {
let matches = matches.subcommand_matches("local-clean").unwrap();
let directory = matches.value_of("directory").unwrap();
metrics_local_clean(&session, directory)?;
}
None => { None => {
eprintln!("No command was used."); eprintln!("No command was used.");
return Ok(()); return Ok(());
@@ -397,62 +307,3 @@ fn main() -> Result<(), Box<dyn error::Error>> {
} }
fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), Error> {
let q =
"SELECT id, name, token(name), config, created_on, updated_on, read_on \
FROM biggraphite_metadata.metrics_metadata WHERE token(name) > ? LIMIT 1000";
let mut current_token = start_key;
let mut n = 0;
let mut points : u64 = 0;
let mut stats : HashMap<String, usize> = HashMap::new();
while current_token < end_key {
let mut query = stmt!(q);
query.bind(0, current_token)?;
let results = session.metadata_session().execute(&query).wait()?;
for row in results.iter() {
current_token = row.get_column(2)?.get_i64()?;
let metric : Metric = row.into();
let stages = match metric.stages() {
Ok(stages) => stages,
Err(_) => continue,
};
for stage in stages {
points += stage.points() as u64;
}
let parts = metric.name().split(".").collect::<Vec<&str>>();
*stats.entry(String::from(parts[0])).or_insert(0) += 1;
// println!("{}: {}", current_token, metric.name());
n += 1;
}
}
let p : f64 = ((current_token - start_key) / std::i64::MAX) as f64;
println!("Range: {} -> {} ({:.4}%)", start_key, current_token, 100. * p);
println!("{} metrics", n);
println!("{} points", points);
println!("-----");
let mut vec : Vec<(&String, &usize)> = stats.iter().collect();
vec.sort_by(|a, b| b.1.cmp(a.1));
for (id, v) in vec.iter().enumerate() {
println!("{} {}", v.0, v.1);
if id == 10 {
break;
}
}
Ok(())
}

View File

@@ -10,6 +10,7 @@ use std::fmt;
use std::convert::TryFrom; use std::convert::TryFrom;
use cassandra_cpp::Row; use cassandra_cpp::Row;
use chrono::Utc;
#[derive(Debug)] #[derive(Debug)]
pub struct Metric { pub struct Metric {
@@ -29,6 +30,10 @@ impl Metric {
&self.name &self.name
} }
pub fn updated_on(self: &Self) -> u64 {
self.updated_on
}
pub fn config(self: &Self, name: String) -> Result<String, String> { pub fn config(self: &Self, name: String) -> Result<String, String> {
let res = self.config.get(&name); let res = self.config.get(&name);
if let Some(v) = res { if let Some(v) = res {
@@ -91,14 +96,25 @@ impl From<Row> for Metric {
Err(_) => {}, Err(_) => {},
}; };
let created_on = row.get_column_by_name("created_on".to_string()).unwrap(); let created_on = row.get_column_by_name("created_on".to_string());
let created_on_timestamp = match created_on.get_uuid() { let created_on_timestamp = if let Ok(creation_time) = created_on {
match creation_time.get_uuid() {
Err(_) => 0, Err(_) => 0,
Ok(v) => v.timestamp(), Ok(v) => v.timestamp(),
}
} else {
Utc::now().timestamp() as u64
}; };
let updated_on = row.get_column_by_name("updated_on".to_string()).unwrap(); let updated_on = row.get_column_by_name("updated_on".to_string());
let updated_on_timestamp = updated_on.get_uuid().unwrap().timestamp(); let updated_on_timestamp = if let Ok(updated_time) = updated_on {
match updated_time.get_uuid() {
Err(_) => 0,
Ok(v) => v.timestamp(),
}
} else {
Utc::now().timestamp() as u64
};
let uuid = match row.get_column_by_name("id".to_string()).unwrap().get_uuid() { let uuid = match row.get_column_by_name("id".to_string()).unwrap().get_uuid() {
Ok(v) => v.to_string(), Ok(v) => v.to_string(),

View File

@@ -13,6 +13,7 @@ use crate::cassandra::*;
pub struct Session { pub struct Session {
metadata: CassSession, metadata: CassSession,
points: CassSession, points: CassSession,
dry_run: bool,
} }
impl Session { impl Session {
@@ -23,11 +24,16 @@ impl Session {
let session = Self { let session = Self {
metadata: metadata, metadata: metadata,
points: points, points: points,
dry_run: false,
}; };
Ok(session) Ok(session)
} }
pub fn set_dry_run(&mut self, dry_run: bool) {
self.dry_run = dry_run
}
pub fn metadata_session(&self) -> &CassSession { pub fn metadata_session(&self) -> &CassSession {
&self.metadata &self.metadata
} }
@@ -45,4 +51,8 @@ impl Session {
pub fn write_consistency(&self) -> Consistency { pub fn write_consistency(&self) -> Consistency {
Consistency::LOCAL_QUORUM Consistency::LOCAL_QUORUM
} }
pub fn is_dry_run(&self) -> bool {
self.dry_run
}
} }