Compare commits
No commits in common. "9dbf1542b2b659c1139f3c4a899d8f77a494bd8a" and "f6b412ca5b9c40d8ad226298eac0368dac6b8c47" have entirely different histories.
9dbf1542b2
...
f6b412ca5b
24
README.md
24
README.md
@ -21,13 +21,11 @@ USAGE:
|
|||||||
bgutil-rs <SUBCOMMAND>
|
bgutil-rs <SUBCOMMAND>
|
||||||
|
|
||||||
SUBCOMMANDS:
|
SUBCOMMANDS:
|
||||||
clean Stats
|
|
||||||
delete Delete metric(s)
|
delete Delete metric(s)
|
||||||
help Prints this message or the help of the given subcommand(s)
|
help Prints this message or the help of the given subcommand(s)
|
||||||
info Information about a metric
|
info Information about a metric
|
||||||
list List metrics with given pattern
|
list List metrics with given pattern
|
||||||
read Read a metric contents
|
read Read a metric contents
|
||||||
stats Stats
|
|
||||||
write Write a metric and its value
|
write Write a metric and its value
|
||||||
```
|
```
|
||||||
|
|
||||||
@ -152,25 +150,6 @@ ARGS:
|
|||||||
<metric>
|
<metric>
|
||||||
```
|
```
|
||||||
|
|
||||||
### Clean
|
|
||||||
|
|
||||||
```sh
|
|
||||||
$ cargo run -- clean --help
|
|
||||||
bgutil-rs-clean
|
|
||||||
Stats
|
|
||||||
|
|
||||||
USAGE:
|
|
||||||
bgutil-rs clean [FLAGS] [OPTIONS]
|
|
||||||
|
|
||||||
FLAGS:
|
|
||||||
--clean-directories
|
|
||||||
--clean-metrics
|
|
||||||
|
|
||||||
OPTIONS:
|
|
||||||
--end-key <end-key>
|
|
||||||
--start-key <start-key>
|
|
||||||
```
|
|
||||||
|
|
||||||
## Todo
|
## Todo
|
||||||
|
|
||||||
* command: read
|
* command: read
|
||||||
@ -181,6 +160,3 @@ OPTIONS:
|
|||||||
- Arguments handling
|
- Arguments handling
|
||||||
* command: delete
|
* command: delete
|
||||||
- with recursive
|
- with recursive
|
||||||
* command: clean
|
|
||||||
- progress bar
|
|
||||||
* ...
|
|
189
src/main.rs
189
src/main.rs
@ -269,18 +269,6 @@ fn main() -> Result<(), Box<dyn error::Error>> {
|
|||||||
.arg(Arg::with_name("end-key")
|
.arg(Arg::with_name("end-key")
|
||||||
.long("end-key")
|
.long("end-key")
|
||||||
.takes_value(true)))
|
.takes_value(true)))
|
||||||
.subcommand(SubCommand::with_name("clean")
|
|
||||||
.about("Stats")
|
|
||||||
.arg(Arg::with_name("start-key")
|
|
||||||
.long("start-key")
|
|
||||||
.takes_value(true))
|
|
||||||
.arg(Arg::with_name("end-key")
|
|
||||||
.long("end-key")
|
|
||||||
.takes_value(true))
|
|
||||||
.arg(Arg::with_name("clean-metrics")
|
|
||||||
.long("clean-metrics"))
|
|
||||||
.arg(Arg::with_name("clean-directories")
|
|
||||||
.long("clean-directories")))
|
|
||||||
.get_matches();
|
.get_matches();
|
||||||
|
|
||||||
let contact_points_metadata = "tag--cstars07--cassandra-cstars07.query.consul.preprod.crto.in";
|
let contact_points_metadata = "tag--cstars07--cassandra-cstars07.query.consul.preprod.crto.in";
|
||||||
@ -371,8 +359,8 @@ fn main() -> Result<(), Box<dyn error::Error>> {
|
|||||||
},
|
},
|
||||||
Some("stats") => {
|
Some("stats") => {
|
||||||
let matches = matches.subcommand_matches("stats").unwrap();
|
let matches = matches.subcommand_matches("stats").unwrap();
|
||||||
let start_key = matches.value_of("start-key");
|
let start_key = matches.value_of("start-key"); // 0
|
||||||
let end_key = matches.value_of("end-key");
|
let end_key = matches.value_of("end-key"); // 100000000000000
|
||||||
|
|
||||||
let start_key = match start_key {
|
let start_key = match start_key {
|
||||||
None => 0,
|
None => 0,
|
||||||
@ -398,39 +386,6 @@ fn main() -> Result<(), Box<dyn error::Error>> {
|
|||||||
|
|
||||||
metric_stats(&session, start_key, end_key)?;
|
metric_stats(&session, start_key, end_key)?;
|
||||||
},
|
},
|
||||||
Some("clean") => {
|
|
||||||
let matches = matches.subcommand_matches("clean").unwrap();
|
|
||||||
|
|
||||||
let start_key = matches.value_of("start-key");
|
|
||||||
let end_key = matches.value_of("end-key");
|
|
||||||
|
|
||||||
let start_key = match start_key {
|
|
||||||
None => std::i64::MIN,
|
|
||||||
Some(s) => match s.parse::<i64>() {
|
|
||||||
Ok(n) => n,
|
|
||||||
Err(_) => {
|
|
||||||
eprintln!("Could not parse {}", s);
|
|
||||||
return Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let end_key = match end_key {
|
|
||||||
None => std::i64::MAX,
|
|
||||||
Some(s) => match s.parse::<i64>() {
|
|
||||||
Ok(n) => n,
|
|
||||||
Err(_) => {
|
|
||||||
eprintln!("Could not parse {}", s);
|
|
||||||
return Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let clean_metrics = matches.is_present("clean-metrics");
|
|
||||||
let clean_directories = matches.is_present("clean-directories");
|
|
||||||
|
|
||||||
metrics_clean(&session, start_key, end_key, clean_metrics, clean_directories)?;
|
|
||||||
},
|
|
||||||
None => {
|
None => {
|
||||||
eprintln!("No command was used.");
|
eprintln!("No command was used.");
|
||||||
return Ok(());
|
return Ok(());
|
||||||
@ -441,6 +396,7 @@ fn main() -> Result<(), Box<dyn error::Error>> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), Error> {
|
fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), Error> {
|
||||||
let q =
|
let q =
|
||||||
"SELECT id, name, token(name), config, created_on, updated_on, read_on \
|
"SELECT id, name, token(name), config, created_on, updated_on, read_on \
|
||||||
@ -472,8 +428,10 @@ fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), E
|
|||||||
}
|
}
|
||||||
|
|
||||||
let parts = metric.name().split(".").collect::<Vec<&str>>();
|
let parts = metric.name().split(".").collect::<Vec<&str>>();
|
||||||
|
|
||||||
*stats.entry(String::from(parts[0])).or_insert(0) += 1;
|
*stats.entry(String::from(parts[0])).or_insert(0) += 1;
|
||||||
|
|
||||||
|
// println!("{}: {}", current_token, metric.name());
|
||||||
n += 1;
|
n += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -498,140 +456,3 @@ fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), E
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn metrics_clean(session: &Session, start_key: i64, end_key: i64, clean_metrics: bool, clean_directories: bool) -> Result<(), Error> {
|
|
||||||
let mut current_token = start_key;
|
|
||||||
let cutoff : u64 = (Utc::now().timestamp() as u64 - 86400 * 14) * 1000;
|
|
||||||
|
|
||||||
let namespace = "biggraphite_metadata";
|
|
||||||
let batch_limit = 1000;
|
|
||||||
|
|
||||||
let query = format!("SELECT name, token(name) FROM {}.metrics_metadata \
|
|
||||||
WHERE updated_on <= maxTimeuuid({}) and token(name) > ? and token(name) < ? LIMIT {};",
|
|
||||||
namespace, cutoff, batch_limit);
|
|
||||||
let delete_metric_query = format!("DELETE FROM {}.metrics WHERE name = ?;", namespace);
|
|
||||||
let delete_metadata_query = format!("DELETE FROM {}.metrics_metadata WHERE name = ?;", namespace);
|
|
||||||
|
|
||||||
let mut deleted_metrics_count = 0;
|
|
||||||
let mut scanned_metrics_count = 0;
|
|
||||||
let mut scanned_directories_count = 0;
|
|
||||||
let mut deleted_directories_count = 0;
|
|
||||||
|
|
||||||
// clean metrics
|
|
||||||
loop {
|
|
||||||
if !clean_metrics || current_token >= end_key {
|
|
||||||
// println!("Stopping: {} >= {}", current_token, end_key);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut outdated_metrics_query = stmt!(query.as_str());
|
|
||||||
outdated_metrics_query.set_consistency(session.read_consistency())?;
|
|
||||||
outdated_metrics_query.bind(0, current_token)?;
|
|
||||||
outdated_metrics_query.bind(1, end_key)?;
|
|
||||||
|
|
||||||
let result = session.metadata_session().execute(&outdated_metrics_query).wait()?;
|
|
||||||
if result.row_count() == 0 {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
let mut queries = vec![];
|
|
||||||
|
|
||||||
for row in result.iter() {
|
|
||||||
let name = row.get_column_by_name("name".to_string())?.to_string();
|
|
||||||
scanned_metrics_count += 1;
|
|
||||||
let mut delete_metric_query = stmt!(delete_metric_query.as_str());
|
|
||||||
delete_metric_query.set_consistency(session.write_consistency())?;
|
|
||||||
delete_metric_query.bind(0, name.as_str())?;
|
|
||||||
queries.push(session.metadata_session().execute(&delete_metric_query));
|
|
||||||
|
|
||||||
let mut delete_metadata_query = stmt!(delete_metadata_query.as_str());
|
|
||||||
delete_metric_query.set_consistency(session.write_consistency())?;
|
|
||||||
delete_metadata_query.bind(0, name.as_str())?;
|
|
||||||
queries.push(session.metadata_session().execute(&delete_metadata_query));
|
|
||||||
|
|
||||||
deleted_metrics_count += 1;
|
|
||||||
current_token = row.get_column(1)?.get_i64()?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if result.row_count() != batch_limit {
|
|
||||||
// println!("Stopping because count == 0");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
for query in queries {
|
|
||||||
if let Err(err) = query.wait() {
|
|
||||||
eprintln!("Failed: {:?}", err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let list_directories_query = format!("SELECT name, token(name) FROM {}.directories WHERE token(name) > ? AND token(name) < ? LIMIT {};",
|
|
||||||
namespace, batch_limit);
|
|
||||||
let metric_query = format!("SELECT name FROM {}.metrics WHERE parent LIKE ? LIMIT 1", namespace);
|
|
||||||
let delete_directory_query = format!("DELETE FROM {}.directories WHERE name = ?;", namespace);
|
|
||||||
|
|
||||||
current_token = start_key;
|
|
||||||
|
|
||||||
// clean directories
|
|
||||||
loop {
|
|
||||||
if !clean_directories || current_token >= end_key {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut list_directories_query = stmt!(list_directories_query.as_str());
|
|
||||||
list_directories_query.set_consistency(session.read_consistency())?;
|
|
||||||
list_directories_query.bind(0, current_token)?;
|
|
||||||
list_directories_query.bind(1, end_key)?;
|
|
||||||
|
|
||||||
let list_result = session.metadata_session().execute(&list_directories_query).wait()?;
|
|
||||||
if list_result.row_count() == 0 {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut queries = vec![];
|
|
||||||
|
|
||||||
for row in list_result.iter() {
|
|
||||||
let mut name = row.get_column_by_name("name".to_string())?.to_string();
|
|
||||||
let orig_name = name.clone();
|
|
||||||
name.push_str(".%");
|
|
||||||
current_token = row.get_column(1)?.get_i64()?;
|
|
||||||
|
|
||||||
let mut metric_query = stmt!(metric_query.as_str());
|
|
||||||
metric_query.set_consistency(session.read_consistency())?;
|
|
||||||
metric_query.bind(0, name.as_str())?;
|
|
||||||
let query = session.metadata_session().execute(&metric_query);
|
|
||||||
|
|
||||||
queries.push((orig_name, query));
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut to_delete_queries = vec![];
|
|
||||||
|
|
||||||
for el in queries {
|
|
||||||
let result = el.1.wait()?;
|
|
||||||
scanned_directories_count += 1;
|
|
||||||
if result.row_count() != 0 {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut delete_directory_query = stmt!(delete_directory_query.as_str());
|
|
||||||
delete_directory_query.set_consistency(session.write_consistency())?;
|
|
||||||
delete_directory_query.bind(0, el.0.as_str())?;
|
|
||||||
|
|
||||||
to_delete_queries.push(session.metadata_session().execute(&delete_directory_query));
|
|
||||||
|
|
||||||
deleted_directories_count += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
for to_delete in to_delete_queries {
|
|
||||||
to_delete.wait()?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if list_result.row_count() != batch_limit {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
println!("Deleted {} metrics, {} directories.", deleted_metrics_count, deleted_directories_count);
|
|
||||||
println!("Scanned {} metrics, {} directories", scanned_metrics_count, scanned_directories_count);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user