Compare commits

..

2 Commits

Author SHA1 Message Date
9dbf1542b2 Updating documentation. 2021-02-24 21:43:31 +01:00
9fb6975188 Implementing "clean" command. 2021-02-24 21:41:26 +01:00
2 changed files with 208 additions and 5 deletions

View File

@ -21,11 +21,13 @@ USAGE:
bgutil-rs <SUBCOMMAND> bgutil-rs <SUBCOMMAND>
SUBCOMMANDS: SUBCOMMANDS:
clean Stats
delete Delete metric(s) delete Delete metric(s)
help Prints this message or the help of the given subcommand(s) help Prints this message or the help of the given subcommand(s)
info Information about a metric info Information about a metric
list List metrics with given pattern list List metrics with given pattern
read Read a metric contents read Read a metric contents
stats Stats
write Write a metric and its value write Write a metric and its value
``` ```
@ -150,6 +152,25 @@ ARGS:
<metric> <metric>
``` ```
### Clean
```sh
$ cargo run -- clean --help
bgutil-rs-clean
Stats
USAGE:
bgutil-rs clean [FLAGS] [OPTIONS]
FLAGS:
--clean-directories
--clean-metrics
OPTIONS:
--end-key <end-key>
--start-key <start-key>
```
## Todo ## Todo
* command: read * command: read
@ -160,3 +181,6 @@ ARGS:
- Arguments handling - Arguments handling
* command: delete * command: delete
- with recursive - with recursive
* command: clean
- progress bar
* ...

View File

@ -269,6 +269,18 @@ fn main() -> Result<(), Box<dyn error::Error>> {
.arg(Arg::with_name("end-key") .arg(Arg::with_name("end-key")
.long("end-key") .long("end-key")
.takes_value(true))) .takes_value(true)))
.subcommand(SubCommand::with_name("clean")
.about("Stats")
.arg(Arg::with_name("start-key")
.long("start-key")
.takes_value(true))
.arg(Arg::with_name("end-key")
.long("end-key")
.takes_value(true))
.arg(Arg::with_name("clean-metrics")
.long("clean-metrics"))
.arg(Arg::with_name("clean-directories")
.long("clean-directories")))
.get_matches(); .get_matches();
let contact_points_metadata = "tag--cstars07--cassandra-cstars07.query.consul.preprod.crto.in"; let contact_points_metadata = "tag--cstars07--cassandra-cstars07.query.consul.preprod.crto.in";
@ -359,8 +371,8 @@ fn main() -> Result<(), Box<dyn error::Error>> {
}, },
Some("stats") => { Some("stats") => {
let matches = matches.subcommand_matches("stats").unwrap(); let matches = matches.subcommand_matches("stats").unwrap();
let start_key = matches.value_of("start-key"); // 0 let start_key = matches.value_of("start-key");
let end_key = matches.value_of("end-key"); // 100000000000000 let end_key = matches.value_of("end-key");
let start_key = match start_key { let start_key = match start_key {
None => 0, None => 0,
@ -386,6 +398,39 @@ fn main() -> Result<(), Box<dyn error::Error>> {
metric_stats(&session, start_key, end_key)?; metric_stats(&session, start_key, end_key)?;
}, },
Some("clean") => {
let matches = matches.subcommand_matches("clean").unwrap();
let start_key = matches.value_of("start-key");
let end_key = matches.value_of("end-key");
let start_key = match start_key {
None => std::i64::MIN,
Some(s) => match s.parse::<i64>() {
Ok(n) => n,
Err(_) => {
eprintln!("Could not parse {}", s);
return Ok(())
}
}
};
let end_key = match end_key {
None => std::i64::MAX,
Some(s) => match s.parse::<i64>() {
Ok(n) => n,
Err(_) => {
eprintln!("Could not parse {}", s);
return Ok(())
}
}
};
let clean_metrics = matches.is_present("clean-metrics");
let clean_directories = matches.is_present("clean-directories");
metrics_clean(&session, start_key, end_key, clean_metrics, clean_directories)?;
},
None => { None => {
eprintln!("No command was used."); eprintln!("No command was used.");
return Ok(()); return Ok(());
@ -396,7 +441,6 @@ fn main() -> Result<(), Box<dyn error::Error>> {
Ok(()) Ok(())
} }
fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), Error> { fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), Error> {
let q = let q =
"SELECT id, name, token(name), config, created_on, updated_on, read_on \ "SELECT id, name, token(name), config, created_on, updated_on, read_on \
@ -428,10 +472,8 @@ fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), E
} }
let parts = metric.name().split(".").collect::<Vec<&str>>(); let parts = metric.name().split(".").collect::<Vec<&str>>();
*stats.entry(String::from(parts[0])).or_insert(0) += 1; *stats.entry(String::from(parts[0])).or_insert(0) += 1;
// println!("{}: {}", current_token, metric.name());
n += 1; n += 1;
} }
} }
@ -456,3 +498,140 @@ fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), E
Ok(()) Ok(())
} }
fn metrics_clean(session: &Session, start_key: i64, end_key: i64, clean_metrics: bool, clean_directories: bool) -> Result<(), Error> {
let mut current_token = start_key;
let cutoff : u64 = (Utc::now().timestamp() as u64 - 86400 * 14) * 1000;
let namespace = "biggraphite_metadata";
let batch_limit = 1000;
let query = format!("SELECT name, token(name) FROM {}.metrics_metadata \
WHERE updated_on <= maxTimeuuid({}) and token(name) > ? and token(name) < ? LIMIT {};",
namespace, cutoff, batch_limit);
let delete_metric_query = format!("DELETE FROM {}.metrics WHERE name = ?;", namespace);
let delete_metadata_query = format!("DELETE FROM {}.metrics_metadata WHERE name = ?;", namespace);
let mut deleted_metrics_count = 0;
let mut scanned_metrics_count = 0;
let mut scanned_directories_count = 0;
let mut deleted_directories_count = 0;
// clean metrics
loop {
if !clean_metrics || current_token >= end_key {
// println!("Stopping: {} >= {}", current_token, end_key);
break;
}
let mut outdated_metrics_query = stmt!(query.as_str());
outdated_metrics_query.set_consistency(session.read_consistency())?;
outdated_metrics_query.bind(0, current_token)?;
outdated_metrics_query.bind(1, end_key)?;
let result = session.metadata_session().execute(&outdated_metrics_query).wait()?;
if result.row_count() == 0 {
break;
}
let mut queries = vec![];
for row in result.iter() {
let name = row.get_column_by_name("name".to_string())?.to_string();
scanned_metrics_count += 1;
let mut delete_metric_query = stmt!(delete_metric_query.as_str());
delete_metric_query.set_consistency(session.write_consistency())?;
delete_metric_query.bind(0, name.as_str())?;
queries.push(session.metadata_session().execute(&delete_metric_query));
let mut delete_metadata_query = stmt!(delete_metadata_query.as_str());
delete_metric_query.set_consistency(session.write_consistency())?;
delete_metadata_query.bind(0, name.as_str())?;
queries.push(session.metadata_session().execute(&delete_metadata_query));
deleted_metrics_count += 1;
current_token = row.get_column(1)?.get_i64()?;
}
if result.row_count() != batch_limit {
// println!("Stopping because count == 0");
break;
}
for query in queries {
if let Err(err) = query.wait() {
eprintln!("Failed: {:?}", err);
}
}
}
let list_directories_query = format!("SELECT name, token(name) FROM {}.directories WHERE token(name) > ? AND token(name) < ? LIMIT {};",
namespace, batch_limit);
let metric_query = format!("SELECT name FROM {}.metrics WHERE parent LIKE ? LIMIT 1", namespace);
let delete_directory_query = format!("DELETE FROM {}.directories WHERE name = ?;", namespace);
current_token = start_key;
// clean directories
loop {
if !clean_directories || current_token >= end_key {
break;
}
let mut list_directories_query = stmt!(list_directories_query.as_str());
list_directories_query.set_consistency(session.read_consistency())?;
list_directories_query.bind(0, current_token)?;
list_directories_query.bind(1, end_key)?;
let list_result = session.metadata_session().execute(&list_directories_query).wait()?;
if list_result.row_count() == 0 {
break;
}
let mut queries = vec![];
for row in list_result.iter() {
let mut name = row.get_column_by_name("name".to_string())?.to_string();
let orig_name = name.clone();
name.push_str(".%");
current_token = row.get_column(1)?.get_i64()?;
let mut metric_query = stmt!(metric_query.as_str());
metric_query.set_consistency(session.read_consistency())?;
metric_query.bind(0, name.as_str())?;
let query = session.metadata_session().execute(&metric_query);
queries.push((orig_name, query));
}
let mut to_delete_queries = vec![];
for el in queries {
let result = el.1.wait()?;
scanned_directories_count += 1;
if result.row_count() != 0 {
continue;
}
let mut delete_directory_query = stmt!(delete_directory_query.as_str());
delete_directory_query.set_consistency(session.write_consistency())?;
delete_directory_query.bind(0, el.0.as_str())?;
to_delete_queries.push(session.metadata_session().execute(&delete_directory_query));
deleted_directories_count += 1;
}
for to_delete in to_delete_queries {
to_delete.wait()?;
}
if list_result.row_count() != batch_limit {
break;
}
}
println!("Deleted {} metrics, {} directories.", deleted_metrics_count, deleted_directories_count);
println!("Scanned {} metrics, {} directories", scanned_metrics_count, scanned_directories_count);
Ok(())
}