Implementing "local_clean" command.

This commit is contained in:
Patrick MARIE 2021-02-25 23:28:09 +01:00
parent 640954ab5a
commit 3d316921e1
9 changed files with 211 additions and 13 deletions

View File

@ -4,6 +4,8 @@
* Author: Patrick MARIE <pm@mkz.me>
*/
use std::str::FromStr;
use std::fmt;
use std::error;
use crate::metric::Metric;
use crate::session::Session;
@ -17,6 +19,17 @@ use cassandra_cpp::{set_level,stmt};
use uuid::Uuid;
#[derive(Debug, Clone)]
struct NoRecord;
impl fmt::Display for NoRecord {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "no record suitable")
}
}
impl error::Error for NoRecord {}
pub fn connect(contact_points: &str) -> Result<CassSession, Error> {
set_level(LogLevel::DISABLED);
@ -147,14 +160,17 @@ pub fn prepare_component_query_globstar(table_name: &str, arguments: &Vec<&str>)
Ok(out)
}
pub fn fetch_metric(session: &Session, metric_name: &str) -> Result<Metric, Error> {
pub fn fetch_metric(session: &Session, metric_name: &str) -> Result<Metric, Box<dyn error::Error>> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.set_consistency(session.read_consistency())?;
query.bind(0, metric_name)?;
// XXX set consistency
// query.set_consistency(session.read_consistency());
let result = session.metadata_session().execute(&query).wait()?;
if result.row_count() == 0 {
return Err(NoRecord.into());
}
Ok(result.first_row().unwrap().into())
}
@ -311,3 +327,33 @@ pub fn create_metric(session: &Session, metric: &str) -> Result<(), Error> {
Ok(())
}
pub fn delete_metric(session: &Session, name: &str) -> Result<(), Error> {
let namespace = "biggraphite_metadata";
let delete_metric_query = format!("DELETE FROM {}.metrics WHERE name = ?;", namespace);
let delete_metadata_query = format!("DELETE FROM {}.metrics_metadata WHERE name = ?;", namespace);
let mut delete_metric_query = stmt!(delete_metric_query.as_str());
delete_metric_query.set_consistency(session.write_consistency())?;
delete_metric_query.bind(0, name)?;
session.metadata_session().execute(&delete_metric_query).wait()?;
let mut delete_metadata_query = stmt!(delete_metadata_query.as_str());
delete_metric_query.set_consistency(session.write_consistency())?;
delete_metadata_query.bind(0, name)?;
session.metadata_session().execute(&delete_metric_query).wait()?;
Ok(())
}
pub fn delete_directory(session: &Session, name: &str) -> Result<(), Error> {
let namespace = "biggraphite_metadata";
let delete_directory_query = format!("DELETE FROM {}.directories WHERE name = ?;", namespace);
let mut delete_directory_query = stmt!(delete_directory_query.as_str());
delete_directory_query.set_consistency(session.write_consistency())?;
delete_directory_query.bind(0, name)?;
session.metadata_session().execute(&delete_directory_query).wait()?;
Ok(())
}

View File

@ -7,5 +7,6 @@ pub mod clean;
pub mod delete;
pub mod info;
pub mod list;
pub mod local_clean;
pub mod stats;
pub mod write;

View File

@ -3,14 +3,15 @@
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use std::error;
use cassandra_cpp::stmt;
use cassandra_cpp::{BindRustType,Error};
use cassandra_cpp::BindRustType;
use crate::fetch_metric;
use crate::Session;
pub fn metric_delete(session: &Session, metric_name: &str) -> Result<(), Error> {
pub fn metric_delete(session: &Session, metric_name: &str) -> Result<(), Box<dyn error::Error>> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.bind(0, metric_name)?;

View File

@ -3,12 +3,12 @@
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use std::error;
use crate::Session;
use crate::fetch_metric;
use cassandra_cpp::Error;
pub fn metric_info(session: &Session, metric_name: &str) -> Result<(), Error> {
pub fn metric_info(session: &Session, metric_name: &str) -> Result<(), Box<dyn error::Error>> {
let metric = fetch_metric(session, metric_name)?;
println!("{}", metric);

View File

@ -38,7 +38,13 @@ pub fn metric_list(session: &Session, glob: &str) -> Result<(), Error> {
}
for result in results {
let rows = result.wait()?;
let res = result.wait();
if let Err(err) = res {
eprintln!("Query failed: {}", err);
continue;
}
let rows = res.unwrap();
let names = rows
.iter()
.map(|x| {

129
src/cmd/local_clean.rs Normal file
View File

@ -0,0 +1,129 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use std::error;
use crate::Session;
use crate::delete_directory;
use crate::delete_metric;
use crate::fetch_metric;
use crate::prepare_component_query_globstar;
use cassandra_cpp::stmt;
use cassandra_cpp::BindRustType;
use chrono::Utc;
fn clean_metrics_in_directory(session: &Session, directory: &str) -> Result<(), Box<dyn error::Error>> {
// println!("Cleaning metrics in directory: '{}'", directory);
let mut directory = String::from(directory);
directory.push_str(".**");
let components = directory.split(".").collect::<Vec<&str>>();
let mut results = vec![];
let query_metrics = prepare_component_query_globstar("metrics", &components)?;
let outdated_ts : u64 = (Utc::now().timestamp() as u64 - 14 * 86400) * 1000;
for mut q in query_metrics {
q.set_consistency(session.read_consistency())?;
results.push(session.metadata_session().execute(&q));
}
for result in results {
let rows = result.wait()?;
for row in rows.iter() {
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
let metric = fetch_metric(session, &name);
if let Err(e) = metric {
eprintln!("Error while retrieving metric: {}", e);
continue;
}
let metric = metric.unwrap();
if metric.updated_on() > outdated_ts {
continue;
}
println!("Deleting metric {}", metric.name());
delete_metric(session, metric.name())?;
}
}
Ok(())
}
fn directory_has_metrics(session: &Session, directory: &str) -> Result<bool, Box<dyn error::Error>> {
let query = format!("SELECT name FROM biggraphite_metadata.metrics WHERE parent LIKE ? LIMIT 1;");
let mut query = stmt!(query.as_str());
let mut directory = String::from(directory);
directory.push_str(".%");
query.bind(0, directory.as_str())?;
let result = session.metadata_session().execute(&query).wait()?;
Ok(result.row_count() != 0)
}
fn clean_empty_directories_in_directory(session: &Session, directory: &str) -> Result<(), Box<dyn error::Error>> {
// println!("Cleaning empty directories in directory '{}'", directory);
let mut directory = String::from(directory);
directory.push_str(".**");
let components = directory.split(".").collect::<Vec<&str>>();
let mut results = vec![];
let query_directories = prepare_component_query_globstar("directories", &components)?;
for mut q in query_directories {
q.set_consistency(session.read_consistency())?;
results.push(session.metadata_session().execute(&q));
}
for result in results {
let rows = result.wait()?;
for row in rows.iter() {
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
if directory_has_metrics(session, &name)? {
continue;
}
println!("Deleting directory {}", name);
delete_directory(session, &name)?;
}
}
Ok(())
}
pub fn metrics_local_clean(session: &Session, directory: &str) -> Result<(), Box<dyn error::Error>> {
let components = directory.split(".").collect::<Vec<&str>>();
let query_directories = prepare_component_query_globstar("directories", &components)?;
let mut results = vec![];
for mut q in query_directories {
q.set_consistency(session.read_consistency())?;
results.push(session.metadata_session().execute(&q));
}
for result in results {
let rows = result.wait()?;
for row in rows.iter() {
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
clean_metrics_in_directory(session, &name)?;
clean_empty_directories_in_directory(session, &name)?;
}
}
Ok(())
}

View File

@ -3,6 +3,7 @@
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use std::error;
use std::str::FromStr;
use crate::Session;
@ -12,12 +13,12 @@ use crate::create_metric;
use crate::fetch_metric;
use cassandra_cpp::stmt;
use cassandra_cpp::{BindRustType,Error};
use cassandra_cpp::BindRustType;
use cassandra_cpp::Uuid as CassUuid;
use std::convert::TryFrom;
pub fn metric_write(session: &Session, metric_name: &str, value: f64, retention: &str, timestamp: i64) -> Result<(), Error> {
pub fn metric_write(session: &Session, metric_name: &str, value: f64, retention: &str, timestamp: i64) -> Result<(), Box<dyn error::Error>> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.bind(0, metric_name)?;

View File

@ -26,6 +26,7 @@ use crate::cmd::clean::*;
use crate::cmd::delete::*;
use crate::cmd::info::*;
use crate::cmd::list::*;
use crate::cmd::local_clean::*;
use crate::cmd::stats::*;
use crate::cmd::write::*;
@ -110,7 +111,7 @@ fn main() -> Result<(), Box<dyn error::Error>> {
.long("end-key")
.takes_value(true)))
.subcommand(SubCommand::with_name("clean")
.about("Stats")
.about("Clean outdated metrics & empty directories")
.arg(Arg::with_name("start-key")
.long("start-key")
.takes_value(true))
@ -121,6 +122,9 @@ fn main() -> Result<(), Box<dyn error::Error>> {
.long("clean-metrics"))
.arg(Arg::with_name("clean-directories")
.long("clean-directories")))
.subcommand(SubCommand::with_name("local-clean")
.about("Clean a directory of outdated metrics & empty sub-directories")
.arg(Arg::with_name("directory")))
.get_matches();
let mut contact_points_metadata = "localhost";
@ -278,6 +282,12 @@ fn main() -> Result<(), Box<dyn error::Error>> {
metrics_clean(&session, start_key, end_key, clean_metrics, clean_directories)?;
},
Some("local-clean") => {
let matches = matches.subcommand_matches("local-clean").unwrap();
let directory = matches.value_of("directory").unwrap();
metrics_local_clean(&session, directory)?;
}
None => {
eprintln!("No command was used.");
return Ok(());

View File

@ -29,6 +29,10 @@ impl Metric {
&self.name
}
pub fn updated_on(self: &Self) -> u64 {
self.updated_on
}
pub fn config(self: &Self, name: String) -> Result<String, String> {
let res = self.config.get(&name);
if let Some(v) = res {