Adding write & delete commands.

This commit is contained in:
2021-02-21 22:04:23 +01:00
parent a388c18408
commit de38b241ca
8 changed files with 545 additions and 132 deletions

245
src/cassandra.rs Normal file
View File

@ -0,0 +1,245 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use crate::*;
use cassandra_cpp::{BindRustType,Cluster,Error,LogLevel,Session};
use cassandra_cpp::{set_level,stmt};
pub fn connect(contact_points: &str) -> Result<Session, Error> {
set_level(LogLevel::DISABLED);
let mut cluster = Cluster::default();
cluster.set_contact_points(contact_points).unwrap();
cluster.set_load_balance_round_robin();
cluster.set_protocol_version(4)?;
cluster.connect()
}
pub fn fetch_metric(session: &Session, metric_name: &str) -> Result<Metric, Error> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.bind(0, metric_name)?;
// XXX set consistency
// query.set_consistency(Consistency::QUORUM);
let result = session.execute(&query).wait()?;
Ok(result.first_row().unwrap().into())
}
pub fn fetch_points(session_points: &Session, m: &Metric, s: &Stage, time_start: i64, time_end: i64) -> Result<(), Error> {
let table_name = s.table_name();
let q = format!(
"SELECT time_start_ms, offset, value FROM biggraphite.{} WHERE metric = ? AND time_start_ms = ? AND offset >= ? AND offset < ? ORDER BY offset",
table_name
);
let ranges = TimeRange::new(&s, time_start, time_end).ranges();
// XXX concurrent
for range in ranges.iter() {
let mut query = stmt!(q.as_str());
query.bind(0, CassUuid::from_str(m.id().as_str())?)?;
query.bind(1, range.0)?;
query.bind(2, range.1 as i16)?;
query.bind(3, range.2 as i16)?;
let result = session_points.execute(&query).wait()?;
for row in result.iter() {
let ts : i64 = row.get_column_by_name("time_start_ms".to_string())?.get_i64()?;
let offset : i16 = row.get_column_by_name("offset".to_string())?.get_i16()?;
let value : f64 = row.get_column_by_name("value".to_string())?.get_f64()?;
let ts = ts / 1000;
let offset : i64 = offset as i64 * s.precision_as_seconds();
println!("{:?};{:?}", ts + offset, value);
}
}
Ok(())
}
/// async fetch multiple metrics
pub fn fetch_metrics(session: &Session, metric_names: &Vec<String>) -> Result<Vec<Metric>, Error> {
let mut results = vec![];
let mut out = vec![];
for metric_name in metric_names.iter() {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.bind(0, metric_name.as_str())?;
query.set_consistency(Consistency::QUORUM)?;
let result = session.execute(&query);
results.push(result);
}
for result in results {
let result = result.wait()?;
if result.row_count() < 1 {
continue;
}
out.push(result.first_row().unwrap().into())
}
Ok(out)
}
pub fn create_metric(session_metadata: &Session, metric: &str) -> Result<(), Error> {
let mut batch = Batch::new(BatchType::LOGGED);
let metrics_parts = metric.split(".").collect::<Vec<&str>>();
for d in 0..metrics_parts.len() {
let mut fields = vec![String::from("name"), String::from("parent")];
let mut values = vec![];
let n = metrics_parts.len() - d;
let path = &metrics_parts[0..n].join(".");
let parent_path = &mut metrics_parts[0..n-1].join(".");
parent_path.push('.');
values.push(String::from(path));
values.push(parent_path.to_string());
for id in 0..=n {
let field = format!("component_{}", id);
fields.push(field);
if id != n {
values.push(String::from(metrics_parts[id]));
} else {
values.push(String::from("__END__"));
}
}
let query = format!("INSERT INTO biggraphite_metadata.{}({}) VALUES ({});",
String::from("directories"),
fields.join(", "),
fields.iter().map(|_| String::from("?")).collect::<Vec<String>>().join(", ")
);
// before anything, create the "metrics" record.
if d == 0 {
let query_metrics = format!("INSERT INTO biggraphite_metadata.{}({}) VALUES ({});",
String::from("metrics"),
fields.join(", "),
fields.iter().map(|_| String::from("?")).collect::<Vec<String>>().join(", ")
);
let mut query_metrics = stmt!(query_metrics.as_str());
for (id, arg) in values.iter().enumerate() {
query_metrics.bind(id, arg.as_str())?;
}
batch.add_statement(&query_metrics)?;
}
let mut query = stmt!(query.as_str());
for (id, arg) in values.iter().enumerate() {
query.bind(id, arg.as_str())?;
}
batch.add_statement(&query)?;
}
let query = format!(
"INSERT INTO biggraphite_metadata.metrics_metadata(name, config, id, created_on, updated_on) VALUES (?, ?, ?, now(), now())"
);
let uuid = Uuid::new_v4();
let mut config = Map::new(0);
config.append_string("aggregator")?;
config.append_string("average")?;
config.append_string("carbon_xfilesfactor")?;
config.append_string("0.500000")?;
config.append_string("retention")?;
config.append_string("11520*60s:720*3600s:730*86400s")?;
let mut query = stmt!(&query);
query.bind(0, metric)?; // name
query.bind(1, config)?; // config
query.bind(2, CassUuid::from_str(&uuid.to_hyphenated().to_string())?)?;
query.set_consistency(Consistency::LOCAL_QUORUM)?;
session_metadata.execute(&query).wait()?;
// Write directories
session_metadata.execute_batch(batch).wait()?;
println!("Metric was written.");
Ok(())
}
pub fn metric_delete(session_metadata: &Session, metric_name: &str) -> Result<(), Error> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.bind(0, metric_name)?;
let result = session_metadata.execute(&query).wait()?;
if result.row_count() == 0 {
println!("Metric is not existing");
return Ok(());
}
let _metric = fetch_metric(&session_metadata, metric_name)?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;");
query.bind(0, metric_name)?;
query.set_consistency(Consistency::LOCAL_QUORUM)?;
session_metadata.execute(&query).wait()?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;");
query.bind(0, metric_name)?;
query.set_consistency(Consistency::LOCAL_QUORUM)?;
session_metadata.execute(&query).wait()?;
let mut query = stmt!("DELETE FROM biggraphite_metadata.directories WHERE name = ?;");
query.bind(0, metric_name)?;
query.set_consistency(Consistency::LOCAL_QUORUM)?;
session_metadata.execute(&query).wait()?;
Ok(())
}
pub fn metric_write(session_metadata: &Session, session_points: &Session, metric_name: &str, value: f64, retention: &str, timestamp: i64) -> Result<(), Error> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.bind(0, metric_name)?;
let result = session_metadata.execute(&query).wait()?;
if result.row_count() == 0 {
create_metric(session_metadata, metric_name)?;
}
let stage = Stage::try_from(retention)?;
let metric = fetch_metric(&session_metadata, metric_name)?;
let (time_start_ms, offset) = stage.time_offset_ms(timestamp);
let query = format!(
"INSERT INTO biggraphite.{} (metric, time_start_ms, offset, value) VALUES (?, ?, ?, ?);",
stage.table_name()
);
let mut query = stmt!(&query);
query.bind(0, CassUuid::from_str(metric.id().as_str())?)?;
query.bind(1, time_start_ms)?;
query.bind(2, offset as i16)?;
query.bind(3, value)?;
session_points.execute(&query).wait()?;
Ok(())
}

View File

@ -5,16 +5,25 @@
*/
use std::str::FromStr;
use std::convert::TryFrom;
use std::error;
use cassandra_cpp::*;
use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,CassResult,Consistency,Error,Map,Session,Statement};
use cassandra_cpp::Uuid as CassUuid;
use cassandra_cpp::{stmt};
use chrono::Utc;
use clap::{App,AppSettings,Arg,SubCommand};
use uuid::Uuid;
mod cassandra;
mod metric;
mod stage;
mod timerange;
use crate::cassandra::*;
use crate::metric::*;
use crate::stage::*;
use crate::timerange::*;
#[allow(dead_code)]
fn describe_result(result: &CassResult) {
@ -29,27 +38,7 @@ fn describe_result(result: &CassResult) {
}
}
fn connect(contact_points: &str) -> Result<Session> {
set_level(LogLevel::DISABLED);
let mut cluster = Cluster::default();
cluster.set_contact_points(contact_points).unwrap();
cluster.set_load_balance_round_robin();
cluster.set_protocol_version(4)?;
cluster.connect()
}
fn fetch_metric(session: &Session, metric_name: &str) -> Result<Metric> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.bind(0, metric_name)?;
let result = session.execute(&query).wait()?;
Ok(result.first_row().unwrap().into())
}
fn metric_info(session: &Session, metric_name: &str) -> Result<()> {
pub fn metric_info(session: &Session, metric_name: &str) -> Result<(), Error> {
let metric = fetch_metric(session, metric_name)?;
println!("{}", metric);
@ -57,46 +46,14 @@ fn metric_info(session: &Session, metric_name: &str) -> Result<()> {
Ok(())
}
fn fetch_points(session_points: &Session, m: &Metric, s: &Stage, time_start: i64, time_end: i64) -> Result<()> {
let table_name = s.table_name();
let q = format!(
"SELECT time_start_ms, offset, value FROM biggraphite.{} WHERE metric = ? AND time_start_ms = ? AND offset >= ? AND offset < ? ORDER BY offset",
table_name
);
let ranges = TimeRange::new(&s, time_start, time_end).ranges();
// XXX concurrent
for range in ranges.iter() {
let mut query = stmt!(q.as_str());
query.bind(0, Uuid::from_str(m.id().as_str())?)?;
query.bind(1, range.0)?;
query.bind(2, range.1 as i16)?;
query.bind(3, range.2 as i16)?;
let result = session_points.execute(&query).wait()?;
for row in result.iter() {
let ts : i64 = row.get_column_by_name("time_start_ms".to_string())?.get_i64()?;
let offset : i16 = row.get_column_by_name("offset".to_string())?.get_i16()?;
let value : f64 = row.get_column_by_name("value".to_string())?.get_f64()?;
let ts = ts / 1000;
let offset : i64 = offset as i64 * s.precision_as_seconds();
println!("{:?};{:?}", ts + offset, value);
}
}
Ok(())
}
fn prepare_component_query(table_name: &str, arguments: &Vec<&str>) -> Result<Statement> {
fn prepare_component_query(table_name: &str, arguments: &Vec<&str>) -> Result<Statement, Error> {
let mut q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name);
let mut component_number = 0;
let mut components = vec![];
for (id, component) in arguments.iter().enumerate() {
let mut operator = "=";
if *component == "*" {
component_number += 1;
continue;
@ -106,9 +63,13 @@ fn prepare_component_query(table_name: &str, arguments: &Vec<&str>) -> Result<St
q.push_str("AND ");
}
q.push_str(format!("component_{} = ? ", id).as_str());
if component.ends_with("*") {
operator = "LIKE";
}
q.push_str(format!("component_{} {} ? ", id, operator).as_str());
component_number += 1;
components.push(component);
components.push(component.replace("*", "%"));
}
if component_number != 0 {
@ -117,39 +78,48 @@ fn prepare_component_query(table_name: &str, arguments: &Vec<&str>) -> Result<St
// Adding last component for __END__.
q.push_str(format!("component_{} = ? ALLOW FILTERING;", component_number).as_str());
components.push(&"__END__");
components.push("__END__".to_string());
let mut query = stmt!(q.as_str());
for (id, &arg) in components.iter().enumerate() {
query.bind(id, *arg)?;
for (id, arg) in components.iter().enumerate() {
query.bind(id, arg.as_str())?;
}
Ok(query)
}
fn metric_list(session_metadata: &Session, glob: &str) -> Result<()> {
fn metric_list(session_metadata: &Session, glob: &str) -> Result<(), Error> {
let components = glob.split(".").collect::<Vec<&str>>();
let query_directories = prepare_component_query("directories", &components)?;
let mut query_directories = prepare_component_query("directories", &components)?;
query_directories.set_consistency(Consistency::QUORUM)?;
let result = session_metadata.execute(&query_directories).wait()?;
for row in result.iter() {
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
println!("d {}", name);
}
let query = prepare_component_query("metrics", &components)?;
let mut query = prepare_component_query("metrics", &components)?;
query.set_consistency(Consistency::QUORUM)?;
let result = session_metadata.execute(&query).wait()?;
for row in result.iter() {
let name = row.get_column_by_name("name".to_string()).unwrap().to_string();
let metric = fetch_metric(session_metadata, &name)?;
let names = result
.iter()
.map(|x| {
x.get_column_by_name("name".to_string()).unwrap().to_string()
})
.collect::<Vec<String>>();
let metrics = fetch_metrics(session_metadata, &names)?;
for metric in metrics {
println!("m {}", metric);
}
Ok(())
}
fn main() -> Result<()> {
fn main() -> Result<(), Box<dyn error::Error>> {
let matches = App::new("bgutil-rs")
.setting(AppSettings::SubcommandRequired)
.subcommand(SubCommand::with_name("info")
@ -157,8 +127,7 @@ fn main() -> Result<()> {
.arg(Arg::with_name("metric")
.help("metric to retrieve info about")
.index(1)
.required(true)
))
.required(true)))
.subcommand(SubCommand::with_name("read")
.about("Read a metric contents")
.arg(Arg::with_name("stage")
@ -173,14 +142,34 @@ fn main() -> Result<()> {
.arg(Arg::with_name("metric")
.help("metric to get values")
.index(1)
.required(true)
))
.required(true)))
.subcommand(SubCommand::with_name("list")
.about("List metrics with given pattern")
.arg(Arg::with_name("glob")
.index(1)
.required(true)
))
.required(true)))
.subcommand(SubCommand::with_name("write")
.about("Write a metric and its value")
.arg(Arg::with_name("metric")
.index(1)
.required(true))
.arg(Arg::with_name("value")
.index(2)
.required(true))
.arg(Arg::with_name("timestamp")
.short("t")
.long("timestamp")
.takes_value(true))
.arg(Arg::with_name("retention")
.long("retention")
.takes_value(true)))
.subcommand(SubCommand::with_name("delete")
.about("Delete metric(s)")
.arg(Arg::with_name("recursive")
.long("recursive"))
.arg(Arg::with_name("metric")
.index(1)
.required(true)))
.get_matches();
let contact_points_metadata = "tag--cstars07--cassandra-cstars07.query.consul.preprod.crto.in";
@ -212,7 +201,7 @@ fn main() -> Result<()> {
let stage = matches.value_of("stage").unwrap_or("11520*60s");
// XXX: Change default value relative to stage's precision to have more or less data
let time_start = matches.value_of("time-start"); // default now - 1h
let time_end= matches.value_of("time-end"); // default: now
let time_end = matches.value_of("time-end"); // default: now
let time_start = match time_start {
None => Utc::now().timestamp() - 3600,
@ -252,6 +241,36 @@ fn main() -> Result<()> {
Some("list") => {
let matches = matches.subcommand_matches("list").unwrap();
metric_list(&session_metadata, matches.value_of("glob").unwrap())?;
},
Some("write") => {
let matches = matches.subcommand_matches("write").unwrap();
let metric = matches.value_of("metric").unwrap();
let value = matches.value_of("value").unwrap().parse::<f64>()?;
let retention = matches.value_of("retention").unwrap_or("11520*60s");
let timestamp = match matches.value_of("timestamp") {
None => Utc::now().timestamp(),
Some(s) => match s.parse::<i64>() {
Ok(n) => n,
Err(_) => {
eprintln!("Could not parse {}", s);
return Ok(())
}
}
};
metric_write(&session_metadata, &session_points, metric, value, retention, timestamp)?;
},
Some("delete") => {
let matches = matches.subcommand_matches("delete").unwrap();
let metric = matches.value_of("metric").unwrap();
if matches.is_present("recursive") {
unimplemented!();
}
metric_delete(&session_metadata, &metric)?;
}
None => {
eprintln!("No command was used.");
@ -262,3 +281,4 @@ fn main() -> Result<()> {
Ok(())
}

View File

@ -3,7 +3,7 @@
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use crate::stage::Stage;
use crate::Stage;
use std::collections::HashMap;
use std::fmt;

View File

@ -5,6 +5,7 @@
*/
use std::fmt;
use std::convert::TryFrom;
use std::string::String;
use regex::Regex;
@ -129,53 +130,3 @@ impl fmt::Display for Stage {
write!(f, "{}", self.table_name())
}
}
pub struct TimeRange {
stage: Stage,
time_start: i64,
time_end: i64
}
impl TimeRange {
pub fn new(stage: &Stage, time_start: i64, time_end: i64) -> Self {
TimeRange {
stage: stage.clone(),
time_start: time_start,
time_end: time_end,
}
}
pub fn ranges(&self) -> Vec<(i64, i64, i64)> {
let first_offset = self.stage.time_offset_ms(self.time_start);
let last_offset = self.stage.time_offset_ms(self.time_end);
let mut offset = first_offset.0;
let mut offset_start = first_offset.1;
let mut out = vec![];
while offset != last_offset.0 {
out.push((offset, offset_start, self.stage.table_row_size_ms()));
offset_start = 0;
offset += self.stage.table_row_size_ms();
}
out.push((offset, offset_start, last_offset.1));
out
}
}
impl fmt::Display for TimeRange {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{} ({} -> {})", self.stage, self.time_start, self.time_end)
}
}
#[test]
fn timerange() {
let stage = Stage::try_from("11520*60s").unwrap();
assert_eq!(vec![(0, 2, 4)], TimeRange::new(&stage, 120, 240).ranges());
assert_eq!(vec![(0, 2, 11520),(691200000, 0, 4)], TimeRange::new(&stage, 120, 691200 + 240).ranges());
}

51
src/timerange.rs Normal file
View File

@ -0,0 +1,51 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use std::fmt;
use crate::stage::Stage;
pub struct TimeRange {
stage: Stage,
time_start: i64,
time_end: i64
}
impl TimeRange {
pub fn new(stage: &Stage, time_start: i64, time_end: i64) -> Self {
TimeRange {
stage: stage.clone(),
time_start: time_start,
time_end: time_end,
}
}
pub fn ranges(&self) -> Vec<(i64, i64, i64)> {
let first_offset = self.stage.time_offset_ms(self.time_start);
let last_offset = self.stage.time_offset_ms(self.time_end);
let mut offset = first_offset.0;
let mut offset_start = first_offset.1;
let mut out = vec![];
while offset != last_offset.0 {
out.push((offset, offset_start, self.stage.table_row_size_ms()));
offset_start = 0;
offset += self.stage.table_row_size_ms();
}
out.push((offset, offset_start, last_offset.1));
out
}
}
impl fmt::Display for TimeRange {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{} ({} -> {})", self.stage, self.time_start, self.time_end)
}
}