diff --git a/src/cassandra.rs b/src/cassandra.rs index bb99d8f..f2b1df7 100644 --- a/src/cassandra.rs +++ b/src/cassandra.rs @@ -4,6 +4,8 @@ * Author: Patrick MARIE */ use std::str::FromStr; +use std::fmt; +use std::error; use crate::metric::Metric; use crate::session::Session; @@ -17,6 +19,17 @@ use cassandra_cpp::{set_level,stmt}; use uuid::Uuid; +#[derive(Debug, Clone)] +struct NoRecord; + +impl fmt::Display for NoRecord { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "no record suitable") + } +} + +impl error::Error for NoRecord {} + pub fn connect(contact_points: &str) -> Result { set_level(LogLevel::DISABLED); @@ -147,14 +160,17 @@ pub fn prepare_component_query_globstar(table_name: &str, arguments: &Vec<&str>) Ok(out) } -pub fn fetch_metric(session: &Session, metric_name: &str) -> Result { +pub fn fetch_metric(session: &Session, metric_name: &str) -> Result> { let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?"); + query.set_consistency(session.read_consistency())?; query.bind(0, metric_name)?; - // XXX set consistency - // query.set_consistency(session.read_consistency()); + let result = session.metadata_session().execute(&query).wait()?; + + if result.row_count() == 0 { + return Err(NoRecord.into()); + } - let result = session.metadata_session().execute(&query).wait()?; Ok(result.first_row().unwrap().into()) } @@ -311,3 +327,33 @@ pub fn create_metric(session: &Session, metric: &str) -> Result<(), Error> { Ok(()) } +pub fn delete_metric(session: &Session, name: &str) -> Result<(), Error> { + let namespace = "biggraphite_metadata"; + let delete_metric_query = format!("DELETE FROM {}.metrics WHERE name = ?;", namespace); + let delete_metadata_query = format!("DELETE FROM {}.metrics_metadata WHERE name = ?;", namespace); + + let mut delete_metric_query = stmt!(delete_metric_query.as_str()); + delete_metric_query.set_consistency(session.write_consistency())?; + delete_metric_query.bind(0, name)?; + session.metadata_session().execute(&delete_metric_query).wait()?; + + let mut delete_metadata_query = stmt!(delete_metadata_query.as_str()); + delete_metric_query.set_consistency(session.write_consistency())?; + delete_metadata_query.bind(0, name)?; + session.metadata_session().execute(&delete_metric_query).wait()?; + + Ok(()) +} + + +pub fn delete_directory(session: &Session, name: &str) -> Result<(), Error> { + let namespace = "biggraphite_metadata"; + let delete_directory_query = format!("DELETE FROM {}.directories WHERE name = ?;", namespace); + + let mut delete_directory_query = stmt!(delete_directory_query.as_str()); + delete_directory_query.set_consistency(session.write_consistency())?; + delete_directory_query.bind(0, name)?; + session.metadata_session().execute(&delete_directory_query).wait()?; + + Ok(()) +} \ No newline at end of file diff --git a/src/cmd.rs b/src/cmd.rs index 1ae306b..94c86bc 100644 --- a/src/cmd.rs +++ b/src/cmd.rs @@ -7,5 +7,6 @@ pub mod clean; pub mod delete; pub mod info; pub mod list; +pub mod local_clean; pub mod stats; pub mod write; diff --git a/src/cmd/delete.rs b/src/cmd/delete.rs index 95a8769..8c8791e 100644 --- a/src/cmd/delete.rs +++ b/src/cmd/delete.rs @@ -3,14 +3,15 @@ * * Author: Patrick MARIE */ +use std::error; use cassandra_cpp::stmt; -use cassandra_cpp::{BindRustType,Error}; +use cassandra_cpp::BindRustType; use crate::fetch_metric; use crate::Session; -pub fn metric_delete(session: &Session, metric_name: &str) -> Result<(), Error> { +pub fn metric_delete(session: &Session, metric_name: &str) -> Result<(), Box> { let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?"); query.bind(0, metric_name)?; diff --git a/src/cmd/info.rs b/src/cmd/info.rs index ab62ad4..23a7ddd 100644 --- a/src/cmd/info.rs +++ b/src/cmd/info.rs @@ -3,12 +3,12 @@ * * Author: Patrick MARIE */ +use std::error; + use crate::Session; - use crate::fetch_metric; -use cassandra_cpp::Error; -pub fn metric_info(session: &Session, metric_name: &str) -> Result<(), Error> { +pub fn metric_info(session: &Session, metric_name: &str) -> Result<(), Box> { let metric = fetch_metric(session, metric_name)?; println!("{}", metric); diff --git a/src/cmd/list.rs b/src/cmd/list.rs index 35c5cf5..71c7ee9 100644 --- a/src/cmd/list.rs +++ b/src/cmd/list.rs @@ -38,7 +38,13 @@ pub fn metric_list(session: &Session, glob: &str) -> Result<(), Error> { } for result in results { - let rows = result.wait()?; + let res = result.wait(); + if let Err(err) = res { + eprintln!("Query failed: {}", err); + continue; + } + + let rows = res.unwrap(); let names = rows .iter() .map(|x| { diff --git a/src/cmd/local_clean.rs b/src/cmd/local_clean.rs new file mode 100644 index 0000000..fed7663 --- /dev/null +++ b/src/cmd/local_clean.rs @@ -0,0 +1,129 @@ +/* + * bgutil-rs + * + * Author: Patrick MARIE + */ +use std::error; + +use crate::Session; + +use crate::delete_directory; +use crate::delete_metric; +use crate::fetch_metric; +use crate::prepare_component_query_globstar; + +use cassandra_cpp::stmt; +use cassandra_cpp::BindRustType; + +use chrono::Utc; + +fn clean_metrics_in_directory(session: &Session, directory: &str) -> Result<(), Box> { + // println!("Cleaning metrics in directory: '{}'", directory); + + let mut directory = String::from(directory); + directory.push_str(".**"); + + let components = directory.split(".").collect::>(); + let mut results = vec![]; + + let query_metrics = prepare_component_query_globstar("metrics", &components)?; + let outdated_ts : u64 = (Utc::now().timestamp() as u64 - 14 * 86400) * 1000; + + for mut q in query_metrics { + q.set_consistency(session.read_consistency())?; + results.push(session.metadata_session().execute(&q)); + } + + for result in results { + let rows = result.wait()?; + for row in rows.iter() { + let name = row.get_column_by_name("name".to_string()).unwrap().to_string(); + let metric = fetch_metric(session, &name); + + if let Err(e) = metric { + eprintln!("Error while retrieving metric: {}", e); + continue; + } + + let metric = metric.unwrap(); + + if metric.updated_on() > outdated_ts { + continue; + } + + println!("Deleting metric {}", metric.name()); + delete_metric(session, metric.name())?; + } + } + + Ok(()) +} + +fn directory_has_metrics(session: &Session, directory: &str) -> Result> { + let query = format!("SELECT name FROM biggraphite_metadata.metrics WHERE parent LIKE ? LIMIT 1;"); + let mut query = stmt!(query.as_str()); + let mut directory = String::from(directory); + directory.push_str(".%"); + query.bind(0, directory.as_str())?; + + let result = session.metadata_session().execute(&query).wait()?; + + Ok(result.row_count() != 0) +} + +fn clean_empty_directories_in_directory(session: &Session, directory: &str) -> Result<(), Box> { + // println!("Cleaning empty directories in directory '{}'", directory); + + let mut directory = String::from(directory); + directory.push_str(".**"); + + let components = directory.split(".").collect::>(); + let mut results = vec![]; + + let query_directories = prepare_component_query_globstar("directories", &components)?; + + for mut q in query_directories { + q.set_consistency(session.read_consistency())?; + results.push(session.metadata_session().execute(&q)); + } + + for result in results { + let rows = result.wait()?; + for row in rows.iter() { + let name = row.get_column_by_name("name".to_string()).unwrap().to_string(); + + if directory_has_metrics(session, &name)? { + continue; + } + + println!("Deleting directory {}", name); + delete_directory(session, &name)?; + } + } + + Ok(()) +} + +pub fn metrics_local_clean(session: &Session, directory: &str) -> Result<(), Box> { + let components = directory.split(".").collect::>(); + + let query_directories = prepare_component_query_globstar("directories", &components)?; + let mut results = vec![]; + + for mut q in query_directories { + q.set_consistency(session.read_consistency())?; + results.push(session.metadata_session().execute(&q)); + } + + for result in results { + let rows = result.wait()?; + for row in rows.iter() { + let name = row.get_column_by_name("name".to_string()).unwrap().to_string(); + + clean_metrics_in_directory(session, &name)?; + clean_empty_directories_in_directory(session, &name)?; + } + } + + Ok(()) +} \ No newline at end of file diff --git a/src/cmd/write.rs b/src/cmd/write.rs index 168bbb9..e4352c5 100644 --- a/src/cmd/write.rs +++ b/src/cmd/write.rs @@ -3,6 +3,7 @@ * * Author: Patrick MARIE */ +use std::error; use std::str::FromStr; use crate::Session; @@ -12,12 +13,12 @@ use crate::create_metric; use crate::fetch_metric; use cassandra_cpp::stmt; -use cassandra_cpp::{BindRustType,Error}; +use cassandra_cpp::BindRustType; use cassandra_cpp::Uuid as CassUuid; use std::convert::TryFrom; -pub fn metric_write(session: &Session, metric_name: &str, value: f64, retention: &str, timestamp: i64) -> Result<(), Error> { +pub fn metric_write(session: &Session, metric_name: &str, value: f64, retention: &str, timestamp: i64) -> Result<(), Box> { let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?"); query.bind(0, metric_name)?; diff --git a/src/main.rs b/src/main.rs index f940788..c2bc1fa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,6 +26,7 @@ use crate::cmd::clean::*; use crate::cmd::delete::*; use crate::cmd::info::*; use crate::cmd::list::*; +use crate::cmd::local_clean::*; use crate::cmd::stats::*; use crate::cmd::write::*; @@ -110,7 +111,7 @@ fn main() -> Result<(), Box> { .long("end-key") .takes_value(true))) .subcommand(SubCommand::with_name("clean") - .about("Stats") + .about("Clean outdated metrics & empty directories") .arg(Arg::with_name("start-key") .long("start-key") .takes_value(true)) @@ -121,6 +122,9 @@ fn main() -> Result<(), Box> { .long("clean-metrics")) .arg(Arg::with_name("clean-directories") .long("clean-directories"))) + .subcommand(SubCommand::with_name("local-clean") + .about("Clean a directory of outdated metrics & empty sub-directories") + .arg(Arg::with_name("directory"))) .get_matches(); let mut contact_points_metadata = "localhost"; @@ -278,6 +282,12 @@ fn main() -> Result<(), Box> { metrics_clean(&session, start_key, end_key, clean_metrics, clean_directories)?; }, + Some("local-clean") => { + let matches = matches.subcommand_matches("local-clean").unwrap(); + let directory = matches.value_of("directory").unwrap(); + + metrics_local_clean(&session, directory)?; + } None => { eprintln!("No command was used."); return Ok(()); diff --git a/src/metric.rs b/src/metric.rs index 704ccac..f05db28 100644 --- a/src/metric.rs +++ b/src/metric.rs @@ -29,6 +29,10 @@ impl Metric { &self.name } + pub fn updated_on(self: &Self) -> u64 { + self.updated_on + } + pub fn config(self: &Self, name: String) -> Result { let res = self.config.get(&name); if let Some(v) = res {