Compare commits

...

7 Commits

Author SHA1 Message Date
7c78a7f17a Updating README.md. 2021-02-26 12:02:31 +01:00
a526f388df Adding a dry-run mode on local-clean & a retry policy.
Minor fixes to handle invalid metric record.
2021-02-26 11:45:30 +01:00
3d316921e1 Implementing "local_clean" command. 2021-02-25 23:28:09 +01:00
640954ab5a Module refactoring. 2021-02-25 21:25:00 +01:00
b8a670bbce Set cassandra servers with args or env var. 2021-02-24 23:57:41 +01:00
9dbf1542b2 Updating documentation. 2021-02-24 21:43:31 +01:00
9fb6975188 Implementing "clean" command. 2021-02-24 21:41:26 +01:00
13 changed files with 894 additions and 316 deletions

View File

@@ -11,22 +11,35 @@ Don't forget to download & install [cassandra-cpp](https://downloads.datastax.co
## Run
```sh
$ cargo build
Finished dev [unoptimized + debuginfo] target(s) in 0.04s
$ cargo run -- --help
bgutil-rs
USAGE:
bgutil-rs <SUBCOMMAND>
bgutil-rs [FLAGS] [OPTIONS] <SUBCOMMAND>
FLAGS:
--dry-run Do not write in database (local-clean only)
-h, --help Prints help information
-V, --version Prints version information
OPTIONS:
--contact-metadata <contact-metadata>
[env: CASSANDRA_CONTACT_METADATA=localhost]
--contact-points <contact-points>
[env: CASSANDRA_CONTACT_POINTS=localhost]
SUBCOMMANDS:
delete Delete metric(s)
help Prints this message or the help of the given subcommand(s)
info Information about a metric
list List metrics with given pattern
read Read a metric contents
write Write a metric and its value
clean Clean outdated metrics & empty directories
delete Delete metric(s)
help Prints this message or the help of the given subcommand(s)
info Information about a metric
list List metrics with given pattern
local-clean Clean a directory of outdated metrics & empty sub-directories
read Read a metric contents
stats Stats
write Write a metric and its value
```
### Info
@@ -150,6 +163,46 @@ ARGS:
<metric>
```
### Clean
```sh
$ cargo run -- clean --help
bgutil-rs-clean
Stats
USAGE:
bgutil-rs clean [FLAGS] [OPTIONS]
FLAGS:
--clean-directories
--clean-metrics
OPTIONS:
--end-key <end-key>
--start-key <start-key>
```
### Local-clean
Clean outdated metrics in a given directory.
```sh
$ cargo run -- local-clean --help
bgutil-rs-local-clean
Clean a directory of outdated metrics & empty sub-directories
USAGE:
bgutil-rs local-clean <directory>
FLAGS:
-h, --help Prints help information
-V, --version Prints version information
ARGS:
<directory>
```
## Todo
* command: read
@@ -160,3 +213,11 @@ ARGS:
- Arguments handling
* command: delete
- with recursive
* command: clean
- progress bar
* ...
## Dedication
This piece of software was written during the mourning of Jean-Yves Moyart, aka Maître Mô, 21/10/1967-20/02/2021. My thoughts were with him, his family, his friends and all of us, who really appreciated him. Rest in Peace.

View File

@@ -3,9 +3,9 @@
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use std::str::FromStr;
use std::convert::TryFrom;
use std::fmt;
use std::error;
use crate::metric::Metric;
use crate::session::Session;
@@ -14,31 +14,165 @@ use crate::timerange::TimeRange;
use cassandra_cpp::Session as CassSession;
use cassandra_cpp::Uuid as CassUuid;
use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,Cluster,Error,LogLevel,Map};
use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,Cluster,Error,LogLevel,Map,RetryPolicy,Statement};
use cassandra_cpp::{set_level,stmt};
use chrono::Duration;
use uuid::Uuid;
#[derive(Debug, Clone)]
struct NoRecord;
impl fmt::Display for NoRecord {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "no record suitable")
}
}
impl error::Error for NoRecord {}
pub fn connect(contact_points: &str) -> Result<CassSession, Error> {
set_level(LogLevel::DISABLED);
let mut cluster = Cluster::default();
cluster.set_contact_points(contact_points).unwrap();
cluster.set_load_balance_round_robin();
cluster.set_retry_policy(RetryPolicy::downgrading_consistency_new());
cluster.set_request_timeout(Duration::seconds(30));
cluster.set_protocol_version(4)?;
cluster.connect()
}
pub fn fetch_metric(session: &Session, metric_name: &str) -> Result<Metric, Error> {
pub fn prepare_component_query(table_name: &str, arguments: &Vec<&str>) -> Result<Statement, Error> {
let mut q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name);
let mut component_number = 0;
let mut components = vec![];
for (id, component) in arguments.iter().enumerate() {
let mut operator = "=";
if *component == "*" {
component_number += 1;
continue;
}
if component_number != 0 {
q.push_str("AND ");
}
if component.ends_with("*") {
operator = "LIKE";
}
q.push_str(format!("component_{} {} ? ", id, operator).as_str());
component_number += 1;
components.push(component.replace("*", "%"));
}
if component_number != 0 {
q.push_str("AND ");
}
// Adding last component for __END__.
q.push_str(format!("component_{} = ? ALLOW FILTERING;", component_number).as_str());
components.push("__END__".to_string());
let mut query = stmt!(q.as_str());
for (id, arg) in components.iter().enumerate() {
query.bind(id, arg.as_str())?;
}
Ok(query)
}
pub fn prepare_component_query_globstar(table_name: &str, arguments: &Vec<&str>) -> Result<Vec<Statement>, Error> {
let _q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name);
let _component_number = 0;
let mut out = vec![];
let pos_globstar = arguments.iter().enumerate().filter(|(_, &x)| x == "**").map(|(id, _)| id).collect::<Vec<usize>>();
if pos_globstar.len() != 1 {
// XXX return error
return Ok(vec![prepare_component_query(table_name, arguments)?]);
}
let pos_globstar = pos_globstar[0];
let mut queries = vec![];
let mut init_args = vec![];
let mut end_args = arguments[pos_globstar+1..].to_vec();
end_args.push("__END__");
for (id, el) in arguments[0..pos_globstar].iter().enumerate() {
if *el == "*" {
continue;
}
if el.ends_with("*") {
init_args.push((id, "LIKE", el.replace("*", "%")));
} else {
init_args.push((id, "=", el.to_string()));
}
}
let components = 16;
for id in init_args.len()..(components-end_args.len()+1) {
let mut current_query = init_args.to_vec();
for (sub_id, el) in end_args.iter().enumerate() {
if *el == "*" {
continue;
}
if el.ends_with("*") {
current_query.push((sub_id + id, "LIKE", el.replace("*", "%")));
} else {
current_query.push((sub_id + id, "=", el.to_string()));
}
}
queries.push(current_query);
}
for query in &queries {
let mut current_query = _q.to_string();
for el in query {
if el.0 != 0 {
current_query.push_str("AND ");
}
current_query.push_str(&format!("component_{} {} ? ", el.0, el.1));
}
current_query.push_str(&String::from("ALLOW FILTERING;"));
let mut statement = stmt!(&current_query);
for (id, el) in query.iter().enumerate() {
statement.bind(id, el.2.as_str())?;
}
out.push(statement);
}
Ok(out)
}
pub fn fetch_metric(session: &Session, metric_name: &str) -> Result<Metric, Box<dyn error::Error>> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.set_consistency(session.read_consistency())?;
query.bind(0, metric_name)?;
// XXX set consistency
// query.set_consistency(session.read_consistency());
let result = session.metadata_session().execute(&query).wait()?;
if result.row_count() == 0 {
return Err(NoRecord.into());
}
let result = session.metadata_session().execute(&query).wait()?;
Ok(result.first_row().unwrap().into())
}
@@ -195,62 +329,33 @@ pub fn create_metric(session: &Session, metric: &str) -> Result<(), Error> {
Ok(())
}
pub fn metric_delete(session: &Session, metric_name: &str) -> Result<(), Error> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.bind(0, metric_name)?;
pub fn delete_metric(session: &Session, name: &str) -> Result<(), Error> {
let namespace = "biggraphite_metadata";
let delete_metric_query = format!("DELETE FROM {}.metrics WHERE name = ?;", namespace);
let delete_metadata_query = format!("DELETE FROM {}.metrics_metadata WHERE name = ?;", namespace);
let result = session.metadata_session().execute(&query).wait()?;
if result.row_count() == 0 {
println!("Metric is not existing");
return Ok(());
}
let mut delete_metric_query = stmt!(delete_metric_query.as_str());
delete_metric_query.set_consistency(session.write_consistency())?;
delete_metric_query.bind(0, name)?;
session.metadata_session().execute(&delete_metric_query).wait()?;
let _metric = fetch_metric(session, metric_name)?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;");
query.bind(0, metric_name)?;
query.set_consistency(session.write_consistency())?;
session.metadata_session().execute(&query).wait()?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;");
query.bind(0, metric_name)?;
query.set_consistency(session.write_consistency())?;
session.metadata_session().execute(&query).wait()?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.directories WHERE name = ?;");
query.bind(0, metric_name)?;
query.set_consistency(session.write_consistency())?;
session.metadata_session().execute(&query).wait()?;
let mut delete_metadata_query = stmt!(delete_metadata_query.as_str());
delete_metric_query.set_consistency(session.write_consistency())?;
delete_metadata_query.bind(0, name)?;
session.metadata_session().execute(&delete_metric_query).wait()?;
Ok(())
}
pub fn metric_write(session: &Session, metric_name: &str, value: f64, retention: &str, timestamp: i64) -> Result<(), Error> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.bind(0, metric_name)?;
let result = session.metadata_session().execute(&query).wait()?;
if result.row_count() == 0 {
create_metric(session, metric_name)?;
}
pub fn delete_directory(session: &Session, name: &str) -> Result<(), Error> {
let namespace = "biggraphite_metadata";
let delete_directory_query = format!("DELETE FROM {}.directories WHERE name = ?;", namespace);
let stage = Stage::try_from(retention)?;
let metric = fetch_metric(session, metric_name)?;
let (time_start_ms, offset) = stage.time_offset_ms(timestamp);
let query = format!(
"INSERT INTO biggraphite.{} (metric, time_start_ms, offset, value) VALUES (?, ?, ?, ?);",
stage.table_name()
);
let mut query = stmt!(&query);
query.bind(0, CassUuid::from_str(metric.id().as_str())?)?;
query.bind(1, time_start_ms)?;
query.bind(2, offset as i16)?;
query.bind(3, value)?;
session.points_session().execute(&query).wait()?;
let mut delete_directory_query = stmt!(delete_directory_query.as_str());
delete_directory_query.set_consistency(session.write_consistency())?;
delete_directory_query.bind(0, name)?;
session.metadata_session().execute(&delete_directory_query).wait()?;
Ok(())
}
}

12
src/cmd.rs Normal file
View File

@@ -0,0 +1,12 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
pub mod clean;
pub mod delete;
pub mod info;
pub mod list;
pub mod local_clean;
pub mod stats;
pub mod write;

148
src/cmd/clean.rs Normal file
View File

@@ -0,0 +1,148 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use crate::Session;
use cassandra_cpp::{BindRustType,Error};
use cassandra_cpp::stmt;
use chrono::Utc;
pub fn metrics_clean(session: &Session, start_key: i64, end_key: i64, clean_metrics: bool, clean_directories: bool) -> Result<(), Error> {
let mut current_token = start_key;
let cutoff : u64 = (Utc::now().timestamp() as u64 - 86400 * 14) * 1000;
let namespace = "biggraphite_metadata";
let batch_limit = 1000;
let query = format!("SELECT name, token(name) FROM {}.metrics_metadata \
WHERE updated_on <= maxTimeuuid({}) and token(name) > ? and token(name) < ? LIMIT {};",
namespace, cutoff, batch_limit);
let delete_metric_query = format!("DELETE FROM {}.metrics WHERE name = ?;", namespace);
let delete_metadata_query = format!("DELETE FROM {}.metrics_metadata WHERE name = ?;", namespace);
let mut deleted_metrics_count = 0;
let mut scanned_metrics_count = 0;
let mut scanned_directories_count = 0;
let mut deleted_directories_count = 0;
// clean metrics
loop {
if !clean_metrics || current_token >= end_key {
// println!("Stopping: {} >= {}", current_token, end_key);
break;
}
let mut outdated_metrics_query = stmt!(query.as_str());
outdated_metrics_query.set_consistency(session.read_consistency())?;
outdated_metrics_query.bind(0, current_token)?;
outdated_metrics_query.bind(1, end_key)?;
let result = session.metadata_session().execute(&outdated_metrics_query).wait()?;
if result.row_count() == 0 {
break;
}
let mut queries = vec![];
for row in result.iter() {
let name = row.get_column_by_name("name".to_string())?.to_string();
scanned_metrics_count += 1;
let mut delete_metric_query = stmt!(delete_metric_query.as_str());
delete_metric_query.set_consistency(session.write_consistency())?;
delete_metric_query.bind(0, name.as_str())?;
queries.push(session.metadata_session().execute(&delete_metric_query));
let mut delete_metadata_query = stmt!(delete_metadata_query.as_str());
delete_metric_query.set_consistency(session.write_consistency())?;
delete_metadata_query.bind(0, name.as_str())?;
queries.push(session.metadata_session().execute(&delete_metadata_query));
deleted_metrics_count += 1;
current_token = row.get_column(1)?.get_i64()?;
}
if result.row_count() != batch_limit {
// println!("Stopping because count == 0");
break;
}
for query in queries {
if let Err(err) = query.wait() {
eprintln!("Failed: {:?}", err);
}
}
}
let list_directories_query = format!("SELECT name, token(name) FROM {}.directories WHERE token(name) > ? AND token(name) < ? LIMIT {};",
namespace, batch_limit);
let metric_query = format!("SELECT name FROM {}.metrics WHERE parent LIKE ? LIMIT 1", namespace);
let delete_directory_query = format!("DELETE FROM {}.directories WHERE name = ?;", namespace);
current_token = start_key;
// clean directories
loop {
if !clean_directories || current_token >= end_key {
break;
}
let mut list_directories_query = stmt!(list_directories_query.as_str());
list_directories_query.set_consistency(session.read_consistency())?;
list_directories_query.bind(0, current_token)?;
list_directories_query.bind(1, end_key)?;
let list_result = session.metadata_session().execute(&list_directories_query).wait()?;
if list_result.row_count() == 0 {
break;
}
let mut queries = vec![];
for row in list_result.iter() {
let mut name = row.get_column_by_name("name".to_string())?.to_string();
let orig_name = name.clone();
name.push_str(".%");
current_token = row.get_column(1)?.get_i64()?;
let mut metric_query = stmt!(metric_query.as_str());
metric_query.set_consistency(session.read_consistency())?;
metric_query.bind(0, name.as_str())?;
let query = session.metadata_session().execute(&metric_query);
queries.push((orig_name, query));
}
let mut to_delete_queries = vec![];
for el in queries {
let result = el.1.wait()?;
scanned_directories_count += 1;
if result.row_count() != 0 {
continue;
}
let mut delete_directory_query = stmt!(delete_directory_query.as_str());
delete_directory_query.set_consistency(session.write_consistency())?;
delete_directory_query.bind(0, el.0.as_str())?;
to_delete_queries.push(session.metadata_session().execute(&delete_directory_query));
deleted_directories_count += 1;
}
for to_delete in to_delete_queries {
to_delete.wait()?;
}
if list_result.row_count() != batch_limit {
break;
}
}
println!("Deleted {} metrics, {} directories.", deleted_metrics_count, deleted_directories_count);
println!("Scanned {} metrics, {} directories", scanned_metrics_count, scanned_directories_count);
Ok(())
}

42
src/cmd/delete.rs Normal file
View File

@@ -0,0 +1,42 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use std::error;
use cassandra_cpp::stmt;
use cassandra_cpp::BindRustType;
use crate::fetch_metric;
use crate::Session;
pub fn metric_delete(session: &Session, metric_name: &str) -> Result<(), Box<dyn error::Error>> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.bind(0, metric_name)?;
let result = session.metadata_session().execute(&query).wait()?;
if result.row_count() == 0 {
println!("Metric is not existing");
return Ok(());
}
let _metric = fetch_metric(session, metric_name)?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;");
query.bind(0, metric_name)?;
query.set_consistency(session.write_consistency())?;
session.metadata_session().execute(&query).wait()?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;");
query.bind(0, metric_name)?;
query.set_consistency(session.write_consistency())?;
session.metadata_session().execute(&query).wait()?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.directories WHERE name = ?;");
query.bind(0, metric_name)?;
query.set_consistency(session.write_consistency())?;
session.metadata_session().execute(&query).wait()?;
Ok(())
}

17
src/cmd/info.rs Normal file
View File

@@ -0,0 +1,17 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use std::error;
use crate::Session;
use crate::fetch_metric;
pub fn metric_info(session: &Session, metric_name: &str) -> Result<(), Box<dyn error::Error>> {
let metric = fetch_metric(session, metric_name)?;
println!("{}", metric);
Ok(())
}

62
src/cmd/list.rs Normal file
View File

@@ -0,0 +1,62 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use crate::prepare_component_query_globstar;
use crate::fetch_metrics;
use crate::Session;
use cassandra_cpp::{Error};
pub fn metric_list(session: &Session, glob: &str) -> Result<(), Error> {
let components = glob.split(".").collect::<Vec<&str>>();
let query_directories = prepare_component_query_globstar("directories", &components)?;
let mut results = vec![];
for mut q in query_directories {
q.set_consistency(session.read_consistency())?;
results.push(session.metadata_session().execute(&q));
}
for result in results {
let rows = result.wait()?;
for row in rows.iter() {
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
println!("d {}", name);
}
}
let query_metrics = prepare_component_query_globstar("metrics", &components)?;
let mut results = vec![];
for mut q in query_metrics {
q.set_consistency(session.read_consistency())?;
results.push(session.metadata_session().execute(&q));
}
for result in results {
let res = result.wait();
if let Err(err) = res {
eprintln!("Query failed: {}", err);
continue;
}
let rows = res.unwrap();
let names = rows
.iter()
.map(|x| {
x.get_column_by_name("name".to_string()).unwrap().to_string()
})
.collect::<Vec<String>>();
let metrics = fetch_metrics(session, &names)?;
for metric in metrics {
println!("m {}", metric);
}
}
Ok(())
}

135
src/cmd/local_clean.rs Normal file
View File

@@ -0,0 +1,135 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use std::error;
use crate::Session;
use crate::delete_directory;
use crate::delete_metric;
use crate::fetch_metric;
use crate::prepare_component_query_globstar;
use cassandra_cpp::stmt;
use cassandra_cpp::BindRustType;
use chrono::Utc;
fn clean_metrics_in_directory(session: &Session, directory: &str) -> Result<(), Box<dyn error::Error>> {
// println!("Cleaning metrics in directory: '{}'", directory);
let mut directory = String::from(directory);
directory.push_str(".**");
let components = directory.split(".").collect::<Vec<&str>>();
let mut results = vec![];
let query_metrics = prepare_component_query_globstar("metrics", &components)?;
let outdated_ts : u64 = (Utc::now().timestamp() as u64 - 14 * 86400) * 1000;
for mut q in query_metrics {
q.set_consistency(session.read_consistency())?;
results.push(session.metadata_session().execute(&q));
}
for result in results {
let rows = result.wait()?;
for row in rows.iter() {
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
let metric = fetch_metric(session, &name);
if let Err(e) = metric {
eprintln!("Error while retrieving metric: {}", e);
continue;
}
let metric = metric.unwrap();
if metric.updated_on() > outdated_ts {
continue;
}
println!("Deleting metric {}", metric.name());
if session.is_dry_run() {
continue;
}
delete_metric(session, metric.name())?;
}
}
Ok(())
}
fn directory_has_metrics(session: &Session, directory: &str) -> Result<bool, Box<dyn error::Error>> {
let query = format!("SELECT name FROM biggraphite_metadata.metrics WHERE parent LIKE ? LIMIT 1;");
let mut query = stmt!(query.as_str());
let mut directory = String::from(directory);
directory.push_str(".%");
query.bind(0, directory.as_str())?;
let result = session.metadata_session().execute(&query).wait()?;
Ok(result.row_count() != 0)
}
fn clean_empty_directories_in_directory(session: &Session, directory: &str) -> Result<(), Box<dyn error::Error>> {
// println!("Cleaning empty directories in directory '{}'", directory);
let mut directory = String::from(directory);
directory.push_str(".**");
let components = directory.split(".").collect::<Vec<&str>>();
let mut results = vec![];
let query_directories = prepare_component_query_globstar("directories", &components)?;
for mut q in query_directories {
q.set_consistency(session.read_consistency())?;
results.push(session.metadata_session().execute(&q));
}
for result in results {
let rows = result.wait()?;
for row in rows.iter() {
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
if directory_has_metrics(session, &name)? {
continue;
}
println!("Deleting directory {}", name);
if session.is_dry_run() {
continue;
}
delete_directory(session, &name)?;
}
}
Ok(())
}
pub fn metrics_local_clean(session: &Session, directory: &str) -> Result<(), Box<dyn error::Error>> {
let components = directory.split(".").collect::<Vec<&str>>();
let query_directories = prepare_component_query_globstar("directories", &components)?;
let mut results = vec![];
for mut q in query_directories {
q.set_consistency(session.read_consistency())?;
results.push(session.metadata_session().execute(&q));
}
for result in results {
let rows = result.wait()?;
for row in rows.iter() {
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
clean_metrics_in_directory(session, &name)?;
clean_empty_directories_in_directory(session, &name)?;
}
}
Ok(())
}

70
src/cmd/stats.rs Normal file
View File

@@ -0,0 +1,70 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use cassandra_cpp::{BindRustType,Error};
use cassandra_cpp::stmt;
use std::collections::HashMap;
use crate::Metric;
use crate::Session;
pub fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), Error> {
let q =
"SELECT id, name, token(name), config, created_on, updated_on, read_on \
FROM biggraphite_metadata.metrics_metadata WHERE token(name) > ? LIMIT 1000";
let mut current_token = start_key;
let mut n = 0;
let mut points : u64 = 0;
let mut stats : HashMap<String, usize> = HashMap::new();
while current_token < end_key {
let mut query = stmt!(q);
query.bind(0, current_token)?;
let results = session.metadata_session().execute(&query).wait()?;
for row in results.iter() {
current_token = row.get_column(2)?.get_i64()?;
let metric : Metric = row.into();
let stages = match metric.stages() {
Ok(stages) => stages,
Err(_) => continue,
};
for stage in stages {
points += stage.points() as u64;
}
let parts = metric.name().split(".").collect::<Vec<&str>>();
*stats.entry(String::from(parts[0])).or_insert(0) += 1;
n += 1;
}
}
let p : f64 = ((current_token - start_key) / std::i64::MAX) as f64;
println!("Range: {} -> {} ({:.4}%)", start_key, current_token, 100. * p);
println!("{} metrics", n);
println!("{} points", points);
println!("-----");
let mut vec : Vec<(&String, &usize)> = stats.iter().collect();
vec.sort_by(|a, b| b.1.cmp(a.1));
for (id, v) in vec.iter().enumerate() {
println!("{} {}", v.0, v.1);
if id == 10 {
break;
}
}
Ok(())
}

49
src/cmd/write.rs Normal file
View File

@@ -0,0 +1,49 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use std::error;
use std::str::FromStr;
use crate::Session;
use crate::Stage;
use crate::create_metric;
use crate::fetch_metric;
use cassandra_cpp::stmt;
use cassandra_cpp::BindRustType;
use cassandra_cpp::Uuid as CassUuid;
use std::convert::TryFrom;
pub fn metric_write(session: &Session, metric_name: &str, value: f64, retention: &str, timestamp: i64) -> Result<(), Box<dyn error::Error>> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.bind(0, metric_name)?;
let result = session.metadata_session().execute(&query).wait()?;
if result.row_count() == 0 {
create_metric(session, metric_name)?;
}
let stage = Stage::try_from(retention)?;
let metric = fetch_metric(session, metric_name)?;
let (time_start_ms, offset) = stage.time_offset_ms(timestamp);
let query = format!(
"INSERT INTO biggraphite.{} (metric, time_start_ms, offset, value) VALUES (?, ?, ?, ?);",
stage.table_name()
);
let mut query = stmt!(&query);
query.bind(0, CassUuid::from_str(metric.id().as_str())?)?;
query.bind(1, time_start_ms)?;
query.bind(2, offset as i16)?;
query.bind(3, value)?;
session.points_session().execute(&query).wait()?;
Ok(())
}

View File

@@ -3,13 +3,10 @@
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use std::collections::HashMap;
use std::convert::TryFrom;
use std::error;
use cassandra_cpp::{BindRustType,CassResult,Error,Statement};
use cassandra_cpp::{stmt};
use cassandra_cpp::CassResult;
use chrono::Utc;
use clap::{App,AppSettings,Arg,SubCommand};
@@ -18,12 +15,20 @@ mod metric;
mod session;
mod stage;
mod timerange;
mod cmd;
use crate::cassandra::*;
use crate::session::Session;
use crate::stage::*;
use crate::stage::Stage;
use crate::metric::Metric;
use crate::cmd::clean::*;
use crate::cmd::delete::*;
use crate::cmd::info::*;
use crate::cmd::list::*;
use crate::cmd::local_clean::*;
use crate::cmd::stats::*;
use crate::cmd::write::*;
#[allow(dead_code)]
fn describe_result(result: &CassResult) {
@@ -38,181 +43,20 @@ fn describe_result(result: &CassResult) {
}
}
pub fn metric_info(session: &Session, metric_name: &str) -> Result<(), Error> {
let metric = fetch_metric(session, metric_name)?;
println!("{}", metric);
Ok(())
}
fn prepare_component_query(table_name: &str, arguments: &Vec<&str>) -> Result<Statement, Error> {
let mut q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name);
let mut component_number = 0;
let mut components = vec![];
for (id, component) in arguments.iter().enumerate() {
let mut operator = "=";
if *component == "*" {
component_number += 1;
continue;
}
if component_number != 0 {
q.push_str("AND ");
}
if component.ends_with("*") {
operator = "LIKE";
}
q.push_str(format!("component_{} {} ? ", id, operator).as_str());
component_number += 1;
components.push(component.replace("*", "%"));
}
if component_number != 0 {
q.push_str("AND ");
}
// Adding last component for __END__.
q.push_str(format!("component_{} = ? ALLOW FILTERING;", component_number).as_str());
components.push("__END__".to_string());
let mut query = stmt!(q.as_str());
for (id, arg) in components.iter().enumerate() {
query.bind(id, arg.as_str())?;
}
Ok(query)
}
fn prepare_component_query_globstar(table_name: &str, arguments: &Vec<&str>) -> Result<Vec<Statement>, Error> {
let _q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name);
let _component_number = 0;
let mut out = vec![];
let pos_globstar = arguments.iter().enumerate().filter(|(_, &x)| x == "**").map(|(id, _)| id).collect::<Vec<usize>>();
if pos_globstar.len() != 1 {
// XXX return error
return Ok(vec![prepare_component_query(table_name, arguments)?]);
}
let pos_globstar = pos_globstar[0];
let mut queries = vec![];
let mut init_args = vec![];
let mut end_args = arguments[pos_globstar+1..].to_vec();
end_args.push("__END__");
for (id, el) in arguments[0..pos_globstar].iter().enumerate() {
if *el == "*" {
continue;
}
if el.ends_with("*") {
init_args.push((id, "LIKE", el.replace("*", "%")));
} else {
init_args.push((id, "=", el.to_string()));
}
}
let components = 16;
for id in init_args.len()..(components-end_args.len()+1) {
let mut current_query = init_args.to_vec();
for (sub_id, el) in end_args.iter().enumerate() {
if *el == "*" {
continue;
}
if el.ends_with("*") {
current_query.push((sub_id + id, "LIKE", el.replace("*", "%")));
} else {
current_query.push((sub_id + id, "=", el.to_string()));
}
}
queries.push(current_query);
}
for query in &queries {
let mut current_query = _q.to_string();
for el in query {
if el.0 != 0 {
current_query.push_str("AND ");
}
current_query.push_str(&format!("component_{} {} ? ", el.0, el.1));
}
current_query.push_str(&String::from("ALLOW FILTERING;"));
let mut statement = stmt!(&current_query);
for (id, el) in query.iter().enumerate() {
statement.bind(id, el.2.as_str())?;
}
out.push(statement);
}
Ok(out)
}
fn metric_list(session: &Session, glob: &str) -> Result<(), Error> {
let components = glob.split(".").collect::<Vec<&str>>();
let query_directories = prepare_component_query_globstar("directories", &components)?;
let mut results = vec![];
for mut q in query_directories {
q.set_consistency(session.read_consistency())?;
results.push(session.metadata_session().execute(&q));
}
for result in results {
let rows = result.wait()?;
for row in rows.iter() {
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
println!("d {}", name);
}
}
let query_metrics = prepare_component_query_globstar("metrics", &components)?;
let mut results = vec![];
for mut q in query_metrics {
q.set_consistency(session.read_consistency())?;
results.push(session.metadata_session().execute(&q));
}
for result in results {
let rows = result.wait()?;
let names = rows
.iter()
.map(|x| {
x.get_column_by_name("name".to_string()).unwrap().to_string()
})
.collect::<Vec<String>>();
let metrics = fetch_metrics(session, &names)?;
for metric in metrics {
println!("m {}", metric);
}
}
Ok(())
}
fn main() -> Result<(), Box<dyn error::Error>> {
let matches = App::new("bgutil-rs")
.setting(AppSettings::SubcommandRequired)
.arg(Arg::with_name("contact-metadata")
.long("contact-metadata")
.env("CASSANDRA_CONTACT_METADATA")
.takes_value(true))
.arg(Arg::with_name("contact-points")
.long("contact-points")
.env("CASSANDRA_CONTACT_POINTS")
.takes_value(true))
.arg(Arg::with_name("dry-run")
.help("Do not write in database (local-clean only)")
.long("dry-run"))
.subcommand(SubCommand::with_name("info")
.about("Information about a metric")
.arg(Arg::with_name("metric")
@@ -269,12 +113,39 @@ fn main() -> Result<(), Box<dyn error::Error>> {
.arg(Arg::with_name("end-key")
.long("end-key")
.takes_value(true)))
.subcommand(SubCommand::with_name("clean")
.about("Clean outdated metrics & empty directories")
.arg(Arg::with_name("start-key")
.long("start-key")
.takes_value(true))
.arg(Arg::with_name("end-key")
.long("end-key")
.takes_value(true))
.arg(Arg::with_name("clean-metrics")
.long("clean-metrics"))
.arg(Arg::with_name("clean-directories")
.long("clean-directories")))
.subcommand(SubCommand::with_name("local-clean")
.about("Clean a directory of outdated metrics & empty sub-directories")
.arg(Arg::with_name("directory")
.index(1)
.required(true)))
.get_matches();
let contact_points_metadata = "tag--cstars07--cassandra-cstars07.query.consul.preprod.crto.in";
let contact_points_data = "tag--cstars04--cassandra-cstars04.query.consul.preprod.crto.in";
let mut contact_points_metadata = "localhost";
if matches.is_present("contact-metadata") {
contact_points_metadata = matches.value_of("contact-metadata").unwrap();
}
let session = Session::new(&contact_points_metadata, &contact_points_data)?;
let mut contact_points_data = "localhost";
if matches.is_present("contact-points") {
contact_points_data = matches.value_of("contact-points").unwrap();
}
let dry_run = matches.is_present("dry-run");
let mut session = Session::new(&contact_points_metadata, &contact_points_data)?;
session.set_dry_run(dry_run);
match matches.subcommand_name() {
Some("info") => {
@@ -359,8 +230,8 @@ fn main() -> Result<(), Box<dyn error::Error>> {
},
Some("stats") => {
let matches = matches.subcommand_matches("stats").unwrap();
let start_key = matches.value_of("start-key"); // 0
let end_key = matches.value_of("end-key"); // 100000000000000
let start_key = matches.value_of("start-key");
let end_key = matches.value_of("end-key");
let start_key = match start_key {
None => 0,
@@ -386,6 +257,45 @@ fn main() -> Result<(), Box<dyn error::Error>> {
metric_stats(&session, start_key, end_key)?;
},
Some("clean") => {
let matches = matches.subcommand_matches("clean").unwrap();
let start_key = matches.value_of("start-key");
let end_key = matches.value_of("end-key");
let start_key = match start_key {
None => std::i64::MIN,
Some(s) => match s.parse::<i64>() {
Ok(n) => n,
Err(_) => {
eprintln!("Could not parse {}", s);
return Ok(())
}
}
};
let end_key = match end_key {
None => std::i64::MAX,
Some(s) => match s.parse::<i64>() {
Ok(n) => n,
Err(_) => {
eprintln!("Could not parse {}", s);
return Ok(())
}
}
};
let clean_metrics = matches.is_present("clean-metrics");
let clean_directories = matches.is_present("clean-directories");
metrics_clean(&session, start_key, end_key, clean_metrics, clean_directories)?;
},
Some("local-clean") => {
let matches = matches.subcommand_matches("local-clean").unwrap();
let directory = matches.value_of("directory").unwrap();
metrics_local_clean(&session, directory)?;
}
None => {
eprintln!("No command was used.");
return Ok(());
@@ -397,62 +307,3 @@ fn main() -> Result<(), Box<dyn error::Error>> {
}
fn metric_stats(session: &Session, start_key: i64, end_key: i64) -> Result<(), Error> {
let q =
"SELECT id, name, token(name), config, created_on, updated_on, read_on \
FROM biggraphite_metadata.metrics_metadata WHERE token(name) > ? LIMIT 1000";
let mut current_token = start_key;
let mut n = 0;
let mut points : u64 = 0;
let mut stats : HashMap<String, usize> = HashMap::new();
while current_token < end_key {
let mut query = stmt!(q);
query.bind(0, current_token)?;
let results = session.metadata_session().execute(&query).wait()?;
for row in results.iter() {
current_token = row.get_column(2)?.get_i64()?;
let metric : Metric = row.into();
let stages = match metric.stages() {
Ok(stages) => stages,
Err(_) => continue,
};
for stage in stages {
points += stage.points() as u64;
}
let parts = metric.name().split(".").collect::<Vec<&str>>();
*stats.entry(String::from(parts[0])).or_insert(0) += 1;
// println!("{}: {}", current_token, metric.name());
n += 1;
}
}
let p : f64 = ((current_token - start_key) / std::i64::MAX) as f64;
println!("Range: {} -> {} ({:.4}%)", start_key, current_token, 100. * p);
println!("{} metrics", n);
println!("{} points", points);
println!("-----");
let mut vec : Vec<(&String, &usize)> = stats.iter().collect();
vec.sort_by(|a, b| b.1.cmp(a.1));
for (id, v) in vec.iter().enumerate() {
println!("{} {}", v.0, v.1);
if id == 10 {
break;
}
}
Ok(())
}

View File

@@ -10,6 +10,7 @@ use std::fmt;
use std::convert::TryFrom;
use cassandra_cpp::Row;
use chrono::Utc;
#[derive(Debug)]
pub struct Metric {
@@ -29,6 +30,10 @@ impl Metric {
&self.name
}
pub fn updated_on(self: &Self) -> u64 {
self.updated_on
}
pub fn config(self: &Self, name: String) -> Result<String, String> {
let res = self.config.get(&name);
if let Some(v) = res {
@@ -91,14 +96,25 @@ impl From<Row> for Metric {
Err(_) => {},
};
let created_on = row.get_column_by_name("created_on".to_string()).unwrap();
let created_on_timestamp = match created_on.get_uuid() {
Err(_) => 0,
Ok(v) => v.timestamp(),
let created_on = row.get_column_by_name("created_on".to_string());
let created_on_timestamp = if let Ok(creation_time) = created_on {
match creation_time.get_uuid() {
Err(_) => 0,
Ok(v) => v.timestamp(),
}
} else {
Utc::now().timestamp() as u64
};
let updated_on = row.get_column_by_name("updated_on".to_string()).unwrap();
let updated_on_timestamp = updated_on.get_uuid().unwrap().timestamp();
let updated_on = row.get_column_by_name("updated_on".to_string());
let updated_on_timestamp = if let Ok(updated_time) = updated_on {
match updated_time.get_uuid() {
Err(_) => 0,
Ok(v) => v.timestamp(),
}
} else {
Utc::now().timestamp() as u64
};
let uuid = match row.get_column_by_name("id".to_string()).unwrap().get_uuid() {
Ok(v) => v.to_string(),

View File

@@ -13,6 +13,7 @@ use crate::cassandra::*;
pub struct Session {
metadata: CassSession,
points: CassSession,
dry_run: bool,
}
impl Session {
@@ -23,11 +24,16 @@ impl Session {
let session = Self {
metadata: metadata,
points: points,
dry_run: false,
};
Ok(session)
}
pub fn set_dry_run(&mut self, dry_run: bool) {
self.dry_run = dry_run
}
pub fn metadata_session(&self) -> &CassSession {
&self.metadata
}
@@ -45,4 +51,8 @@ impl Session {
pub fn write_consistency(&self) -> Consistency {
Consistency::LOCAL_QUORUM
}
}
pub fn is_dry_run(&self) -> bool {
self.dry_run
}
}