Module refactoring.

This commit is contained in:
Patrick MARIE 2021-02-25 21:25:00 +01:00
parent b8a670bbce
commit 640954ab5a
9 changed files with 519 additions and 432 deletions

View File

@ -3,9 +3,7 @@
* *
* Author: Patrick MARIE <pm@mkz.me> * Author: Patrick MARIE <pm@mkz.me>
*/ */
use std::str::FromStr; use std::str::FromStr;
use std::convert::TryFrom;
use crate::metric::Metric; use crate::metric::Metric;
use crate::session::Session; use crate::session::Session;
@ -14,7 +12,7 @@ use crate::timerange::TimeRange;
use cassandra_cpp::Session as CassSession; use cassandra_cpp::Session as CassSession;
use cassandra_cpp::Uuid as CassUuid; use cassandra_cpp::Uuid as CassUuid;
use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,Cluster,Error,LogLevel,Map}; use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,Cluster,Error,LogLevel,Map,Statement};
use cassandra_cpp::{set_level,stmt}; use cassandra_cpp::{set_level,stmt};
use uuid::Uuid; use uuid::Uuid;
@ -31,6 +29,124 @@ pub fn connect(contact_points: &str) -> Result<CassSession, Error> {
cluster.connect() cluster.connect()
} }
pub fn prepare_component_query(table_name: &str, arguments: &Vec<&str>) -> Result<Statement, Error> {
let mut q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name);
let mut component_number = 0;
let mut components = vec![];
for (id, component) in arguments.iter().enumerate() {
let mut operator = "=";
if *component == "*" {
component_number += 1;
continue;
}
if component_number != 0 {
q.push_str("AND ");
}
if component.ends_with("*") {
operator = "LIKE";
}
q.push_str(format!("component_{} {} ? ", id, operator).as_str());
component_number += 1;
components.push(component.replace("*", "%"));
}
if component_number != 0 {
q.push_str("AND ");
}
// Adding last component for __END__.
q.push_str(format!("component_{} = ? ALLOW FILTERING;", component_number).as_str());
components.push("__END__".to_string());
let mut query = stmt!(q.as_str());
for (id, arg) in components.iter().enumerate() {
query.bind(id, arg.as_str())?;
}
Ok(query)
}
pub fn prepare_component_query_globstar(table_name: &str, arguments: &Vec<&str>) -> Result<Vec<Statement>, Error> {
let _q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name);
let _component_number = 0;
let mut out = vec![];
let pos_globstar = arguments.iter().enumerate().filter(|(_, &x)| x == "**").map(|(id, _)| id).collect::<Vec<usize>>();
if pos_globstar.len() != 1 {
// XXX return error
return Ok(vec![prepare_component_query(table_name, arguments)?]);
}
let pos_globstar = pos_globstar[0];
let mut queries = vec![];
let mut init_args = vec![];
let mut end_args = arguments[pos_globstar+1..].to_vec();
end_args.push("__END__");
for (id, el) in arguments[0..pos_globstar].iter().enumerate() {
if *el == "*" {
continue;
}
if el.ends_with("*") {
init_args.push((id, "LIKE", el.replace("*", "%")));
} else {
init_args.push((id, "=", el.to_string()));
}
}
let components = 16;
for id in init_args.len()..(components-end_args.len()+1) {
let mut current_query = init_args.to_vec();
for (sub_id, el) in end_args.iter().enumerate() {
if *el == "*" {
continue;
}
if el.ends_with("*") {
current_query.push((sub_id + id, "LIKE", el.replace("*", "%")));
} else {
current_query.push((sub_id + id, "=", el.to_string()));
}
}
queries.push(current_query);
}
for query in &queries {
let mut current_query = _q.to_string();
for el in query {
if el.0 != 0 {
current_query.push_str("AND ");
}
current_query.push_str(&format!("component_{} {} ? ", el.0, el.1));
}
current_query.push_str(&String::from("ALLOW FILTERING;"));
let mut statement = stmt!(&current_query);
for (id, el) in query.iter().enumerate() {
statement.bind(id, el.2.as_str())?;
}
out.push(statement);
}
Ok(out)
}
pub fn fetch_metric(session: &Session, metric_name: &str) -> Result<Metric, Error> { pub fn fetch_metric(session: &Session, metric_name: &str) -> Result<Metric, Error> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?"); let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.bind(0, metric_name)?; query.bind(0, metric_name)?;
@ -195,62 +311,3 @@ pub fn create_metric(session: &Session, metric: &str) -> Result<(), Error> {
Ok(()) Ok(())
} }
pub fn metric_delete(session: &Session, metric_name: &str) -> Result<(), Error> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.bind(0, metric_name)?;
let result = session.metadata_session().execute(&query).wait()?;
if result.row_count() == 0 {
println!("Metric is not existing");
return Ok(());
}
let _metric = fetch_metric(session, metric_name)?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;");
query.bind(0, metric_name)?;
query.set_consistency(session.write_consistency())?;
session.metadata_session().execute(&query).wait()?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;");
query.bind(0, metric_name)?;
query.set_consistency(session.write_consistency())?;
session.metadata_session().execute(&query).wait()?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.directories WHERE name = ?;");
query.bind(0, metric_name)?;
query.set_consistency(session.write_consistency())?;
session.metadata_session().execute(&query).wait()?;
Ok(())
}
pub fn metric_write(session: &Session, metric_name: &str, value: f64, retention: &str, timestamp: i64) -> Result<(), Error> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.bind(0, metric_name)?;
let result = session.metadata_session().execute(&query).wait()?;
if result.row_count() == 0 {
create_metric(session, metric_name)?;
}
let stage = Stage::try_from(retention)?;
let metric = fetch_metric(session, metric_name)?;
let (time_start_ms, offset) = stage.time_offset_ms(timestamp);
let query = format!(
"INSERT INTO biggraphite.{} (metric, time_start_ms, offset, value) VALUES (?, ?, ?, ?);",
stage.table_name()
);
let mut query = stmt!(&query);
query.bind(0, CassUuid::from_str(metric.id().as_str())?)?;
query.bind(1, time_start_ms)?;
query.bind(2, offset as i16)?;
query.bind(3, value)?;
session.points_session().execute(&query).wait()?;
Ok(())
}

11
src/cmd.rs Normal file
View File

@ -0,0 +1,11 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
pub mod clean;
pub mod delete;
pub mod info;
pub mod list;
pub mod stats;
pub mod write;

148
src/cmd/clean.rs Normal file
View File

@ -0,0 +1,148 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use crate::Session;
use cassandra_cpp::{BindRustType,Error};
use cassandra_cpp::stmt;
use chrono::Utc;
pub fn metrics_clean(session: &Session, start_key: i64, end_key: i64, clean_metrics: bool, clean_directories: bool) -> Result<(), Error> {
let mut current_token = start_key;
let cutoff : u64 = (Utc::now().timestamp() as u64 - 86400 * 14) * 1000;
let namespace = "biggraphite_metadata";
let batch_limit = 1000;
let query = format!("SELECT name, token(name) FROM {}.metrics_metadata \
WHERE updated_on <= maxTimeuuid({}) and token(name) > ? and token(name) < ? LIMIT {};",
namespace, cutoff, batch_limit);
let delete_metric_query = format!("DELETE FROM {}.metrics WHERE name = ?;", namespace);
let delete_metadata_query = format!("DELETE FROM {}.metrics_metadata WHERE name = ?;", namespace);
let mut deleted_metrics_count = 0;
let mut scanned_metrics_count = 0;
let mut scanned_directories_count = 0;
let mut deleted_directories_count = 0;
// clean metrics
loop {
if !clean_metrics || current_token >= end_key {
// println!("Stopping: {} >= {}", current_token, end_key);
break;
}
let mut outdated_metrics_query = stmt!(query.as_str());
outdated_metrics_query.set_consistency(session.read_consistency())?;
outdated_metrics_query.bind(0, current_token)?;
outdated_metrics_query.bind(1, end_key)?;
let result = session.metadata_session().execute(&outdated_metrics_query).wait()?;
if result.row_count() == 0 {
break;
}
let mut queries = vec![];
for row in result.iter() {
let name = row.get_column_by_name("name".to_string())?.to_string();
scanned_metrics_count += 1;
let mut delete_metric_query = stmt!(delete_metric_query.as_str());
delete_metric_query.set_consistency(session.write_consistency())?;
delete_metric_query.bind(0, name.as_str())?;
queries.push(session.metadata_session().execute(&delete_metric_query));
let mut delete_metadata_query = stmt!(delete_metadata_query.as_str());
delete_metric_query.set_consistency(session.write_consistency())?;
delete_metadata_query.bind(0, name.as_str())?;
queries.push(session.metadata_session().execute(&delete_metadata_query));
deleted_metrics_count += 1;
current_token = row.get_column(1)?.get_i64()?;
}
if result.row_count() != batch_limit {
// println!("Stopping because count == 0");
break;
}
for query in queries {
if let Err(err) = query.wait() {
eprintln!("Failed: {:?}", err);
}
}
}
let list_directories_query = format!("SELECT name, token(name) FROM {}.directories WHERE token(name) > ? AND token(name) < ? LIMIT {};",
namespace, batch_limit);
let metric_query = format!("SELECT name FROM {}.metrics WHERE parent LIKE ? LIMIT 1", namespace);
let delete_directory_query = format!("DELETE FROM {}.directories WHERE name = ?;", namespace);
current_token = start_key;
// clean directories
loop {
if !clean_directories || current_token >= end_key {
break;
}
let mut list_directories_query = stmt!(list_directories_query.as_str());
list_directories_query.set_consistency(session.read_consistency())?;
list_directories_query.bind(0, current_token)?;
list_directories_query.bind(1, end_key)?;
let list_result = session.metadata_session().execute(&list_directories_query).wait()?;
if list_result.row_count() == 0 {
break;
}
let mut queries = vec![];
for row in list_result.iter() {
let mut name = row.get_column_by_name("name".to_string())?.to_string();
let orig_name = name.clone();
name.push_str(".%");
current_token = row.get_column(1)?.get_i64()?;
let mut metric_query = stmt!(metric_query.as_str());
metric_query.set_consistency(session.read_consistency())?;
metric_query.bind(0, name.as_str())?;
let query = session.metadata_session().execute(&metric_query);
queries.push((orig_name, query));
}
let mut to_delete_queries = vec![];
for el in queries {
let result = el.1.wait()?;
scanned_directories_count += 1;
if result.row_count() != 0 {
continue;
}
let mut delete_directory_query = stmt!(delete_directory_query.as_str());
delete_directory_query.set_consistency(session.write_consistency())?;
delete_directory_query.bind(0, el.0.as_str())?;
to_delete_queries.push(session.metadata_session().execute(&delete_directory_query));
deleted_directories_count += 1;
}
for to_delete in to_delete_queries {
to_delete.wait()?;
}
if list_result.row_count() != batch_limit {
break;
}
}
println!("Deleted {} metrics, {} directories.", deleted_metrics_count, deleted_directories_count);
println!("Scanned {} metrics, {} directories", scanned_metrics_count, scanned_directories_count);
Ok(())
}

41
src/cmd/delete.rs Normal file
View File

@ -0,0 +1,41 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use cassandra_cpp::stmt;
use cassandra_cpp::{BindRustType,Error};
use crate::fetch_metric;
use crate::Session;
pub fn metric_delete(session: &Session, metric_name: &str) -> Result<(), Error> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.bind(0, metric_name)?;
let result = session.metadata_session().execute(&query).wait()?;
if result.row_count() == 0 {
println!("Metric is not existing");
return Ok(());
}
let _metric = fetch_metric(session, metric_name)?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;");
query.bind(0, metric_name)?;
query.set_consistency(session.write_consistency())?;
session.metadata_session().execute(&query).wait()?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;");
query.bind(0, metric_name)?;
query.set_consistency(session.write_consistency())?;
session.metadata_session().execute(&query).wait()?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.directories WHERE name = ?;");
query.bind(0, metric_name)?;
query.set_consistency(session.write_consistency())?;
session.metadata_session().execute(&query).wait()?;
Ok(())
}

17
src/cmd/info.rs Normal file
View File

@ -0,0 +1,17 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use crate::Session;
use crate::fetch_metric;
use cassandra_cpp::Error;
pub fn metric_info(session: &Session, metric_name: &str) -> Result<(), Error> {
let metric = fetch_metric(session, metric_name)?;
println!("{}", metric);
Ok(())
}

56
src/cmd/list.rs Normal file
View File

@ -0,0 +1,56 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use crate::prepare_component_query_globstar;
use crate::fetch_metrics;
use crate::Session;
use cassandra_cpp::{Error};
pub fn metric_list(session: &Session, glob: &str) -> Result<(), Error> {
let components = glob.split(".").collect::<Vec<&str>>();
let query_directories = prepare_component_query_globstar("directories", &components)?;
let mut results = vec![];
for mut q in query_directories {
q.set_consistency(session.read_consistency())?;
results.push(session.metadata_session().execute(&q));
}
for result in results {
let rows = result.wait()?;
for row in rows.iter() {
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
println!("d {}", name);
}
}
let query_metrics = prepare_component_query_globstar("metrics", &components)?;
let mut results = vec![];
for mut q in query_metrics {
q.set_consistency(session.read_consistency())?;
results.push(session.metadata_session().execute(&q));
}
for result in results {
let rows = result.wait()?;
let names = rows
.iter()
.map(|x| {
x.get_column_by_name("name".to_string()).unwrap().to_string()
})
.collect::<Vec<String>>();
let metrics = fetch_metrics(session, &names)?;
for metric in metrics {
println!("m {}", metric);
}
}
Ok(())
}

70
src/cmd/stats.rs Normal file
View File

@ -0,0 +1,70 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use cassandra_cpp::{BindRustType,Error};
use cassandra_cpp::stmt;
use std::collections::HashMap;
use crate::Metric;
use crate::Session;
pub fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), Error> {
let q =
"SELECT id, name, token(name), config, created_on, updated_on, read_on \
FROM biggraphite_metadata.metrics_metadata WHERE token(name) > ? LIMIT 1000";
let mut current_token = start_key;
let mut n = 0;
let mut points : u64 = 0;
let mut stats : HashMap<String, usize> = HashMap::new();
while current_token < end_key {
let mut query = stmt!(q);
query.bind(0, current_token)?;
let results = session.metadata_session().execute(&query).wait()?;
for row in results.iter() {
current_token = row.get_column(2)?.get_i64()?;
let metric : Metric = row.into();
let stages = match metric.stages() {
Ok(stages) => stages,
Err(_) => continue,
};
for stage in stages {
points += stage.points() as u64;
}
let parts = metric.name().split(".").collect::<Vec<&str>>();
*stats.entry(String::from(parts[0])).or_insert(0) += 1;
n += 1;
}
}
let p : f64 = ((current_token - start_key) / std::i64::MAX) as f64;
println!("Range: {} -> {} ({:.4}%)", start_key, current_token, 100. * p);
println!("{} metrics", n);
println!("{} points", points);
println!("-----");
let mut vec : Vec<(&String, &usize)> = stats.iter().collect();
vec.sort_by(|a, b| b.1.cmp(a.1));
for (id, v) in vec.iter().enumerate() {
println!("{} {}", v.0, v.1);
if id == 10 {
break;
}
}
Ok(())
}

48
src/cmd/write.rs Normal file
View File

@ -0,0 +1,48 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use std::str::FromStr;
use crate::Session;
use crate::Stage;
use crate::create_metric;
use crate::fetch_metric;
use cassandra_cpp::stmt;
use cassandra_cpp::{BindRustType,Error};
use cassandra_cpp::Uuid as CassUuid;
use std::convert::TryFrom;
pub fn metric_write(session: &Session, metric_name: &str, value: f64, retention: &str, timestamp: i64) -> Result<(), Error> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.bind(0, metric_name)?;
let result = session.metadata_session().execute(&query).wait()?;
if result.row_count() == 0 {
create_metric(session, metric_name)?;
}
let stage = Stage::try_from(retention)?;
let metric = fetch_metric(session, metric_name)?;
let (time_start_ms, offset) = stage.time_offset_ms(timestamp);
let query = format!(
"INSERT INTO biggraphite.{} (metric, time_start_ms, offset, value) VALUES (?, ?, ?, ?);",
stage.table_name()
);
let mut query = stmt!(&query);
query.bind(0, CassUuid::from_str(metric.id().as_str())?)?;
query.bind(1, time_start_ms)?;
query.bind(2, offset as i16)?;
query.bind(3, value)?;
session.points_session().execute(&query).wait()?;
Ok(())
}

View File

@ -3,13 +3,10 @@
* *
* Author: Patrick MARIE <pm@mkz.me> * Author: Patrick MARIE <pm@mkz.me>
*/ */
use std::collections::HashMap;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::error; use std::error;
use cassandra_cpp::{BindRustType,CassResult,Error,Statement}; use cassandra_cpp::CassResult;
use cassandra_cpp::{stmt};
use chrono::Utc; use chrono::Utc;
use clap::{App,AppSettings,Arg,SubCommand}; use clap::{App,AppSettings,Arg,SubCommand};
@ -18,12 +15,19 @@ mod metric;
mod session; mod session;
mod stage; mod stage;
mod timerange; mod timerange;
mod cmd;
use crate::cassandra::*; use crate::cassandra::*;
use crate::session::Session; use crate::session::Session;
use crate::stage::*; use crate::stage::Stage;
use crate::metric::Metric; use crate::metric::Metric;
use crate::cmd::clean::*;
use crate::cmd::delete::*;
use crate::cmd::info::*;
use crate::cmd::list::*;
use crate::cmd::stats::*;
use crate::cmd::write::*;
#[allow(dead_code)] #[allow(dead_code)]
fn describe_result(result: &CassResult) { fn describe_result(result: &CassResult) {
@ -38,178 +42,6 @@ fn describe_result(result: &CassResult) {
} }
} }
pub fn metric_info(session: &Session, metric_name: &str) -> Result<(), Error> {
let metric = fetch_metric(session, metric_name)?;
println!("{}", metric);
Ok(())
}
fn prepare_component_query(table_name: &str, arguments: &Vec<&str>) -> Result<Statement, Error> {
let mut q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name);
let mut component_number = 0;
let mut components = vec![];
for (id, component) in arguments.iter().enumerate() {
let mut operator = "=";
if *component == "*" {
component_number += 1;
continue;
}
if component_number != 0 {
q.push_str("AND ");
}
if component.ends_with("*") {
operator = "LIKE";
}
q.push_str(format!("component_{} {} ? ", id, operator).as_str());
component_number += 1;
components.push(component.replace("*", "%"));
}
if component_number != 0 {
q.push_str("AND ");
}
// Adding last component for __END__.
q.push_str(format!("component_{} = ? ALLOW FILTERING;", component_number).as_str());
components.push("__END__".to_string());
let mut query = stmt!(q.as_str());
for (id, arg) in components.iter().enumerate() {
query.bind(id, arg.as_str())?;
}
Ok(query)
}
fn prepare_component_query_globstar(table_name: &str, arguments: &Vec<&str>) -> Result<Vec<Statement>, Error> {
let _q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name);
let _component_number = 0;
let mut out = vec![];
let pos_globstar = arguments.iter().enumerate().filter(|(_, &x)| x == "**").map(|(id, _)| id).collect::<Vec<usize>>();
if pos_globstar.len() != 1 {
// XXX return error
return Ok(vec![prepare_component_query(table_name, arguments)?]);
}
let pos_globstar = pos_globstar[0];
let mut queries = vec![];
let mut init_args = vec![];
let mut end_args = arguments[pos_globstar+1..].to_vec();
end_args.push("__END__");
for (id, el) in arguments[0..pos_globstar].iter().enumerate() {
if *el == "*" {
continue;
}
if el.ends_with("*") {
init_args.push((id, "LIKE", el.replace("*", "%")));
} else {
init_args.push((id, "=", el.to_string()));
}
}
let components = 16;
for id in init_args.len()..(components-end_args.len()+1) {
let mut current_query = init_args.to_vec();
for (sub_id, el) in end_args.iter().enumerate() {
if *el == "*" {
continue;
}
if el.ends_with("*") {
current_query.push((sub_id + id, "LIKE", el.replace("*", "%")));
} else {
current_query.push((sub_id + id, "=", el.to_string()));
}
}
queries.push(current_query);
}
for query in &queries {
let mut current_query = _q.to_string();
for el in query {
if el.0 != 0 {
current_query.push_str("AND ");
}
current_query.push_str(&format!("component_{} {} ? ", el.0, el.1));
}
current_query.push_str(&String::from("ALLOW FILTERING;"));
let mut statement = stmt!(&current_query);
for (id, el) in query.iter().enumerate() {
statement.bind(id, el.2.as_str())?;
}
out.push(statement);
}
Ok(out)
}
fn metric_list(session: &Session, glob: &str) -> Result<(), Error> {
let components = glob.split(".").collect::<Vec<&str>>();
let query_directories = prepare_component_query_globstar("directories", &components)?;
let mut results = vec![];
for mut q in query_directories {
q.set_consistency(session.read_consistency())?;
results.push(session.metadata_session().execute(&q));
}
for result in results {
let rows = result.wait()?;
for row in rows.iter() {
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
println!("d {}", name);
}
}
let query_metrics = prepare_component_query_globstar("metrics", &components)?;
let mut results = vec![];
for mut q in query_metrics {
q.set_consistency(session.read_consistency())?;
results.push(session.metadata_session().execute(&q));
}
for result in results {
let rows = result.wait()?;
let names = rows
.iter()
.map(|x| {
x.get_column_by_name("name".to_string()).unwrap().to_string()
})
.collect::<Vec<String>>();
let metrics = fetch_metrics(session, &names)?;
for metric in metrics {
println!("m {}", metric);
}
}
Ok(())
}
fn main() -> Result<(), Box<dyn error::Error>> { fn main() -> Result<(), Box<dyn error::Error>> {
let matches = App::new("bgutil-rs") let matches = App::new("bgutil-rs")
.setting(AppSettings::SubcommandRequired) .setting(AppSettings::SubcommandRequired)
@ -456,197 +288,4 @@ fn main() -> Result<(), Box<dyn error::Error>> {
Ok(()) Ok(())
} }
fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), Error> {
let q =
"SELECT id, name, token(name), config, created_on, updated_on, read_on \
FROM biggraphite_metadata.metrics_metadata WHERE token(name) > ? LIMIT 1000";
let mut current_token = start_key;
let mut n = 0;
let mut points : u64 = 0;
let mut stats : HashMap<String, usize> = HashMap::new();
while current_token < end_key {
let mut query = stmt!(q);
query.bind(0, current_token)?;
let results = session.metadata_session().execute(&query).wait()?;
for row in results.iter() {
current_token = row.get_column(2)?.get_i64()?;
let metric : Metric = row.into();
let stages = match metric.stages() {
Ok(stages) => stages,
Err(_) => continue,
};
for stage in stages {
points += stage.points() as u64;
}
let parts = metric.name().split(".").collect::<Vec<&str>>();
*stats.entry(String::from(parts[0])).or_insert(0) += 1;
n += 1;
}
}
let p : f64 = ((current_token - start_key) / std::i64::MAX) as f64;
println!("Range: {} -> {} ({:.4}%)", start_key, current_token, 100. * p);
println!("{} metrics", n);
println!("{} points", points);
println!("-----");
let mut vec : Vec<(&String, &usize)> = stats.iter().collect();
vec.sort_by(|a, b| b.1.cmp(a.1));
for (id, v) in vec.iter().enumerate() {
println!("{} {}", v.0, v.1);
if id == 10 {
break;
}
}
Ok(())
}
fn metrics_clean(session: &Session, start_key: i64, end_key: i64, clean_metrics: bool, clean_directories: bool) -> Result<(), Error> {
let mut current_token = start_key;
let cutoff : u64 = (Utc::now().timestamp() as u64 - 86400 * 14) * 1000;
let namespace = "biggraphite_metadata";
let batch_limit = 1000;
let query = format!("SELECT name, token(name) FROM {}.metrics_metadata \
WHERE updated_on <= maxTimeuuid({}) and token(name) > ? and token(name) < ? LIMIT {};",
namespace, cutoff, batch_limit);
let delete_metric_query = format!("DELETE FROM {}.metrics WHERE name = ?;", namespace);
let delete_metadata_query = format!("DELETE FROM {}.metrics_metadata WHERE name = ?;", namespace);
let mut deleted_metrics_count = 0;
let mut scanned_metrics_count = 0;
let mut scanned_directories_count = 0;
let mut deleted_directories_count = 0;
// clean metrics
loop {
if !clean_metrics || current_token >= end_key {
// println!("Stopping: {} >= {}", current_token, end_key);
break;
}
let mut outdated_metrics_query = stmt!(query.as_str());
outdated_metrics_query.set_consistency(session.read_consistency())?;
outdated_metrics_query.bind(0, current_token)?;
outdated_metrics_query.bind(1, end_key)?;
let result = session.metadata_session().execute(&outdated_metrics_query).wait()?;
if result.row_count() == 0 {
break;
}
let mut queries = vec![];
for row in result.iter() {
let name = row.get_column_by_name("name".to_string())?.to_string();
scanned_metrics_count += 1;
let mut delete_metric_query = stmt!(delete_metric_query.as_str());
delete_metric_query.set_consistency(session.write_consistency())?;
delete_metric_query.bind(0, name.as_str())?;
queries.push(session.metadata_session().execute(&delete_metric_query));
let mut delete_metadata_query = stmt!(delete_metadata_query.as_str());
delete_metric_query.set_consistency(session.write_consistency())?;
delete_metadata_query.bind(0, name.as_str())?;
queries.push(session.metadata_session().execute(&delete_metadata_query));
deleted_metrics_count += 1;
current_token = row.get_column(1)?.get_i64()?;
}
if result.row_count() != batch_limit {
// println!("Stopping because count == 0");
break;
}
for query in queries {
if let Err(err) = query.wait() {
eprintln!("Failed: {:?}", err);
}
}
}
let list_directories_query = format!("SELECT name, token(name) FROM {}.directories WHERE token(name) > ? AND token(name) < ? LIMIT {};",
namespace, batch_limit);
let metric_query = format!("SELECT name FROM {}.metrics WHERE parent LIKE ? LIMIT 1", namespace);
let delete_directory_query = format!("DELETE FROM {}.directories WHERE name = ?;", namespace);
current_token = start_key;
// clean directories
loop {
if !clean_directories || current_token >= end_key {
break;
}
let mut list_directories_query = stmt!(list_directories_query.as_str());
list_directories_query.set_consistency(session.read_consistency())?;
list_directories_query.bind(0, current_token)?;
list_directories_query.bind(1, end_key)?;
let list_result = session.metadata_session().execute(&list_directories_query).wait()?;
if list_result.row_count() == 0 {
break;
}
let mut queries = vec![];
for row in list_result.iter() {
let mut name = row.get_column_by_name("name".to_string())?.to_string();
let orig_name = name.clone();
name.push_str(".%");
current_token = row.get_column(1)?.get_i64()?;
let mut metric_query = stmt!(metric_query.as_str());
metric_query.set_consistency(session.read_consistency())?;
metric_query.bind(0, name.as_str())?;
let query = session.metadata_session().execute(&metric_query);
queries.push((orig_name, query));
}
let mut to_delete_queries = vec![];
for el in queries {
let result = el.1.wait()?;
scanned_directories_count += 1;
if result.row_count() != 0 {
continue;
}
let mut delete_directory_query = stmt!(delete_directory_query.as_str());
delete_directory_query.set_consistency(session.write_consistency())?;
delete_directory_query.bind(0, el.0.as_str())?;
to_delete_queries.push(session.metadata_session().execute(&delete_directory_query));
deleted_directories_count += 1;
}
for to_delete in to_delete_queries {
to_delete.wait()?;
}
if list_result.row_count() != batch_limit {
break;
}
}
println!("Deleted {} metrics, {} directories.", deleted_metrics_count, deleted_directories_count);
println!("Scanned {} metrics, {} directories", scanned_metrics_count, scanned_directories_count);
Ok(())
}