From 6fcd46fb88abdbb6072ccf60f516ad2a524faf80 Mon Sep 17 00:00:00 2001 From: Patrick MARIE Date: Mon, 22 Feb 2021 13:39:45 +0100 Subject: [PATCH] Refactoring: sessions. --- src/cassandra.rs | 53 +++++++++++++++++++++++++++++------------------- src/main.rs | 33 +++++++++++++----------------- 2 files changed, 46 insertions(+), 40 deletions(-) diff --git a/src/cassandra.rs b/src/cassandra.rs index 38bbaa1..431be75 100644 --- a/src/cassandra.rs +++ b/src/cassandra.rs @@ -3,12 +3,23 @@ * * Author: Patrick MARIE */ -use crate::*; -use cassandra_cpp::{BindRustType,Cluster,Error,LogLevel,Session}; +use std::str::FromStr; +use std::convert::TryFrom; + +use crate::metric::Metric; +use crate::session::Session; +use crate::stage::Stage; +use crate::timerange::TimeRange; + +use cassandra_cpp::Session as CassSession; +use cassandra_cpp::Uuid as CassUuid; +use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,Cluster,Consistency,Error,LogLevel,Map}; use cassandra_cpp::{set_level,stmt}; -pub fn connect(contact_points: &str) -> Result { +use uuid::Uuid; + +pub fn connect(contact_points: &str) -> Result { set_level(LogLevel::DISABLED); let mut cluster = Cluster::default(); @@ -27,11 +38,11 @@ pub fn fetch_metric(session: &Session, metric_name: &str) -> Result Result<(), Error> { +pub fn fetch_points(session: &Session, m: &Metric, s: &Stage, time_start: i64, time_end: i64) -> Result<(), Error> { let table_name = s.table_name(); let q = format!( @@ -48,7 +59,7 @@ pub fn fetch_points(session_points: &Session, m: &Metric, s: &Stage, time_start: query.bind(2, range.1 as i16)?; query.bind(3, range.2 as i16)?; - let result = session_points.execute(&query).wait()?; + let result = session.points_session().execute(&query).wait()?; for row in result.iter() { let ts : i64 = row.get_column_by_name("time_start_ms".to_string())?.get_i64()?; @@ -75,7 +86,7 @@ pub fn fetch_metrics(session: &Session, metric_names: &Vec) -> Result) -> Result Result<(), Error> { +pub fn create_metric(session: &Session, metric: &str) -> Result<(), Error> { let mut batch = Batch::new(BatchType::LOGGED); let metrics_parts = metric.split(".").collect::>(); @@ -174,58 +185,58 @@ pub fn create_metric(session_metadata: &Session, metric: &str) -> Result<(), Err query.set_consistency(Consistency::LOCAL_QUORUM)?; - session_metadata.execute(&query).wait()?; + session.metadata_session().execute(&query).wait()?; // Write directories - session_metadata.execute_batch(batch).wait()?; + session.metadata_session().execute_batch(batch).wait()?; println!("Metric was written."); Ok(()) } -pub fn metric_delete(session_metadata: &Session, metric_name: &str) -> Result<(), Error> { +pub fn metric_delete(session: &Session, metric_name: &str) -> Result<(), Error> { let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?"); query.bind(0, metric_name)?; - let result = session_metadata.execute(&query).wait()?; + let result = session.metadata_session().execute(&query).wait()?; if result.row_count() == 0 { println!("Metric is not existing"); return Ok(()); } - let _metric = fetch_metric(&session_metadata, metric_name)?; + let _metric = fetch_metric(session, metric_name)?; let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;"); query.bind(0, metric_name)?; query.set_consistency(Consistency::LOCAL_QUORUM)?; - session_metadata.execute(&query).wait()?; + session.metadata_session().execute(&query).wait()?; let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;"); query.bind(0, metric_name)?; query.set_consistency(Consistency::LOCAL_QUORUM)?; - session_metadata.execute(&query).wait()?; + session.metadata_session().execute(&query).wait()?; let mut query = stmt!("DELETE FROM biggraphite_metadata.directories WHERE name = ?;"); query.bind(0, metric_name)?; query.set_consistency(Consistency::LOCAL_QUORUM)?; - session_metadata.execute(&query).wait()?; + session.metadata_session().execute(&query).wait()?; Ok(()) } -pub fn metric_write(session_metadata: &Session, session_points: &Session, metric_name: &str, value: f64, retention: &str, timestamp: i64) -> Result<(), Error> { +pub fn metric_write(session: &Session, metric_name: &str, value: f64, retention: &str, timestamp: i64) -> Result<(), Error> { let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?"); query.bind(0, metric_name)?; - let result = session_metadata.execute(&query).wait()?; + let result = session.metadata_session().execute(&query).wait()?; if result.row_count() == 0 { - create_metric(session_metadata, metric_name)?; + create_metric(session, metric_name)?; } let stage = Stage::try_from(retention)?; - let metric = fetch_metric(&session_metadata, metric_name)?; + let metric = fetch_metric(session, metric_name)?; let (time_start_ms, offset) = stage.time_offset_ms(timestamp); let query = format!( @@ -239,7 +250,7 @@ pub fn metric_write(session_metadata: &Session, session_points: &Session, metric query.bind(2, offset as i16)?; query.bind(3, value)?; - session_points.execute(&query).wait()?; + session.points_session().execute(&query).wait()?; Ok(()) } diff --git a/src/main.rs b/src/main.rs index c8906a6..a5ea6bc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,19 +3,15 @@ * * Author: Patrick MARIE */ -use std::str::FromStr; + use std::convert::TryFrom; use std::error; -use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,CassResult,Consistency,Error,Map,Statement}; -use cassandra_cpp::Session as CassSession; -use cassandra_cpp::Uuid as CassUuid; +use cassandra_cpp::{BindRustType,CassResult,Consistency,Error,Statement}; use cassandra_cpp::{stmt}; use chrono::Utc; use clap::{App,AppSettings,Arg,SubCommand}; -use uuid::Uuid; - mod cassandra; mod metric; mod session; @@ -23,10 +19,9 @@ mod stage; mod timerange; use crate::cassandra::*; -use crate::metric::*; use crate::session::Session; use crate::stage::*; -use crate::timerange::*; + #[allow(dead_code)] fn describe_result(result: &CassResult) { @@ -41,7 +36,7 @@ fn describe_result(result: &CassResult) { } } -pub fn metric_info(session: &CassSession, metric_name: &str) -> Result<(), Error> { +pub fn metric_info(session: &Session, metric_name: &str) -> Result<(), Error> { let metric = fetch_metric(session, metric_name)?; println!("{}", metric); @@ -92,12 +87,12 @@ fn prepare_component_query(table_name: &str, arguments: &Vec<&str>) -> Result Result<(), Error> { +fn metric_list(session: &Session, glob: &str) -> Result<(), Error> { let components = glob.split(".").collect::>(); let mut query_directories = prepare_component_query("directories", &components)?; query_directories.set_consistency(Consistency::QUORUM)?; - let result = session_metadata.execute(&query_directories).wait()?; + let result = session.metadata_session().execute(&query_directories).wait()?; for row in result.iter() { let name = row.get_column_by_name("name".to_string()).unwrap().to_string(); println!("d {}", name); @@ -105,7 +100,7 @@ fn metric_list(session_metadata: &CassSession, glob: &str) -> Result<(), Error> let mut query = prepare_component_query("metrics", &components)?; query.set_consistency(Consistency::QUORUM)?; - let result = session_metadata.execute(&query).wait()?; + let result = session.metadata_session().execute(&query).wait()?; let names = result .iter() @@ -114,7 +109,7 @@ fn metric_list(session_metadata: &CassSession, glob: &str) -> Result<(), Error> }) .collect::>(); - let metrics = fetch_metrics(session_metadata, &names)?; + let metrics = fetch_metrics(session, &names)?; for metric in metrics { println!("m {}", metric); } @@ -183,7 +178,7 @@ fn main() -> Result<(), Box> { match matches.subcommand_name() { Some("info") => { let matches = matches.subcommand_matches("info").unwrap(); - metric_info(session.metadata_session(), matches.value_of("metric").unwrap())?; + metric_info(&session, matches.value_of("metric").unwrap())?; }, Some("read") => { let matches = matches.subcommand_matches("read").unwrap(); @@ -215,7 +210,7 @@ fn main() -> Result<(), Box> { }; let metric_name = matches.value_of("metric").unwrap(); - let metric = fetch_metric(session.metadata_session(), metric_name)?; + let metric = fetch_metric(&session, metric_name)?; let available_stages = metric.stages()?; let stage = Stage::try_from(stage)?; @@ -225,11 +220,11 @@ fn main() -> Result<(), Box> { return Ok(()); } - fetch_points(session.points_session(), &metric, &stage, time_start, time_end)?; + fetch_points(&session, &metric, &stage, time_start, time_end)?; }, Some("list") => { let matches = matches.subcommand_matches("list").unwrap(); - metric_list(session.metadata_session(), matches.value_of("glob").unwrap())?; + metric_list(&session, matches.value_of("glob").unwrap())?; }, Some("write") => { let matches = matches.subcommand_matches("write").unwrap(); @@ -249,7 +244,7 @@ fn main() -> Result<(), Box> { } }; - metric_write(session.metadata_session(), session.points_session(), metric, value, retention, timestamp)?; + metric_write(&session, metric, value, retention, timestamp)?; }, Some("delete") => { let matches = matches.subcommand_matches("delete").unwrap(); @@ -259,7 +254,7 @@ fn main() -> Result<(), Box> { unimplemented!(); } - metric_delete(session.metadata_session(), &metric)?; + metric_delete(&session, &metric)?; } None => { eprintln!("No command was used.");