Refactoring: sessions.

This commit is contained in:
Patrick MARIE 2021-02-22 13:13:54 +01:00
parent 87a3e5e865
commit 3bbc68f431
2 changed files with 61 additions and 25 deletions

View File

@ -7,7 +7,8 @@ use std::str::FromStr;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::error; use std::error;
use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,CassResult,Consistency,Error,Map,Session,Statement}; use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,CassResult,Consistency,Error,Map,Statement};
use cassandra_cpp::Session as CassSession;
use cassandra_cpp::Uuid as CassUuid; use cassandra_cpp::Uuid as CassUuid;
use cassandra_cpp::{stmt}; use cassandra_cpp::{stmt};
use chrono::Utc; use chrono::Utc;
@ -17,11 +18,13 @@ use uuid::Uuid;
mod cassandra; mod cassandra;
mod metric; mod metric;
mod session;
mod stage; mod stage;
mod timerange; mod timerange;
use crate::cassandra::*; use crate::cassandra::*;
use crate::metric::*; use crate::metric::*;
use crate::session::Session;
use crate::stage::*; use crate::stage::*;
use crate::timerange::*; use crate::timerange::*;
@ -38,7 +41,7 @@ fn describe_result(result: &CassResult) {
} }
} }
pub fn metric_info(session: &Session, metric_name: &str) -> Result<(), Error> { pub fn metric_info(session: &CassSession, metric_name: &str) -> Result<(), Error> {
let metric = fetch_metric(session, metric_name)?; let metric = fetch_metric(session, metric_name)?;
println!("{}", metric); println!("{}", metric);
@ -89,7 +92,7 @@ fn prepare_component_query(table_name: &str, arguments: &Vec<&str>) -> Result<St
Ok(query) Ok(query)
} }
fn metric_list(session_metadata: &Session, glob: &str) -> Result<(), Error> { fn metric_list(session_metadata: &CassSession, glob: &str) -> Result<(), Error> {
let components = glob.split(".").collect::<Vec<&str>>(); let components = glob.split(".").collect::<Vec<&str>>();
let mut query_directories = prepare_component_query("directories", &components)?; let mut query_directories = prepare_component_query("directories", &components)?;
@ -175,26 +178,12 @@ fn main() -> Result<(), Box<dyn error::Error>> {
let contact_points_metadata = "tag--cstars07--cassandra-cstars07.query.consul.preprod.crto.in"; let contact_points_metadata = "tag--cstars07--cassandra-cstars07.query.consul.preprod.crto.in";
let contact_points_data = "tag--cstars04--cassandra-cstars04.query.consul.preprod.crto.in"; let contact_points_data = "tag--cstars04--cassandra-cstars04.query.consul.preprod.crto.in";
let session_metadata = match connect(contact_points_metadata) { let session = Session::new(&contact_points_metadata, &contact_points_data)?;
Ok(session) => session,
Err(err) => {
eprintln!("{:?}", err);
return Ok(());
}
};
let session_points = match connect(contact_points_data) {
Ok(session) => session,
Err(err) => {
eprintln!("{:?}", err);
return Ok(());
}
};
match matches.subcommand_name() { match matches.subcommand_name() {
Some("info") => { Some("info") => {
let matches = matches.subcommand_matches("info").unwrap(); let matches = matches.subcommand_matches("info").unwrap();
metric_info(&session_metadata, matches.value_of("metric").unwrap())?; metric_info(session.metadata_session(), matches.value_of("metric").unwrap())?;
}, },
Some("read") => { Some("read") => {
let matches = matches.subcommand_matches("read").unwrap(); let matches = matches.subcommand_matches("read").unwrap();
@ -226,7 +215,7 @@ fn main() -> Result<(), Box<dyn error::Error>> {
}; };
let metric_name = matches.value_of("metric").unwrap(); let metric_name = matches.value_of("metric").unwrap();
let metric = fetch_metric(&session_metadata, metric_name)?; let metric = fetch_metric(session.metadata_session(), metric_name)?;
let available_stages = metric.stages()?; let available_stages = metric.stages()?;
let stage = Stage::try_from(stage)?; let stage = Stage::try_from(stage)?;
@ -236,11 +225,11 @@ fn main() -> Result<(), Box<dyn error::Error>> {
return Ok(()); return Ok(());
} }
fetch_points(&session_points, &metric, &stage, time_start, time_end)?; fetch_points(session.points_session(), &metric, &stage, time_start, time_end)?;
}, },
Some("list") => { Some("list") => {
let matches = matches.subcommand_matches("list").unwrap(); let matches = matches.subcommand_matches("list").unwrap();
metric_list(&session_metadata, matches.value_of("glob").unwrap())?; metric_list(session.metadata_session(), matches.value_of("glob").unwrap())?;
}, },
Some("write") => { Some("write") => {
let matches = matches.subcommand_matches("write").unwrap(); let matches = matches.subcommand_matches("write").unwrap();
@ -260,7 +249,7 @@ fn main() -> Result<(), Box<dyn error::Error>> {
} }
}; };
metric_write(&session_metadata, &session_points, metric, value, retention, timestamp)?; metric_write(session.metadata_session(), session.points_session(), metric, value, retention, timestamp)?;
}, },
Some("delete") => { Some("delete") => {
let matches = matches.subcommand_matches("delete").unwrap(); let matches = matches.subcommand_matches("delete").unwrap();
@ -270,7 +259,7 @@ fn main() -> Result<(), Box<dyn error::Error>> {
unimplemented!(); unimplemented!();
} }
metric_delete(&session_metadata, &metric)?; metric_delete(session.metadata_session(), &metric)?;
} }
None => { None => {
eprintln!("No command was used."); eprintln!("No command was used.");
@ -281,4 +270,3 @@ fn main() -> Result<(), Box<dyn error::Error>> {
Ok(()) Ok(())
} }

48
src/session.rs Normal file
View File

@ -0,0 +1,48 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use cassandra_cpp::Session as CassSession;
use cassandra_cpp::Error as CassError;
use cassandra_cpp::Consistency;
use crate::cassandra::*;
pub struct Session {
metadata: CassSession,
points: CassSession,
}
impl Session {
pub fn new(metadata_contact: &str, points_contact: &str) -> Result<Self, CassError> {
let metadata = connect(metadata_contact)?;
let points = connect(points_contact)?;
let session = Self {
metadata: metadata,
points: points,
};
Ok(session)
}
pub fn metadata_session(&self) -> &CassSession {
&self.metadata
}
pub fn points_session(&self) -> &CassSession {
&self.points
}
// XXX to make configurable
pub fn read_consistency(&self) -> Consistency {
Consistency::LOCAL_QUORUM
}
// XXX to make configurable
pub fn write_consistency(&self) -> Consistency {
Consistency::LOCAL_QUORUM
}
}