Skip to content

Commit

Permalink
eval time progression (#210) (#211)
Browse files Browse the repository at this point in the history
* initial work to compare evals

* remove unnecessary div

* design

---------

Co-authored-by: Dinmukhamed Mailibay <47117969+dinmukhamedm@users.noreply.github.com>
Co-authored-by: Din <dinmukhamed.mailibay@gmail.com>
  • Loading branch information
3 people authored Nov 15, 2024
1 parent 467a09a commit a56639c
Show file tree
Hide file tree
Showing 23 changed files with 986 additions and 464 deletions.
166 changes: 109 additions & 57 deletions app-server/src/ch/evaluation_scores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use uuid::Uuid;

use crate::evaluations::utils::EvaluationDatapointResult;

use super::utils::{chrono_to_nanoseconds, execute_query, validate_string_against_injection};
use super::utils::chrono_to_nanoseconds;

fn serialize_timestamp<S>(timestamp: &DateTime<Utc>, serializer: S) -> Result<S::Ok, S::Error>
where
Expand Down Expand Up @@ -110,28 +110,37 @@ pub async fn get_average_evaluation_score(
evaluation_id: Uuid,
name: String,
) -> Result<f64> {
validate_string_against_injection(&name)?;

let query = format!(
"SELECT avg(value) as average_value
let row = clickhouse
.query(
"SELECT avg(value) as average_value
FROM evaluation_scores
WHERE project_id = '{project_id}'
AND evaluation_id = '{evaluation_id}'
AND name = '{name}'",
);
WHERE project_id = ?
AND evaluation_id = ?
AND name = ?
",
)
.bind(project_id)
.bind(evaluation_id)
.bind(name)
.fetch_one::<AverageEvaluationScore>()
.await?;

let rows: Vec<AverageEvaluationScore> = execute_query(&clickhouse, &query).await?;
Ok(rows[0].average_value)
Ok(row.average_value)
}

#[derive(Row, Deserialize)]
#[derive(Row, Deserialize, Clone, Debug)]
pub struct EvaluationScoreBucket {
pub lower_bound: f64,
pub upper_bound: f64,
pub height: u64,
}

pub async fn get_evaluation_score_buckets_based_on_bounds(
#[derive(Row, Deserialize)]
struct TotalCount {
total_count: u64,
}

pub async fn get_evaluation_score_single_bucket(
clickhouse: clickhouse::Client,
project_id: Uuid,
evaluation_id: Uuid,
Expand All @@ -140,53 +149,98 @@ pub async fn get_evaluation_score_buckets_based_on_bounds(
upper_bound: f64,
bucket_count: u64,
) -> Result<Vec<EvaluationScoreBucket>> {
validate_string_against_injection(&name)?;
// If the bounds are the same, we only need one bucket.
// We fill in the rest with 0s.
let total_count = clickhouse
.query(
"SELECT COUNT() as total_count
FROM evaluation_scores
WHERE project_id = ?
AND evaluation_id = ?
AND name = ?",
)
.bind(project_id)
.bind(evaluation_id)
.bind(name)
.fetch_one::<TotalCount>()
.await?;
let mut res = vec![
EvaluationScoreBucket {
lower_bound,
upper_bound,
height: 0,
};
bucket_count as usize - 1
];
res.push(EvaluationScoreBucket {
lower_bound,
upper_bound,
height: total_count.total_count,
});
return Ok(res);
}

pub async fn get_evaluation_score_buckets_based_on_bounds(
clickhouse: clickhouse::Client,
project_id: Uuid,
evaluation_id: Uuid,
name: String,
lower_bound: f64,
upper_bound: f64,
bucket_count: u64,
) -> Result<Vec<EvaluationScoreBucket>> {
let step_size = (upper_bound - lower_bound) / bucket_count as f64;
let interval_nums = (1..=bucket_count)
.map(|num| num.to_string())
.collect::<Vec<String>>()
.join(",");

// This query uses {:?} with the purpose to render floats like 1.0 as 1.0 instead of 1
let query = format!(
"
let interval_nums = (1..=bucket_count).collect::<Vec<_>>();

let rows: Vec<EvaluationScoreBucket> = clickhouse
.query(
"
WITH intervals AS (
SELECT
arrayJoin([{interval_nums}]) AS interval_num,
{:?} + ((interval_num - 1) * {:?}) AS lower_bound,
arrayJoin(?) AS interval_num,
? + ((interval_num - 1) * ?) AS lower_bound,
CASE
WHEN interval_num = {bucket_count} THEN {:?}
ELSE {:?} + (interval_num * {:?})
WHEN interval_num = ? THEN ? -- to avoid floating point precision issues
ELSE ? + (interval_num * ?)
END AS upper_bound
)
SELECT
intervals.lower_bound,
intervals.upper_bound,
COUNT(CASE
WHEN value >= intervals.lower_bound AND value < intervals.upper_bound THEN 1
WHEN intervals.interval_num = {bucket_count}
AND value >= intervals.lower_bound
AND value <= intervals.upper_bound THEN 1
ELSE NULL
CAST(intervals.lower_bound AS Float64) AS lower_bound,
CAST(intervals.upper_bound AS Float64) AS upper_bound,
SUM(CASE
-- exclusive on upper bound to avoid counting the same value twice
WHEN (value >= intervals.lower_bound AND value < intervals.upper_bound)
OR value = ? THEN 1
ELSE 0
END) AS height
FROM evaluation_scores
JOIN intervals ON 1 = 1
WHERE project_id = '{project_id}'
AND evaluation_id = '{evaluation_id}'
AND name = '{name}'
WHERE project_id = ?
AND evaluation_id = ?
AND name = ?
GROUP BY intervals.lower_bound, intervals.upper_bound, intervals.interval_num
ORDER BY intervals.interval_num",
lower_bound, step_size, upper_bound, lower_bound, step_size
);

let rows: Vec<EvaluationScoreBucket> = execute_query(&clickhouse, &query).await?;
)
.bind(interval_nums)
.bind(lower_bound)
.bind(step_size)
.bind(bucket_count)
.bind(upper_bound)
.bind(lower_bound)
.bind(step_size)
.bind(upper_bound)
.bind(project_id)
.bind(evaluation_id)
.bind(name)
.fetch_all::<EvaluationScoreBucket>()
.await?;

Ok(rows)
}

#[derive(Row, Deserialize, Clone)]
pub struct ComparedEvaluationScoresBounds {
pub lower_bound: f64,
pub upper_bound: f64,
}

Expand All @@ -196,24 +250,22 @@ pub async fn get_global_evaluation_scores_bounds(
evaluation_ids: &Vec<Uuid>,
name: String,
) -> Result<ComparedEvaluationScoresBounds> {
validate_string_against_injection(&name)?;

let evaluation_ids_str = evaluation_ids
.iter()
.map(|id| format!("'{}'", id))
.collect::<Vec<String>>()
.join(",");

let query = format!(
"
let row = clickhouse
.query(
"
SELECT
MIN(value) AS lower_bound,
MAX(value) AS upper_bound
FROM evaluation_scores
WHERE project_id = '{project_id}'
AND evaluation_id IN ({evaluation_ids_str})
AND name = '{name}'",
);
WHERE project_id = ?
AND evaluation_id IN ?
AND name = ?",
)
.bind(project_id)
.bind(evaluation_ids)
.bind(name)
.fetch_one()
.await?;

let rows: Vec<ComparedEvaluationScoresBounds> = execute_query(&clickhouse, &query).await?;
Ok(rows[0].clone())
Ok(row)
}
38 changes: 27 additions & 11 deletions app-server/src/ch/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ use crate::db::{self, event_templates::EventTemplate};
use super::{
modifiers::GroupByInterval,
utils::{
chrono_to_nanoseconds, execute_query, group_by_time_absolute_statement,
group_by_time_relative_statement,
chrono_to_nanoseconds, group_by_time_absolute_statement, group_by_time_relative_statement,
},
MetricTimeValue,
};
Expand Down Expand Up @@ -132,14 +131,22 @@ pub async fn get_total_event_count_metrics_relative(
COUNT(DISTINCT id) AS value
FROM events
WHERE
project_id = '{project_id}'
AND template_id = '{template_id}'
AND timestamp >= now() - INTERVAL {past_hours} HOUR
project_id = ?
AND template_id = ?
AND timestamp >= now() - INTERVAL ? HOUR
{}",
group_by_time_relative_statement(past_hours, group_by_interval),
);

execute_query(&clickhouse, &query_string).await
let rows: Vec<MetricTimeValue<i64>> = clickhouse
.query(&query_string)
.bind(project_id)
.bind(template_id)
.bind(past_hours)
.fetch_all::<MetricTimeValue<i64>>()
.await?;

Ok(rows)
}

pub async fn get_total_event_count_metrics_absolute(
Expand All @@ -161,13 +168,22 @@ pub async fn get_total_event_count_metrics_absolute(
COUNT(DISTINCT id) AS value
FROM events
WHERE
project_id = '{project_id}'
AND template_id = '{template_id}'
AND timestamp >= fromUnixTimestamp({ch_start_time})
AND timestamp <= fromUnixTimestamp({ch_end_time})
project_id = ?
AND template_id = ?
AND timestamp >= fromUnixTimestamp(?)
AND timestamp <= fromUnixTimestamp(?)
{}",
group_by_time_absolute_statement(start_time, end_time, group_by_interval)
);

execute_query(&clickhouse, &query_string).await
let rows: Vec<MetricTimeValue<i64>> = clickhouse
.query(&query_string)
.bind(project_id)
.bind(template_id)
.bind(ch_start_time)
.bind(ch_end_time)
.fetch_all::<MetricTimeValue<i64>>()
.await?;

Ok(rows)
}
Loading

0 comments on commit a56639c

Please sign in to comment.