first commit

This commit is contained in:
2021-02-20 17:25:07 +01:00
commit df462caa1b
7 changed files with 921 additions and 0 deletions

196
src/main.rs Normal file
View File

@ -0,0 +1,196 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use std::str::FromStr;
use std::convert::TryFrom;
use cassandra_cpp::*;
use chrono::Utc;
use clap::{App,AppSettings,Arg,SubCommand};
mod metric;
mod stage;
use crate::metric::*;
use crate::stage::*;
#[allow(dead_code)]
fn describe_result(result: &CassResult) {
println!("Result has {} record(s).", result.row_count());
println!("Schema is:");
for column_id in 0..result.column_count() {
println!("{:?}: {:?}",
result.column_type(column_id as usize),
result.column_name(column_id as usize)
);
}
}
fn connect(contact_points: &str) -> Result<Session> {
set_level(LogLevel::DISABLED);
let mut cluster = Cluster::default();
cluster.set_contact_points(contact_points).unwrap();
cluster.set_load_balance_round_robin();
cluster.set_protocol_version(4)?;
cluster.connect()
}
fn fetch_metric(session: &Session, metric_name: &str) -> Result<Metric> {
let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?");
query.bind(0, metric_name)?;
let result = session.execute(&query).wait()?;
Ok(result.first_row().unwrap().into())
}
fn metric_info(session: &Session, metric_name: &str) -> Result<()> {
let metric = fetch_metric(session, metric_name)?;
println!("{}", metric);
Ok(())
}
fn fetch_points(session_points: &Session, m: &Metric, s: &Stage, time_start: i64, time_end: i64) -> Result<()> {
let table_name = s.table_name();
let q = format!(
"SELECT time_start_ms, offset, value FROM biggraphite.{} WHERE metric = ? AND time_start_ms = ? AND offset >= ? AND offset < ? ORDER BY offset",
table_name
);
let ranges = TimeRange::new(&s, time_start, time_end).ranges();
// XXX concurrent
for range in ranges.iter() {
let mut query = stmt!(q.as_str());
query.bind(0, Uuid::from_str(m.id().as_str())?)?;
query.bind(1, range.0)?;
query.bind(2, range.1 as i16)?;
query.bind(3, range.2 as i16)?;
let result = session_points.execute(&query).wait()?;
for row in result.iter() {
let ts : i64 = row.get_column_by_name("time_start_ms".to_string())?.get_i64()?;
let offset : i16 = row.get_column_by_name("offset".to_string())?.get_i16()?;
let value : f64 = row.get_column_by_name("value".to_string())?.get_f64()?;
let ts = ts / 1000;
let offset : i64 = offset as i64 * s.precision_as_seconds();
println!("{:?};{:?}", ts + offset, value);
}
}
Ok(())
}
fn main() -> Result<()> {
let matches = App::new("bgutil-rs")
.setting(AppSettings::SubcommandRequired)
.subcommand(SubCommand::with_name("info")
.about("Information about a metric")
.arg(Arg::with_name("metric")
.help("metric to retrieve info about")
.index(1)
.required(true)
))
.subcommand(SubCommand::with_name("read")
.about("Read a metric contents")
.arg(Arg::with_name("stage")
.long("stage")
.takes_value(true))
.arg(Arg::with_name("time-start")
.long("time-start")
.takes_value(true))
.arg(Arg::with_name("time-end")
.long("time-end")
.takes_value(true))
.arg(Arg::with_name("metric")
.help("metric to get values")
.index(1)
.required(true)
))
.get_matches();
let contact_points_metadata = "tag--cstars07--cassandra-cstars07.query.consul.preprod.crto.in";
let contact_points_data = "tag--cstars04--cassandra-cstars04.query.consul.preprod.crto.in";
let session_metadata = match connect(contact_points_metadata) {
Ok(session) => session,
Err(err) => {
eprintln!("{:?}", err);
return Ok(());
}
};
let session_points = match connect(contact_points_data) {
Ok(session) => session,
Err(err) => {
eprintln!("{:?}", err);
return Ok(());
}
};
match matches.subcommand_name() {
Some("info") => {
let matches = matches.subcommand_matches("info").unwrap();
metric_info(&session_metadata, matches.value_of("metric").unwrap())?;
},
Some("read") => {
let matches = matches.subcommand_matches("read").unwrap();
let stage = matches.value_of("stage").unwrap_or("11520*60s");
// XXX: Change default value relative to stage's precision to have more or less data
let time_start = matches.value_of("time-start"); // default now - 1h
let time_end= matches.value_of("time-end"); // default: now
let time_start = match time_start {
None => Utc::now().timestamp() - 3600,
Some(s) => match s.parse::<i64>() {
Ok(n) => n,
Err(_) => {
eprintln!("Could not parse {}", s);
return Ok(())
}
}
};
let time_end = match time_end {
None => time_start + 3600,
Some(s) => match s.parse::<i64>() {
Ok(n) => n,
Err(_) => {
eprintln!("Could not parse {}", s);
return Ok(())
}
}
};
let metric_name = matches.value_of("metric").unwrap();
let metric = fetch_metric(&session_metadata, metric_name)?;
let available_stages = metric.stages()?;
let stage = Stage::try_from(stage)?;
if !available_stages.iter().any(|x| *x == stage) {
eprintln!("Could not find any stage matching {}", stage);
return Ok(());
}
fetch_points(&session_points, &metric, &stage, time_start, time_end)?;
}
None => {
eprintln!("No command was used.");
return Ok(());
},
_ => {}
}
Ok(())
}

103
src/metric.rs Normal file
View File

@ -0,0 +1,103 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use crate::stage::Stage;
use std::collections::HashMap;
use std::fmt;
use std::convert::TryFrom;
use cassandra_cpp::Row;
#[derive(Debug)]
pub struct Metric {
id: String,
name: String,
config: HashMap<String, String>,
created_on: u64,
updated_on: u64
}
impl Metric {
pub fn id(self: &Self) -> &String {
&self.id
}
pub fn config(self: &Self, name: String) -> Result<String, String> {
let res = self.config.get(&name);
if let Some(v) = res {
Ok(v.to_string())
} else {
Err("Invalid key".to_string())
}
}
pub fn stages(self: &Self) -> Result<Vec<Stage>, String> {
let mut out = vec![];
let stages = self.config("retention".to_string());
if let Err(err) = stages {
return Err(err);
}
for stage in stages.unwrap().split(":") {
match Stage::try_from(stage) {
Ok(stage) => out.push(stage),
Err(err) => {
return Err(err.to_string())
}
};
}
Ok(out)
}
}
impl fmt::Display for Metric {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{} created_on:{} updated_on:{}\n{:?}",
self.name,
self.created_on,
self.updated_on,
self.config,
)
}
}
impl From<String> for Metric {
fn from(name: String) -> Self {
Metric {
id: String::from(""),
name: name,
config: HashMap::new(),
created_on: 0,
updated_on: 0
}
}
}
impl From<Row> for Metric {
fn from(row: Row) -> Self {
let config_collection = row.get_column_by_name("config".to_string()).unwrap().get_map().unwrap();
let mut config : HashMap<String, String> = HashMap::new();
config_collection
.map(|(k, v)| config.insert(k.to_string(), v.to_string()))
.count();
let created_on = row.get_column_by_name("created_on".to_string()).unwrap();
let created_on_timestamp = created_on.get_uuid().unwrap().timestamp();
let updated_on = row.get_column_by_name("updated_on".to_string()).unwrap();
let updated_on_timestamp = updated_on.get_uuid().unwrap().timestamp();
Self {
id: row.get_column_by_name("id".to_string()).unwrap().get_uuid().unwrap().to_string(),
name: row.get_column_by_name("name".to_string()).unwrap().to_string(),
config: config,
created_on: created_on_timestamp,
updated_on: updated_on_timestamp
}
}
}

181
src/stage.rs Normal file
View File

@ -0,0 +1,181 @@
/*
* bgutil-rs
*
* Author: Patrick MARIE <pm@mkz.me>
*/
use std::fmt;
use std::convert::TryFrom;
use regex::Regex;
#[derive(Debug)]
pub struct Stage {
stage: String,
points: u32,
precision: u32,
factor: char,
}
impl TryFrom<&str> for Stage {
type Error = &'static str;
fn try_from(stage: &str) -> Result<Self, Self::Error> {
let re = Regex::new(r"^(\d+)\*(\d+)(.)");
if let Err(_) = re {
return Err("regex initialisation failed");
}
let captures = match re.unwrap().captures(&stage) {
None => return Err("invalid regex capture"),
Some(c) => c,
};
let points = captures.get(1).unwrap().as_str().parse::<u32>().unwrap();
let factor = captures.get(3).unwrap().as_str();
if factor.len() != 1 {
return Err("invalid factor length")
}
let factor = factor.chars().nth(0).unwrap();
match factor {
's' | 'm' | 'h' | 'd' | 'w' | 'y' => {},
_ => {
return Err("invalid precision unit")
}
};
let precision = captures.get(2).unwrap().as_str().parse::<u32>().unwrap();
Ok(Stage {
stage: String::from(stage),
points: points,
precision: precision,
factor: factor,
})
}
}
impl Stage {
pub fn precision_as_seconds(self: &Self) -> i64 {
let factor = match self.factor {
's' => 1,
'm' => 60,
'h' => 60 * 60,
'd' => 60 * 60 * 24,
'w' => 60 * 60 * 24 * 7,
'y' => 60 * 60 * 24 * 365,
_ => unreachable!()
};
factor * self.precision as i64
}
pub fn time_offset_ms(self: &Self, ts: i64) -> (i64, i64) {
let table_row_size_ms = self.table_row_size_ms();
let time_offset_ms = ts * 1000 % table_row_size_ms;
let time_start_ms = ts * 1000 - time_offset_ms;
(
time_start_ms,
time_offset_ms / (self.precision_as_seconds() * 1000)
)
}
pub fn table_name(self: &Self) -> String {
// XXX aggregations?
format!("datapoints_{}p_{}{}_0", self.points, self.precision, self.factor)
}
pub fn table_row_size_ms(self: &Self) -> i64 {
let hour = 3600;
let _max_partition_size = 25000;
let _expected_points_per_read = 2000;
let _min_partition_size_ms = 6 * hour;
std::cmp::min(
self.precision_as_seconds() * 1000 * _max_partition_size,
std::cmp::max(
self.precision_as_seconds() * 1000 * _expected_points_per_read,
_min_partition_size_ms,
)
)
}
}
impl Clone for Stage {
fn clone(&self) -> Stage {
Stage {
stage: self.stage.to_string(),
..*self
}
}
}
impl PartialEq for Stage {
fn eq(&self, other: &Self) -> bool {
self.points == other.points
&& self.precision == other.precision
&& self.factor == other.factor
}
}
impl Eq for Stage {}
impl fmt::Display for Stage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.table_name())
}
}
pub struct TimeRange {
stage: Stage,
time_start: i64,
time_end: i64
}
impl TimeRange {
pub fn new(stage: &Stage, time_start: i64, time_end: i64) -> Self {
TimeRange {
stage: stage.clone(),
time_start: time_start,
time_end: time_end,
}
}
pub fn ranges(&self) -> Vec<(i64, i64, i64)> {
let first_offset = self.stage.time_offset_ms(self.time_start);
let last_offset = self.stage.time_offset_ms(self.time_end);
let mut offset = first_offset.0;
let mut offset_start = first_offset.1;
let mut out = vec![];
while offset != last_offset.0 {
out.push((offset, offset_start, self.stage.table_row_size_ms()));
offset_start = 0;
offset += self.stage.table_row_size_ms();
}
out.push((offset, offset_start, last_offset.1));
out
}
}
impl fmt::Display for TimeRange {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{} ({} -> {})", self.stage, self.time_start, self.time_end)
}
}
#[test]
fn timerange() {
let stage = Stage::try_from("11520*60s").unwrap();
assert_eq!(vec![(0, 2, 4)], TimeRange::new(&stage, 120, 240).ranges());
assert_eq!(vec![(0, 2, 11520),(691200000, 0, 4)], TimeRange::new(&stage, 120, 691200 + 240).ranges());
}