From 9fb6975188f527ced8d1a1945d36702ef24bafca Mon Sep 17 00:00:00 2001 From: Patrick MARIE Date: Wed, 24 Feb 2021 21:41:26 +0100 Subject: [PATCH] Implementing "clean" command. --- src/main.rs | 189 ++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 184 insertions(+), 5 deletions(-) diff --git a/src/main.rs b/src/main.rs index 5255a63..357b7e5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -269,6 +269,18 @@ fn main() -> Result<(), Box> { .arg(Arg::with_name("end-key") .long("end-key") .takes_value(true))) + .subcommand(SubCommand::with_name("clean") + .about("Stats") + .arg(Arg::with_name("start-key") + .long("start-key") + .takes_value(true)) + .arg(Arg::with_name("end-key") + .long("end-key") + .takes_value(true)) + .arg(Arg::with_name("clean-metrics") + .long("clean-metrics")) + .arg(Arg::with_name("clean-directories") + .long("clean-directories"))) .get_matches(); let contact_points_metadata = "tag--cstars07--cassandra-cstars07.query.consul.preprod.crto.in"; @@ -359,8 +371,8 @@ fn main() -> Result<(), Box> { }, Some("stats") => { let matches = matches.subcommand_matches("stats").unwrap(); - let start_key = matches.value_of("start-key"); // 0 - let end_key = matches.value_of("end-key"); // 100000000000000 + let start_key = matches.value_of("start-key"); + let end_key = matches.value_of("end-key"); let start_key = match start_key { None => 0, @@ -386,6 +398,39 @@ fn main() -> Result<(), Box> { metric_stats(&session, start_key, end_key)?; }, + Some("clean") => { + let matches = matches.subcommand_matches("clean").unwrap(); + + let start_key = matches.value_of("start-key"); + let end_key = matches.value_of("end-key"); + + let start_key = match start_key { + None => std::i64::MIN, + Some(s) => match s.parse::() { + Ok(n) => n, + Err(_) => { + eprintln!("Could not parse {}", s); + return Ok(()) + } + } + }; + + let end_key = match end_key { + None => std::i64::MAX, + Some(s) => match s.parse::() { + Ok(n) => n, + Err(_) => { + eprintln!("Could not parse {}", s); + return Ok(()) + } + } + }; + + let clean_metrics = matches.is_present("clean-metrics"); + let clean_directories = matches.is_present("clean-directories"); + + metrics_clean(&session, start_key, end_key, clean_metrics, clean_directories)?; + }, None => { eprintln!("No command was used."); return Ok(()); @@ -396,7 +441,6 @@ fn main() -> Result<(), Box> { Ok(()) } - fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), Error> { let q = "SELECT id, name, token(name), config, created_on, updated_on, read_on \ @@ -428,10 +472,8 @@ fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), E } let parts = metric.name().split(".").collect::>(); - *stats.entry(String::from(parts[0])).or_insert(0) += 1; - // println!("{}: {}", current_token, metric.name()); n += 1; } } @@ -456,3 +498,140 @@ fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), E Ok(()) } + +fn metrics_clean(session: &Session, start_key: i64, end_key: i64, clean_metrics: bool, clean_directories: bool) -> Result<(), Error> { + let mut current_token = start_key; + let cutoff : u64 = (Utc::now().timestamp() as u64 - 86400 * 14) * 1000; + + let namespace = "biggraphite_metadata"; + let batch_limit = 1000; + + let query = format!("SELECT name, token(name) FROM {}.metrics_metadata \ + WHERE updated_on <= maxTimeuuid({}) and token(name) > ? and token(name) < ? LIMIT {};", + namespace, cutoff, batch_limit); + let delete_metric_query = format!("DELETE FROM {}.metrics WHERE name = ?;", namespace); + let delete_metadata_query = format!("DELETE FROM {}.metrics_metadata WHERE name = ?;", namespace); + + let mut deleted_metrics_count = 0; + let mut scanned_metrics_count = 0; + let mut scanned_directories_count = 0; + let mut deleted_directories_count = 0; + + // clean metrics + loop { + if !clean_metrics || current_token >= end_key { + // println!("Stopping: {} >= {}", current_token, end_key); + break; + } + + let mut outdated_metrics_query = stmt!(query.as_str()); + outdated_metrics_query.set_consistency(session.read_consistency())?; + outdated_metrics_query.bind(0, current_token)?; + outdated_metrics_query.bind(1, end_key)?; + + let result = session.metadata_session().execute(&outdated_metrics_query).wait()?; + if result.row_count() == 0 { + break; + } + let mut queries = vec![]; + + for row in result.iter() { + let name = row.get_column_by_name("name".to_string())?.to_string(); + scanned_metrics_count += 1; + let mut delete_metric_query = stmt!(delete_metric_query.as_str()); + delete_metric_query.set_consistency(session.write_consistency())?; + delete_metric_query.bind(0, name.as_str())?; + queries.push(session.metadata_session().execute(&delete_metric_query)); + + let mut delete_metadata_query = stmt!(delete_metadata_query.as_str()); + delete_metric_query.set_consistency(session.write_consistency())?; + delete_metadata_query.bind(0, name.as_str())?; + queries.push(session.metadata_session().execute(&delete_metadata_query)); + + deleted_metrics_count += 1; + current_token = row.get_column(1)?.get_i64()?; + } + + if result.row_count() != batch_limit { + // println!("Stopping because count == 0"); + break; + } + + for query in queries { + if let Err(err) = query.wait() { + eprintln!("Failed: {:?}", err); + } + } + } + + let list_directories_query = format!("SELECT name, token(name) FROM {}.directories WHERE token(name) > ? AND token(name) < ? LIMIT {};", + namespace, batch_limit); + let metric_query = format!("SELECT name FROM {}.metrics WHERE parent LIKE ? LIMIT 1", namespace); + let delete_directory_query = format!("DELETE FROM {}.directories WHERE name = ?;", namespace); + + current_token = start_key; + + // clean directories + loop { + if !clean_directories || current_token >= end_key { + break; + } + + let mut list_directories_query = stmt!(list_directories_query.as_str()); + list_directories_query.set_consistency(session.read_consistency())?; + list_directories_query.bind(0, current_token)?; + list_directories_query.bind(1, end_key)?; + + let list_result = session.metadata_session().execute(&list_directories_query).wait()?; + if list_result.row_count() == 0 { + break; + } + + let mut queries = vec![]; + + for row in list_result.iter() { + let mut name = row.get_column_by_name("name".to_string())?.to_string(); + let orig_name = name.clone(); + name.push_str(".%"); + current_token = row.get_column(1)?.get_i64()?; + + let mut metric_query = stmt!(metric_query.as_str()); + metric_query.set_consistency(session.read_consistency())?; + metric_query.bind(0, name.as_str())?; + let query = session.metadata_session().execute(&metric_query); + + queries.push((orig_name, query)); + } + + let mut to_delete_queries = vec![]; + + for el in queries { + let result = el.1.wait()?; + scanned_directories_count += 1; + if result.row_count() != 0 { + continue; + } + + let mut delete_directory_query = stmt!(delete_directory_query.as_str()); + delete_directory_query.set_consistency(session.write_consistency())?; + delete_directory_query.bind(0, el.0.as_str())?; + + to_delete_queries.push(session.metadata_session().execute(&delete_directory_query)); + + deleted_directories_count += 1; + } + + for to_delete in to_delete_queries { + to_delete.wait()?; + } + + if list_result.row_count() != batch_limit { + break; + } + } + + println!("Deleted {} metrics, {} directories.", deleted_metrics_count, deleted_directories_count); + println!("Scanned {} metrics, {} directories", scanned_metrics_count, scanned_directories_count); + + Ok(()) +}