From de38b241cae07f27f19a136fa32b6a06a5908ba4 Mon Sep 17 00:00:00 2001 From: Patrick MARIE Date: Sun, 21 Feb 2021 22:04:23 +0100 Subject: [PATCH] Adding write & delete commands. --- Cargo.lock | 23 ++++- Cargo.toml | 1 + README.md | 130 ++++++++++++++++++++++++- src/cassandra.rs | 245 +++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 174 ++++++++++++++++++--------------- src/metric.rs | 2 +- src/stage.rs | 51 +--------- src/timerange.rs | 51 ++++++++++ 8 files changed, 545 insertions(+), 132 deletions(-) create mode 100644 src/cassandra.rs create mode 100644 src/timerange.rs diff --git a/Cargo.lock b/Cargo.lock index d7568da..c8fc874 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -72,6 +72,7 @@ dependencies = [ "chrono", "clap", "regex", + "uuid 0.8.2", ] [[package]] @@ -97,7 +98,7 @@ dependencies = [ "error-chain", "slog", "time", - "uuid", + "uuid 0.5.1", ] [[package]] @@ -170,6 +171,17 @@ version = "0.3.55" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2" +[[package]] +name = "getrandom" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9495705279e7140bf035dde1f6e750c162df8b625267cd52cc44e0b156732c8" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "gimli" version = "0.23.0" @@ -333,6 +345,15 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bcc7e3b898aa6f6c08e5295b6c89258d1331e9ac578cc992fb818759951bdc22" +[[package]] +name = "uuid" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" +dependencies = [ + "getrandom", +] + [[package]] name = "vec_map" version = "0.8.2" diff --git a/Cargo.toml b/Cargo.toml index e47d6c7..c680d31 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,3 +11,4 @@ cassandra-cpp = "0.15.1" chrono = "0.4" clap = "2.33.3" regex = "1.4.3" +uuid = { version = "0.8.2", features = ["v4"] } diff --git a/README.md b/README.md index 12ee140..1bf1772 100644 --- a/README.md +++ b/README.md @@ -97,13 +97,137 @@ Example: $ cargo run -- list observability.*.up d observability.testaroo.up m observability.testaroo.up {"retention": "11520*60s:720*3600s:730*86400s", "aggregator": "average", "carbon_xfilesfactor": "0.500000"} + +$ time cargo run -- list observability.test*.go* +d observability.testaroo.go_memstats_next_gc_bytes +d observability.testaroo.go_memstats_mallocs_total +... +m observability.testaroo.go_memstats_next_gc_bytes {"aggregator": "average", "carbon_xfilesfactor": "0.500000", "retention": "11520*60s:720*3600s:730*86400s"} +m observability.testaroo.go_memstats_mallocs_total {"aggregator": "average", "retention": "11520*60s:720*3600s:730*86400s", "carbon_xfilesfactor": "0.500000"} +... ``` ## Todo * command: read - - async - - human timestamps + - human timestamps (but unix timestamps are ok) * command: list - - Enhance pattern matching (with '{}', 'xxx*' or '*xxx'...) + - Missing pattern matching like {}, ** +* command: write + - Arguments handling +* command: delete + - with recursive + +``` +usage: bgutil delete [--help] [-r] [--dry-run] path + +positional arguments: + path One metric or subdirectory name + +optional arguments: + --help show this help message and exit + -r, --recursive Delete points for all metrics as a subtree + --dry-run Only show commands to create/upgrade the schema. +``` + +* command: copy + +``` +usage: bgutil copy [--help] [-r] [--time-start TIME_START] + [--time-end TIME_END] [--dry-run] + [--src_retention SRC_RETENTION] + [--dst_retention DST_RETENTION] + src dst + +positional arguments: + src One source metric or subdirectory name + dst One destination metric or subdirectory name + +optional arguments: + --help show this help message and exit + -r, --recursive Copy points for all metrics as a subtree + --time-start TIME_START + Copy points written later than this time. + --time-end TIME_END Copy points written earlier than this time. + --dry-run Only show commands to create/upgrade the schema. + --src_retention SRC_RETENTION + Retention used to read points from the source metrics. + --dst_retention DST_RETENTION + Retention used to write points to the destination + metrics. It only works if retentions are similar, i.e. + with same precisions. +``` + * command: clean + +``` +usage: bgutil clean [--help] [--clean-cache] [--clean-backend] + [--clean-corrupted] [--quiet] [--max-age MAX_AGE] + [--start-key START_KEY] [--end-key END_KEY] + [--shard SHARD] [--nshards NSHARDS] + [--disable-clean-directories] [--disable-clean-metrics] + +optional arguments: + --help show this help message and exit + --clean-cache clean cache + --clean-backend clean backend + --clean-corrupted clean corrupted metrics + --quiet Show no output unless there are problems. + --max-age MAX_AGE Specify the age of metrics in seconds to evict (ie: + 3600 to delete older than one hour metrics) + --start-key START_KEY + Start key. + --end-key END_KEY End key. + --shard SHARD Shard number. + --nshards NSHARDS Number of shards. + --disable-clean-directories + Disable cleaning directories + --disable-clean-metrics + Disable cleaning outdated metrics + +``` + +* command: repair + +``` +usage: bgutil repair [--help] [--start-key START_KEY] [--end-key END_KEY] + [--shard SHARD] [--nshards NSHARDS] [--quiet] + +optional arguments: + --help show this help message and exit + --start-key START_KEY + Start key. + --end-key END_KEY End key. + --shard SHARD Shard number. + --nshards NSHARDS Number of shards. + --quiet Show no output unless there are problems. +``` + +* command: write + +``` +usage: bgutil write [--help] [-t TIMESTAMP] [-c COUNT] + [--aggregator AGGREGATOR] [--retention RETENTION] + [--x-files-factor X_FILES_FACTOR] + metric value + +positional arguments: + metric Name of the metric to update. + value Value to write at the select time. + +optional arguments: + --help show this help message and exit + -t TIMESTAMP, --timestamp TIMESTAMP + Timestamp at which to write the new point. + -c COUNT, --count COUNT + Count associated with the value to be written. + --aggregator AGGREGATOR + Aggregator function for the metric (average, last, + max, min, sum). + --retention RETENTION + Retention configuration for the metric. + --x-files-factor X_FILES_FACTOR + Science fiction coefficient. +``` + +* command: test \ No newline at end of file diff --git a/src/cassandra.rs b/src/cassandra.rs new file mode 100644 index 0000000..38bbaa1 --- /dev/null +++ b/src/cassandra.rs @@ -0,0 +1,245 @@ +/* + * bgutil-rs + * + * Author: Patrick MARIE + */ +use crate::*; + +use cassandra_cpp::{BindRustType,Cluster,Error,LogLevel,Session}; +use cassandra_cpp::{set_level,stmt}; + +pub fn connect(contact_points: &str) -> Result { + set_level(LogLevel::DISABLED); + + let mut cluster = Cluster::default(); + cluster.set_contact_points(contact_points).unwrap(); + cluster.set_load_balance_round_robin(); + + cluster.set_protocol_version(4)?; + + cluster.connect() +} + +pub fn fetch_metric(session: &Session, metric_name: &str) -> Result { + let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?"); + query.bind(0, metric_name)?; + + // XXX set consistency + // query.set_consistency(Consistency::QUORUM); + + let result = session.execute(&query).wait()?; + Ok(result.first_row().unwrap().into()) +} + +pub fn fetch_points(session_points: &Session, m: &Metric, s: &Stage, time_start: i64, time_end: i64) -> Result<(), Error> { + let table_name = s.table_name(); + + let q = format!( + "SELECT time_start_ms, offset, value FROM biggraphite.{} WHERE metric = ? AND time_start_ms = ? AND offset >= ? AND offset < ? ORDER BY offset", + table_name + ); + + let ranges = TimeRange::new(&s, time_start, time_end).ranges(); + // XXX concurrent + for range in ranges.iter() { + let mut query = stmt!(q.as_str()); + query.bind(0, CassUuid::from_str(m.id().as_str())?)?; + query.bind(1, range.0)?; + query.bind(2, range.1 as i16)?; + query.bind(3, range.2 as i16)?; + + let result = session_points.execute(&query).wait()?; + + for row in result.iter() { + let ts : i64 = row.get_column_by_name("time_start_ms".to_string())?.get_i64()?; + let offset : i16 = row.get_column_by_name("offset".to_string())?.get_i16()?; + let value : f64 = row.get_column_by_name("value".to_string())?.get_f64()?; + + let ts = ts / 1000; + let offset : i64 = offset as i64 * s.precision_as_seconds(); + + println!("{:?};{:?}", ts + offset, value); + } + } + + Ok(()) +} + +/// async fetch multiple metrics +pub fn fetch_metrics(session: &Session, metric_names: &Vec) -> Result, Error> { + let mut results = vec![]; + let mut out = vec![]; + + for metric_name in metric_names.iter() { + let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?"); + query.bind(0, metric_name.as_str())?; + query.set_consistency(Consistency::QUORUM)?; + + let result = session.execute(&query); + results.push(result); + } + + for result in results { + let result = result.wait()?; + + if result.row_count() < 1 { + continue; + } + + out.push(result.first_row().unwrap().into()) + } + + Ok(out) +} + +pub fn create_metric(session_metadata: &Session, metric: &str) -> Result<(), Error> { + let mut batch = Batch::new(BatchType::LOGGED); + + let metrics_parts = metric.split(".").collect::>(); + + for d in 0..metrics_parts.len() { + let mut fields = vec![String::from("name"), String::from("parent")]; + let mut values = vec![]; + let n = metrics_parts.len() - d; + + let path = &metrics_parts[0..n].join("."); + let parent_path = &mut metrics_parts[0..n-1].join("."); + parent_path.push('.'); + + values.push(String::from(path)); + values.push(parent_path.to_string()); + + for id in 0..=n { + let field = format!("component_{}", id); + fields.push(field); + if id != n { + values.push(String::from(metrics_parts[id])); + } else { + values.push(String::from("__END__")); + } + } + + let query = format!("INSERT INTO biggraphite_metadata.{}({}) VALUES ({});", + String::from("directories"), + fields.join(", "), + fields.iter().map(|_| String::from("?")).collect::>().join(", ") + ); + + // before anything, create the "metrics" record. + if d == 0 { + let query_metrics = format!("INSERT INTO biggraphite_metadata.{}({}) VALUES ({});", + String::from("metrics"), + fields.join(", "), + fields.iter().map(|_| String::from("?")).collect::>().join(", ") + ); + + let mut query_metrics = stmt!(query_metrics.as_str()); + + for (id, arg) in values.iter().enumerate() { + query_metrics.bind(id, arg.as_str())?; + } + + batch.add_statement(&query_metrics)?; + } + + let mut query = stmt!(query.as_str()); + + for (id, arg) in values.iter().enumerate() { + query.bind(id, arg.as_str())?; + } + + batch.add_statement(&query)?; + } + + let query = format!( + "INSERT INTO biggraphite_metadata.metrics_metadata(name, config, id, created_on, updated_on) VALUES (?, ?, ?, now(), now())" + ); + + let uuid = Uuid::new_v4(); + + let mut config = Map::new(0); + config.append_string("aggregator")?; + config.append_string("average")?; + + config.append_string("carbon_xfilesfactor")?; + config.append_string("0.500000")?; + + config.append_string("retention")?; + config.append_string("11520*60s:720*3600s:730*86400s")?; + + let mut query = stmt!(&query); + query.bind(0, metric)?; // name + query.bind(1, config)?; // config + query.bind(2, CassUuid::from_str(&uuid.to_hyphenated().to_string())?)?; + + query.set_consistency(Consistency::LOCAL_QUORUM)?; + + session_metadata.execute(&query).wait()?; + + // Write directories + session_metadata.execute_batch(batch).wait()?; + + println!("Metric was written."); + + Ok(()) +} + +pub fn metric_delete(session_metadata: &Session, metric_name: &str) -> Result<(), Error> { + let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?"); + query.bind(0, metric_name)?; + + let result = session_metadata.execute(&query).wait()?; + if result.row_count() == 0 { + println!("Metric is not existing"); + return Ok(()); + } + + let _metric = fetch_metric(&session_metadata, metric_name)?; + + let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;"); + query.bind(0, metric_name)?; + query.set_consistency(Consistency::LOCAL_QUORUM)?; + session_metadata.execute(&query).wait()?; + + let mut query = stmt!("DELETE FROM biggraphite_metadata.metrics_metadata WHERE name = ?;"); + query.bind(0, metric_name)?; + query.set_consistency(Consistency::LOCAL_QUORUM)?; + session_metadata.execute(&query).wait()?; + + let mut query = stmt!("DELETE FROM biggraphite_metadata.directories WHERE name = ?;"); + query.bind(0, metric_name)?; + query.set_consistency(Consistency::LOCAL_QUORUM)?; + session_metadata.execute(&query).wait()?; + + Ok(()) +} + +pub fn metric_write(session_metadata: &Session, session_points: &Session, metric_name: &str, value: f64, retention: &str, timestamp: i64) -> Result<(), Error> { + let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?"); + query.bind(0, metric_name)?; + + let result = session_metadata.execute(&query).wait()?; + if result.row_count() == 0 { + create_metric(session_metadata, metric_name)?; + } + + let stage = Stage::try_from(retention)?; + + let metric = fetch_metric(&session_metadata, metric_name)?; + let (time_start_ms, offset) = stage.time_offset_ms(timestamp); + + let query = format!( + "INSERT INTO biggraphite.{} (metric, time_start_ms, offset, value) VALUES (?, ?, ?, ?);", + stage.table_name() + ); + + let mut query = stmt!(&query); + query.bind(0, CassUuid::from_str(metric.id().as_str())?)?; + query.bind(1, time_start_ms)?; + query.bind(2, offset as i16)?; + query.bind(3, value)?; + + session_points.execute(&query).wait()?; + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 7bbe91d..6130fa1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,16 +5,25 @@ */ use std::str::FromStr; use std::convert::TryFrom; +use std::error; -use cassandra_cpp::*; +use cassandra_cpp::{Batch,BatchType,BindRustType,CassCollection,CassResult,Consistency,Error,Map,Session,Statement}; +use cassandra_cpp::Uuid as CassUuid; +use cassandra_cpp::{stmt}; use chrono::Utc; use clap::{App,AppSettings,Arg,SubCommand}; +use uuid::Uuid; + +mod cassandra; mod metric; mod stage; +mod timerange; + +use crate::cassandra::*; use crate::metric::*; use crate::stage::*; - +use crate::timerange::*; #[allow(dead_code)] fn describe_result(result: &CassResult) { @@ -29,27 +38,7 @@ fn describe_result(result: &CassResult) { } } -fn connect(contact_points: &str) -> Result { - set_level(LogLevel::DISABLED); - - let mut cluster = Cluster::default(); - cluster.set_contact_points(contact_points).unwrap(); - cluster.set_load_balance_round_robin(); - - cluster.set_protocol_version(4)?; - - cluster.connect() -} - -fn fetch_metric(session: &Session, metric_name: &str) -> Result { - let mut query = stmt!("SELECT * FROM biggraphite_metadata.metrics_metadata WHERE name = ?"); - query.bind(0, metric_name)?; - - let result = session.execute(&query).wait()?; - Ok(result.first_row().unwrap().into()) -} - -fn metric_info(session: &Session, metric_name: &str) -> Result<()> { +pub fn metric_info(session: &Session, metric_name: &str) -> Result<(), Error> { let metric = fetch_metric(session, metric_name)?; println!("{}", metric); @@ -57,46 +46,14 @@ fn metric_info(session: &Session, metric_name: &str) -> Result<()> { Ok(()) } -fn fetch_points(session_points: &Session, m: &Metric, s: &Stage, time_start: i64, time_end: i64) -> Result<()> { - let table_name = s.table_name(); - - let q = format!( - "SELECT time_start_ms, offset, value FROM biggraphite.{} WHERE metric = ? AND time_start_ms = ? AND offset >= ? AND offset < ? ORDER BY offset", - table_name - ); - - let ranges = TimeRange::new(&s, time_start, time_end).ranges(); - // XXX concurrent - for range in ranges.iter() { - let mut query = stmt!(q.as_str()); - query.bind(0, Uuid::from_str(m.id().as_str())?)?; - query.bind(1, range.0)?; - query.bind(2, range.1 as i16)?; - query.bind(3, range.2 as i16)?; - - let result = session_points.execute(&query).wait()?; - - for row in result.iter() { - let ts : i64 = row.get_column_by_name("time_start_ms".to_string())?.get_i64()?; - let offset : i16 = row.get_column_by_name("offset".to_string())?.get_i16()?; - let value : f64 = row.get_column_by_name("value".to_string())?.get_f64()?; - - let ts = ts / 1000; - let offset : i64 = offset as i64 * s.precision_as_seconds(); - - println!("{:?};{:?}", ts + offset, value); - } - } - - Ok(()) -} - -fn prepare_component_query(table_name: &str, arguments: &Vec<&str>) -> Result { +fn prepare_component_query(table_name: &str, arguments: &Vec<&str>) -> Result { let mut q = format!("SELECT parent, name FROM biggraphite_metadata.{} WHERE ", table_name); let mut component_number = 0; let mut components = vec![]; for (id, component) in arguments.iter().enumerate() { + let mut operator = "="; + if *component == "*" { component_number += 1; continue; @@ -106,9 +63,13 @@ fn prepare_component_query(table_name: &str, arguments: &Vec<&str>) -> Result) -> Result Result<()> { +fn metric_list(session_metadata: &Session, glob: &str) -> Result<(), Error> { let components = glob.split(".").collect::>(); - let query_directories = prepare_component_query("directories", &components)?; + let mut query_directories = prepare_component_query("directories", &components)?; + query_directories.set_consistency(Consistency::QUORUM)?; let result = session_metadata.execute(&query_directories).wait()?; for row in result.iter() { let name = row.get_column_by_name("name".to_string()).unwrap().to_string(); println!("d {}", name); } - let query = prepare_component_query("metrics", &components)?; + let mut query = prepare_component_query("metrics", &components)?; + query.set_consistency(Consistency::QUORUM)?; let result = session_metadata.execute(&query).wait()?; - for row in result.iter() { - let name = row.get_column_by_name("name".to_string()).unwrap().to_string(); - let metric = fetch_metric(session_metadata, &name)?; + + let names = result + .iter() + .map(|x| { + x.get_column_by_name("name".to_string()).unwrap().to_string() + }) + .collect::>(); + + let metrics = fetch_metrics(session_metadata, &names)?; + for metric in metrics { println!("m {}", metric); } Ok(()) } -fn main() -> Result<()> { +fn main() -> Result<(), Box> { let matches = App::new("bgutil-rs") .setting(AppSettings::SubcommandRequired) .subcommand(SubCommand::with_name("info") @@ -157,8 +127,7 @@ fn main() -> Result<()> { .arg(Arg::with_name("metric") .help("metric to retrieve info about") .index(1) - .required(true) - )) + .required(true))) .subcommand(SubCommand::with_name("read") .about("Read a metric contents") .arg(Arg::with_name("stage") @@ -173,14 +142,34 @@ fn main() -> Result<()> { .arg(Arg::with_name("metric") .help("metric to get values") .index(1) - .required(true) - )) + .required(true))) .subcommand(SubCommand::with_name("list") .about("List metrics with given pattern") .arg(Arg::with_name("glob") .index(1) - .required(true) - )) + .required(true))) + .subcommand(SubCommand::with_name("write") + .about("Write a metric and its value") + .arg(Arg::with_name("metric") + .index(1) + .required(true)) + .arg(Arg::with_name("value") + .index(2) + .required(true)) + .arg(Arg::with_name("timestamp") + .short("t") + .long("timestamp") + .takes_value(true)) + .arg(Arg::with_name("retention") + .long("retention") + .takes_value(true))) + .subcommand(SubCommand::with_name("delete") + .about("Delete metric(s)") + .arg(Arg::with_name("recursive") + .long("recursive")) + .arg(Arg::with_name("metric") + .index(1) + .required(true))) .get_matches(); let contact_points_metadata = "tag--cstars07--cassandra-cstars07.query.consul.preprod.crto.in"; @@ -212,7 +201,7 @@ fn main() -> Result<()> { let stage = matches.value_of("stage").unwrap_or("11520*60s"); // XXX: Change default value relative to stage's precision to have more or less data let time_start = matches.value_of("time-start"); // default now - 1h - let time_end= matches.value_of("time-end"); // default: now + let time_end = matches.value_of("time-end"); // default: now let time_start = match time_start { None => Utc::now().timestamp() - 3600, @@ -252,6 +241,36 @@ fn main() -> Result<()> { Some("list") => { let matches = matches.subcommand_matches("list").unwrap(); metric_list(&session_metadata, matches.value_of("glob").unwrap())?; + }, + Some("write") => { + let matches = matches.subcommand_matches("write").unwrap(); + + let metric = matches.value_of("metric").unwrap(); + let value = matches.value_of("value").unwrap().parse::()?; + + let retention = matches.value_of("retention").unwrap_or("11520*60s"); + let timestamp = match matches.value_of("timestamp") { + None => Utc::now().timestamp(), + Some(s) => match s.parse::() { + Ok(n) => n, + Err(_) => { + eprintln!("Could not parse {}", s); + return Ok(()) + } + } + }; + + metric_write(&session_metadata, &session_points, metric, value, retention, timestamp)?; + }, + Some("delete") => { + let matches = matches.subcommand_matches("delete").unwrap(); + let metric = matches.value_of("metric").unwrap(); + + if matches.is_present("recursive") { + unimplemented!(); + } + + metric_delete(&session_metadata, &metric)?; } None => { eprintln!("No command was used."); @@ -262,3 +281,4 @@ fn main() -> Result<()> { Ok(()) } + diff --git a/src/metric.rs b/src/metric.rs index d593b9e..0a15286 100644 --- a/src/metric.rs +++ b/src/metric.rs @@ -3,7 +3,7 @@ * * Author: Patrick MARIE */ -use crate::stage::Stage; +use crate::Stage; use std::collections::HashMap; use std::fmt; diff --git a/src/stage.rs b/src/stage.rs index 0d211ca..3d96245 100644 --- a/src/stage.rs +++ b/src/stage.rs @@ -5,6 +5,7 @@ */ use std::fmt; use std::convert::TryFrom; +use std::string::String; use regex::Regex; @@ -129,53 +130,3 @@ impl fmt::Display for Stage { write!(f, "{}", self.table_name()) } } - -pub struct TimeRange { - stage: Stage, - time_start: i64, - time_end: i64 -} - -impl TimeRange { - pub fn new(stage: &Stage, time_start: i64, time_end: i64) -> Self { - TimeRange { - stage: stage.clone(), - time_start: time_start, - time_end: time_end, - } - } - - pub fn ranges(&self) -> Vec<(i64, i64, i64)> { - let first_offset = self.stage.time_offset_ms(self.time_start); - let last_offset = self.stage.time_offset_ms(self.time_end); - - let mut offset = first_offset.0; - let mut offset_start = first_offset.1; - - let mut out = vec![]; - - while offset != last_offset.0 { - out.push((offset, offset_start, self.stage.table_row_size_ms())); - - offset_start = 0; - offset += self.stage.table_row_size_ms(); - } - - out.push((offset, offset_start, last_offset.1)); - - out - } -} - -impl fmt::Display for TimeRange { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{} ({} -> {})", self.stage, self.time_start, self.time_end) - } -} - -#[test] -fn timerange() { - let stage = Stage::try_from("11520*60s").unwrap(); - assert_eq!(vec![(0, 2, 4)], TimeRange::new(&stage, 120, 240).ranges()); - assert_eq!(vec![(0, 2, 11520),(691200000, 0, 4)], TimeRange::new(&stage, 120, 691200 + 240).ranges()); -} diff --git a/src/timerange.rs b/src/timerange.rs new file mode 100644 index 0000000..d05c214 --- /dev/null +++ b/src/timerange.rs @@ -0,0 +1,51 @@ +/* + * bgutil-rs + * + * Author: Patrick MARIE + */ +use std::fmt; + +use crate::stage::Stage; + +pub struct TimeRange { + stage: Stage, + time_start: i64, + time_end: i64 +} + +impl TimeRange { + pub fn new(stage: &Stage, time_start: i64, time_end: i64) -> Self { + TimeRange { + stage: stage.clone(), + time_start: time_start, + time_end: time_end, + } + } + + pub fn ranges(&self) -> Vec<(i64, i64, i64)> { + let first_offset = self.stage.time_offset_ms(self.time_start); + let last_offset = self.stage.time_offset_ms(self.time_end); + + let mut offset = first_offset.0; + let mut offset_start = first_offset.1; + + let mut out = vec![]; + + while offset != last_offset.0 { + out.push((offset, offset_start, self.stage.table_row_size_ms())); + + offset_start = 0; + offset += self.stage.table_row_size_ms(); + } + + out.push((offset, offset_start, last_offset.1)); + + out + } +} + +impl fmt::Display for TimeRange { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{} ({} -> {})", self.stage, self.time_start, self.time_end) + } +}