From 640954ab5aebecee013a0a177d222f06aa3af1b6 Mon Sep 17 00:00:00 2001 From: Patrick MARIE Date: Thu, 25 Feb 2021 21:25:00 +0100 Subject: [PATCH] Module refactoring. --- src/cassandra.rs | 181 ++++++++++++++-------- src/cmd.rs | 11 ++ src/cmd/clean.rs | 148 ++++++++++++++++++ src/cmd/delete.rs | 41 +++++ src/cmd/info.rs | 17 +++ src/cmd/list.rs | 56 +++++++ src/cmd/stats.rs | 70 +++++++++ src/cmd/write.rs | 48 ++++++ src/main.rs | 379 ++-------------------------------------------- 9 files changed, 519 insertions(+), 432 deletions(-) create mode 100644 src/cmd.rs create mode 100644 src/cmd/clean.rs create mode 100644 src/cmd/delete.rs create mode 100644 src/cmd/info.rs create mode 100644 src/cmd/list.rs create mode 100644 src/cmd/stats.rs create mode 100644 src/cmd/write.rs diff --git a/src/cassandra.rs b/src/cassandra.rs index cebfe08..bb99d8f 100644 --- a/src/cassandra.rs +++ b/src/cassandra.rs @@ -3,9 +3,7 @@ * * Author: Patrick MARIE */ - use std::str::FromStr; -use std::convert::TryFrom; use crate::metric::Metric; use crate::session::Session; @@ -14,7 +12,7 @@ use crate::timerange::TimeRange; use cassandra_cpp::Session as CassSession; use cassandra_cpp::Uuid as CassUuid; -use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,Cluster,Error,LogLevel,Map}; +use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,Cluster,Error,LogLevel,Map,Statement}; use cassandra_cpp::{set_level,stmt}; use uuid::Uuid; @@ -31,6 +29,124 @@ pub fn connect(contact_points: &str) -> Result { cluster.connect() } +pub fn prepare_component_query(table_name: &str, arguments: &Vec<&str>) -> Result { + let mut q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name); + let mut component_number = 0; + let mut components = vec![]; + + for (id, component) in arguments.iter().enumerate() { + let mut operator = "="; + + if *component == "*" { + component_number += 1; + continue; + } + + if component_number != 0 { + q.push_str("AND "); + } + + if component.ends_with("*") { + operator = "LIKE"; + } + + q.push_str(format!("component_{} {} ? ", id, operator).as_str()); + component_number += 1; + components.push(component.replace("*", "%")); + } + + if component_number != 0 { + q.push_str("AND "); + } + + // Adding last component for __END__. + q.push_str(format!("component_{} = ? ALLOW FILTERING;", component_number).as_str()); + components.push("__END__".to_string()); + + let mut query = stmt!(q.as_str()); + + for (id, arg) in components.iter().enumerate() { + query.bind(id, arg.as_str())?; + } + + Ok(query) +} + +pub fn prepare_component_query_globstar(table_name: &str, arguments: &Vec<&str>) -> Result, Error> { + let _q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name); + let _component_number = 0; + + let mut out = vec![]; + + let pos_globstar = arguments.iter().enumerate().filter(|(_, &x)| x == "**").map(|(id, _)| id).collect::>(); + if pos_globstar.len() != 1 { + // XXX return error + return Ok(vec![prepare_component_query(table_name, arguments)?]); + } + + let pos_globstar = pos_globstar[0]; + let mut queries = vec![]; + + let mut init_args = vec![]; + let mut end_args = arguments[pos_globstar+1..].to_vec(); + end_args.push("__END__"); + + for (id, el) in arguments[0..pos_globstar].iter().enumerate() { + if *el == "*" { + continue; + } + + if el.ends_with("*") { + init_args.push((id, "LIKE", el.replace("*", "%"))); + } else { + init_args.push((id, "=", el.to_string())); + } + } + + let components = 16; + + for id in init_args.len()..(components-end_args.len()+1) { + let mut current_query = init_args.to_vec(); + + for (sub_id, el) in end_args.iter().enumerate() { + if *el == "*" { + continue; + } + + if el.ends_with("*") { + current_query.push((sub_id + id, "LIKE", el.replace("*", "%"))); + } else { + current_query.push((sub_id + id, "=", el.to_string())); + } + } + + queries.push(current_query); + } + + for query in &queries { + let mut current_query = _q.to_string(); + + for el in query { + if el.0 != 0 { + current_query.push_str("AND "); + } + + current_query.push_str(&format!("component_{} {} ? ", el.0, el.1)); + } + + current_query.push_str(&String::from("ALLOW FILTERING;")); + + let mut statement = stmt!(¤t_query); + for (id, el) in query.iter().enumerate() { + statement.bind(id, el.2.as_str())?; + } + + out.push(statement); + } + + Ok(out) +} + pub fn fetch_metric(session: &Session, metric_name: &str) -> Result { let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?"); query.bind(0, metric_name)?; @@ -195,62 +311,3 @@ pub fn create_metric(session: &Session, metric: &str) -> Result<(), Error> { Ok(()) } -pub fn metric_delete(session: &Session, metric_name: &str) -> Result<(), Error> { - let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?"); - query.bind(0, metric_name)?; - - let result = session.metadata_session().execute(&query).wait()?; - if result.row_count() == 0 { - println!("Metric is not existing"); - return Ok(()); - } - - let _metric = fetch_metric(session, metric_name)?; - - let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;"); - query.bind(0, metric_name)?; - query.set_consistency(session.write_consistency())?; - session.metadata_session().execute(&query).wait()?; - - let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;"); - query.bind(0, metric_name)?; - query.set_consistency(session.write_consistency())?; - session.metadata_session().execute(&query).wait()?; - - let mut query = stmt!("DELETE FROM biggraphite_metadata.directories WHERE name = ?;"); - query.bind(0, metric_name)?; - query.set_consistency(session.write_consistency())?; - session.metadata_session().execute(&query).wait()?; - - Ok(()) -} - -pub fn metric_write(session: &Session, metric_name: &str, value: f64, retention: &str, timestamp: i64) -> Result<(), Error> { - let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?"); - query.bind(0, metric_name)?; - - let result = session.metadata_session().execute(&query).wait()?; - if result.row_count() == 0 { - create_metric(session, metric_name)?; - } - - let stage = Stage::try_from(retention)?; - - let metric = fetch_metric(session, metric_name)?; - let (time_start_ms, offset) = stage.time_offset_ms(timestamp); - - let query = format!( - "INSERT INTO biggraphite.{} (metric, time_start_ms, offset, value) VALUES (?, ?, ?, ?);", - stage.table_name() - ); - - let mut query = stmt!(&query); - query.bind(0, CassUuid::from_str(metric.id().as_str())?)?; - query.bind(1, time_start_ms)?; - query.bind(2, offset as i16)?; - query.bind(3, value)?; - - session.points_session().execute(&query).wait()?; - - Ok(()) -} diff --git a/src/cmd.rs b/src/cmd.rs new file mode 100644 index 0000000..1ae306b --- /dev/null +++ b/src/cmd.rs @@ -0,0 +1,11 @@ +/* + * bgutil-rs + * + * Author: Patrick MARIE + */ +pub mod clean; +pub mod delete; +pub mod info; +pub mod list; +pub mod stats; +pub mod write; diff --git a/src/cmd/clean.rs b/src/cmd/clean.rs new file mode 100644 index 0000000..b568fc7 --- /dev/null +++ b/src/cmd/clean.rs @@ -0,0 +1,148 @@ +/* + * bgutil-rs + * + * Author: Patrick MARIE + */ +use crate::Session; + +use cassandra_cpp::{BindRustType,Error}; +use cassandra_cpp::stmt; + +use chrono::Utc; + +pub fn metrics_clean(session: &Session, start_key: i64, end_key: i64, clean_metrics: bool, clean_directories: bool) -> Result<(), Error> { + let mut current_token = start_key; + let cutoff : u64 = (Utc::now().timestamp() as u64 - 86400 * 14) * 1000; + + let namespace = "biggraphite_metadata"; + let batch_limit = 1000; + + let query = format!("SELECT name, token(name) FROM {}.metrics_metadata \ + WHERE updated_on <= maxTimeuuid({}) and token(name) > ? and token(name) < ? LIMIT {};", + namespace, cutoff, batch_limit); + let delete_metric_query = format!("DELETE FROM {}.metrics WHERE name = ?;", namespace); + let delete_metadata_query = format!("DELETE FROM {}.metrics_metadata WHERE name = ?;", namespace); + + let mut deleted_metrics_count = 0; + let mut scanned_metrics_count = 0; + let mut scanned_directories_count = 0; + let mut deleted_directories_count = 0; + + // clean metrics + loop { + if !clean_metrics || current_token >= end_key { + // println!("Stopping: {} >= {}", current_token, end_key); + break; + } + + let mut outdated_metrics_query = stmt!(query.as_str()); + outdated_metrics_query.set_consistency(session.read_consistency())?; + outdated_metrics_query.bind(0, current_token)?; + outdated_metrics_query.bind(1, end_key)?; + + let result = session.metadata_session().execute(&outdated_metrics_query).wait()?; + if result.row_count() == 0 { + break; + } + let mut queries = vec![]; + + for row in result.iter() { + let name = row.get_column_by_name("name".to_string())?.to_string(); + scanned_metrics_count += 1; + let mut delete_metric_query = stmt!(delete_metric_query.as_str()); + delete_metric_query.set_consistency(session.write_consistency())?; + delete_metric_query.bind(0, name.as_str())?; + queries.push(session.metadata_session().execute(&delete_metric_query)); + + let mut delete_metadata_query = stmt!(delete_metadata_query.as_str()); + delete_metric_query.set_consistency(session.write_consistency())?; + delete_metadata_query.bind(0, name.as_str())?; + queries.push(session.metadata_session().execute(&delete_metadata_query)); + + deleted_metrics_count += 1; + current_token = row.get_column(1)?.get_i64()?; + } + + if result.row_count() != batch_limit { + // println!("Stopping because count == 0"); + break; + } + + for query in queries { + if let Err(err) = query.wait() { + eprintln!("Failed: {:?}", err); + } + } + } + + let list_directories_query = format!("SELECT name, token(name) FROM {}.directories WHERE token(name) > ? AND token(name) < ? LIMIT {};", + namespace, batch_limit); + let metric_query = format!("SELECT name FROM {}.metrics WHERE parent LIKE ? LIMIT 1", namespace); + let delete_directory_query = format!("DELETE FROM {}.directories WHERE name = ?;", namespace); + + current_token = start_key; + + // clean directories + loop { + if !clean_directories || current_token >= end_key { + break; + } + + let mut list_directories_query = stmt!(list_directories_query.as_str()); + list_directories_query.set_consistency(session.read_consistency())?; + list_directories_query.bind(0, current_token)?; + list_directories_query.bind(1, end_key)?; + + let list_result = session.metadata_session().execute(&list_directories_query).wait()?; + if list_result.row_count() == 0 { + break; + } + + let mut queries = vec![]; + + for row in list_result.iter() { + let mut name = row.get_column_by_name("name".to_string())?.to_string(); + let orig_name = name.clone(); + name.push_str(".%"); + current_token = row.get_column(1)?.get_i64()?; + + let mut metric_query = stmt!(metric_query.as_str()); + metric_query.set_consistency(session.read_consistency())?; + metric_query.bind(0, name.as_str())?; + let query = session.metadata_session().execute(&metric_query); + + queries.push((orig_name, query)); + } + + let mut to_delete_queries = vec![]; + + for el in queries { + let result = el.1.wait()?; + scanned_directories_count += 1; + if result.row_count() != 0 { + continue; + } + + let mut delete_directory_query = stmt!(delete_directory_query.as_str()); + delete_directory_query.set_consistency(session.write_consistency())?; + delete_directory_query.bind(0, el.0.as_str())?; + + to_delete_queries.push(session.metadata_session().execute(&delete_directory_query)); + + deleted_directories_count += 1; + } + + for to_delete in to_delete_queries { + to_delete.wait()?; + } + + if list_result.row_count() != batch_limit { + break; + } + } + + println!("Deleted {} metrics, {} directories.", deleted_metrics_count, deleted_directories_count); + println!("Scanned {} metrics, {} directories", scanned_metrics_count, scanned_directories_count); + + Ok(()) +} diff --git a/src/cmd/delete.rs b/src/cmd/delete.rs new file mode 100644 index 0000000..95a8769 --- /dev/null +++ b/src/cmd/delete.rs @@ -0,0 +1,41 @@ +/* + * bgutil-rs + * + * Author: Patrick MARIE + */ + +use cassandra_cpp::stmt; +use cassandra_cpp::{BindRustType,Error}; +use crate::fetch_metric; + +use crate::Session; + +pub fn metric_delete(session: &Session, metric_name: &str) -> Result<(), Error> { + let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?"); + query.bind(0, metric_name)?; + + let result = session.metadata_session().execute(&query).wait()?; + if result.row_count() == 0 { + println!("Metric is not existing"); + return Ok(()); + } + + let _metric = fetch_metric(session, metric_name)?; + + let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;"); + query.bind(0, metric_name)?; + query.set_consistency(session.write_consistency())?; + session.metadata_session().execute(&query).wait()?; + + let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;"); + query.bind(0, metric_name)?; + query.set_consistency(session.write_consistency())?; + session.metadata_session().execute(&query).wait()?; + + let mut query = stmt!("DELETE FROM biggraphite_metadata.directories WHERE name = ?;"); + query.bind(0, metric_name)?; + query.set_consistency(session.write_consistency())?; + session.metadata_session().execute(&query).wait()?; + + Ok(()) +} diff --git a/src/cmd/info.rs b/src/cmd/info.rs new file mode 100644 index 0000000..ab62ad4 --- /dev/null +++ b/src/cmd/info.rs @@ -0,0 +1,17 @@ +/* + * bgutil-rs + * + * Author: Patrick MARIE + */ +use crate::Session; + +use crate::fetch_metric; +use cassandra_cpp::Error; + +pub fn metric_info(session: &Session, metric_name: &str) -> Result<(), Error> { + let metric = fetch_metric(session, metric_name)?; + + println!("{}", metric); + + Ok(()) +} diff --git a/src/cmd/list.rs b/src/cmd/list.rs new file mode 100644 index 0000000..35c5cf5 --- /dev/null +++ b/src/cmd/list.rs @@ -0,0 +1,56 @@ +/* + * bgutil-rs + * + * Author: Patrick MARIE + */ + +use crate::prepare_component_query_globstar; +use crate::fetch_metrics; +use crate::Session; +use cassandra_cpp::{Error}; + +pub fn metric_list(session: &Session, glob: &str) -> Result<(), Error> { + let components = glob.split(".").collect::>(); + + let query_directories = prepare_component_query_globstar("directories", &components)?; + let mut results = vec![]; + + for mut q in query_directories { + q.set_consistency(session.read_consistency())?; + results.push(session.metadata_session().execute(&q)); + } + + for result in results { + let rows = result.wait()?; + for row in rows.iter() { + let name = row.get_column_by_name("name".to_string()).unwrap().to_string(); + println!("d {}", name); + + } + } + + let query_metrics = prepare_component_query_globstar("metrics", &components)?; + let mut results = vec![]; + + for mut q in query_metrics { + q.set_consistency(session.read_consistency())?; + results.push(session.metadata_session().execute(&q)); + } + + for result in results { + let rows = result.wait()?; + let names = rows + .iter() + .map(|x| { + x.get_column_by_name("name".to_string()).unwrap().to_string() + }) + .collect::>(); + + let metrics = fetch_metrics(session, &names)?; + for metric in metrics { + println!("m {}", metric); + } + } + + Ok(()) +} diff --git a/src/cmd/stats.rs b/src/cmd/stats.rs new file mode 100644 index 0000000..6b1b73f --- /dev/null +++ b/src/cmd/stats.rs @@ -0,0 +1,70 @@ +/* + * bgutil-rs + * + * Author: Patrick MARIE + */ +use cassandra_cpp::{BindRustType,Error}; +use cassandra_cpp::stmt; + +use std::collections::HashMap; + +use crate::Metric; +use crate::Session; + +pub fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), Error> { + let q = + "SELECT id, name, token(name), config, created_on, updated_on, read_on \ + FROM biggraphite_metadata.metrics_metadata WHERE token(name) > ? LIMIT 1000"; + + let mut current_token = start_key; + let mut n = 0; + let mut points : u64 = 0; + + let mut stats : HashMap = HashMap::new(); + + while current_token < end_key { + let mut query = stmt!(q); + query.bind(0, current_token)?; + + let results = session.metadata_session().execute(&query).wait()?; + + for row in results.iter() { + current_token = row.get_column(2)?.get_i64()?; + + let metric : Metric = row.into(); + let stages = match metric.stages() { + Ok(stages) => stages, + Err(_) => continue, + }; + + for stage in stages { + points += stage.points() as u64; + } + + let parts = metric.name().split(".").collect::>(); + *stats.entry(String::from(parts[0])).or_insert(0) += 1; + + n += 1; + } + } + + let p : f64 = ((current_token - start_key) / std::i64::MAX) as f64; + + println!("Range: {} -> {} ({:.4}%)", start_key, current_token, 100. * p); + println!("{} metrics", n); + println!("{} points", points); + println!("-----"); + + let mut vec : Vec<(&String, &usize)> = stats.iter().collect(); + vec.sort_by(|a, b| b.1.cmp(a.1)); + + for (id, v) in vec.iter().enumerate() { + println!("{} {}", v.0, v.1); + + if id == 10 { + break; + } + } + + Ok(()) +} diff --git a/src/cmd/write.rs b/src/cmd/write.rs new file mode 100644 index 0000000..168bbb9 --- /dev/null +++ b/src/cmd/write.rs @@ -0,0 +1,48 @@ +/* + * bgutil-rs + * + * Author: Patrick MARIE + */ +use std::str::FromStr; + +use crate::Session; +use crate::Stage; + +use crate::create_metric; +use crate::fetch_metric; + +use cassandra_cpp::stmt; +use cassandra_cpp::{BindRustType,Error}; +use cassandra_cpp::Uuid as CassUuid; + +use std::convert::TryFrom; + +pub fn metric_write(session: &Session, metric_name: &str, value: f64, retention: &str, timestamp: i64) -> Result<(), Error> { + let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?"); + query.bind(0, metric_name)?; + + let result = session.metadata_session().execute(&query).wait()?; + if result.row_count() == 0 { + create_metric(session, metric_name)?; + } + + let stage = Stage::try_from(retention)?; + + let metric = fetch_metric(session, metric_name)?; + let (time_start_ms, offset) = stage.time_offset_ms(timestamp); + + let query = format!( + "INSERT INTO biggraphite.{} (metric, time_start_ms, offset, value) VALUES (?, ?, ?, ?);", + stage.table_name() + ); + + let mut query = stmt!(&query); + query.bind(0, CassUuid::from_str(metric.id().as_str())?)?; + query.bind(1, time_start_ms)?; + query.bind(2, offset as i16)?; + query.bind(3, value)?; + + session.points_session().execute(&query).wait()?; + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 09e7e70..f940788 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,13 +3,10 @@ * * Author: Patrick MARIE */ - -use std::collections::HashMap; use std::convert::TryFrom; use std::error; -use cassandra_cpp::{BindRustType,CassResult,Error,Statement}; -use cassandra_cpp::{stmt}; +use cassandra_cpp::CassResult; use chrono::Utc; use clap::{App,AppSettings,Arg,SubCommand}; @@ -18,12 +15,19 @@ mod metric; mod session; mod stage; mod timerange; +mod cmd; use crate::cassandra::*; use crate::session::Session; -use crate::stage::*; +use crate::stage::Stage; use crate::metric::Metric; +use crate::cmd::clean::*; +use crate::cmd::delete::*; +use crate::cmd::info::*; +use crate::cmd::list::*; +use crate::cmd::stats::*; +use crate::cmd::write::*; #[allow(dead_code)] fn describe_result(result: &CassResult) { @@ -38,178 +42,6 @@ fn describe_result(result: &CassResult) { } } -pub fn metric_info(session: &Session, metric_name: &str) -> Result<(), Error> { - let metric = fetch_metric(session, metric_name)?; - - println!("{}", metric); - - Ok(()) -} - -fn prepare_component_query(table_name: &str, arguments: &Vec<&str>) -> Result { - let mut q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name); - let mut component_number = 0; - let mut components = vec![]; - - for (id, component) in arguments.iter().enumerate() { - let mut operator = "="; - - if *component == "*" { - component_number += 1; - continue; - } - - if component_number != 0 { - q.push_str("AND "); - } - - if component.ends_with("*") { - operator = "LIKE"; - } - - q.push_str(format!("component_{} {} ? ", id, operator).as_str()); - component_number += 1; - components.push(component.replace("*", "%")); - } - - if component_number != 0 { - q.push_str("AND "); - } - - // Adding last component for __END__. - q.push_str(format!("component_{} = ? ALLOW FILTERING;", component_number).as_str()); - components.push("__END__".to_string()); - - let mut query = stmt!(q.as_str()); - - for (id, arg) in components.iter().enumerate() { - query.bind(id, arg.as_str())?; - } - - Ok(query) -} - -fn prepare_component_query_globstar(table_name: &str, arguments: &Vec<&str>) -> Result, Error> { - let _q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name); - let _component_number = 0; - - let mut out = vec![]; - - let pos_globstar = arguments.iter().enumerate().filter(|(_, &x)| x == "**").map(|(id, _)| id).collect::>(); - if pos_globstar.len() != 1 { - // XXX return error - return Ok(vec![prepare_component_query(table_name, arguments)?]); - } - - let pos_globstar = pos_globstar[0]; - let mut queries = vec![]; - - let mut init_args = vec![]; - let mut end_args = arguments[pos_globstar+1..].to_vec(); - end_args.push("__END__"); - - for (id, el) in arguments[0..pos_globstar].iter().enumerate() { - if *el == "*" { - continue; - } - - if el.ends_with("*") { - init_args.push((id, "LIKE", el.replace("*", "%"))); - } else { - init_args.push((id, "=", el.to_string())); - } - } - - let components = 16; - - for id in init_args.len()..(components-end_args.len()+1) { - let mut current_query = init_args.to_vec(); - - for (sub_id, el) in end_args.iter().enumerate() { - if *el == "*" { - continue; - } - - if el.ends_with("*") { - current_query.push((sub_id + id, "LIKE", el.replace("*", "%"))); - } else { - current_query.push((sub_id + id, "=", el.to_string())); - } - } - - queries.push(current_query); - } - - for query in &queries { - let mut current_query = _q.to_string(); - - for el in query { - if el.0 != 0 { - current_query.push_str("AND "); - } - - current_query.push_str(&format!("component_{} {} ? ", el.0, el.1)); - } - - current_query.push_str(&String::from("ALLOW FILTERING;")); - - let mut statement = stmt!(¤t_query); - for (id, el) in query.iter().enumerate() { - statement.bind(id, el.2.as_str())?; - } - - out.push(statement); - } - - Ok(out) -} - -fn metric_list(session: &Session, glob: &str) -> Result<(), Error> { - let components = glob.split(".").collect::>(); - - let query_directories = prepare_component_query_globstar("directories", &components)?; - let mut results = vec![]; - - for mut q in query_directories { - q.set_consistency(session.read_consistency())?; - results.push(session.metadata_session().execute(&q)); - } - - for result in results { - let rows = result.wait()?; - for row in rows.iter() { - let name = row.get_column_by_name("name".to_string()).unwrap().to_string(); - println!("d {}", name); - - } - } - - let query_metrics = prepare_component_query_globstar("metrics", &components)?; - let mut results = vec![]; - - for mut q in query_metrics { - q.set_consistency(session.read_consistency())?; - results.push(session.metadata_session().execute(&q)); - } - - for result in results { - let rows = result.wait()?; - let names = rows - .iter() - .map(|x| { - x.get_column_by_name("name".to_string()).unwrap().to_string() - }) - .collect::>(); - - let metrics = fetch_metrics(session, &names)?; - for metric in metrics { - println!("m {}", metric); - } - } - - Ok(()) -} - fn main() -> Result<(), Box> { let matches = App::new("bgutil-rs") .setting(AppSettings::SubcommandRequired) @@ -456,197 +288,4 @@ fn main() -> Result<(), Box> { Ok(()) } -fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), Error> { - let q = - "SELECT id, name, token(name), config, created_on, updated_on, read_on \ - FROM biggraphite_metadata.metrics_metadata WHERE token(name) > ? LIMIT 1000"; - let mut current_token = start_key; - let mut n = 0; - let mut points : u64 = 0; - - let mut stats : HashMap = HashMap::new(); - - while current_token < end_key { - let mut query = stmt!(q); - query.bind(0, current_token)?; - - let results = session.metadata_session().execute(&query).wait()?; - - for row in results.iter() { - current_token = row.get_column(2)?.get_i64()?; - - let metric : Metric = row.into(); - let stages = match metric.stages() { - Ok(stages) => stages, - Err(_) => continue, - }; - - for stage in stages { - points += stage.points() as u64; - } - - let parts = metric.name().split(".").collect::>(); - *stats.entry(String::from(parts[0])).or_insert(0) += 1; - - n += 1; - } - } - - let p : f64 = ((current_token - start_key) / std::i64::MAX) as f64; - - println!("Range: {} -> {} ({:.4}%)", start_key, current_token, 100. * p); - println!("{} metrics", n); - println!("{} points", points); - println!("-----"); - - let mut vec : Vec<(&String, &usize)> = stats.iter().collect(); - vec.sort_by(|a, b| b.1.cmp(a.1)); - - for (id, v) in vec.iter().enumerate() { - println!("{} {}", v.0, v.1); - - if id == 10 { - break; - } - } - - Ok(()) -} - -fn metrics_clean(session: &Session, start_key: i64, end_key: i64, clean_metrics: bool, clean_directories: bool) -> Result<(), Error> { - let mut current_token = start_key; - let cutoff : u64 = (Utc::now().timestamp() as u64 - 86400 * 14) * 1000; - - let namespace = "biggraphite_metadata"; - let batch_limit = 1000; - - let query = format!("SELECT name, token(name) FROM {}.metrics_metadata \ - WHERE updated_on <= maxTimeuuid({}) and token(name) > ? and token(name) < ? LIMIT {};", - namespace, cutoff, batch_limit); - let delete_metric_query = format!("DELETE FROM {}.metrics WHERE name = ?;", namespace); - let delete_metadata_query = format!("DELETE FROM {}.metrics_metadata WHERE name = ?;", namespace); - - let mut deleted_metrics_count = 0; - let mut scanned_metrics_count = 0; - let mut scanned_directories_count = 0; - let mut deleted_directories_count = 0; - - // clean metrics - loop { - if !clean_metrics || current_token >= end_key { - // println!("Stopping: {} >= {}", current_token, end_key); - break; - } - - let mut outdated_metrics_query = stmt!(query.as_str()); - outdated_metrics_query.set_consistency(session.read_consistency())?; - outdated_metrics_query.bind(0, current_token)?; - outdated_metrics_query.bind(1, end_key)?; - - let result = session.metadata_session().execute(&outdated_metrics_query).wait()?; - if result.row_count() == 0 { - break; - } - let mut queries = vec![]; - - for row in result.iter() { - let name = row.get_column_by_name("name".to_string())?.to_string(); - scanned_metrics_count += 1; - let mut delete_metric_query = stmt!(delete_metric_query.as_str()); - delete_metric_query.set_consistency(session.write_consistency())?; - delete_metric_query.bind(0, name.as_str())?; - queries.push(session.metadata_session().execute(&delete_metric_query)); - - let mut delete_metadata_query = stmt!(delete_metadata_query.as_str()); - delete_metric_query.set_consistency(session.write_consistency())?; - delete_metadata_query.bind(0, name.as_str())?; - queries.push(session.metadata_session().execute(&delete_metadata_query)); - - deleted_metrics_count += 1; - current_token = row.get_column(1)?.get_i64()?; - } - - if result.row_count() != batch_limit { - // println!("Stopping because count == 0"); - break; - } - - for query in queries { - if let Err(err) = query.wait() { - eprintln!("Failed: {:?}", err); - } - } - } - - let list_directories_query = format!("SELECT name, token(name) FROM {}.directories WHERE token(name) > ? AND token(name) < ? LIMIT {};", - namespace, batch_limit); - let metric_query = format!("SELECT name FROM {}.metrics WHERE parent LIKE ? LIMIT 1", namespace); - let delete_directory_query = format!("DELETE FROM {}.directories WHERE name = ?;", namespace); - - current_token = start_key; - - // clean directories - loop { - if !clean_directories || current_token >= end_key { - break; - } - - let mut list_directories_query = stmt!(list_directories_query.as_str()); - list_directories_query.set_consistency(session.read_consistency())?; - list_directories_query.bind(0, current_token)?; - list_directories_query.bind(1, end_key)?; - - let list_result = session.metadata_session().execute(&list_directories_query).wait()?; - if list_result.row_count() == 0 { - break; - } - - let mut queries = vec![]; - - for row in list_result.iter() { - let mut name = row.get_column_by_name("name".to_string())?.to_string(); - let orig_name = name.clone(); - name.push_str(".%"); - current_token = row.get_column(1)?.get_i64()?; - - let mut metric_query = stmt!(metric_query.as_str()); - metric_query.set_consistency(session.read_consistency())?; - metric_query.bind(0, name.as_str())?; - let query = session.metadata_session().execute(&metric_query); - - queries.push((orig_name, query)); - } - - let mut to_delete_queries = vec![]; - - for el in queries { - let result = el.1.wait()?; - scanned_directories_count += 1; - if result.row_count() != 0 { - continue; - } - - let mut delete_directory_query = stmt!(delete_directory_query.as_str()); - delete_directory_query.set_consistency(session.write_consistency())?; - delete_directory_query.bind(0, el.0.as_str())?; - - to_delete_queries.push(session.metadata_session().execute(&delete_directory_query)); - - deleted_directories_count += 1; - } - - for to_delete in to_delete_queries { - to_delete.wait()?; - } - - if list_result.row_count() != batch_limit { - break; - } - } - - println!("Deleted {} metrics, {} directories.", deleted_metrics_count, deleted_directories_count); - println!("Scanned {} metrics, {} directories", scanned_metrics_count, scanned_directories_count); - - Ok(()) -}