Compare commits
7 Commits
f6b412ca5b
...
master
Author | SHA1 | Date | |
---|---|---|---|
7c78a7f17a | |||
a526f388df | |||
3d316921e1 | |||
640954ab5a | |||
b8a670bbce | |||
9dbf1542b2 | |||
9fb6975188 |
83
README.md
83
README.md
@@ -11,22 +11,35 @@ Don't forget to download & install [cassandra-cpp](https://downloads.datastax.co
|
|||||||
## Run
|
## Run
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
$ cargo build
|
|
||||||
Finished dev [unoptimized + debuginfo] target(s) in 0.04s
|
|
||||||
|
|
||||||
$ cargo run -- --help
|
|
||||||
bgutil-rs
|
bgutil-rs
|
||||||
|
|
||||||
USAGE:
|
USAGE:
|
||||||
bgutil-rs <SUBCOMMAND>
|
bgutil-rs [FLAGS] [OPTIONS] <SUBCOMMAND>
|
||||||
|
|
||||||
|
FLAGS:
|
||||||
|
--dry-run Do not write in database (local-clean only)
|
||||||
|
-h, --help Prints help information
|
||||||
|
-V, --version Prints version information
|
||||||
|
|
||||||
|
OPTIONS:
|
||||||
|
--contact-metadata <contact-metadata>
|
||||||
|
[env: CASSANDRA_CONTACT_METADATA=localhost]
|
||||||
|
|
||||||
|
--contact-points <contact-points>
|
||||||
|
[env: CASSANDRA_CONTACT_POINTS=localhost]
|
||||||
|
|
||||||
|
|
||||||
SUBCOMMANDS:
|
SUBCOMMANDS:
|
||||||
delete Delete metric(s)
|
clean Clean outdated metrics & empty directories
|
||||||
help Prints this message or the help of the given subcommand(s)
|
delete Delete metric(s)
|
||||||
info Information about a metric
|
help Prints this message or the help of the given subcommand(s)
|
||||||
list List metrics with given pattern
|
info Information about a metric
|
||||||
read Read a metric contents
|
list List metrics with given pattern
|
||||||
write Write a metric and its value
|
local-clean Clean a directory of outdated metrics & empty sub-directories
|
||||||
|
read Read a metric contents
|
||||||
|
stats Stats
|
||||||
|
write Write a metric and its value
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
### Info
|
### Info
|
||||||
@@ -150,6 +163,46 @@ ARGS:
|
|||||||
<metric>
|
<metric>
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Clean
|
||||||
|
|
||||||
|
```sh
|
||||||
|
$ cargo run -- clean --help
|
||||||
|
bgutil-rs-clean
|
||||||
|
Stats
|
||||||
|
|
||||||
|
USAGE:
|
||||||
|
bgutil-rs clean [FLAGS] [OPTIONS]
|
||||||
|
|
||||||
|
FLAGS:
|
||||||
|
--clean-directories
|
||||||
|
--clean-metrics
|
||||||
|
|
||||||
|
OPTIONS:
|
||||||
|
--end-key <end-key>
|
||||||
|
--start-key <start-key>
|
||||||
|
```
|
||||||
|
|
||||||
|
### Local-clean
|
||||||
|
|
||||||
|
Clean outdated metrics in a given directory.
|
||||||
|
|
||||||
|
```sh
|
||||||
|
$ cargo run -- local-clean --help
|
||||||
|
bgutil-rs-local-clean
|
||||||
|
Clean a directory of outdated metrics & empty sub-directories
|
||||||
|
|
||||||
|
USAGE:
|
||||||
|
bgutil-rs local-clean <directory>
|
||||||
|
|
||||||
|
FLAGS:
|
||||||
|
-h, --help Prints help information
|
||||||
|
-V, --version Prints version information
|
||||||
|
|
||||||
|
ARGS:
|
||||||
|
<directory>
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
## Todo
|
## Todo
|
||||||
|
|
||||||
* command: read
|
* command: read
|
||||||
@@ -160,3 +213,11 @@ ARGS:
|
|||||||
- Arguments handling
|
- Arguments handling
|
||||||
* command: delete
|
* command: delete
|
||||||
- with recursive
|
- with recursive
|
||||||
|
* command: clean
|
||||||
|
- progress bar
|
||||||
|
* ...
|
||||||
|
|
||||||
|
|
||||||
|
## Dedication
|
||||||
|
|
||||||
|
This piece of software was written during the mourning of Jean-Yves Moyart, aka Maître Mô, 21/10/1967-20/02/2021. My thoughts were with him, his family, his friends and all of us, who really appreciated him. Rest in Peace.
|
||||||
|
219
src/cassandra.rs
219
src/cassandra.rs
@@ -3,9 +3,9 @@
|
|||||||
*
|
*
|
||||||
* Author: Patrick MARIE <pm@mkz.me>
|
* Author: Patrick MARIE <pm@mkz.me>
|
||||||
*/
|
*/
|
||||||
|
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::convert::TryFrom;
|
use std::fmt;
|
||||||
|
use std::error;
|
||||||
|
|
||||||
use crate::metric::Metric;
|
use crate::metric::Metric;
|
||||||
use crate::session::Session;
|
use crate::session::Session;
|
||||||
@@ -14,31 +14,165 @@ use crate::timerange::TimeRange;
|
|||||||
|
|
||||||
use cassandra_cpp::Session as CassSession;
|
use cassandra_cpp::Session as CassSession;
|
||||||
use cassandra_cpp::Uuid as CassUuid;
|
use cassandra_cpp::Uuid as CassUuid;
|
||||||
use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,Cluster,Error,LogLevel,Map};
|
use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,Cluster,Error,LogLevel,Map,RetryPolicy,Statement};
|
||||||
use cassandra_cpp::{set_level,stmt};
|
use cassandra_cpp::{set_level,stmt};
|
||||||
|
|
||||||
|
use chrono::Duration;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct NoRecord;
|
||||||
|
|
||||||
|
impl fmt::Display for NoRecord {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
write!(f, "no record suitable")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl error::Error for NoRecord {}
|
||||||
|
|
||||||
pub fn connect(contact_points: &str) -> Result<CassSession, Error> {
|
pub fn connect(contact_points: &str) -> Result<CassSession, Error> {
|
||||||
set_level(LogLevel::DISABLED);
|
set_level(LogLevel::DISABLED);
|
||||||
|
|
||||||
let mut cluster = Cluster::default();
|
let mut cluster = Cluster::default();
|
||||||
cluster.set_contact_points(contact_points).unwrap();
|
cluster.set_contact_points(contact_points).unwrap();
|
||||||
cluster.set_load_balance_round_robin();
|
cluster.set_load_balance_round_robin();
|
||||||
|
cluster.set_retry_policy(RetryPolicy::downgrading_consistency_new());
|
||||||
|
cluster.set_request_timeout(Duration::seconds(30));
|
||||||
cluster.set_protocol_version(4)?;
|
cluster.set_protocol_version(4)?;
|
||||||
|
|
||||||
cluster.connect()
|
cluster.connect()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn fetch_metric(session: &Session, metric_name: &str) -> Result<Metric, Error> {
|
pub fn prepare_component_query(table_name: &str, arguments: &Vec<&str>) -> Result<Statement, Error> {
|
||||||
|
let mut q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name);
|
||||||
|
let mut component_number = 0;
|
||||||
|
let mut components = vec![];
|
||||||
|
|
||||||
|
for (id, component) in arguments.iter().enumerate() {
|
||||||
|
let mut operator = "=";
|
||||||
|
|
||||||
|
if *component == "*" {
|
||||||
|
component_number += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if component_number != 0 {
|
||||||
|
q.push_str("AND ");
|
||||||
|
}
|
||||||
|
|
||||||
|
if component.ends_with("*") {
|
||||||
|
operator = "LIKE";
|
||||||
|
}
|
||||||
|
|
||||||
|
q.push_str(format!("component_{} {} ? ", id, operator).as_str());
|
||||||
|
component_number += 1;
|
||||||
|
components.push(component.replace("*", "%"));
|
||||||
|
}
|
||||||
|
|
||||||
|
if component_number != 0 {
|
||||||
|
q.push_str("AND ");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Adding last component for __END__.
|
||||||
|
q.push_str(format!("component_{} = ? ALLOW FILTERING;", component_number).as_str());
|
||||||
|
components.push("__END__".to_string());
|
||||||
|
|
||||||
|
let mut query = stmt!(q.as_str());
|
||||||
|
|
||||||
|
for (id, arg) in components.iter().enumerate() {
|
||||||
|
query.bind(id, arg.as_str())?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(query)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn prepare_component_query_globstar(table_name: &str, arguments: &Vec<&str>) -> Result<Vec<Statement>, Error> {
|
||||||
|
let _q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name);
|
||||||
|
let _component_number = 0;
|
||||||
|
|
||||||
|
let mut out = vec![];
|
||||||
|
|
||||||
|
let pos_globstar = arguments.iter().enumerate().filter(|(_, &x)| x == "**").map(|(id, _)| id).collect::<Vec<usize>>();
|
||||||
|
if pos_globstar.len() != 1 {
|
||||||
|
// XXX return error
|
||||||
|
return Ok(vec![prepare_component_query(table_name, arguments)?]);
|
||||||
|
}
|
||||||
|
|
||||||
|
let pos_globstar = pos_globstar[0];
|
||||||
|
let mut queries = vec![];
|
||||||
|
|
||||||
|
let mut init_args = vec![];
|
||||||
|
let mut end_args = arguments[pos_globstar+1..].to_vec();
|
||||||
|
end_args.push("__END__");
|
||||||
|
|
||||||
|
for (id, el) in arguments[0..pos_globstar].iter().enumerate() {
|
||||||
|
if *el == "*" {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if el.ends_with("*") {
|
||||||
|
init_args.push((id, "LIKE", el.replace("*", "%")));
|
||||||
|
} else {
|
||||||
|
init_args.push((id, "=", el.to_string()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let components = 16;
|
||||||
|
|
||||||
|
for id in init_args.len()..(components-end_args.len()+1) {
|
||||||
|
let mut current_query = init_args.to_vec();
|
||||||
|
|
||||||
|
for (sub_id, el) in end_args.iter().enumerate() {
|
||||||
|
if *el == "*" {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if el.ends_with("*") {
|
||||||
|
current_query.push((sub_id + id, "LIKE", el.replace("*", "%")));
|
||||||
|
} else {
|
||||||
|
current_query.push((sub_id + id, "=", el.to_string()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
queries.push(current_query);
|
||||||
|
}
|
||||||
|
|
||||||
|
for query in &queries {
|
||||||
|
let mut current_query = _q.to_string();
|
||||||
|
|
||||||
|
for el in query {
|
||||||
|
if el.0 != 0 {
|
||||||
|
current_query.push_str("AND ");
|
||||||
|
}
|
||||||
|
|
||||||
|
current_query.push_str(&format!("component_{} {} ? ", el.0, el.1));
|
||||||
|
}
|
||||||
|
|
||||||
|
current_query.push_str(&String::from("ALLOW FILTERING;"));
|
||||||
|
|
||||||
|
let mut statement = stmt!(¤t_query);
|
||||||
|
for (id, el) in query.iter().enumerate() {
|
||||||
|
statement.bind(id, el.2.as_str())?;
|
||||||
|
}
|
||||||
|
|
||||||
|
out.push(statement);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn fetch_metric(session: &Session, metric_name: &str) -> Result<Metric, Box<dyn error::Error>> {
|
||||||
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
|
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
|
||||||
|
query.set_consistency(session.read_consistency())?;
|
||||||
query.bind(0, metric_name)?;
|
query.bind(0, metric_name)?;
|
||||||
|
|
||||||
// XXX set consistency
|
let result = session.metadata_session().execute(&query).wait()?;
|
||||||
// query.set_consistency(session.read_consistency());
|
|
||||||
|
if result.row_count() == 0 {
|
||||||
|
return Err(NoRecord.into());
|
||||||
|
}
|
||||||
|
|
||||||
let result = session.metadata_session().execute(&query).wait()?;
|
|
||||||
Ok(result.first_row().unwrap().into())
|
Ok(result.first_row().unwrap().into())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -195,62 +329,33 @@ pub fn create_metric(session: &Session, metric: &str) -> Result<(), Error> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn metric_delete(session: &Session, metric_name: &str) -> Result<(), Error> {
|
pub fn delete_metric(session: &Session, name: &str) -> Result<(), Error> {
|
||||||
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
|
let namespace = "biggraphite_metadata";
|
||||||
query.bind(0, metric_name)?;
|
let delete_metric_query = format!("DELETE FROM {}.metrics WHERE name = ?;", namespace);
|
||||||
|
let delete_metadata_query = format!("DELETE FROM {}.metrics_metadata WHERE name = ?;", namespace);
|
||||||
|
|
||||||
let result = session.metadata_session().execute(&query).wait()?;
|
let mut delete_metric_query = stmt!(delete_metric_query.as_str());
|
||||||
if result.row_count() == 0 {
|
delete_metric_query.set_consistency(session.write_consistency())?;
|
||||||
println!("Metric is not existing");
|
delete_metric_query.bind(0, name)?;
|
||||||
return Ok(());
|
session.metadata_session().execute(&delete_metric_query).wait()?;
|
||||||
}
|
|
||||||
|
|
||||||
let _metric = fetch_metric(session, metric_name)?;
|
let mut delete_metadata_query = stmt!(delete_metadata_query.as_str());
|
||||||
|
delete_metric_query.set_consistency(session.write_consistency())?;
|
||||||
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;");
|
delete_metadata_query.bind(0, name)?;
|
||||||
query.bind(0, metric_name)?;
|
session.metadata_session().execute(&delete_metric_query).wait()?;
|
||||||
query.set_consistency(session.write_consistency())?;
|
|
||||||
session.metadata_session().execute(&query).wait()?;
|
|
||||||
|
|
||||||
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;");
|
|
||||||
query.bind(0, metric_name)?;
|
|
||||||
query.set_consistency(session.write_consistency())?;
|
|
||||||
session.metadata_session().execute(&query).wait()?;
|
|
||||||
|
|
||||||
let mut query = stmt!("DELETE FROM biggraphite_metadata.directories WHERE name = ?;");
|
|
||||||
query.bind(0, metric_name)?;
|
|
||||||
query.set_consistency(session.write_consistency())?;
|
|
||||||
session.metadata_session().execute(&query).wait()?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn metric_write(session: &Session, metric_name: &str, value: f64, retention: &str, timestamp: i64) -> Result<(), Error> {
|
|
||||||
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
|
|
||||||
query.bind(0, metric_name)?;
|
|
||||||
|
|
||||||
let result = session.metadata_session().execute(&query).wait()?;
|
pub fn delete_directory(session: &Session, name: &str) -> Result<(), Error> {
|
||||||
if result.row_count() == 0 {
|
let namespace = "biggraphite_metadata";
|
||||||
create_metric(session, metric_name)?;
|
let delete_directory_query = format!("DELETE FROM {}.directories WHERE name = ?;", namespace);
|
||||||
}
|
|
||||||
|
|
||||||
let stage = Stage::try_from(retention)?;
|
let mut delete_directory_query = stmt!(delete_directory_query.as_str());
|
||||||
|
delete_directory_query.set_consistency(session.write_consistency())?;
|
||||||
let metric = fetch_metric(session, metric_name)?;
|
delete_directory_query.bind(0, name)?;
|
||||||
let (time_start_ms, offset) = stage.time_offset_ms(timestamp);
|
session.metadata_session().execute(&delete_directory_query).wait()?;
|
||||||
|
|
||||||
let query = format!(
|
|
||||||
"INSERT INTO biggraphite.{} (metric, time_start_ms, offset, value) VALUES (?, ?, ?, ?);",
|
|
||||||
stage.table_name()
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut query = stmt!(&query);
|
|
||||||
query.bind(0, CassUuid::from_str(metric.id().as_str())?)?;
|
|
||||||
query.bind(1, time_start_ms)?;
|
|
||||||
query.bind(2, offset as i16)?;
|
|
||||||
query.bind(3, value)?;
|
|
||||||
|
|
||||||
session.points_session().execute(&query).wait()?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
12
src/cmd.rs
Normal file
12
src/cmd.rs
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
/*
|
||||||
|
* bgutil-rs
|
||||||
|
*
|
||||||
|
* Author: Patrick MARIE <pm@mkz.me>
|
||||||
|
*/
|
||||||
|
pub mod clean;
|
||||||
|
pub mod delete;
|
||||||
|
pub mod info;
|
||||||
|
pub mod list;
|
||||||
|
pub mod local_clean;
|
||||||
|
pub mod stats;
|
||||||
|
pub mod write;
|
148
src/cmd/clean.rs
Normal file
148
src/cmd/clean.rs
Normal file
@@ -0,0 +1,148 @@
|
|||||||
|
/*
|
||||||
|
* bgutil-rs
|
||||||
|
*
|
||||||
|
* Author: Patrick MARIE <pm@mkz.me>
|
||||||
|
*/
|
||||||
|
use crate::Session;
|
||||||
|
|
||||||
|
use cassandra_cpp::{BindRustType,Error};
|
||||||
|
use cassandra_cpp::stmt;
|
||||||
|
|
||||||
|
use chrono::Utc;
|
||||||
|
|
||||||
|
pub fn metrics_clean(session: &Session, start_key: i64, end_key: i64, clean_metrics: bool, clean_directories: bool) -> Result<(), Error> {
|
||||||
|
let mut current_token = start_key;
|
||||||
|
let cutoff : u64 = (Utc::now().timestamp() as u64 - 86400 * 14) * 1000;
|
||||||
|
|
||||||
|
let namespace = "biggraphite_metadata";
|
||||||
|
let batch_limit = 1000;
|
||||||
|
|
||||||
|
let query = format!("SELECT name, token(name) FROM {}.metrics_metadata \
|
||||||
|
WHERE updated_on <= maxTimeuuid({}) and token(name) > ? and token(name) < ? LIMIT {};",
|
||||||
|
namespace, cutoff, batch_limit);
|
||||||
|
let delete_metric_query = format!("DELETE FROM {}.metrics WHERE name = ?;", namespace);
|
||||||
|
let delete_metadata_query = format!("DELETE FROM {}.metrics_metadata WHERE name = ?;", namespace);
|
||||||
|
|
||||||
|
let mut deleted_metrics_count = 0;
|
||||||
|
let mut scanned_metrics_count = 0;
|
||||||
|
let mut scanned_directories_count = 0;
|
||||||
|
let mut deleted_directories_count = 0;
|
||||||
|
|
||||||
|
// clean metrics
|
||||||
|
loop {
|
||||||
|
if !clean_metrics || current_token >= end_key {
|
||||||
|
// println!("Stopping: {} >= {}", current_token, end_key);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut outdated_metrics_query = stmt!(query.as_str());
|
||||||
|
outdated_metrics_query.set_consistency(session.read_consistency())?;
|
||||||
|
outdated_metrics_query.bind(0, current_token)?;
|
||||||
|
outdated_metrics_query.bind(1, end_key)?;
|
||||||
|
|
||||||
|
let result = session.metadata_session().execute(&outdated_metrics_query).wait()?;
|
||||||
|
if result.row_count() == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let mut queries = vec![];
|
||||||
|
|
||||||
|
for row in result.iter() {
|
||||||
|
let name = row.get_column_by_name("name".to_string())?.to_string();
|
||||||
|
scanned_metrics_count += 1;
|
||||||
|
let mut delete_metric_query = stmt!(delete_metric_query.as_str());
|
||||||
|
delete_metric_query.set_consistency(session.write_consistency())?;
|
||||||
|
delete_metric_query.bind(0, name.as_str())?;
|
||||||
|
queries.push(session.metadata_session().execute(&delete_metric_query));
|
||||||
|
|
||||||
|
let mut delete_metadata_query = stmt!(delete_metadata_query.as_str());
|
||||||
|
delete_metric_query.set_consistency(session.write_consistency())?;
|
||||||
|
delete_metadata_query.bind(0, name.as_str())?;
|
||||||
|
queries.push(session.metadata_session().execute(&delete_metadata_query));
|
||||||
|
|
||||||
|
deleted_metrics_count += 1;
|
||||||
|
current_token = row.get_column(1)?.get_i64()?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if result.row_count() != batch_limit {
|
||||||
|
// println!("Stopping because count == 0");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
for query in queries {
|
||||||
|
if let Err(err) = query.wait() {
|
||||||
|
eprintln!("Failed: {:?}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let list_directories_query = format!("SELECT name, token(name) FROM {}.directories WHERE token(name) > ? AND token(name) < ? LIMIT {};",
|
||||||
|
namespace, batch_limit);
|
||||||
|
let metric_query = format!("SELECT name FROM {}.metrics WHERE parent LIKE ? LIMIT 1", namespace);
|
||||||
|
let delete_directory_query = format!("DELETE FROM {}.directories WHERE name = ?;", namespace);
|
||||||
|
|
||||||
|
current_token = start_key;
|
||||||
|
|
||||||
|
// clean directories
|
||||||
|
loop {
|
||||||
|
if !clean_directories || current_token >= end_key {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut list_directories_query = stmt!(list_directories_query.as_str());
|
||||||
|
list_directories_query.set_consistency(session.read_consistency())?;
|
||||||
|
list_directories_query.bind(0, current_token)?;
|
||||||
|
list_directories_query.bind(1, end_key)?;
|
||||||
|
|
||||||
|
let list_result = session.metadata_session().execute(&list_directories_query).wait()?;
|
||||||
|
if list_result.row_count() == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut queries = vec![];
|
||||||
|
|
||||||
|
for row in list_result.iter() {
|
||||||
|
let mut name = row.get_column_by_name("name".to_string())?.to_string();
|
||||||
|
let orig_name = name.clone();
|
||||||
|
name.push_str(".%");
|
||||||
|
current_token = row.get_column(1)?.get_i64()?;
|
||||||
|
|
||||||
|
let mut metric_query = stmt!(metric_query.as_str());
|
||||||
|
metric_query.set_consistency(session.read_consistency())?;
|
||||||
|
metric_query.bind(0, name.as_str())?;
|
||||||
|
let query = session.metadata_session().execute(&metric_query);
|
||||||
|
|
||||||
|
queries.push((orig_name, query));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut to_delete_queries = vec![];
|
||||||
|
|
||||||
|
for el in queries {
|
||||||
|
let result = el.1.wait()?;
|
||||||
|
scanned_directories_count += 1;
|
||||||
|
if result.row_count() != 0 {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut delete_directory_query = stmt!(delete_directory_query.as_str());
|
||||||
|
delete_directory_query.set_consistency(session.write_consistency())?;
|
||||||
|
delete_directory_query.bind(0, el.0.as_str())?;
|
||||||
|
|
||||||
|
to_delete_queries.push(session.metadata_session().execute(&delete_directory_query));
|
||||||
|
|
||||||
|
deleted_directories_count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
for to_delete in to_delete_queries {
|
||||||
|
to_delete.wait()?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if list_result.row_count() != batch_limit {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("Deleted {} metrics, {} directories.", deleted_metrics_count, deleted_directories_count);
|
||||||
|
println!("Scanned {} metrics, {} directories", scanned_metrics_count, scanned_directories_count);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
42
src/cmd/delete.rs
Normal file
42
src/cmd/delete.rs
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
/*
|
||||||
|
* bgutil-rs
|
||||||
|
*
|
||||||
|
* Author: Patrick MARIE <pm@mkz.me>
|
||||||
|
*/
|
||||||
|
use std::error;
|
||||||
|
|
||||||
|
use cassandra_cpp::stmt;
|
||||||
|
use cassandra_cpp::BindRustType;
|
||||||
|
use crate::fetch_metric;
|
||||||
|
|
||||||
|
use crate::Session;
|
||||||
|
|
||||||
|
pub fn metric_delete(session: &Session, metric_name: &str) -> Result<(), Box<dyn error::Error>> {
|
||||||
|
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
|
||||||
|
query.bind(0, metric_name)?;
|
||||||
|
|
||||||
|
let result = session.metadata_session().execute(&query).wait()?;
|
||||||
|
if result.row_count() == 0 {
|
||||||
|
println!("Metric is not existing");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let _metric = fetch_metric(session, metric_name)?;
|
||||||
|
|
||||||
|
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;");
|
||||||
|
query.bind(0, metric_name)?;
|
||||||
|
query.set_consistency(session.write_consistency())?;
|
||||||
|
session.metadata_session().execute(&query).wait()?;
|
||||||
|
|
||||||
|
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;");
|
||||||
|
query.bind(0, metric_name)?;
|
||||||
|
query.set_consistency(session.write_consistency())?;
|
||||||
|
session.metadata_session().execute(&query).wait()?;
|
||||||
|
|
||||||
|
let mut query = stmt!("DELETE FROM biggraphite_metadata.directories WHERE name = ?;");
|
||||||
|
query.bind(0, metric_name)?;
|
||||||
|
query.set_consistency(session.write_consistency())?;
|
||||||
|
session.metadata_session().execute(&query).wait()?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
17
src/cmd/info.rs
Normal file
17
src/cmd/info.rs
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
/*
|
||||||
|
* bgutil-rs
|
||||||
|
*
|
||||||
|
* Author: Patrick MARIE <pm@mkz.me>
|
||||||
|
*/
|
||||||
|
use std::error;
|
||||||
|
|
||||||
|
use crate::Session;
|
||||||
|
use crate::fetch_metric;
|
||||||
|
|
||||||
|
pub fn metric_info(session: &Session, metric_name: &str) -> Result<(), Box<dyn error::Error>> {
|
||||||
|
let metric = fetch_metric(session, metric_name)?;
|
||||||
|
|
||||||
|
println!("{}", metric);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
62
src/cmd/list.rs
Normal file
62
src/cmd/list.rs
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
/*
|
||||||
|
* bgutil-rs
|
||||||
|
*
|
||||||
|
* Author: Patrick MARIE <pm@mkz.me>
|
||||||
|
*/
|
||||||
|
|
||||||
|
use crate::prepare_component_query_globstar;
|
||||||
|
use crate::fetch_metrics;
|
||||||
|
use crate::Session;
|
||||||
|
use cassandra_cpp::{Error};
|
||||||
|
|
||||||
|
pub fn metric_list(session: &Session, glob: &str) -> Result<(), Error> {
|
||||||
|
let components = glob.split(".").collect::<Vec<&str>>();
|
||||||
|
|
||||||
|
let query_directories = prepare_component_query_globstar("directories", &components)?;
|
||||||
|
let mut results = vec![];
|
||||||
|
|
||||||
|
for mut q in query_directories {
|
||||||
|
q.set_consistency(session.read_consistency())?;
|
||||||
|
results.push(session.metadata_session().execute(&q));
|
||||||
|
}
|
||||||
|
|
||||||
|
for result in results {
|
||||||
|
let rows = result.wait()?;
|
||||||
|
for row in rows.iter() {
|
||||||
|
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
|
||||||
|
println!("d {}", name);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let query_metrics = prepare_component_query_globstar("metrics", &components)?;
|
||||||
|
let mut results = vec![];
|
||||||
|
|
||||||
|
for mut q in query_metrics {
|
||||||
|
q.set_consistency(session.read_consistency())?;
|
||||||
|
results.push(session.metadata_session().execute(&q));
|
||||||
|
}
|
||||||
|
|
||||||
|
for result in results {
|
||||||
|
let res = result.wait();
|
||||||
|
if let Err(err) = res {
|
||||||
|
eprintln!("Query failed: {}", err);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let rows = res.unwrap();
|
||||||
|
let names = rows
|
||||||
|
.iter()
|
||||||
|
.map(|x| {
|
||||||
|
x.get_column_by_name("name".to_string()).unwrap().to_string()
|
||||||
|
})
|
||||||
|
.collect::<Vec<String>>();
|
||||||
|
|
||||||
|
let metrics = fetch_metrics(session, &names)?;
|
||||||
|
for metric in metrics {
|
||||||
|
println!("m {}", metric);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
135
src/cmd/local_clean.rs
Normal file
135
src/cmd/local_clean.rs
Normal file
@@ -0,0 +1,135 @@
|
|||||||
|
/*
|
||||||
|
* bgutil-rs
|
||||||
|
*
|
||||||
|
* Author: Patrick MARIE <pm@mkz.me>
|
||||||
|
*/
|
||||||
|
use std::error;
|
||||||
|
|
||||||
|
use crate::Session;
|
||||||
|
|
||||||
|
use crate::delete_directory;
|
||||||
|
use crate::delete_metric;
|
||||||
|
use crate::fetch_metric;
|
||||||
|
use crate::prepare_component_query_globstar;
|
||||||
|
|
||||||
|
use cassandra_cpp::stmt;
|
||||||
|
use cassandra_cpp::BindRustType;
|
||||||
|
|
||||||
|
use chrono::Utc;
|
||||||
|
|
||||||
|
fn clean_metrics_in_directory(session: &Session, directory: &str) -> Result<(), Box<dyn error::Error>> {
|
||||||
|
// println!("Cleaning metrics in directory: '{}'", directory);
|
||||||
|
|
||||||
|
let mut directory = String::from(directory);
|
||||||
|
directory.push_str(".**");
|
||||||
|
|
||||||
|
let components = directory.split(".").collect::<Vec<&str>>();
|
||||||
|
let mut results = vec![];
|
||||||
|
|
||||||
|
let query_metrics = prepare_component_query_globstar("metrics", &components)?;
|
||||||
|
let outdated_ts : u64 = (Utc::now().timestamp() as u64 - 14 * 86400) * 1000;
|
||||||
|
|
||||||
|
for mut q in query_metrics {
|
||||||
|
q.set_consistency(session.read_consistency())?;
|
||||||
|
results.push(session.metadata_session().execute(&q));
|
||||||
|
}
|
||||||
|
|
||||||
|
for result in results {
|
||||||
|
let rows = result.wait()?;
|
||||||
|
for row in rows.iter() {
|
||||||
|
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
|
||||||
|
let metric = fetch_metric(session, &name);
|
||||||
|
|
||||||
|
if let Err(e) = metric {
|
||||||
|
eprintln!("Error while retrieving metric: {}", e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let metric = metric.unwrap();
|
||||||
|
|
||||||
|
if metric.updated_on() > outdated_ts {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("Deleting metric {}", metric.name());
|
||||||
|
if session.is_dry_run() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
delete_metric(session, metric.name())?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn directory_has_metrics(session: &Session, directory: &str) -> Result<bool, Box<dyn error::Error>> {
|
||||||
|
let query = format!("SELECT name FROM biggraphite_metadata.metrics WHERE parent LIKE ? LIMIT 1;");
|
||||||
|
let mut query = stmt!(query.as_str());
|
||||||
|
let mut directory = String::from(directory);
|
||||||
|
directory.push_str(".%");
|
||||||
|
query.bind(0, directory.as_str())?;
|
||||||
|
|
||||||
|
let result = session.metadata_session().execute(&query).wait()?;
|
||||||
|
|
||||||
|
Ok(result.row_count() != 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clean_empty_directories_in_directory(session: &Session, directory: &str) -> Result<(), Box<dyn error::Error>> {
|
||||||
|
// println!("Cleaning empty directories in directory '{}'", directory);
|
||||||
|
|
||||||
|
let mut directory = String::from(directory);
|
||||||
|
directory.push_str(".**");
|
||||||
|
|
||||||
|
let components = directory.split(".").collect::<Vec<&str>>();
|
||||||
|
let mut results = vec![];
|
||||||
|
|
||||||
|
let query_directories = prepare_component_query_globstar("directories", &components)?;
|
||||||
|
|
||||||
|
for mut q in query_directories {
|
||||||
|
q.set_consistency(session.read_consistency())?;
|
||||||
|
results.push(session.metadata_session().execute(&q));
|
||||||
|
}
|
||||||
|
|
||||||
|
for result in results {
|
||||||
|
let rows = result.wait()?;
|
||||||
|
for row in rows.iter() {
|
||||||
|
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
|
||||||
|
|
||||||
|
if directory_has_metrics(session, &name)? {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("Deleting directory {}", name);
|
||||||
|
if session.is_dry_run() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
delete_directory(session, &name)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn metrics_local_clean(session: &Session, directory: &str) -> Result<(), Box<dyn error::Error>> {
|
||||||
|
let components = directory.split(".").collect::<Vec<&str>>();
|
||||||
|
|
||||||
|
let query_directories = prepare_component_query_globstar("directories", &components)?;
|
||||||
|
let mut results = vec![];
|
||||||
|
|
||||||
|
for mut q in query_directories {
|
||||||
|
q.set_consistency(session.read_consistency())?;
|
||||||
|
results.push(session.metadata_session().execute(&q));
|
||||||
|
}
|
||||||
|
|
||||||
|
for result in results {
|
||||||
|
let rows = result.wait()?;
|
||||||
|
for row in rows.iter() {
|
||||||
|
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
|
||||||
|
|
||||||
|
clean_metrics_in_directory(session, &name)?;
|
||||||
|
clean_empty_directories_in_directory(session, &name)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
70
src/cmd/stats.rs
Normal file
70
src/cmd/stats.rs
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
/*
|
||||||
|
* bgutil-rs
|
||||||
|
*
|
||||||
|
* Author: Patrick MARIE <pm@mkz.me>
|
||||||
|
*/
|
||||||
|
use cassandra_cpp::{BindRustType,Error};
|
||||||
|
use cassandra_cpp::stmt;
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use crate::Metric;
|
||||||
|
use crate::Session;
|
||||||
|
|
||||||
|
pub fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), Error> {
|
||||||
|
let q =
|
||||||
|
"SELECT id, name, token(name), config, created_on, updated_on, read_on \
|
||||||
|
FROM biggraphite_metadata.metrics_metadata WHERE token(name) > ? LIMIT 1000";
|
||||||
|
|
||||||
|
let mut current_token = start_key;
|
||||||
|
let mut n = 0;
|
||||||
|
let mut points : u64 = 0;
|
||||||
|
|
||||||
|
let mut stats : HashMap<String, usize> = HashMap::new();
|
||||||
|
|
||||||
|
while current_token < end_key {
|
||||||
|
let mut query = stmt!(q);
|
||||||
|
query.bind(0, current_token)?;
|
||||||
|
|
||||||
|
let results = session.metadata_session().execute(&query).wait()?;
|
||||||
|
|
||||||
|
for row in results.iter() {
|
||||||
|
current_token = row.get_column(2)?.get_i64()?;
|
||||||
|
|
||||||
|
let metric : Metric = row.into();
|
||||||
|
let stages = match metric.stages() {
|
||||||
|
Ok(stages) => stages,
|
||||||
|
Err(_) => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
for stage in stages {
|
||||||
|
points += stage.points() as u64;
|
||||||
|
}
|
||||||
|
|
||||||
|
let parts = metric.name().split(".").collect::<Vec<&str>>();
|
||||||
|
*stats.entry(String::from(parts[0])).or_insert(0) += 1;
|
||||||
|
|
||||||
|
n += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let p : f64 = ((current_token - start_key) / std::i64::MAX) as f64;
|
||||||
|
|
||||||
|
println!("Range: {} -> {} ({:.4}%)", start_key, current_token, 100. * p);
|
||||||
|
println!("{} metrics", n);
|
||||||
|
println!("{} points", points);
|
||||||
|
println!("-----");
|
||||||
|
|
||||||
|
let mut vec : Vec<(&String, &usize)> = stats.iter().collect();
|
||||||
|
vec.sort_by(|a, b| b.1.cmp(a.1));
|
||||||
|
|
||||||
|
for (id, v) in vec.iter().enumerate() {
|
||||||
|
println!("{} {}", v.0, v.1);
|
||||||
|
|
||||||
|
if id == 10 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
49
src/cmd/write.rs
Normal file
49
src/cmd/write.rs
Normal file
@@ -0,0 +1,49 @@
|
|||||||
|
/*
|
||||||
|
* bgutil-rs
|
||||||
|
*
|
||||||
|
* Author: Patrick MARIE <pm@mkz.me>
|
||||||
|
*/
|
||||||
|
use std::error;
|
||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
use crate::Session;
|
||||||
|
use crate::Stage;
|
||||||
|
|
||||||
|
use crate::create_metric;
|
||||||
|
use crate::fetch_metric;
|
||||||
|
|
||||||
|
use cassandra_cpp::stmt;
|
||||||
|
use cassandra_cpp::BindRustType;
|
||||||
|
use cassandra_cpp::Uuid as CassUuid;
|
||||||
|
|
||||||
|
use std::convert::TryFrom;
|
||||||
|
|
||||||
|
pub fn metric_write(session: &Session, metric_name: &str, value: f64, retention: &str, timestamp: i64) -> Result<(), Box<dyn error::Error>> {
|
||||||
|
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
|
||||||
|
query.bind(0, metric_name)?;
|
||||||
|
|
||||||
|
let result = session.metadata_session().execute(&query).wait()?;
|
||||||
|
if result.row_count() == 0 {
|
||||||
|
create_metric(session, metric_name)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let stage = Stage::try_from(retention)?;
|
||||||
|
|
||||||
|
let metric = fetch_metric(session, metric_name)?;
|
||||||
|
let (time_start_ms, offset) = stage.time_offset_ms(timestamp);
|
||||||
|
|
||||||
|
let query = format!(
|
||||||
|
"INSERT INTO biggraphite.{} (metric, time_start_ms, offset, value) VALUES (?, ?, ?, ?);",
|
||||||
|
stage.table_name()
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut query = stmt!(&query);
|
||||||
|
query.bind(0, CassUuid::from_str(metric.id().as_str())?)?;
|
||||||
|
query.bind(1, time_start_ms)?;
|
||||||
|
query.bind(2, offset as i16)?;
|
||||||
|
query.bind(3, value)?;
|
||||||
|
|
||||||
|
session.points_session().execute(&query).wait()?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
333
src/main.rs
333
src/main.rs
@@ -3,13 +3,10 @@
|
|||||||
*
|
*
|
||||||
* Author: Patrick MARIE <pm@mkz.me>
|
* Author: Patrick MARIE <pm@mkz.me>
|
||||||
*/
|
*/
|
||||||
|
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::error;
|
use std::error;
|
||||||
|
|
||||||
use cassandra_cpp::{BindRustType,CassResult,Error,Statement};
|
use cassandra_cpp::CassResult;
|
||||||
use cassandra_cpp::{stmt};
|
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use clap::{App,AppSettings,Arg,SubCommand};
|
use clap::{App,AppSettings,Arg,SubCommand};
|
||||||
|
|
||||||
@@ -18,12 +15,20 @@ mod metric;
|
|||||||
mod session;
|
mod session;
|
||||||
mod stage;
|
mod stage;
|
||||||
mod timerange;
|
mod timerange;
|
||||||
|
mod cmd;
|
||||||
|
|
||||||
use crate::cassandra::*;
|
use crate::cassandra::*;
|
||||||
use crate::session::Session;
|
use crate::session::Session;
|
||||||
use crate::stage::*;
|
use crate::stage::Stage;
|
||||||
use crate::metric::Metric;
|
use crate::metric::Metric;
|
||||||
|
|
||||||
|
use crate::cmd::clean::*;
|
||||||
|
use crate::cmd::delete::*;
|
||||||
|
use crate::cmd::info::*;
|
||||||
|
use crate::cmd::list::*;
|
||||||
|
use crate::cmd::local_clean::*;
|
||||||
|
use crate::cmd::stats::*;
|
||||||
|
use crate::cmd::write::*;
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
fn describe_result(result: &CassResult) {
|
fn describe_result(result: &CassResult) {
|
||||||
@@ -38,181 +43,20 @@ fn describe_result(result: &CassResult) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn metric_info(session: &Session, metric_name: &str) -> Result<(), Error> {
|
|
||||||
let metric = fetch_metric(session, metric_name)?;
|
|
||||||
|
|
||||||
println!("{}", metric);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn prepare_component_query(table_name: &str, arguments: &Vec<&str>) -> Result<Statement, Error> {
|
|
||||||
let mut q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name);
|
|
||||||
let mut component_number = 0;
|
|
||||||
let mut components = vec![];
|
|
||||||
|
|
||||||
for (id, component) in arguments.iter().enumerate() {
|
|
||||||
let mut operator = "=";
|
|
||||||
|
|
||||||
if *component == "*" {
|
|
||||||
component_number += 1;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if component_number != 0 {
|
|
||||||
q.push_str("AND ");
|
|
||||||
}
|
|
||||||
|
|
||||||
if component.ends_with("*") {
|
|
||||||
operator = "LIKE";
|
|
||||||
}
|
|
||||||
|
|
||||||
q.push_str(format!("component_{} {} ? ", id, operator).as_str());
|
|
||||||
component_number += 1;
|
|
||||||
components.push(component.replace("*", "%"));
|
|
||||||
}
|
|
||||||
|
|
||||||
if component_number != 0 {
|
|
||||||
q.push_str("AND ");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Adding last component for __END__.
|
|
||||||
q.push_str(format!("component_{} = ? ALLOW FILTERING;", component_number).as_str());
|
|
||||||
components.push("__END__".to_string());
|
|
||||||
|
|
||||||
let mut query = stmt!(q.as_str());
|
|
||||||
|
|
||||||
for (id, arg) in components.iter().enumerate() {
|
|
||||||
query.bind(id, arg.as_str())?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(query)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn prepare_component_query_globstar(table_name: &str, arguments: &Vec<&str>) -> Result<Vec<Statement>, Error> {
|
|
||||||
let _q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name);
|
|
||||||
let _component_number = 0;
|
|
||||||
|
|
||||||
let mut out = vec![];
|
|
||||||
|
|
||||||
let pos_globstar = arguments.iter().enumerate().filter(|(_, &x)| x == "**").map(|(id, _)| id).collect::<Vec<usize>>();
|
|
||||||
if pos_globstar.len() != 1 {
|
|
||||||
// XXX return error
|
|
||||||
return Ok(vec![prepare_component_query(table_name, arguments)?]);
|
|
||||||
}
|
|
||||||
|
|
||||||
let pos_globstar = pos_globstar[0];
|
|
||||||
let mut queries = vec![];
|
|
||||||
|
|
||||||
let mut init_args = vec![];
|
|
||||||
let mut end_args = arguments[pos_globstar+1..].to_vec();
|
|
||||||
end_args.push("__END__");
|
|
||||||
|
|
||||||
for (id, el) in arguments[0..pos_globstar].iter().enumerate() {
|
|
||||||
if *el == "*" {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if el.ends_with("*") {
|
|
||||||
init_args.push((id, "LIKE", el.replace("*", "%")));
|
|
||||||
} else {
|
|
||||||
init_args.push((id, "=", el.to_string()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let components = 16;
|
|
||||||
|
|
||||||
for id in init_args.len()..(components-end_args.len()+1) {
|
|
||||||
let mut current_query = init_args.to_vec();
|
|
||||||
|
|
||||||
for (sub_id, el) in end_args.iter().enumerate() {
|
|
||||||
if *el == "*" {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if el.ends_with("*") {
|
|
||||||
current_query.push((sub_id + id, "LIKE", el.replace("*", "%")));
|
|
||||||
} else {
|
|
||||||
current_query.push((sub_id + id, "=", el.to_string()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
queries.push(current_query);
|
|
||||||
}
|
|
||||||
|
|
||||||
for query in &queries {
|
|
||||||
let mut current_query = _q.to_string();
|
|
||||||
|
|
||||||
for el in query {
|
|
||||||
if el.0 != 0 {
|
|
||||||
current_query.push_str("AND ");
|
|
||||||
}
|
|
||||||
|
|
||||||
current_query.push_str(&format!("component_{} {} ? ", el.0, el.1));
|
|
||||||
}
|
|
||||||
|
|
||||||
current_query.push_str(&String::from("ALLOW FILTERING;"));
|
|
||||||
|
|
||||||
let mut statement = stmt!(¤t_query);
|
|
||||||
for (id, el) in query.iter().enumerate() {
|
|
||||||
statement.bind(id, el.2.as_str())?;
|
|
||||||
}
|
|
||||||
|
|
||||||
out.push(statement);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(out)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn metric_list(session: &Session, glob: &str) -> Result<(), Error> {
|
|
||||||
let components = glob.split(".").collect::<Vec<&str>>();
|
|
||||||
|
|
||||||
let query_directories = prepare_component_query_globstar("directories", &components)?;
|
|
||||||
let mut results = vec![];
|
|
||||||
|
|
||||||
for mut q in query_directories {
|
|
||||||
q.set_consistency(session.read_consistency())?;
|
|
||||||
results.push(session.metadata_session().execute(&q));
|
|
||||||
}
|
|
||||||
|
|
||||||
for result in results {
|
|
||||||
let rows = result.wait()?;
|
|
||||||
for row in rows.iter() {
|
|
||||||
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
|
|
||||||
println!("d {}", name);
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let query_metrics = prepare_component_query_globstar("metrics", &components)?;
|
|
||||||
let mut results = vec![];
|
|
||||||
|
|
||||||
for mut q in query_metrics {
|
|
||||||
q.set_consistency(session.read_consistency())?;
|
|
||||||
results.push(session.metadata_session().execute(&q));
|
|
||||||
}
|
|
||||||
|
|
||||||
for result in results {
|
|
||||||
let rows = result.wait()?;
|
|
||||||
let names = rows
|
|
||||||
.iter()
|
|
||||||
.map(|x| {
|
|
||||||
x.get_column_by_name("name".to_string()).unwrap().to_string()
|
|
||||||
})
|
|
||||||
.collect::<Vec<String>>();
|
|
||||||
|
|
||||||
let metrics = fetch_metrics(session, &names)?;
|
|
||||||
for metric in metrics {
|
|
||||||
println!("m {}", metric);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn main() -> Result<(), Box<dyn error::Error>> {
|
fn main() -> Result<(), Box<dyn error::Error>> {
|
||||||
let matches = App::new("bgutil-rs")
|
let matches = App::new("bgutil-rs")
|
||||||
.setting(AppSettings::SubcommandRequired)
|
.setting(AppSettings::SubcommandRequired)
|
||||||
|
.arg(Arg::with_name("contact-metadata")
|
||||||
|
.long("contact-metadata")
|
||||||
|
.env("CASSANDRA_CONTACT_METADATA")
|
||||||
|
.takes_value(true))
|
||||||
|
.arg(Arg::with_name("contact-points")
|
||||||
|
.long("contact-points")
|
||||||
|
.env("CASSANDRA_CONTACT_POINTS")
|
||||||
|
.takes_value(true))
|
||||||
|
.arg(Arg::with_name("dry-run")
|
||||||
|
.help("Do not write in database (local-clean only)")
|
||||||
|
.long("dry-run"))
|
||||||
.subcommand(SubCommand::with_name("info")
|
.subcommand(SubCommand::with_name("info")
|
||||||
.about("Information about a metric")
|
.about("Information about a metric")
|
||||||
.arg(Arg::with_name("metric")
|
.arg(Arg::with_name("metric")
|
||||||
@@ -269,12 +113,39 @@ fn main() -> Result<(), Box<dyn error::Error>> {
|
|||||||
.arg(Arg::with_name("end-key")
|
.arg(Arg::with_name("end-key")
|
||||||
.long("end-key")
|
.long("end-key")
|
||||||
.takes_value(true)))
|
.takes_value(true)))
|
||||||
|
.subcommand(SubCommand::with_name("clean")
|
||||||
|
.about("Clean outdated metrics & empty directories")
|
||||||
|
.arg(Arg::with_name("start-key")
|
||||||
|
.long("start-key")
|
||||||
|
.takes_value(true))
|
||||||
|
.arg(Arg::with_name("end-key")
|
||||||
|
.long("end-key")
|
||||||
|
.takes_value(true))
|
||||||
|
.arg(Arg::with_name("clean-metrics")
|
||||||
|
.long("clean-metrics"))
|
||||||
|
.arg(Arg::with_name("clean-directories")
|
||||||
|
.long("clean-directories")))
|
||||||
|
.subcommand(SubCommand::with_name("local-clean")
|
||||||
|
.about("Clean a directory of outdated metrics & empty sub-directories")
|
||||||
|
.arg(Arg::with_name("directory")
|
||||||
|
.index(1)
|
||||||
|
.required(true)))
|
||||||
.get_matches();
|
.get_matches();
|
||||||
|
|
||||||
let contact_points_metadata = "tag--cstars07--cassandra-cstars07.query.consul.preprod.crto.in";
|
let mut contact_points_metadata = "localhost";
|
||||||
let contact_points_data = "tag--cstars04--cassandra-cstars04.query.consul.preprod.crto.in";
|
if matches.is_present("contact-metadata") {
|
||||||
|
contact_points_metadata = matches.value_of("contact-metadata").unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
let session = Session::new(&contact_points_metadata, &contact_points_data)?;
|
let mut contact_points_data = "localhost";
|
||||||
|
if matches.is_present("contact-points") {
|
||||||
|
contact_points_data = matches.value_of("contact-points").unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
let dry_run = matches.is_present("dry-run");
|
||||||
|
|
||||||
|
let mut session = Session::new(&contact_points_metadata, &contact_points_data)?;
|
||||||
|
session.set_dry_run(dry_run);
|
||||||
|
|
||||||
match matches.subcommand_name() {
|
match matches.subcommand_name() {
|
||||||
Some("info") => {
|
Some("info") => {
|
||||||
@@ -359,8 +230,8 @@ fn main() -> Result<(), Box<dyn error::Error>> {
|
|||||||
},
|
},
|
||||||
Some("stats") => {
|
Some("stats") => {
|
||||||
let matches = matches.subcommand_matches("stats").unwrap();
|
let matches = matches.subcommand_matches("stats").unwrap();
|
||||||
let start_key = matches.value_of("start-key"); // 0
|
let start_key = matches.value_of("start-key");
|
||||||
let end_key = matches.value_of("end-key"); // 100000000000000
|
let end_key = matches.value_of("end-key");
|
||||||
|
|
||||||
let start_key = match start_key {
|
let start_key = match start_key {
|
||||||
None => 0,
|
None => 0,
|
||||||
@@ -386,6 +257,45 @@ fn main() -> Result<(), Box<dyn error::Error>> {
|
|||||||
|
|
||||||
metric_stats(&session, start_key, end_key)?;
|
metric_stats(&session, start_key, end_key)?;
|
||||||
},
|
},
|
||||||
|
Some("clean") => {
|
||||||
|
let matches = matches.subcommand_matches("clean").unwrap();
|
||||||
|
|
||||||
|
let start_key = matches.value_of("start-key");
|
||||||
|
let end_key = matches.value_of("end-key");
|
||||||
|
|
||||||
|
let start_key = match start_key {
|
||||||
|
None => std::i64::MIN,
|
||||||
|
Some(s) => match s.parse::<i64>() {
|
||||||
|
Ok(n) => n,
|
||||||
|
Err(_) => {
|
||||||
|
eprintln!("Could not parse {}", s);
|
||||||
|
return Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let end_key = match end_key {
|
||||||
|
None => std::i64::MAX,
|
||||||
|
Some(s) => match s.parse::<i64>() {
|
||||||
|
Ok(n) => n,
|
||||||
|
Err(_) => {
|
||||||
|
eprintln!("Could not parse {}", s);
|
||||||
|
return Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let clean_metrics = matches.is_present("clean-metrics");
|
||||||
|
let clean_directories = matches.is_present("clean-directories");
|
||||||
|
|
||||||
|
metrics_clean(&session, start_key, end_key, clean_metrics, clean_directories)?;
|
||||||
|
},
|
||||||
|
Some("local-clean") => {
|
||||||
|
let matches = matches.subcommand_matches("local-clean").unwrap();
|
||||||
|
let directory = matches.value_of("directory").unwrap();
|
||||||
|
|
||||||
|
metrics_local_clean(&session, directory)?;
|
||||||
|
}
|
||||||
None => {
|
None => {
|
||||||
eprintln!("No command was used.");
|
eprintln!("No command was used.");
|
||||||
return Ok(());
|
return Ok(());
|
||||||
@@ -397,62 +307,3 @@ fn main() -> Result<(), Box<dyn error::Error>> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), Error> {
|
|
||||||
let q =
|
|
||||||
"SELECT id, name, token(name), config, created_on, updated_on, read_on \
|
|
||||||
FROM biggraphite_metadata.metrics_metadata WHERE token(name) > ? LIMIT 1000";
|
|
||||||
|
|
||||||
let mut current_token = start_key;
|
|
||||||
let mut n = 0;
|
|
||||||
let mut points : u64 = 0;
|
|
||||||
|
|
||||||
let mut stats : HashMap<String, usize> = HashMap::new();
|
|
||||||
|
|
||||||
while current_token < end_key {
|
|
||||||
let mut query = stmt!(q);
|
|
||||||
query.bind(0, current_token)?;
|
|
||||||
|
|
||||||
let results = session.metadata_session().execute(&query).wait()?;
|
|
||||||
|
|
||||||
for row in results.iter() {
|
|
||||||
current_token = row.get_column(2)?.get_i64()?;
|
|
||||||
|
|
||||||
let metric : Metric = row.into();
|
|
||||||
let stages = match metric.stages() {
|
|
||||||
Ok(stages) => stages,
|
|
||||||
Err(_) => continue,
|
|
||||||
};
|
|
||||||
|
|
||||||
for stage in stages {
|
|
||||||
points += stage.points() as u64;
|
|
||||||
}
|
|
||||||
|
|
||||||
let parts = metric.name().split(".").collect::<Vec<&str>>();
|
|
||||||
|
|
||||||
*stats.entry(String::from(parts[0])).or_insert(0) += 1;
|
|
||||||
|
|
||||||
// println!("{}: {}", current_token, metric.name());
|
|
||||||
n += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let p : f64 = ((current_token - start_key) / std::i64::MAX) as f64;
|
|
||||||
|
|
||||||
println!("Range: {} -> {} ({:.4}%)", start_key, current_token, 100. * p);
|
|
||||||
println!("{} metrics", n);
|
|
||||||
println!("{} points", points);
|
|
||||||
println!("-----");
|
|
||||||
|
|
||||||
let mut vec : Vec<(&String, &usize)> = stats.iter().collect();
|
|
||||||
vec.sort_by(|a, b| b.1.cmp(a.1));
|
|
||||||
|
|
||||||
for (id, v) in vec.iter().enumerate() {
|
|
||||||
println!("{} {}", v.0, v.1);
|
|
||||||
|
|
||||||
if id == 10 {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
@@ -10,6 +10,7 @@ use std::fmt;
|
|||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
|
|
||||||
use cassandra_cpp::Row;
|
use cassandra_cpp::Row;
|
||||||
|
use chrono::Utc;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Metric {
|
pub struct Metric {
|
||||||
@@ -29,6 +30,10 @@ impl Metric {
|
|||||||
&self.name
|
&self.name
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn updated_on(self: &Self) -> u64 {
|
||||||
|
self.updated_on
|
||||||
|
}
|
||||||
|
|
||||||
pub fn config(self: &Self, name: String) -> Result<String, String> {
|
pub fn config(self: &Self, name: String) -> Result<String, String> {
|
||||||
let res = self.config.get(&name);
|
let res = self.config.get(&name);
|
||||||
if let Some(v) = res {
|
if let Some(v) = res {
|
||||||
@@ -91,14 +96,25 @@ impl From<Row> for Metric {
|
|||||||
Err(_) => {},
|
Err(_) => {},
|
||||||
};
|
};
|
||||||
|
|
||||||
let created_on = row.get_column_by_name("created_on".to_string()).unwrap();
|
let created_on = row.get_column_by_name("created_on".to_string());
|
||||||
let created_on_timestamp = match created_on.get_uuid() {
|
let created_on_timestamp = if let Ok(creation_time) = created_on {
|
||||||
Err(_) => 0,
|
match creation_time.get_uuid() {
|
||||||
Ok(v) => v.timestamp(),
|
Err(_) => 0,
|
||||||
|
Ok(v) => v.timestamp(),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Utc::now().timestamp() as u64
|
||||||
};
|
};
|
||||||
|
|
||||||
let updated_on = row.get_column_by_name("updated_on".to_string()).unwrap();
|
let updated_on = row.get_column_by_name("updated_on".to_string());
|
||||||
let updated_on_timestamp = updated_on.get_uuid().unwrap().timestamp();
|
let updated_on_timestamp = if let Ok(updated_time) = updated_on {
|
||||||
|
match updated_time.get_uuid() {
|
||||||
|
Err(_) => 0,
|
||||||
|
Ok(v) => v.timestamp(),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Utc::now().timestamp() as u64
|
||||||
|
};
|
||||||
|
|
||||||
let uuid = match row.get_column_by_name("id".to_string()).unwrap().get_uuid() {
|
let uuid = match row.get_column_by_name("id".to_string()).unwrap().get_uuid() {
|
||||||
Ok(v) => v.to_string(),
|
Ok(v) => v.to_string(),
|
||||||
|
@@ -13,6 +13,7 @@ use crate::cassandra::*;
|
|||||||
pub struct Session {
|
pub struct Session {
|
||||||
metadata: CassSession,
|
metadata: CassSession,
|
||||||
points: CassSession,
|
points: CassSession,
|
||||||
|
dry_run: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Session {
|
impl Session {
|
||||||
@@ -23,11 +24,16 @@ impl Session {
|
|||||||
let session = Self {
|
let session = Self {
|
||||||
metadata: metadata,
|
metadata: metadata,
|
||||||
points: points,
|
points: points,
|
||||||
|
dry_run: false,
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(session)
|
Ok(session)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn set_dry_run(&mut self, dry_run: bool) {
|
||||||
|
self.dry_run = dry_run
|
||||||
|
}
|
||||||
|
|
||||||
pub fn metadata_session(&self) -> &CassSession {
|
pub fn metadata_session(&self) -> &CassSession {
|
||||||
&self.metadata
|
&self.metadata
|
||||||
}
|
}
|
||||||
@@ -45,4 +51,8 @@ impl Session {
|
|||||||
pub fn write_consistency(&self) -> Consistency {
|
pub fn write_consistency(&self) -> Consistency {
|
||||||
Consistency::LOCAL_QUORUM
|
Consistency::LOCAL_QUORUM
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
pub fn is_dry_run(&self) -> bool {
|
||||||
|
self.dry_run
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user