From a526f388df04507e8cfe257780761d8e982fdc33 Mon Sep 17 00:00:00 2001 From: Patrick MARIE Date: Fri, 26 Feb 2021 11:45:30 +0100 Subject: [PATCH] Adding a dry-run mode on local-clean & a retry policy. Minor fixes to handle invalid metric record. --- src/cassandra.rs | 6 ++++-- src/cmd/local_clean.rs | 8 +++++++- src/main.rs | 12 ++++++++++-- src/metric.rs | 24 ++++++++++++++++++------ src/session.rs | 12 +++++++++++- 5 files changed, 50 insertions(+), 12 deletions(-) diff --git a/src/cassandra.rs b/src/cassandra.rs index f2b1df7..d145883 100644 --- a/src/cassandra.rs +++ b/src/cassandra.rs @@ -14,9 +14,10 @@ use crate::timerange::TimeRange; use cassandra_cpp::Session as CassSession; use cassandra_cpp::Uuid as CassUuid; -use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,Cluster,Error,LogLevel,Map,Statement}; +use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,Cluster,Error,LogLevel,Map,RetryPolicy,Statement}; use cassandra_cpp::{set_level,stmt}; +use chrono::Duration; use uuid::Uuid; #[derive(Debug, Clone)] @@ -36,7 +37,8 @@ pub fn connect(contact_points: &str) -> Result { let mut cluster = Cluster::default(); cluster.set_contact_points(contact_points).unwrap(); cluster.set_load_balance_round_robin(); - + cluster.set_retry_policy(RetryPolicy::downgrading_consistency_new()); + cluster.set_request_timeout(Duration::seconds(30)); cluster.set_protocol_version(4)?; cluster.connect() diff --git a/src/cmd/local_clean.rs b/src/cmd/local_clean.rs index fed7663..81993d5 100644 --- a/src/cmd/local_clean.rs +++ b/src/cmd/local_clean.rs @@ -52,6 +52,9 @@ fn clean_metrics_in_directory(session: &Session, directory: &str) -> Result<(), } println!("Deleting metric {}", metric.name()); + if session.is_dry_run() { + continue; + } delete_metric(session, metric.name())?; } } @@ -97,6 +100,9 @@ fn clean_empty_directories_in_directory(session: &Session, directory: &str) -> R } println!("Deleting directory {}", name); + if session.is_dry_run() { + continue; + } delete_directory(session, &name)?; } } @@ -126,4 +132,4 @@ pub fn metrics_local_clean(session: &Session, directory: &str) -> Result<(), Box } Ok(()) -} \ No newline at end of file +} diff --git a/src/main.rs b/src/main.rs index c2bc1fa..ab460e7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -54,6 +54,9 @@ fn main() -> Result<(), Box> { .long("contact-points") .env("CASSANDRA_CONTACT_POINTS") .takes_value(true)) + .arg(Arg::with_name("dry-run") + .help("Do not write in database (local-clean only)") + .long("dry-run")) .subcommand(SubCommand::with_name("info") .about("Information about a metric") .arg(Arg::with_name("metric") @@ -124,7 +127,9 @@ fn main() -> Result<(), Box> { .long("clean-directories"))) .subcommand(SubCommand::with_name("local-clean") .about("Clean a directory of outdated metrics & empty sub-directories") - .arg(Arg::with_name("directory"))) + .arg(Arg::with_name("directory") + .index(1) + .required(true))) .get_matches(); let mut contact_points_metadata = "localhost"; @@ -137,7 +142,10 @@ fn main() -> Result<(), Box> { contact_points_data = matches.value_of("contact-points").unwrap(); } - let session = Session::new(&contact_points_metadata, &contact_points_data)?; + let dry_run = matches.is_present("dry-run"); + + let mut session = Session::new(&contact_points_metadata, &contact_points_data)?; + session.set_dry_run(dry_run); match matches.subcommand_name() { Some("info") => { diff --git a/src/metric.rs b/src/metric.rs index f05db28..f1deec2 100644 --- a/src/metric.rs +++ b/src/metric.rs @@ -10,6 +10,7 @@ use std::fmt; use std::convert::TryFrom; use cassandra_cpp::Row; +use chrono::Utc; #[derive(Debug)] pub struct Metric { @@ -95,14 +96,25 @@ impl From for Metric { Err(_) => {}, }; - let created_on = row.get_column_by_name("created_on".to_string()).unwrap(); - let created_on_timestamp = match created_on.get_uuid() { - Err(_) => 0, - Ok(v) => v.timestamp(), + let created_on = row.get_column_by_name("created_on".to_string()); + let created_on_timestamp = if let Ok(creation_time) = created_on { + match creation_time.get_uuid() { + Err(_) => 0, + Ok(v) => v.timestamp(), + } + } else { + Utc::now().timestamp() as u64 }; - let updated_on = row.get_column_by_name("updated_on".to_string()).unwrap(); - let updated_on_timestamp = updated_on.get_uuid().unwrap().timestamp(); + let updated_on = row.get_column_by_name("updated_on".to_string()); + let updated_on_timestamp = if let Ok(updated_time) = updated_on { + match updated_time.get_uuid() { + Err(_) => 0, + Ok(v) => v.timestamp(), + } + } else { + Utc::now().timestamp() as u64 + }; let uuid = match row.get_column_by_name("id".to_string()).unwrap().get_uuid() { Ok(v) => v.to_string(), diff --git a/src/session.rs b/src/session.rs index 3be73ef..389d664 100644 --- a/src/session.rs +++ b/src/session.rs @@ -13,6 +13,7 @@ use crate::cassandra::*; pub struct Session { metadata: CassSession, points: CassSession, + dry_run: bool, } impl Session { @@ -23,11 +24,16 @@ impl Session { let session = Self { metadata: metadata, points: points, + dry_run: false, }; Ok(session) } + pub fn set_dry_run(&mut self, dry_run: bool) { + self.dry_run = dry_run + } + pub fn metadata_session(&self) -> &CassSession { &self.metadata } @@ -45,4 +51,8 @@ impl Session { pub fn write_consistency(&self) -> Consistency { Consistency::LOCAL_QUORUM } -} \ No newline at end of file + + pub fn is_dry_run(&self) -> bool { + self.dry_run + } +}