Skip to content

Commit

Permalink
Um, kinda better
Browse files Browse the repository at this point in the history
  • Loading branch information
d1ngd0 committed May 11, 2024
1 parent be7fe25 commit adf5440
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 49 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ base64 = "0.21.5"
byteorder = "1.5.0"
cityhash = "0.1.1"
dyn-clone = "1.0.17"
parse_duration = "2.1.1"
regex = "1.10.4"
serde = { version = "1.0.115", features = ["derive"] }
serde_json = "1.0.115"
Expand Down
21 changes: 7 additions & 14 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::{
env,
io::{self, BufRead},
process,
time::SystemTime,
};

use dapt::{query::Query, Dapt};
Expand All @@ -22,8 +21,6 @@ fn main() {
}
};

let mut n = SystemTime::now();

let stdin = io::stdin();
let lines = stdin.lock().lines();

Expand All @@ -33,17 +30,13 @@ fn main() {
Err(_) => continue,
};

let _ = q.process(&d);
if SystemTime::now().duration_since(n).unwrap().as_secs() > 1 {
let res = match q.collect() {
Ok(res) => res,
Err(_) => continue,
};

for r in res.iter() {
println!("{}", serde_json::to_string(&r).unwrap());
}
n = SystemTime::now();
let res = match q.process_and_collect(&d) {
Ok(res) => res,
Err(_) => continue,
};

for r in res.iter().flatten() {
println!("{}", serde_json::to_string(&r).unwrap());
}
}

Expand Down
34 changes: 34 additions & 0 deletions src/binary/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,40 @@ impl From<Any<'_>> for OwnedAny {
}
}

impl From<OwnedAny> for Any<'_> {
fn from(value: OwnedAny) -> Self {
match value {
OwnedAny::USize(val) => Any::USize(val),
OwnedAny::U8(val) => Any::U8(val),
OwnedAny::U16(val) => Any::U16(val),
OwnedAny::U32(val) => Any::U32(val),
OwnedAny::U64(val) => Any::U64(val),
OwnedAny::U128(val) => Any::U128(val),
OwnedAny::ISize(val) => Any::ISize(val),
OwnedAny::I8(val) => Any::I8(val),
OwnedAny::I16(val) => Any::I16(val),
OwnedAny::I32(val) => Any::I32(val),
OwnedAny::I64(val) => Any::I64(val),
OwnedAny::I128(val) => Any::I128(val),
OwnedAny::F32(val) => Any::F32(val),
OwnedAny::F64(val) => Any::F64(val),
OwnedAny::Str(val) => Any::String(val.to_string()),
OwnedAny::Bytes(val) => Any::Null, // this is gross, fix it
OwnedAny::Char(val) => Any::Char(val),
OwnedAny::Bool(val) => Any::Bool(val),
OwnedAny::Array(val) => {
let mut items = Vec::with_capacity(val.len());
for item in val {
items.push(Any::from(item));
}
Any::Array(items)
}
OwnedAny::Map(val) => Any::Null, // this is gross, fix it
OwnedAny::Null => Any::Null,
}
}
}

impl<'a> From<&'a OwnedAny> for Any<'a> {
fn from(value: &'a OwnedAny) -> Self {
match value {
Expand Down
4 changes: 3 additions & 1 deletion src/query/aggregation/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ impl Aggregation for ExpressionAggregation {
}

fn result<'a>(&'a mut self) -> Option<Any<'a>> {
Some(self.value.as_ref()?.into())
let v = self.value.take()?;
Some(Any::from(v)) // This actually doesn't work for maps and bytes, because we drop the
// data TODO: we need to fix this at some point
}

// since this is essentially first, we can just return the first value
Expand Down
6 changes: 6 additions & 0 deletions src/query/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ impl From<DaptError> for Error {
}
}

impl From<parse_duration::parse::Error> for Error {
fn from(e: parse_duration::parse::Error) -> Self {
Error::InvalidQuery(format!("Duration parsing error: {}", e.to_string()))
}
}

impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down
117 changes: 83 additions & 34 deletions src/query/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use core::fmt;
use std::{
collections::HashMap,
fmt::{Display, Formatter},
time::{Duration, SystemTime},
};

use cityhash::city_hash_64;
Expand Down Expand Up @@ -32,6 +33,7 @@ pub const BY: &str = "BY";
pub const ORDER_ASC: &str = "ASC";
pub const ORDER_DESC: &str = "DESC";
pub const TOP: &str = "TOP";
pub const INTERVAL: &str = "INTERVAL";
pub const SUB_CONDITION: &str = "(";
pub const SUB_CONDITION_END: &str = ")";
pub const EQUAL: &str = "=";
Expand Down Expand Up @@ -148,7 +150,6 @@ impl GroupBy {

fn collect(&mut self, having: &HavingClause) -> QueryResult<Vec<Dapt>> {
let mut results = Vec::new();

for group in self.groups.values_mut() {
let d = group.collect()?;
match having.filter(&d) {
Expand Down Expand Up @@ -342,6 +343,26 @@ impl Display for FromClause {
}
}

#[derive(Clone)]
struct Interval {
last_output: SystemTime,
duration: Duration,
}

impl Interval {
fn should_fire_and_reset(&mut self) -> bool {
let now = SystemTime::now();
let elapsed = now.duration_since(self.last_output).unwrap();

if elapsed >= self.duration {
self.last_output = now;
return true;
}

false
}
}

// Query is the parsed representation of a query. It holds a from, where, having
// group by (which houses the select clause), order by and top. The only thing required
// to parse a query is the `SELECT` portion of the query. The rest is optional.
Expand All @@ -354,6 +375,7 @@ pub struct Query {
group: GroupBy,
order: OrderBy,
top: Option<Top>,
interval: Interval,
}

impl Query {
Expand All @@ -380,6 +402,18 @@ impl Query {
Ok(set)
}

// process_and_collect utilizes the defined interval to determine if the
// query should return data. If there is no data to return option will be
// none
pub fn process_and_collect(&mut self, d: &Dapt) -> QueryResult<Option<Vec<Dapt>>> {
self.process(d)?;
if self.interval.should_fire_and_reset() {
Ok(Some(self.collect()?))
} else {
Ok(None)
}
}

// composite returns two query objects, The first query can be
// run on multiple data sets concurrently, the second query
// is then used to combine the responses of the first query into
Expand All @@ -397,6 +431,7 @@ impl Query {
group: composable,
order: OrderBy { fields: Vec::new() },
top: None,
interval: self.interval.clone(),
},
// where can only be done at the composable step
Query {
Expand All @@ -408,6 +443,7 @@ impl Query {
group: combine,
order: self.order.clone(),
top: self.top.clone(),
interval: self.interval.clone(),
},
)
}
Expand Down Expand Up @@ -637,56 +673,58 @@ impl<'a> Parser<'a> {

// from is optional, so we can create an empty from cluase
// if there is no value.
let from = if let Some(FROM) = self.lex.peak() {
self.parse_from()?
} else {
FromClause(Vec::new())
let from = match self.lex.peak() {
Some(v) if v.to_uppercase() == FROM => self.parse_from()?,
_ => FromClause(Vec::new()),
};

// where is optional, so we can create an empty where cluase
// if there is no value.
let where_clause = if let Some(WHERE) = self.lex.peak() {
self.parse_where()?
} else {
// this always evaluates to true
WhereClause {
let where_clause = match self.lex.peak() {
Some(v) if v.to_uppercase() == WHERE => self.parse_where()?,
_ => WhereClause {
condition: Conjunction::Single(Box::new(NoopCondition::default())),
}
},
};

// having is also optional
let having = if let Some(HAVING) = self.lex.peak() {
self.parse_having()?
} else {
// this always evaluates to true
HavingClause {
let having = match self.lex.peak() {
Some(v) if v.to_uppercase() == HAVING => self.parse_having()?,
_ => HavingClause {
condition: Conjunction::Single(Box::new(NoopCondition::default())),
}
},
};

let group = if let Some(GROUP) = self.lex.peak() {
self.parse_group(select)?
} else {
let mut groups = HashMap::new();
groups.insert(0, select.clone());
let group = match self.lex.peak() {
Some(v) if v.to_uppercase() == GROUP => self.parse_group(select)?,
_ => {
let mut groups = HashMap::new();
groups.insert(0, select.clone());

GroupBy {
fields: Vec::new(),
groups,
template: select,
GroupBy {
fields: Vec::new(),
groups,
template: select,
}
}
};

let order = if let Some(ORDER) = self.lex.peak() {
self.parse_order()?
} else {
OrderBy { fields: Vec::new() }
let order = match self.lex.peak() {
Some(v) if v.to_uppercase() == ORDER => self.parse_order()?,
_ => OrderBy { fields: Vec::new() },
};

let top = if let Some(TOP) = self.lex.peak() {
Some(self.parse_top()?)
} else {
None
let top = match self.lex.peak() {
Some(v) if v.to_uppercase() == TOP => Some(self.parse_top()?),
_ => None,
};

let interval = match self.lex.peak() {
Some(v) if v.to_uppercase() == INTERVAL => self.parse_interval()?,
_ => Interval {
last_output: SystemTime::now(),
duration: Duration::from_secs(0),
},
};

Ok(Query {
Expand All @@ -696,6 +734,17 @@ impl<'a> Parser<'a> {
group,
order,
top,
interval,
})
}

fn parse_interval(&mut self) -> QueryResult<Interval> {
self.consume_token(INTERVAL)?;
let duration = self.parse_string(STRING_WRAP)?;

Ok(Interval {
last_output: SystemTime::now(),
duration: parse_duration::parse(&duration)?,
})
}

Expand Down

0 comments on commit adf5440

Please sign in to comment.