Adding a dry-run mode on local-clean & a retry policy.
Minor fixes to handle invalid metric record.
This commit is contained in:
parent
3d316921e1
commit
a526f388df
@ -14,9 +14,10 @@ use crate::timerange::TimeRange;
|
||||
|
||||
use cassandra_cpp::Session as CassSession;
|
||||
use cassandra_cpp::Uuid as CassUuid;
|
||||
use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,Cluster,Error,LogLevel,Map,Statement};
|
||||
use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,Cluster,Error,LogLevel,Map,RetryPolicy,Statement};
|
||||
use cassandra_cpp::{set_level,stmt};
|
||||
|
||||
use chrono::Duration;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@ -36,7 +37,8 @@ pub fn connect(contact_points: &str) -> Result<CassSession, Error> {
|
||||
let mut cluster = Cluster::default();
|
||||
cluster.set_contact_points(contact_points).unwrap();
|
||||
cluster.set_load_balance_round_robin();
|
||||
|
||||
cluster.set_retry_policy(RetryPolicy::downgrading_consistency_new());
|
||||
cluster.set_request_timeout(Duration::seconds(30));
|
||||
cluster.set_protocol_version(4)?;
|
||||
|
||||
cluster.connect()
|
||||
|
@ -52,6 +52,9 @@ fn clean_metrics_in_directory(session: &Session, directory: &str) -> Result<(),
|
||||
}
|
||||
|
||||
println!("Deleting metric {}", metric.name());
|
||||
if session.is_dry_run() {
|
||||
continue;
|
||||
}
|
||||
delete_metric(session, metric.name())?;
|
||||
}
|
||||
}
|
||||
@ -97,6 +100,9 @@ fn clean_empty_directories_in_directory(session: &Session, directory: &str) -> R
|
||||
}
|
||||
|
||||
println!("Deleting directory {}", name);
|
||||
if session.is_dry_run() {
|
||||
continue;
|
||||
}
|
||||
delete_directory(session, &name)?;
|
||||
}
|
||||
}
|
||||
@ -126,4 +132,4 @@ pub fn metrics_local_clean(session: &Session, directory: &str) -> Result<(), Box
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
12
src/main.rs
12
src/main.rs
@ -54,6 +54,9 @@ fn main() -> Result<(), Box<dyn error::Error>> {
|
||||
.long("contact-points")
|
||||
.env("CASSANDRA_CONTACT_POINTS")
|
||||
.takes_value(true))
|
||||
.arg(Arg::with_name("dry-run")
|
||||
.help("Do not write in database (local-clean only)")
|
||||
.long("dry-run"))
|
||||
.subcommand(SubCommand::with_name("info")
|
||||
.about("Information about a metric")
|
||||
.arg(Arg::with_name("metric")
|
||||
@ -124,7 +127,9 @@ fn main() -> Result<(), Box<dyn error::Error>> {
|
||||
.long("clean-directories")))
|
||||
.subcommand(SubCommand::with_name("local-clean")
|
||||
.about("Clean a directory of outdated metrics & empty sub-directories")
|
||||
.arg(Arg::with_name("directory")))
|
||||
.arg(Arg::with_name("directory")
|
||||
.index(1)
|
||||
.required(true)))
|
||||
.get_matches();
|
||||
|
||||
let mut contact_points_metadata = "localhost";
|
||||
@ -137,7 +142,10 @@ fn main() -> Result<(), Box<dyn error::Error>> {
|
||||
contact_points_data = matches.value_of("contact-points").unwrap();
|
||||
}
|
||||
|
||||
let session = Session::new(&contact_points_metadata, &contact_points_data)?;
|
||||
let dry_run = matches.is_present("dry-run");
|
||||
|
||||
let mut session = Session::new(&contact_points_metadata, &contact_points_data)?;
|
||||
session.set_dry_run(dry_run);
|
||||
|
||||
match matches.subcommand_name() {
|
||||
Some("info") => {
|
||||
|
@ -10,6 +10,7 @@ use std::fmt;
|
||||
use std::convert::TryFrom;
|
||||
|
||||
use cassandra_cpp::Row;
|
||||
use chrono::Utc;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Metric {
|
||||
@ -95,14 +96,25 @@ impl From<Row> for Metric {
|
||||
Err(_) => {},
|
||||
};
|
||||
|
||||
let created_on = row.get_column_by_name("created_on".to_string()).unwrap();
|
||||
let created_on_timestamp = match created_on.get_uuid() {
|
||||
Err(_) => 0,
|
||||
Ok(v) => v.timestamp(),
|
||||
let created_on = row.get_column_by_name("created_on".to_string());
|
||||
let created_on_timestamp = if let Ok(creation_time) = created_on {
|
||||
match creation_time.get_uuid() {
|
||||
Err(_) => 0,
|
||||
Ok(v) => v.timestamp(),
|
||||
}
|
||||
} else {
|
||||
Utc::now().timestamp() as u64
|
||||
};
|
||||
|
||||
let updated_on = row.get_column_by_name("updated_on".to_string()).unwrap();
|
||||
let updated_on_timestamp = updated_on.get_uuid().unwrap().timestamp();
|
||||
let updated_on = row.get_column_by_name("updated_on".to_string());
|
||||
let updated_on_timestamp = if let Ok(updated_time) = updated_on {
|
||||
match updated_time.get_uuid() {
|
||||
Err(_) => 0,
|
||||
Ok(v) => v.timestamp(),
|
||||
}
|
||||
} else {
|
||||
Utc::now().timestamp() as u64
|
||||
};
|
||||
|
||||
let uuid = match row.get_column_by_name("id".to_string()).unwrap().get_uuid() {
|
||||
Ok(v) => v.to_string(),
|
||||
|
@ -13,6 +13,7 @@ use crate::cassandra::*;
|
||||
pub struct Session {
|
||||
metadata: CassSession,
|
||||
points: CassSession,
|
||||
dry_run: bool,
|
||||
}
|
||||
|
||||
impl Session {
|
||||
@ -23,11 +24,16 @@ impl Session {
|
||||
let session = Self {
|
||||
metadata: metadata,
|
||||
points: points,
|
||||
dry_run: false,
|
||||
};
|
||||
|
||||
Ok(session)
|
||||
}
|
||||
|
||||
pub fn set_dry_run(&mut self, dry_run: bool) {
|
||||
self.dry_run = dry_run
|
||||
}
|
||||
|
||||
pub fn metadata_session(&self) -> &CassSession {
|
||||
&self.metadata
|
||||
}
|
||||
@ -45,4 +51,8 @@ impl Session {
|
||||
pub fn write_consistency(&self) -> Consistency {
|
||||
Consistency::LOCAL_QUORUM
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_dry_run(&self) -> bool {
|
||||
self.dry_run
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user