Refactoring: sessions.

This commit is contained in:
Patrick MARIE 2021-02-22 13:39:45 +01:00
parent 3bbc68f431
commit 6fcd46fb88
2 changed files with 46 additions and 40 deletions

View File

@ -3,12 +3,23 @@
* *
* Author: Patrick MARIE <pm@mkz.me> * Author: Patrick MARIE <pm@mkz.me>
*/ */
use crate::*;
use cassandra_cpp::{BindRustType,Cluster,Error,LogLevel,Session}; use std::str::FromStr;
use std::convert::TryFrom;
use crate::metric::Metric;
use crate::session::Session;
use crate::stage::Stage;
use crate::timerange::TimeRange;
use cassandra_cpp::Session as CassSession;
use cassandra_cpp::Uuid as CassUuid;
use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,Cluster,Consistency,Error,LogLevel,Map};
use cassandra_cpp::{set_level,stmt}; use cassandra_cpp::{set_level,stmt};
pub fn connect(contact_points: &str) -> Result<Session, Error> { use uuid::Uuid;
pub fn connect(contact_points: &str) -> Result<CassSession, Error> {
set_level(LogLevel::DISABLED); set_level(LogLevel::DISABLED);
let mut cluster = Cluster::default(); let mut cluster = Cluster::default();
@ -27,11 +38,11 @@ pub fn fetch_metric(session: &Session, metric_name: &str) -> Result<Metric, Erro
// XXX set consistency // XXX set consistency
// query.set_consistency(Consistency::QUORUM); // query.set_consistency(Consistency::QUORUM);
let result = session.execute(&query).wait()?; let result = session.metadata_session().execute(&query).wait()?;
Ok(result.first_row().unwrap().into()) Ok(result.first_row().unwrap().into())
} }
pub fn fetch_points(session_points: &Session, m: &Metric, s: &Stage, time_start: i64, time_end: i64) -> Result<(), Error> { pub fn fetch_points(session: &Session, m: &Metric, s: &Stage, time_start: i64, time_end: i64) -> Result<(), Error> {
let table_name = s.table_name(); let table_name = s.table_name();
let q = format!( let q = format!(
@ -48,7 +59,7 @@ pub fn fetch_points(session_points: &Session, m: &Metric, s: &Stage, time_start:
query.bind(2, range.1 as i16)?; query.bind(2, range.1 as i16)?;
query.bind(3, range.2 as i16)?; query.bind(3, range.2 as i16)?;
let result = session_points.execute(&query).wait()?; let result = session.points_session().execute(&query).wait()?;
for row in result.iter() { for row in result.iter() {
let ts : i64 = row.get_column_by_name("time_start_ms".to_string())?.get_i64()?; let ts : i64 = row.get_column_by_name("time_start_ms".to_string())?.get_i64()?;
@ -75,7 +86,7 @@ pub fn fetch_metrics(session: &Session, metric_names: &Vec<String>) -> Result<Ve
query.bind(0, metric_name.as_str())?; query.bind(0, metric_name.as_str())?;
query.set_consistency(Consistency::QUORUM)?; query.set_consistency(Consistency::QUORUM)?;
let result = session.execute(&query); let result = session.metadata_session().execute(&query);
results.push(result); results.push(result);
} }
@ -92,7 +103,7 @@ pub fn fetch_metrics(session: &Session, metric_names: &Vec<String>) -> Result<Ve
Ok(out) Ok(out)
} }
pub fn create_metric(session_metadata: &Session, metric: &str) -> Result<(), Error> { pub fn create_metric(session: &Session, metric: &str) -> Result<(), Error> {
let mut batch = Batch::new(BatchType::LOGGED); let mut batch = Batch::new(BatchType::LOGGED);
let metrics_parts = metric.split(".").collect::<Vec<&str>>(); let metrics_parts = metric.split(".").collect::<Vec<&str>>();
@ -174,58 +185,58 @@ pub fn create_metric(session_metadata: &Session, metric: &str) -> Result<(), Err
query.set_consistency(Consistency::LOCAL_QUORUM)?; query.set_consistency(Consistency::LOCAL_QUORUM)?;
session_metadata.execute(&query).wait()?; session.metadata_session().execute(&query).wait()?;
// Write directories // Write directories
session_metadata.execute_batch(batch).wait()?; session.metadata_session().execute_batch(batch).wait()?;
println!("Metric was written."); println!("Metric was written.");
Ok(()) Ok(())
} }
pub fn metric_delete(session_metadata: &Session, metric_name: &str) -> Result<(), Error> { pub fn metric_delete(session: &Session, metric_name: &str) -> Result<(), Error> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?"); let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.bind(0, metric_name)?; query.bind(0, metric_name)?;
let result = session_metadata.execute(&query).wait()?; let result = session.metadata_session().execute(&query).wait()?;
if result.row_count() == 0 { if result.row_count() == 0 {
println!("Metric is not existing"); println!("Metric is not existing");
return Ok(()); return Ok(());
} }
let _metric = fetch_metric(&session_metadata, metric_name)?; let _metric = fetch_metric(session, metric_name)?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;"); let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;");
query.bind(0, metric_name)?; query.bind(0, metric_name)?;
query.set_consistency(Consistency::LOCAL_QUORUM)?; query.set_consistency(Consistency::LOCAL_QUORUM)?;
session_metadata.execute(&query).wait()?; session.metadata_session().execute(&query).wait()?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;"); let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;");
query.bind(0, metric_name)?; query.bind(0, metric_name)?;
query.set_consistency(Consistency::LOCAL_QUORUM)?; query.set_consistency(Consistency::LOCAL_QUORUM)?;
session_metadata.execute(&query).wait()?; session.metadata_session().execute(&query).wait()?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.directories WHERE name = ?;"); let mut query = stmt!("DELETE FROM biggraphite_metadata.directories WHERE name = ?;");
query.bind(0, metric_name)?; query.bind(0, metric_name)?;
query.set_consistency(Consistency::LOCAL_QUORUM)?; query.set_consistency(Consistency::LOCAL_QUORUM)?;
session_metadata.execute(&query).wait()?; session.metadata_session().execute(&query).wait()?;
Ok(()) Ok(())
} }
pub fn metric_write(session_metadata: &Session, session_points: &Session, metric_name: &str, value: f64, retention: &str, timestamp: i64) -> Result<(), Error> { pub fn metric_write(session: &Session, metric_name: &str, value: f64, retention: &str, timestamp: i64) -> Result<(), Error> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?"); let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.bind(0, metric_name)?; query.bind(0, metric_name)?;
let result = session_metadata.execute(&query).wait()?; let result = session.metadata_session().execute(&query).wait()?;
if result.row_count() == 0 { if result.row_count() == 0 {
create_metric(session_metadata, metric_name)?; create_metric(session, metric_name)?;
} }
let stage = Stage::try_from(retention)?; let stage = Stage::try_from(retention)?;
let metric = fetch_metric(&session_metadata, metric_name)?; let metric = fetch_metric(session, metric_name)?;
let (time_start_ms, offset) = stage.time_offset_ms(timestamp); let (time_start_ms, offset) = stage.time_offset_ms(timestamp);
let query = format!( let query = format!(
@ -239,7 +250,7 @@ pub fn metric_write(session_metadata: &Session, session_points: &Session, metric
query.bind(2, offset as i16)?; query.bind(2, offset as i16)?;
query.bind(3, value)?; query.bind(3, value)?;
session_points.execute(&query).wait()?; session.points_session().execute(&query).wait()?;
Ok(()) Ok(())
} }

View File

@ -3,19 +3,15 @@
* *
* Author: Patrick MARIE <pm@mkz.me> * Author: Patrick MARIE <pm@mkz.me>
*/ */
use std::str::FromStr;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::error; use std::error;
use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,CassResult,Consistency,Error,Map,Statement}; use cassandra_cpp::{BindRustType,CassResult,Consistency,Error,Statement};
use cassandra_cpp::Session as CassSession;
use cassandra_cpp::Uuid as CassUuid;
use cassandra_cpp::{stmt}; use cassandra_cpp::{stmt};
use chrono::Utc; use chrono::Utc;
use clap::{App,AppSettings,Arg,SubCommand}; use clap::{App,AppSettings,Arg,SubCommand};
use uuid::Uuid;
mod cassandra; mod cassandra;
mod metric; mod metric;
mod session; mod session;
@ -23,10 +19,9 @@ mod stage;
mod timerange; mod timerange;
use crate::cassandra::*; use crate::cassandra::*;
use crate::metric::*;
use crate::session::Session; use crate::session::Session;
use crate::stage::*; use crate::stage::*;
use crate::timerange::*;
#[allow(dead_code)] #[allow(dead_code)]
fn describe_result(result: &CassResult) { fn describe_result(result: &CassResult) {
@ -41,7 +36,7 @@ fn describe_result(result: &CassResult) {
} }
} }
pub fn metric_info(session: &CassSession, metric_name: &str) -> Result<(), Error> { pub fn metric_info(session: &Session, metric_name: &str) -> Result<(), Error> {
let metric = fetch_metric(session, metric_name)?; let metric = fetch_metric(session, metric_name)?;
println!("{}", metric); println!("{}", metric);
@ -92,12 +87,12 @@ fn prepare_component_query(table_name: &str, arguments: &Vec<&str>) -> Result<St
Ok(query) Ok(query)
} }
fn metric_list(session_metadata: &CassSession, glob: &str) -> Result<(), Error> { fn metric_list(session: &Session, glob: &str) -> Result<(), Error> {
let components = glob.split(".").collect::<Vec<&str>>(); let components = glob.split(".").collect::<Vec<&str>>();
let mut query_directories = prepare_component_query("directories", &components)?; let mut query_directories = prepare_component_query("directories", &components)?;
query_directories.set_consistency(Consistency::QUORUM)?; query_directories.set_consistency(Consistency::QUORUM)?;
let result = session_metadata.execute(&query_directories).wait()?; let result = session.metadata_session().execute(&query_directories).wait()?;
for row in result.iter() { for row in result.iter() {
let name = row.get_column_by_name("name".to_string()).unwrap().to_string(); let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
println!("d {}", name); println!("d {}", name);
@ -105,7 +100,7 @@ fn metric_list(session_metadata: &CassSession, glob: &str) -> Result<(), Error>
let mut query = prepare_component_query("metrics", &components)?; let mut query = prepare_component_query("metrics", &components)?;
query.set_consistency(Consistency::QUORUM)?; query.set_consistency(Consistency::QUORUM)?;
let result = session_metadata.execute(&query).wait()?; let result = session.metadata_session().execute(&query).wait()?;
let names = result let names = result
.iter() .iter()
@ -114,7 +109,7 @@ fn metric_list(session_metadata: &CassSession, glob: &str) -> Result<(), Error>
}) })
.collect::<Vec<String>>(); .collect::<Vec<String>>();
let metrics = fetch_metrics(session_metadata, &names)?; let metrics = fetch_metrics(session, &names)?;
for metric in metrics { for metric in metrics {
println!("m {}", metric); println!("m {}", metric);
} }
@ -183,7 +178,7 @@ fn main() -> Result<(), Box<dyn error::Error>> {
match matches.subcommand_name() { match matches.subcommand_name() {
Some("info") => { Some("info") => {
let matches = matches.subcommand_matches("info").unwrap(); let matches = matches.subcommand_matches("info").unwrap();
metric_info(session.metadata_session(), matches.value_of("metric").unwrap())?; metric_info(&session, matches.value_of("metric").unwrap())?;
}, },
Some("read") => { Some("read") => {
let matches = matches.subcommand_matches("read").unwrap(); let matches = matches.subcommand_matches("read").unwrap();
@ -215,7 +210,7 @@ fn main() -> Result<(), Box<dyn error::Error>> {
}; };
let metric_name = matches.value_of("metric").unwrap(); let metric_name = matches.value_of("metric").unwrap();
let metric = fetch_metric(session.metadata_session(), metric_name)?; let metric = fetch_metric(&session, metric_name)?;
let available_stages = metric.stages()?; let available_stages = metric.stages()?;
let stage = Stage::try_from(stage)?; let stage = Stage::try_from(stage)?;
@ -225,11 +220,11 @@ fn main() -> Result<(), Box<dyn error::Error>> {
return Ok(()); return Ok(());
} }
fetch_points(session.points_session(), &metric, &stage, time_start, time_end)?; fetch_points(&session, &metric, &stage, time_start, time_end)?;
}, },
Some("list") => { Some("list") => {
let matches = matches.subcommand_matches("list").unwrap(); let matches = matches.subcommand_matches("list").unwrap();
metric_list(session.metadata_session(), matches.value_of("glob").unwrap())?; metric_list(&session, matches.value_of("glob").unwrap())?;
}, },
Some("write") => { Some("write") => {
let matches = matches.subcommand_matches("write").unwrap(); let matches = matches.subcommand_matches("write").unwrap();
@ -249,7 +244,7 @@ fn main() -> Result<(), Box<dyn error::Error>> {
} }
}; };
metric_write(session.metadata_session(), session.points_session(), metric, value, retention, timestamp)?; metric_write(&session, metric, value, retention, timestamp)?;
}, },
Some("delete") => { Some("delete") => {
let matches = matches.subcommand_matches("delete").unwrap(); let matches = matches.subcommand_matches("delete").unwrap();
@ -259,7 +254,7 @@ fn main() -> Result<(), Box<dyn error::Error>> {
unimplemented!(); unimplemented!();
} }
metric_delete(session.metadata_session(), &metric)?; metric_delete(&session, &metric)?;
} }
None => { None => {
eprintln!("No command was used."); eprintln!("No command was used.");