Implementing "clean" command.

This commit is contained in:
Patrick MARIE 2021-02-24 21:41:26 +01:00
parent f6b412ca5b
commit 9fb6975188

View File

@ -269,6 +269,18 @@ fn main() -> Result<(), Box<dyn error::Error>> {
.arg(Arg::with_name("end-key")
.long("end-key")
.takes_value(true)))
.subcommand(SubCommand::with_name("clean")
.about("Stats")
.arg(Arg::with_name("start-key")
.long("start-key")
.takes_value(true))
.arg(Arg::with_name("end-key")
.long("end-key")
.takes_value(true))
.arg(Arg::with_name("clean-metrics")
.long("clean-metrics"))
.arg(Arg::with_name("clean-directories")
.long("clean-directories")))
.get_matches();
let contact_points_metadata = "tag--cstars07--cassandra-cstars07.query.consul.preprod.crto.in";
@ -359,8 +371,8 @@ fn main() -> Result<(), Box<dyn error::Error>> {
},
Some("stats") => {
let matches = matches.subcommand_matches("stats").unwrap();
let start_key = matches.value_of("start-key"); // 0
let end_key = matches.value_of("end-key"); // 100000000000000
let start_key = matches.value_of("start-key");
let end_key = matches.value_of("end-key");
let start_key = match start_key {
None => 0,
@ -386,6 +398,39 @@ fn main() -> Result<(), Box<dyn error::Error>> {
metric_stats(&session, start_key, end_key)?;
},
Some("clean") => {
let matches = matches.subcommand_matches("clean").unwrap();
let start_key = matches.value_of("start-key");
let end_key = matches.value_of("end-key");
let start_key = match start_key {
None => std::i64::MIN,
Some(s) => match s.parse::<i64>() {
Ok(n) => n,
Err(_) => {
eprintln!("Could not parse {}", s);
return Ok(())
}
}
};
let end_key = match end_key {
None => std::i64::MAX,
Some(s) => match s.parse::<i64>() {
Ok(n) => n,
Err(_) => {
eprintln!("Could not parse {}", s);
return Ok(())
}
}
};
let clean_metrics = matches.is_present("clean-metrics");
let clean_directories = matches.is_present("clean-directories");
metrics_clean(&session, start_key, end_key, clean_metrics, clean_directories)?;
},
None => {
eprintln!("No command was used.");
return Ok(());
@ -396,7 +441,6 @@ fn main() -> Result<(), Box<dyn error::Error>> {
Ok(())
}
fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), Error> {
let q =
"SELECT id, name, token(name), config, created_on, updated_on, read_on \
@ -428,10 +472,8 @@ fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), E
}
let parts = metric.name().split(".").collect::<Vec<&str>>();
*stats.entry(String::from(parts[0])).or_insert(0) += 1;
// println!("{}: {}", current_token, metric.name());
n += 1;
}
}
@ -456,3 +498,140 @@ fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), E
Ok(())
}
fn metrics_clean(session: &Session, start_key: i64, end_key: i64, clean_metrics: bool, clean_directories: bool) -> Result<(), Error> {
let mut current_token = start_key;
let cutoff : u64 = (Utc::now().timestamp() as u64 - 86400 * 14) * 1000;
let namespace = "biggraphite_metadata";
let batch_limit = 1000;
let query = format!("SELECT name, token(name) FROM {}.metrics_metadata \
WHERE updated_on <= maxTimeuuid({}) and token(name) > ? and token(name) < ? LIMIT {};",
namespace, cutoff, batch_limit);
let delete_metric_query = format!("DELETE FROM {}.metrics WHERE name = ?;", namespace);
let delete_metadata_query = format!("DELETE FROM {}.metrics_metadata WHERE name = ?;", namespace);
let mut deleted_metrics_count = 0;
let mut scanned_metrics_count = 0;
let mut scanned_directories_count = 0;
let mut deleted_directories_count = 0;
// clean metrics
loop {
if !clean_metrics || current_token >= end_key {
// println!("Stopping: {} >= {}", current_token, end_key);
break;
}
let mut outdated_metrics_query = stmt!(query.as_str());
outdated_metrics_query.set_consistency(session.read_consistency())?;
outdated_metrics_query.bind(0, current_token)?;
outdated_metrics_query.bind(1, end_key)?;
let result = session.metadata_session().execute(&outdated_metrics_query).wait()?;
if result.row_count() == 0 {
break;
}
let mut queries = vec![];
for row in result.iter() {
let name = row.get_column_by_name("name".to_string())?.to_string();
scanned_metrics_count += 1;
let mut delete_metric_query = stmt!(delete_metric_query.as_str());
delete_metric_query.set_consistency(session.write_consistency())?;
delete_metric_query.bind(0, name.as_str())?;
queries.push(session.metadata_session().execute(&delete_metric_query));
let mut delete_metadata_query = stmt!(delete_metadata_query.as_str());
delete_metric_query.set_consistency(session.write_consistency())?;
delete_metadata_query.bind(0, name.as_str())?;
queries.push(session.metadata_session().execute(&delete_metadata_query));
deleted_metrics_count += 1;
current_token = row.get_column(1)?.get_i64()?;
}
if result.row_count() != batch_limit {
// println!("Stopping because count == 0");
break;
}
for query in queries {
if let Err(err) = query.wait() {
eprintln!("Failed: {:?}", err);
}
}
}
let list_directories_query = format!("SELECT name, token(name) FROM {}.directories WHERE token(name) > ? AND token(name) < ? LIMIT {};",
namespace, batch_limit);
let metric_query = format!("SELECT name FROM {}.metrics WHERE parent LIKE ? LIMIT 1", namespace);
let delete_directory_query = format!("DELETE FROM {}.directories WHERE name = ?;", namespace);
current_token = start_key;
// clean directories
loop {
if !clean_directories || current_token >= end_key {
break;
}
let mut list_directories_query = stmt!(list_directories_query.as_str());
list_directories_query.set_consistency(session.read_consistency())?;
list_directories_query.bind(0, current_token)?;
list_directories_query.bind(1, end_key)?;
let list_result = session.metadata_session().execute(&list_directories_query).wait()?;
if list_result.row_count() == 0 {
break;
}
let mut queries = vec![];
for row in list_result.iter() {
let mut name = row.get_column_by_name("name".to_string())?.to_string();
let orig_name = name.clone();
name.push_str(".%");
current_token = row.get_column(1)?.get_i64()?;
let mut metric_query = stmt!(metric_query.as_str());
metric_query.set_consistency(session.read_consistency())?;
metric_query.bind(0, name.as_str())?;
let query = session.metadata_session().execute(&metric_query);
queries.push((orig_name, query));
}
let mut to_delete_queries = vec![];
for el in queries {
let result = el.1.wait()?;
scanned_directories_count += 1;
if result.row_count() != 0 {
continue;
}
let mut delete_directory_query = stmt!(delete_directory_query.as_str());
delete_directory_query.set_consistency(session.write_consistency())?;
delete_directory_query.bind(0, el.0.as_str())?;
to_delete_queries.push(session.metadata_session().execute(&delete_directory_query));
deleted_directories_count += 1;
}
for to_delete in to_delete_queries {
to_delete.wait()?;
}
if list_result.row_count() != batch_limit {
break;
}
}
println!("Deleted {} metrics, {} directories.", deleted_metrics_count, deleted_directories_count);
println!("Scanned {} metrics, {} directories", scanned_metrics_count, scanned_directories_count);
Ok(())
}