Compare commits
7 Commits
f6b412ca5b
...
master
Author | SHA1 | Date | |
---|---|---|---|
7c78a7f17a | |||
a526f388df | |||
3d316921e1 | |||
640954ab5a | |||
b8a670bbce | |||
9dbf1542b2 | |||
9fb6975188 |
83
README.md
83
README.md
@@ -11,22 +11,35 @@ Don't forget to download & install [cassandra-cpp](https://downloads.datastax.co
|
||||
## Run
|
||||
|
||||
```sh
|
||||
$ cargo build
|
||||
Finished dev [unoptimized + debuginfo] target(s) in 0.04s
|
||||
|
||||
$ cargo run -- --help
|
||||
bgutil-rs
|
||||
|
||||
USAGE:
|
||||
bgutil-rs <SUBCOMMAND>
|
||||
bgutil-rs [FLAGS] [OPTIONS] <SUBCOMMAND>
|
||||
|
||||
FLAGS:
|
||||
--dry-run Do not write in database (local-clean only)
|
||||
-h, --help Prints help information
|
||||
-V, --version Prints version information
|
||||
|
||||
OPTIONS:
|
||||
--contact-metadata <contact-metadata>
|
||||
[env: CASSANDRA_CONTACT_METADATA=localhost]
|
||||
|
||||
--contact-points <contact-points>
|
||||
[env: CASSANDRA_CONTACT_POINTS=localhost]
|
||||
|
||||
|
||||
SUBCOMMANDS:
|
||||
delete Delete metric(s)
|
||||
help Prints this message or the help of the given subcommand(s)
|
||||
info Information about a metric
|
||||
list List metrics with given pattern
|
||||
read Read a metric contents
|
||||
write Write a metric and its value
|
||||
clean Clean outdated metrics & empty directories
|
||||
delete Delete metric(s)
|
||||
help Prints this message or the help of the given subcommand(s)
|
||||
info Information about a metric
|
||||
list List metrics with given pattern
|
||||
local-clean Clean a directory of outdated metrics & empty sub-directories
|
||||
read Read a metric contents
|
||||
stats Stats
|
||||
write Write a metric and its value
|
||||
|
||||
```
|
||||
|
||||
### Info
|
||||
@@ -150,6 +163,46 @@ ARGS:
|
||||
<metric>
|
||||
```
|
||||
|
||||
### Clean
|
||||
|
||||
```sh
|
||||
$ cargo run -- clean --help
|
||||
bgutil-rs-clean
|
||||
Stats
|
||||
|
||||
USAGE:
|
||||
bgutil-rs clean [FLAGS] [OPTIONS]
|
||||
|
||||
FLAGS:
|
||||
--clean-directories
|
||||
--clean-metrics
|
||||
|
||||
OPTIONS:
|
||||
--end-key <end-key>
|
||||
--start-key <start-key>
|
||||
```
|
||||
|
||||
### Local-clean
|
||||
|
||||
Clean outdated metrics in a given directory.
|
||||
|
||||
```sh
|
||||
$ cargo run -- local-clean --help
|
||||
bgutil-rs-local-clean
|
||||
Clean a directory of outdated metrics & empty sub-directories
|
||||
|
||||
USAGE:
|
||||
bgutil-rs local-clean <directory>
|
||||
|
||||
FLAGS:
|
||||
-h, --help Prints help information
|
||||
-V, --version Prints version information
|
||||
|
||||
ARGS:
|
||||
<directory>
|
||||
```
|
||||
|
||||
|
||||
## Todo
|
||||
|
||||
* command: read
|
||||
@@ -160,3 +213,11 @@ ARGS:
|
||||
- Arguments handling
|
||||
* command: delete
|
||||
- with recursive
|
||||
* command: clean
|
||||
- progress bar
|
||||
* ...
|
||||
|
||||
|
||||
## Dedication
|
||||
|
||||
This piece of software was written during the mourning of Jean-Yves Moyart, aka Maître Mô, 21/10/1967-20/02/2021. My thoughts were with him, his family, his friends and all of us, who really appreciated him. Rest in Peace.
|
||||
|
219
src/cassandra.rs
219
src/cassandra.rs
@@ -3,9 +3,9 @@
|
||||
*
|
||||
* Author: Patrick MARIE <pm@mkz.me>
|
||||
*/
|
||||
|
||||
use std::str::FromStr;
|
||||
use std::convert::TryFrom;
|
||||
use std::fmt;
|
||||
use std::error;
|
||||
|
||||
use crate::metric::Metric;
|
||||
use crate::session::Session;
|
||||
@@ -14,31 +14,165 @@ use crate::timerange::TimeRange;
|
||||
|
||||
use cassandra_cpp::Session as CassSession;
|
||||
use cassandra_cpp::Uuid as CassUuid;
|
||||
use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,Cluster,Error,LogLevel,Map};
|
||||
use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,Cluster,Error,LogLevel,Map,RetryPolicy,Statement};
|
||||
use cassandra_cpp::{set_level,stmt};
|
||||
|
||||
use chrono::Duration;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct NoRecord;
|
||||
|
||||
impl fmt::Display for NoRecord {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "no record suitable")
|
||||
}
|
||||
}
|
||||
|
||||
impl error::Error for NoRecord {}
|
||||
|
||||
pub fn connect(contact_points: &str) -> Result<CassSession, Error> {
|
||||
set_level(LogLevel::DISABLED);
|
||||
|
||||
let mut cluster = Cluster::default();
|
||||
cluster.set_contact_points(contact_points).unwrap();
|
||||
cluster.set_load_balance_round_robin();
|
||||
|
||||
cluster.set_retry_policy(RetryPolicy::downgrading_consistency_new());
|
||||
cluster.set_request_timeout(Duration::seconds(30));
|
||||
cluster.set_protocol_version(4)?;
|
||||
|
||||
cluster.connect()
|
||||
}
|
||||
|
||||
pub fn fetch_metric(session: &Session, metric_name: &str) -> Result<Metric, Error> {
|
||||
pub fn prepare_component_query(table_name: &str, arguments: &Vec<&str>) -> Result<Statement, Error> {
|
||||
let mut q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name);
|
||||
let mut component_number = 0;
|
||||
let mut components = vec![];
|
||||
|
||||
for (id, component) in arguments.iter().enumerate() {
|
||||
let mut operator = "=";
|
||||
|
||||
if *component == "*" {
|
||||
component_number += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if component_number != 0 {
|
||||
q.push_str("AND ");
|
||||
}
|
||||
|
||||
if component.ends_with("*") {
|
||||
operator = "LIKE";
|
||||
}
|
||||
|
||||
q.push_str(format!("component_{} {} ? ", id, operator).as_str());
|
||||
component_number += 1;
|
||||
components.push(component.replace("*", "%"));
|
||||
}
|
||||
|
||||
if component_number != 0 {
|
||||
q.push_str("AND ");
|
||||
}
|
||||
|
||||
// Adding last component for __END__.
|
||||
q.push_str(format!("component_{} = ? ALLOW FILTERING;", component_number).as_str());
|
||||
components.push("__END__".to_string());
|
||||
|
||||
let mut query = stmt!(q.as_str());
|
||||
|
||||
for (id, arg) in components.iter().enumerate() {
|
||||
query.bind(id, arg.as_str())?;
|
||||
}
|
||||
|
||||
Ok(query)
|
||||
}
|
||||
|
||||
pub fn prepare_component_query_globstar(table_name: &str, arguments: &Vec<&str>) -> Result<Vec<Statement>, Error> {
|
||||
let _q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name);
|
||||
let _component_number = 0;
|
||||
|
||||
let mut out = vec![];
|
||||
|
||||
let pos_globstar = arguments.iter().enumerate().filter(|(_, &x)| x == "**").map(|(id, _)| id).collect::<Vec<usize>>();
|
||||
if pos_globstar.len() != 1 {
|
||||
// XXX return error
|
||||
return Ok(vec![prepare_component_query(table_name, arguments)?]);
|
||||
}
|
||||
|
||||
let pos_globstar = pos_globstar[0];
|
||||
let mut queries = vec![];
|
||||
|
||||
let mut init_args = vec![];
|
||||
let mut end_args = arguments[pos_globstar+1..].to_vec();
|
||||
end_args.push("__END__");
|
||||
|
||||
for (id, el) in arguments[0..pos_globstar].iter().enumerate() {
|
||||
if *el == "*" {
|
||||
continue;
|
||||
}
|
||||
|
||||
if el.ends_with("*") {
|
||||
init_args.push((id, "LIKE", el.replace("*", "%")));
|
||||
} else {
|
||||
init_args.push((id, "=", el.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
let components = 16;
|
||||
|
||||
for id in init_args.len()..(components-end_args.len()+1) {
|
||||
let mut current_query = init_args.to_vec();
|
||||
|
||||
for (sub_id, el) in end_args.iter().enumerate() {
|
||||
if *el == "*" {
|
||||
continue;
|
||||
}
|
||||
|
||||
if el.ends_with("*") {
|
||||
current_query.push((sub_id + id, "LIKE", el.replace("*", "%")));
|
||||
} else {
|
||||
current_query.push((sub_id + id, "=", el.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
queries.push(current_query);
|
||||
}
|
||||
|
||||
for query in &queries {
|
||||
let mut current_query = _q.to_string();
|
||||
|
||||
for el in query {
|
||||
if el.0 != 0 {
|
||||
current_query.push_str("AND ");
|
||||
}
|
||||
|
||||
current_query.push_str(&format!("component_{} {} ? ", el.0, el.1));
|
||||
}
|
||||
|
||||
current_query.push_str(&String::from("ALLOW FILTERING;"));
|
||||
|
||||
let mut statement = stmt!(¤t_query);
|
||||
for (id, el) in query.iter().enumerate() {
|
||||
statement.bind(id, el.2.as_str())?;
|
||||
}
|
||||
|
||||
out.push(statement);
|
||||
}
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
pub fn fetch_metric(session: &Session, metric_name: &str) -> Result<Metric, Box<dyn error::Error>> {
|
||||
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
|
||||
query.set_consistency(session.read_consistency())?;
|
||||
query.bind(0, metric_name)?;
|
||||
|
||||
// XXX set consistency
|
||||
// query.set_consistency(session.read_consistency());
|
||||
let result = session.metadata_session().execute(&query).wait()?;
|
||||
|
||||
if result.row_count() == 0 {
|
||||
return Err(NoRecord.into());
|
||||
}
|
||||
|
||||
let result = session.metadata_session().execute(&query).wait()?;
|
||||
Ok(result.first_row().unwrap().into())
|
||||
}
|
||||
|
||||
@@ -195,62 +329,33 @@ pub fn create_metric(session: &Session, metric: &str) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn metric_delete(session: &Session, metric_name: &str) -> Result<(), Error> {
|
||||
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
|
||||
query.bind(0, metric_name)?;
|
||||
pub fn delete_metric(session: &Session, name: &str) -> Result<(), Error> {
|
||||
let namespace = "biggraphite_metadata";
|
||||
let delete_metric_query = format!("DELETE FROM {}.metrics WHERE name = ?;", namespace);
|
||||
let delete_metadata_query = format!("DELETE FROM {}.metrics_metadata WHERE name = ?;", namespace);
|
||||
|
||||
let result = session.metadata_session().execute(&query).wait()?;
|
||||
if result.row_count() == 0 {
|
||||
println!("Metric is not existing");
|
||||
return Ok(());
|
||||
}
|
||||
let mut delete_metric_query = stmt!(delete_metric_query.as_str());
|
||||
delete_metric_query.set_consistency(session.write_consistency())?;
|
||||
delete_metric_query.bind(0, name)?;
|
||||
session.metadata_session().execute(&delete_metric_query).wait()?;
|
||||
|
||||
let _metric = fetch_metric(session, metric_name)?;
|
||||
|
||||
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;");
|
||||
query.bind(0, metric_name)?;
|
||||
query.set_consistency(session.write_consistency())?;
|
||||
session.metadata_session().execute(&query).wait()?;
|
||||
|
||||
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;");
|
||||
query.bind(0, metric_name)?;
|
||||
query.set_consistency(session.write_consistency())?;
|
||||
session.metadata_session().execute(&query).wait()?;
|
||||
|
||||
let mut query = stmt!("DELETE FROM biggraphite_metadata.directories WHERE name = ?;");
|
||||
query.bind(0, metric_name)?;
|
||||
query.set_consistency(session.write_consistency())?;
|
||||
session.metadata_session().execute(&query).wait()?;
|
||||
let mut delete_metadata_query = stmt!(delete_metadata_query.as_str());
|
||||
delete_metric_query.set_consistency(session.write_consistency())?;
|
||||
delete_metadata_query.bind(0, name)?;
|
||||
session.metadata_session().execute(&delete_metric_query).wait()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn metric_write(session: &Session, metric_name: &str, value: f64, retention: &str, timestamp: i64) -> Result<(), Error> {
|
||||
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
|
||||
query.bind(0, metric_name)?;
|
||||
|
||||
let result = session.metadata_session().execute(&query).wait()?;
|
||||
if result.row_count() == 0 {
|
||||
create_metric(session, metric_name)?;
|
||||
}
|
||||
pub fn delete_directory(session: &Session, name: &str) -> Result<(), Error> {
|
||||
let namespace = "biggraphite_metadata";
|
||||
let delete_directory_query = format!("DELETE FROM {}.directories WHERE name = ?;", namespace);
|
||||
|
||||
let stage = Stage::try_from(retention)?;
|
||||
|
||||
let metric = fetch_metric(session, metric_name)?;
|
||||
let (time_start_ms, offset) = stage.time_offset_ms(timestamp);
|
||||
|
||||
let query = format!(
|
||||
"INSERT INTO biggraphite.{} (metric, time_start_ms, offset, value) VALUES (?, ?, ?, ?);",
|
||||
stage.table_name()
|
||||
);
|
||||
|
||||
let mut query = stmt!(&query);
|
||||
query.bind(0, CassUuid::from_str(metric.id().as_str())?)?;
|
||||
query.bind(1, time_start_ms)?;
|
||||
query.bind(2, offset as i16)?;
|
||||
query.bind(3, value)?;
|
||||
|
||||
session.points_session().execute(&query).wait()?;
|
||||
let mut delete_directory_query = stmt!(delete_directory_query.as_str());
|
||||
delete_directory_query.set_consistency(session.write_consistency())?;
|
||||
delete_directory_query.bind(0, name)?;
|
||||
session.metadata_session().execute(&delete_directory_query).wait()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
12
src/cmd.rs
Normal file
12
src/cmd.rs
Normal file
@@ -0,0 +1,12 @@
|
||||
/*
|
||||
* bgutil-rs
|
||||
*
|
||||
* Author: Patrick MARIE <pm@mkz.me>
|
||||
*/
|
||||
pub mod clean;
|
||||
pub mod delete;
|
||||
pub mod info;
|
||||
pub mod list;
|
||||
pub mod local_clean;
|
||||
pub mod stats;
|
||||
pub mod write;
|
148
src/cmd/clean.rs
Normal file
148
src/cmd/clean.rs
Normal file
@@ -0,0 +1,148 @@
|
||||
/*
|
||||
* bgutil-rs
|
||||
*
|
||||
* Author: Patrick MARIE <pm@mkz.me>
|
||||
*/
|
||||
use crate::Session;
|
||||
|
||||
use cassandra_cpp::{BindRustType,Error};
|
||||
use cassandra_cpp::stmt;
|
||||
|
||||
use chrono::Utc;
|
||||
|
||||
pub fn metrics_clean(session: &Session, start_key: i64, end_key: i64, clean_metrics: bool, clean_directories: bool) -> Result<(), Error> {
|
||||
let mut current_token = start_key;
|
||||
let cutoff : u64 = (Utc::now().timestamp() as u64 - 86400 * 14) * 1000;
|
||||
|
||||
let namespace = "biggraphite_metadata";
|
||||
let batch_limit = 1000;
|
||||
|
||||
let query = format!("SELECT name, token(name) FROM {}.metrics_metadata \
|
||||
WHERE updated_on <= maxTimeuuid({}) and token(name) > ? and token(name) < ? LIMIT {};",
|
||||
namespace, cutoff, batch_limit);
|
||||
let delete_metric_query = format!("DELETE FROM {}.metrics WHERE name = ?;", namespace);
|
||||
let delete_metadata_query = format!("DELETE FROM {}.metrics_metadata WHERE name = ?;", namespace);
|
||||
|
||||
let mut deleted_metrics_count = 0;
|
||||
let mut scanned_metrics_count = 0;
|
||||
let mut scanned_directories_count = 0;
|
||||
let mut deleted_directories_count = 0;
|
||||
|
||||
// clean metrics
|
||||
loop {
|
||||
if !clean_metrics || current_token >= end_key {
|
||||
// println!("Stopping: {} >= {}", current_token, end_key);
|
||||
break;
|
||||
}
|
||||
|
||||
let mut outdated_metrics_query = stmt!(query.as_str());
|
||||
outdated_metrics_query.set_consistency(session.read_consistency())?;
|
||||
outdated_metrics_query.bind(0, current_token)?;
|
||||
outdated_metrics_query.bind(1, end_key)?;
|
||||
|
||||
let result = session.metadata_session().execute(&outdated_metrics_query).wait()?;
|
||||
if result.row_count() == 0 {
|
||||
break;
|
||||
}
|
||||
let mut queries = vec![];
|
||||
|
||||
for row in result.iter() {
|
||||
let name = row.get_column_by_name("name".to_string())?.to_string();
|
||||
scanned_metrics_count += 1;
|
||||
let mut delete_metric_query = stmt!(delete_metric_query.as_str());
|
||||
delete_metric_query.set_consistency(session.write_consistency())?;
|
||||
delete_metric_query.bind(0, name.as_str())?;
|
||||
queries.push(session.metadata_session().execute(&delete_metric_query));
|
||||
|
||||
let mut delete_metadata_query = stmt!(delete_metadata_query.as_str());
|
||||
delete_metric_query.set_consistency(session.write_consistency())?;
|
||||
delete_metadata_query.bind(0, name.as_str())?;
|
||||
queries.push(session.metadata_session().execute(&delete_metadata_query));
|
||||
|
||||
deleted_metrics_count += 1;
|
||||
current_token = row.get_column(1)?.get_i64()?;
|
||||
}
|
||||
|
||||
if result.row_count() != batch_limit {
|
||||
// println!("Stopping because count == 0");
|
||||
break;
|
||||
}
|
||||
|
||||
for query in queries {
|
||||
if let Err(err) = query.wait() {
|
||||
eprintln!("Failed: {:?}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let list_directories_query = format!("SELECT name, token(name) FROM {}.directories WHERE token(name) > ? AND token(name) < ? LIMIT {};",
|
||||
namespace, batch_limit);
|
||||
let metric_query = format!("SELECT name FROM {}.metrics WHERE parent LIKE ? LIMIT 1", namespace);
|
||||
let delete_directory_query = format!("DELETE FROM {}.directories WHERE name = ?;", namespace);
|
||||
|
||||
current_token = start_key;
|
||||
|
||||
// clean directories
|
||||
loop {
|
||||
if !clean_directories || current_token >= end_key {
|
||||
break;
|
||||
}
|
||||
|
||||
let mut list_directories_query = stmt!(list_directories_query.as_str());
|
||||
list_directories_query.set_consistency(session.read_consistency())?;
|
||||
list_directories_query.bind(0, current_token)?;
|
||||
list_directories_query.bind(1, end_key)?;
|
||||
|
||||
let list_result = session.metadata_session().execute(&list_directories_query).wait()?;
|
||||
if list_result.row_count() == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
let mut queries = vec![];
|
||||
|
||||
for row in list_result.iter() {
|
||||
let mut name = row.get_column_by_name("name".to_string())?.to_string();
|
||||
let orig_name = name.clone();
|
||||
name.push_str(".%");
|
||||
current_token = row.get_column(1)?.get_i64()?;
|
||||
|
||||
let mut metric_query = stmt!(metric_query.as_str());
|
||||
metric_query.set_consistency(session.read_consistency())?;
|
||||
metric_query.bind(0, name.as_str())?;
|
||||
let query = session.metadata_session().execute(&metric_query);
|
||||
|
||||
queries.push((orig_name, query));
|
||||
}
|
||||
|
||||
let mut to_delete_queries = vec![];
|
||||
|
||||
for el in queries {
|
||||
let result = el.1.wait()?;
|
||||
scanned_directories_count += 1;
|
||||
if result.row_count() != 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut delete_directory_query = stmt!(delete_directory_query.as_str());
|
||||
delete_directory_query.set_consistency(session.write_consistency())?;
|
||||
delete_directory_query.bind(0, el.0.as_str())?;
|
||||
|
||||
to_delete_queries.push(session.metadata_session().execute(&delete_directory_query));
|
||||
|
||||
deleted_directories_count += 1;
|
||||
}
|
||||
|
||||
for to_delete in to_delete_queries {
|
||||
to_delete.wait()?;
|
||||
}
|
||||
|
||||
if list_result.row_count() != batch_limit {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
println!("Deleted {} metrics, {} directories.", deleted_metrics_count, deleted_directories_count);
|
||||
println!("Scanned {} metrics, {} directories", scanned_metrics_count, scanned_directories_count);
|
||||
|
||||
Ok(())
|
||||
}
|
42
src/cmd/delete.rs
Normal file
42
src/cmd/delete.rs
Normal file
@@ -0,0 +1,42 @@
|
||||
/*
|
||||
* bgutil-rs
|
||||
*
|
||||
* Author: Patrick MARIE <pm@mkz.me>
|
||||
*/
|
||||
use std::error;
|
||||
|
||||
use cassandra_cpp::stmt;
|
||||
use cassandra_cpp::BindRustType;
|
||||
use crate::fetch_metric;
|
||||
|
||||
use crate::Session;
|
||||
|
||||
pub fn metric_delete(session: &Session, metric_name: &str) -> Result<(), Box<dyn error::Error>> {
|
||||
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
|
||||
query.bind(0, metric_name)?;
|
||||
|
||||
let result = session.metadata_session().execute(&query).wait()?;
|
||||
if result.row_count() == 0 {
|
||||
println!("Metric is not existing");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let _metric = fetch_metric(session, metric_name)?;
|
||||
|
||||
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;");
|
||||
query.bind(0, metric_name)?;
|
||||
query.set_consistency(session.write_consistency())?;
|
||||
session.metadata_session().execute(&query).wait()?;
|
||||
|
||||
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;");
|
||||
query.bind(0, metric_name)?;
|
||||
query.set_consistency(session.write_consistency())?;
|
||||
session.metadata_session().execute(&query).wait()?;
|
||||
|
||||
let mut query = stmt!("DELETE FROM biggraphite_metadata.directories WHERE name = ?;");
|
||||
query.bind(0, metric_name)?;
|
||||
query.set_consistency(session.write_consistency())?;
|
||||
session.metadata_session().execute(&query).wait()?;
|
||||
|
||||
Ok(())
|
||||
}
|
17
src/cmd/info.rs
Normal file
17
src/cmd/info.rs
Normal file
@@ -0,0 +1,17 @@
|
||||
/*
|
||||
* bgutil-rs
|
||||
*
|
||||
* Author: Patrick MARIE <pm@mkz.me>
|
||||
*/
|
||||
use std::error;
|
||||
|
||||
use crate::Session;
|
||||
use crate::fetch_metric;
|
||||
|
||||
pub fn metric_info(session: &Session, metric_name: &str) -> Result<(), Box<dyn error::Error>> {
|
||||
let metric = fetch_metric(session, metric_name)?;
|
||||
|
||||
println!("{}", metric);
|
||||
|
||||
Ok(())
|
||||
}
|
62
src/cmd/list.rs
Normal file
62
src/cmd/list.rs
Normal file
@@ -0,0 +1,62 @@
|
||||
/*
|
||||
* bgutil-rs
|
||||
*
|
||||
* Author: Patrick MARIE <pm@mkz.me>
|
||||
*/
|
||||
|
||||
use crate::prepare_component_query_globstar;
|
||||
use crate::fetch_metrics;
|
||||
use crate::Session;
|
||||
use cassandra_cpp::{Error};
|
||||
|
||||
pub fn metric_list(session: &Session, glob: &str) -> Result<(), Error> {
|
||||
let components = glob.split(".").collect::<Vec<&str>>();
|
||||
|
||||
let query_directories = prepare_component_query_globstar("directories", &components)?;
|
||||
let mut results = vec![];
|
||||
|
||||
for mut q in query_directories {
|
||||
q.set_consistency(session.read_consistency())?;
|
||||
results.push(session.metadata_session().execute(&q));
|
||||
}
|
||||
|
||||
for result in results {
|
||||
let rows = result.wait()?;
|
||||
for row in rows.iter() {
|
||||
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
|
||||
println!("d {}", name);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
let query_metrics = prepare_component_query_globstar("metrics", &components)?;
|
||||
let mut results = vec![];
|
||||
|
||||
for mut q in query_metrics {
|
||||
q.set_consistency(session.read_consistency())?;
|
||||
results.push(session.metadata_session().execute(&q));
|
||||
}
|
||||
|
||||
for result in results {
|
||||
let res = result.wait();
|
||||
if let Err(err) = res {
|
||||
eprintln!("Query failed: {}", err);
|
||||
continue;
|
||||
}
|
||||
|
||||
let rows = res.unwrap();
|
||||
let names = rows
|
||||
.iter()
|
||||
.map(|x| {
|
||||
x.get_column_by_name("name".to_string()).unwrap().to_string()
|
||||
})
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
let metrics = fetch_metrics(session, &names)?;
|
||||
for metric in metrics {
|
||||
println!("m {}", metric);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
135
src/cmd/local_clean.rs
Normal file
135
src/cmd/local_clean.rs
Normal file
@@ -0,0 +1,135 @@
|
||||
/*
|
||||
* bgutil-rs
|
||||
*
|
||||
* Author: Patrick MARIE <pm@mkz.me>
|
||||
*/
|
||||
use std::error;
|
||||
|
||||
use crate::Session;
|
||||
|
||||
use crate::delete_directory;
|
||||
use crate::delete_metric;
|
||||
use crate::fetch_metric;
|
||||
use crate::prepare_component_query_globstar;
|
||||
|
||||
use cassandra_cpp::stmt;
|
||||
use cassandra_cpp::BindRustType;
|
||||
|
||||
use chrono::Utc;
|
||||
|
||||
fn clean_metrics_in_directory(session: &Session, directory: &str) -> Result<(), Box<dyn error::Error>> {
|
||||
// println!("Cleaning metrics in directory: '{}'", directory);
|
||||
|
||||
let mut directory = String::from(directory);
|
||||
directory.push_str(".**");
|
||||
|
||||
let components = directory.split(".").collect::<Vec<&str>>();
|
||||
let mut results = vec![];
|
||||
|
||||
let query_metrics = prepare_component_query_globstar("metrics", &components)?;
|
||||
let outdated_ts : u64 = (Utc::now().timestamp() as u64 - 14 * 86400) * 1000;
|
||||
|
||||
for mut q in query_metrics {
|
||||
q.set_consistency(session.read_consistency())?;
|
||||
results.push(session.metadata_session().execute(&q));
|
||||
}
|
||||
|
||||
for result in results {
|
||||
let rows = result.wait()?;
|
||||
for row in rows.iter() {
|
||||
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
|
||||
let metric = fetch_metric(session, &name);
|
||||
|
||||
if let Err(e) = metric {
|
||||
eprintln!("Error while retrieving metric: {}", e);
|
||||
continue;
|
||||
}
|
||||
|
||||
let metric = metric.unwrap();
|
||||
|
||||
if metric.updated_on() > outdated_ts {
|
||||
continue;
|
||||
}
|
||||
|
||||
println!("Deleting metric {}", metric.name());
|
||||
if session.is_dry_run() {
|
||||
continue;
|
||||
}
|
||||
delete_metric(session, metric.name())?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn directory_has_metrics(session: &Session, directory: &str) -> Result<bool, Box<dyn error::Error>> {
|
||||
let query = format!("SELECT name FROM biggraphite_metadata.metrics WHERE parent LIKE ? LIMIT 1;");
|
||||
let mut query = stmt!(query.as_str());
|
||||
let mut directory = String::from(directory);
|
||||
directory.push_str(".%");
|
||||
query.bind(0, directory.as_str())?;
|
||||
|
||||
let result = session.metadata_session().execute(&query).wait()?;
|
||||
|
||||
Ok(result.row_count() != 0)
|
||||
}
|
||||
|
||||
fn clean_empty_directories_in_directory(session: &Session, directory: &str) -> Result<(), Box<dyn error::Error>> {
|
||||
// println!("Cleaning empty directories in directory '{}'", directory);
|
||||
|
||||
let mut directory = String::from(directory);
|
||||
directory.push_str(".**");
|
||||
|
||||
let components = directory.split(".").collect::<Vec<&str>>();
|
||||
let mut results = vec![];
|
||||
|
||||
let query_directories = prepare_component_query_globstar("directories", &components)?;
|
||||
|
||||
for mut q in query_directories {
|
||||
q.set_consistency(session.read_consistency())?;
|
||||
results.push(session.metadata_session().execute(&q));
|
||||
}
|
||||
|
||||
for result in results {
|
||||
let rows = result.wait()?;
|
||||
for row in rows.iter() {
|
||||
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
|
||||
|
||||
if directory_has_metrics(session, &name)? {
|
||||
continue;
|
||||
}
|
||||
|
||||
println!("Deleting directory {}", name);
|
||||
if session.is_dry_run() {
|
||||
continue;
|
||||
}
|
||||
delete_directory(session, &name)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn metrics_local_clean(session: &Session, directory: &str) -> Result<(), Box<dyn error::Error>> {
|
||||
let components = directory.split(".").collect::<Vec<&str>>();
|
||||
|
||||
let query_directories = prepare_component_query_globstar("directories", &components)?;
|
||||
let mut results = vec![];
|
||||
|
||||
for mut q in query_directories {
|
||||
q.set_consistency(session.read_consistency())?;
|
||||
results.push(session.metadata_session().execute(&q));
|
||||
}
|
||||
|
||||
for result in results {
|
||||
let rows = result.wait()?;
|
||||
for row in rows.iter() {
|
||||
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
|
||||
|
||||
clean_metrics_in_directory(session, &name)?;
|
||||
clean_empty_directories_in_directory(session, &name)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
70
src/cmd/stats.rs
Normal file
70
src/cmd/stats.rs
Normal file
@@ -0,0 +1,70 @@
|
||||
/*
|
||||
* bgutil-rs
|
||||
*
|
||||
* Author: Patrick MARIE <pm@mkz.me>
|
||||
*/
|
||||
use cassandra_cpp::{BindRustType,Error};
|
||||
use cassandra_cpp::stmt;
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::Metric;
|
||||
use crate::Session;
|
||||
|
||||
pub fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), Error> {
|
||||
let q =
|
||||
"SELECT id, name, token(name), config, created_on, updated_on, read_on \
|
||||
FROM biggraphite_metadata.metrics_metadata WHERE token(name) > ? LIMIT 1000";
|
||||
|
||||
let mut current_token = start_key;
|
||||
let mut n = 0;
|
||||
let mut points : u64 = 0;
|
||||
|
||||
let mut stats : HashMap<String, usize> = HashMap::new();
|
||||
|
||||
while current_token < end_key {
|
||||
let mut query = stmt!(q);
|
||||
query.bind(0, current_token)?;
|
||||
|
||||
let results = session.metadata_session().execute(&query).wait()?;
|
||||
|
||||
for row in results.iter() {
|
||||
current_token = row.get_column(2)?.get_i64()?;
|
||||
|
||||
let metric : Metric = row.into();
|
||||
let stages = match metric.stages() {
|
||||
Ok(stages) => stages,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
for stage in stages {
|
||||
points += stage.points() as u64;
|
||||
}
|
||||
|
||||
let parts = metric.name().split(".").collect::<Vec<&str>>();
|
||||
*stats.entry(String::from(parts[0])).or_insert(0) += 1;
|
||||
|
||||
n += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let p : f64 = ((current_token - start_key) / std::i64::MAX) as f64;
|
||||
|
||||
println!("Range: {} -> {} ({:.4}%)", start_key, current_token, 100. * p);
|
||||
println!("{} metrics", n);
|
||||
println!("{} points", points);
|
||||
println!("-----");
|
||||
|
||||
let mut vec : Vec<(&String, &usize)> = stats.iter().collect();
|
||||
vec.sort_by(|a, b| b.1.cmp(a.1));
|
||||
|
||||
for (id, v) in vec.iter().enumerate() {
|
||||
println!("{} {}", v.0, v.1);
|
||||
|
||||
if id == 10 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
49
src/cmd/write.rs
Normal file
49
src/cmd/write.rs
Normal file
@@ -0,0 +1,49 @@
|
||||
/*
|
||||
* bgutil-rs
|
||||
*
|
||||
* Author: Patrick MARIE <pm@mkz.me>
|
||||
*/
|
||||
use std::error;
|
||||
use std::str::FromStr;
|
||||
|
||||
use crate::Session;
|
||||
use crate::Stage;
|
||||
|
||||
use crate::create_metric;
|
||||
use crate::fetch_metric;
|
||||
|
||||
use cassandra_cpp::stmt;
|
||||
use cassandra_cpp::BindRustType;
|
||||
use cassandra_cpp::Uuid as CassUuid;
|
||||
|
||||
use std::convert::TryFrom;
|
||||
|
||||
pub fn metric_write(session: &Session, metric_name: &str, value: f64, retention: &str, timestamp: i64) -> Result<(), Box<dyn error::Error>> {
|
||||
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
|
||||
query.bind(0, metric_name)?;
|
||||
|
||||
let result = session.metadata_session().execute(&query).wait()?;
|
||||
if result.row_count() == 0 {
|
||||
create_metric(session, metric_name)?;
|
||||
}
|
||||
|
||||
let stage = Stage::try_from(retention)?;
|
||||
|
||||
let metric = fetch_metric(session, metric_name)?;
|
||||
let (time_start_ms, offset) = stage.time_offset_ms(timestamp);
|
||||
|
||||
let query = format!(
|
||||
"INSERT INTO biggraphite.{} (metric, time_start_ms, offset, value) VALUES (?, ?, ?, ?);",
|
||||
stage.table_name()
|
||||
);
|
||||
|
||||
let mut query = stmt!(&query);
|
||||
query.bind(0, CassUuid::from_str(metric.id().as_str())?)?;
|
||||
query.bind(1, time_start_ms)?;
|
||||
query.bind(2, offset as i16)?;
|
||||
query.bind(3, value)?;
|
||||
|
||||
session.points_session().execute(&query).wait()?;
|
||||
|
||||
Ok(())
|
||||
}
|
333
src/main.rs
333
src/main.rs
@@ -3,13 +3,10 @@
|
||||
*
|
||||
* Author: Patrick MARIE <pm@mkz.me>
|
||||
*/
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::convert::TryFrom;
|
||||
use std::error;
|
||||
|
||||
use cassandra_cpp::{BindRustType,CassResult,Error,Statement};
|
||||
use cassandra_cpp::{stmt};
|
||||
use cassandra_cpp::CassResult;
|
||||
use chrono::Utc;
|
||||
use clap::{App,AppSettings,Arg,SubCommand};
|
||||
|
||||
@@ -18,12 +15,20 @@ mod metric;
|
||||
mod session;
|
||||
mod stage;
|
||||
mod timerange;
|
||||
mod cmd;
|
||||
|
||||
use crate::cassandra::*;
|
||||
use crate::session::Session;
|
||||
use crate::stage::*;
|
||||
use crate::stage::Stage;
|
||||
use crate::metric::Metric;
|
||||
|
||||
use crate::cmd::clean::*;
|
||||
use crate::cmd::delete::*;
|
||||
use crate::cmd::info::*;
|
||||
use crate::cmd::list::*;
|
||||
use crate::cmd::local_clean::*;
|
||||
use crate::cmd::stats::*;
|
||||
use crate::cmd::write::*;
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn describe_result(result: &CassResult) {
|
||||
@@ -38,181 +43,20 @@ fn describe_result(result: &CassResult) {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn metric_info(session: &Session, metric_name: &str) -> Result<(), Error> {
|
||||
let metric = fetch_metric(session, metric_name)?;
|
||||
|
||||
println!("{}", metric);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn prepare_component_query(table_name: &str, arguments: &Vec<&str>) -> Result<Statement, Error> {
|
||||
let mut q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name);
|
||||
let mut component_number = 0;
|
||||
let mut components = vec![];
|
||||
|
||||
for (id, component) in arguments.iter().enumerate() {
|
||||
let mut operator = "=";
|
||||
|
||||
if *component == "*" {
|
||||
component_number += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if component_number != 0 {
|
||||
q.push_str("AND ");
|
||||
}
|
||||
|
||||
if component.ends_with("*") {
|
||||
operator = "LIKE";
|
||||
}
|
||||
|
||||
q.push_str(format!("component_{} {} ? ", id, operator).as_str());
|
||||
component_number += 1;
|
||||
components.push(component.replace("*", "%"));
|
||||
}
|
||||
|
||||
if component_number != 0 {
|
||||
q.push_str("AND ");
|
||||
}
|
||||
|
||||
// Adding last component for __END__.
|
||||
q.push_str(format!("component_{} = ? ALLOW FILTERING;", component_number).as_str());
|
||||
components.push("__END__".to_string());
|
||||
|
||||
let mut query = stmt!(q.as_str());
|
||||
|
||||
for (id, arg) in components.iter().enumerate() {
|
||||
query.bind(id, arg.as_str())?;
|
||||
}
|
||||
|
||||
Ok(query)
|
||||
}
|
||||
|
||||
fn prepare_component_query_globstar(table_name: &str, arguments: &Vec<&str>) -> Result<Vec<Statement>, Error> {
|
||||
let _q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name);
|
||||
let _component_number = 0;
|
||||
|
||||
let mut out = vec![];
|
||||
|
||||
let pos_globstar = arguments.iter().enumerate().filter(|(_, &x)| x == "**").map(|(id, _)| id).collect::<Vec<usize>>();
|
||||
if pos_globstar.len() != 1 {
|
||||
// XXX return error
|
||||
return Ok(vec![prepare_component_query(table_name, arguments)?]);
|
||||
}
|
||||
|
||||
let pos_globstar = pos_globstar[0];
|
||||
let mut queries = vec![];
|
||||
|
||||
let mut init_args = vec![];
|
||||
let mut end_args = arguments[pos_globstar+1..].to_vec();
|
||||
end_args.push("__END__");
|
||||
|
||||
for (id, el) in arguments[0..pos_globstar].iter().enumerate() {
|
||||
if *el == "*" {
|
||||
continue;
|
||||
}
|
||||
|
||||
if el.ends_with("*") {
|
||||
init_args.push((id, "LIKE", el.replace("*", "%")));
|
||||
} else {
|
||||
init_args.push((id, "=", el.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
let components = 16;
|
||||
|
||||
for id in init_args.len()..(components-end_args.len()+1) {
|
||||
let mut current_query = init_args.to_vec();
|
||||
|
||||
for (sub_id, el) in end_args.iter().enumerate() {
|
||||
if *el == "*" {
|
||||
continue;
|
||||
}
|
||||
|
||||
if el.ends_with("*") {
|
||||
current_query.push((sub_id + id, "LIKE", el.replace("*", "%")));
|
||||
} else {
|
||||
current_query.push((sub_id + id, "=", el.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
queries.push(current_query);
|
||||
}
|
||||
|
||||
for query in &queries {
|
||||
let mut current_query = _q.to_string();
|
||||
|
||||
for el in query {
|
||||
if el.0 != 0 {
|
||||
current_query.push_str("AND ");
|
||||
}
|
||||
|
||||
current_query.push_str(&format!("component_{} {} ? ", el.0, el.1));
|
||||
}
|
||||
|
||||
current_query.push_str(&String::from("ALLOW FILTERING;"));
|
||||
|
||||
let mut statement = stmt!(¤t_query);
|
||||
for (id, el) in query.iter().enumerate() {
|
||||
statement.bind(id, el.2.as_str())?;
|
||||
}
|
||||
|
||||
out.push(statement);
|
||||
}
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
fn metric_list(session: &Session, glob: &str) -> Result<(), Error> {
|
||||
let components = glob.split(".").collect::<Vec<&str>>();
|
||||
|
||||
let query_directories = prepare_component_query_globstar("directories", &components)?;
|
||||
let mut results = vec![];
|
||||
|
||||
for mut q in query_directories {
|
||||
q.set_consistency(session.read_consistency())?;
|
||||
results.push(session.metadata_session().execute(&q));
|
||||
}
|
||||
|
||||
for result in results {
|
||||
let rows = result.wait()?;
|
||||
for row in rows.iter() {
|
||||
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
|
||||
println!("d {}", name);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
let query_metrics = prepare_component_query_globstar("metrics", &components)?;
|
||||
let mut results = vec![];
|
||||
|
||||
for mut q in query_metrics {
|
||||
q.set_consistency(session.read_consistency())?;
|
||||
results.push(session.metadata_session().execute(&q));
|
||||
}
|
||||
|
||||
for result in results {
|
||||
let rows = result.wait()?;
|
||||
let names = rows
|
||||
.iter()
|
||||
.map(|x| {
|
||||
x.get_column_by_name("name".to_string()).unwrap().to_string()
|
||||
})
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
let metrics = fetch_metrics(session, &names)?;
|
||||
for metric in metrics {
|
||||
println!("m {}", metric);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn main() -> Result<(), Box<dyn error::Error>> {
|
||||
let matches = App::new("bgutil-rs")
|
||||
.setting(AppSettings::SubcommandRequired)
|
||||
.arg(Arg::with_name("contact-metadata")
|
||||
.long("contact-metadata")
|
||||
.env("CASSANDRA_CONTACT_METADATA")
|
||||
.takes_value(true))
|
||||
.arg(Arg::with_name("contact-points")
|
||||
.long("contact-points")
|
||||
.env("CASSANDRA_CONTACT_POINTS")
|
||||
.takes_value(true))
|
||||
.arg(Arg::with_name("dry-run")
|
||||
.help("Do not write in database (local-clean only)")
|
||||
.long("dry-run"))
|
||||
.subcommand(SubCommand::with_name("info")
|
||||
.about("Information about a metric")
|
||||
.arg(Arg::with_name("metric")
|
||||
@@ -269,12 +113,39 @@ fn main() -> Result<(), Box<dyn error::Error>> {
|
||||
.arg(Arg::with_name("end-key")
|
||||
.long("end-key")
|
||||
.takes_value(true)))
|
||||
.subcommand(SubCommand::with_name("clean")
|
||||
.about("Clean outdated metrics & empty directories")
|
||||
.arg(Arg::with_name("start-key")
|
||||
.long("start-key")
|
||||
.takes_value(true))
|
||||
.arg(Arg::with_name("end-key")
|
||||
.long("end-key")
|
||||
.takes_value(true))
|
||||
.arg(Arg::with_name("clean-metrics")
|
||||
.long("clean-metrics"))
|
||||
.arg(Arg::with_name("clean-directories")
|
||||
.long("clean-directories")))
|
||||
.subcommand(SubCommand::with_name("local-clean")
|
||||
.about("Clean a directory of outdated metrics & empty sub-directories")
|
||||
.arg(Arg::with_name("directory")
|
||||
.index(1)
|
||||
.required(true)))
|
||||
.get_matches();
|
||||
|
||||
let contact_points_metadata = "tag--cstars07--cassandra-cstars07.query.consul.preprod.crto.in";
|
||||
let contact_points_data = "tag--cstars04--cassandra-cstars04.query.consul.preprod.crto.in";
|
||||
let mut contact_points_metadata = "localhost";
|
||||
if matches.is_present("contact-metadata") {
|
||||
contact_points_metadata = matches.value_of("contact-metadata").unwrap();
|
||||
}
|
||||
|
||||
let session = Session::new(&contact_points_metadata, &contact_points_data)?;
|
||||
let mut contact_points_data = "localhost";
|
||||
if matches.is_present("contact-points") {
|
||||
contact_points_data = matches.value_of("contact-points").unwrap();
|
||||
}
|
||||
|
||||
let dry_run = matches.is_present("dry-run");
|
||||
|
||||
let mut session = Session::new(&contact_points_metadata, &contact_points_data)?;
|
||||
session.set_dry_run(dry_run);
|
||||
|
||||
match matches.subcommand_name() {
|
||||
Some("info") => {
|
||||
@@ -359,8 +230,8 @@ fn main() -> Result<(), Box<dyn error::Error>> {
|
||||
},
|
||||
Some("stats") => {
|
||||
let matches = matches.subcommand_matches("stats").unwrap();
|
||||
let start_key = matches.value_of("start-key"); // 0
|
||||
let end_key = matches.value_of("end-key"); // 100000000000000
|
||||
let start_key = matches.value_of("start-key");
|
||||
let end_key = matches.value_of("end-key");
|
||||
|
||||
let start_key = match start_key {
|
||||
None => 0,
|
||||
@@ -386,6 +257,45 @@ fn main() -> Result<(), Box<dyn error::Error>> {
|
||||
|
||||
metric_stats(&session, start_key, end_key)?;
|
||||
},
|
||||
Some("clean") => {
|
||||
let matches = matches.subcommand_matches("clean").unwrap();
|
||||
|
||||
let start_key = matches.value_of("start-key");
|
||||
let end_key = matches.value_of("end-key");
|
||||
|
||||
let start_key = match start_key {
|
||||
None => std::i64::MIN,
|
||||
Some(s) => match s.parse::<i64>() {
|
||||
Ok(n) => n,
|
||||
Err(_) => {
|
||||
eprintln!("Could not parse {}", s);
|
||||
return Ok(())
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let end_key = match end_key {
|
||||
None => std::i64::MAX,
|
||||
Some(s) => match s.parse::<i64>() {
|
||||
Ok(n) => n,
|
||||
Err(_) => {
|
||||
eprintln!("Could not parse {}", s);
|
||||
return Ok(())
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let clean_metrics = matches.is_present("clean-metrics");
|
||||
let clean_directories = matches.is_present("clean-directories");
|
||||
|
||||
metrics_clean(&session, start_key, end_key, clean_metrics, clean_directories)?;
|
||||
},
|
||||
Some("local-clean") => {
|
||||
let matches = matches.subcommand_matches("local-clean").unwrap();
|
||||
let directory = matches.value_of("directory").unwrap();
|
||||
|
||||
metrics_local_clean(&session, directory)?;
|
||||
}
|
||||
None => {
|
||||
eprintln!("No command was used.");
|
||||
return Ok(());
|
||||
@@ -397,62 +307,3 @@ fn main() -> Result<(), Box<dyn error::Error>> {
|
||||
}
|
||||
|
||||
|
||||
fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), Error> {
|
||||
let q =
|
||||
"SELECT id, name, token(name), config, created_on, updated_on, read_on \
|
||||
FROM biggraphite_metadata.metrics_metadata WHERE token(name) > ? LIMIT 1000";
|
||||
|
||||
let mut current_token = start_key;
|
||||
let mut n = 0;
|
||||
let mut points : u64 = 0;
|
||||
|
||||
let mut stats : HashMap<String, usize> = HashMap::new();
|
||||
|
||||
while current_token < end_key {
|
||||
let mut query = stmt!(q);
|
||||
query.bind(0, current_token)?;
|
||||
|
||||
let results = session.metadata_session().execute(&query).wait()?;
|
||||
|
||||
for row in results.iter() {
|
||||
current_token = row.get_column(2)?.get_i64()?;
|
||||
|
||||
let metric : Metric = row.into();
|
||||
let stages = match metric.stages() {
|
||||
Ok(stages) => stages,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
for stage in stages {
|
||||
points += stage.points() as u64;
|
||||
}
|
||||
|
||||
let parts = metric.name().split(".").collect::<Vec<&str>>();
|
||||
|
||||
*stats.entry(String::from(parts[0])).or_insert(0) += 1;
|
||||
|
||||
// println!("{}: {}", current_token, metric.name());
|
||||
n += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let p : f64 = ((current_token - start_key) / std::i64::MAX) as f64;
|
||||
|
||||
println!("Range: {} -> {} ({:.4}%)", start_key, current_token, 100. * p);
|
||||
println!("{} metrics", n);
|
||||
println!("{} points", points);
|
||||
println!("-----");
|
||||
|
||||
let mut vec : Vec<(&String, &usize)> = stats.iter().collect();
|
||||
vec.sort_by(|a, b| b.1.cmp(a.1));
|
||||
|
||||
for (id, v) in vec.iter().enumerate() {
|
||||
println!("{} {}", v.0, v.1);
|
||||
|
||||
if id == 10 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@@ -10,6 +10,7 @@ use std::fmt;
|
||||
use std::convert::TryFrom;
|
||||
|
||||
use cassandra_cpp::Row;
|
||||
use chrono::Utc;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Metric {
|
||||
@@ -29,6 +30,10 @@ impl Metric {
|
||||
&self.name
|
||||
}
|
||||
|
||||
pub fn updated_on(self: &Self) -> u64 {
|
||||
self.updated_on
|
||||
}
|
||||
|
||||
pub fn config(self: &Self, name: String) -> Result<String, String> {
|
||||
let res = self.config.get(&name);
|
||||
if let Some(v) = res {
|
||||
@@ -91,14 +96,25 @@ impl From<Row> for Metric {
|
||||
Err(_) => {},
|
||||
};
|
||||
|
||||
let created_on = row.get_column_by_name("created_on".to_string()).unwrap();
|
||||
let created_on_timestamp = match created_on.get_uuid() {
|
||||
Err(_) => 0,
|
||||
Ok(v) => v.timestamp(),
|
||||
let created_on = row.get_column_by_name("created_on".to_string());
|
||||
let created_on_timestamp = if let Ok(creation_time) = created_on {
|
||||
match creation_time.get_uuid() {
|
||||
Err(_) => 0,
|
||||
Ok(v) => v.timestamp(),
|
||||
}
|
||||
} else {
|
||||
Utc::now().timestamp() as u64
|
||||
};
|
||||
|
||||
let updated_on = row.get_column_by_name("updated_on".to_string()).unwrap();
|
||||
let updated_on_timestamp = updated_on.get_uuid().unwrap().timestamp();
|
||||
let updated_on = row.get_column_by_name("updated_on".to_string());
|
||||
let updated_on_timestamp = if let Ok(updated_time) = updated_on {
|
||||
match updated_time.get_uuid() {
|
||||
Err(_) => 0,
|
||||
Ok(v) => v.timestamp(),
|
||||
}
|
||||
} else {
|
||||
Utc::now().timestamp() as u64
|
||||
};
|
||||
|
||||
let uuid = match row.get_column_by_name("id".to_string()).unwrap().get_uuid() {
|
||||
Ok(v) => v.to_string(),
|
||||
|
@@ -13,6 +13,7 @@ use crate::cassandra::*;
|
||||
pub struct Session {
|
||||
metadata: CassSession,
|
||||
points: CassSession,
|
||||
dry_run: bool,
|
||||
}
|
||||
|
||||
impl Session {
|
||||
@@ -23,11 +24,16 @@ impl Session {
|
||||
let session = Self {
|
||||
metadata: metadata,
|
||||
points: points,
|
||||
dry_run: false,
|
||||
};
|
||||
|
||||
Ok(session)
|
||||
}
|
||||
|
||||
pub fn set_dry_run(&mut self, dry_run: bool) {
|
||||
self.dry_run = dry_run
|
||||
}
|
||||
|
||||
pub fn metadata_session(&self) -> &CassSession {
|
||||
&self.metadata
|
||||
}
|
||||
@@ -45,4 +51,8 @@ impl Session {
|
||||
pub fn write_consistency(&self) -> Consistency {
|
||||
Consistency::LOCAL_QUORUM
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_dry_run(&self) -> bool {
|
||||
self.dry_run
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user