Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix first/last aggregation functions #5208

Merged
merged 5 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions docs/aggregations.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ populate the `Stats` aggregations whenever a given hour or day ends.

The type for the raw data points is defined with an `@entity(timeseries:
true)` annotation. Timeseries types are immutable, and must have an `id`
field and a `timestamp` field. The `timestamp` is set automatically by
`graph-node` to the timestamp of the current block; if mappings set this
field, it is silently overridden when the entity is saved.
field and a `timestamp` field. The `id` must be of type `Int8` and is set
automatically so that ids are increasing in insertion order. The `timestamp`
is set automatically by `graph-node` to the timestamp of the current block;
if mappings set this field, it is silently overridden when the entity is
saved.

Aggregations are declared with an `@aggregation` annotation instead of an
`@entity` annotation. They must have an `id` field and a `timestamp` field.
Expand Down Expand Up @@ -142,6 +144,10 @@ The following aggregation functions are currently supported:
| `first` | First value |
| `last` | Last value |

The `first` and `last` aggregation function calculate the first and last
value in an interval by sorting the data by `id`; `graph-node` enforces
correctness here by automatically setting the `id` for timeseries entities.

## Querying

_This section is not implemented yet, and will require a bit more thought
Expand Down
2 changes: 1 addition & 1 deletion graph/src/blockchain/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ impl fmt::Display for ChainIdentifier {

/// The timestamp associated with a block. This is used whenever a time
/// needs to be connected to data within the block
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct BlockTime(DateTime<Utc>);

impl BlockTime {
Expand Down
9 changes: 7 additions & 2 deletions graph/src/schema/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2213,13 +2213,13 @@ type Gravatar @entity {
fn aggregation() {
const SCHEMA: &str = r#"
type Data @entity(timeseries: true) {
id: Bytes!
id: Int8!
timestamp: Int8!
value: BigDecimal!
}

type Stats @aggregation(source: "Data", intervals: ["hour", "day"]) {
id: Bytes!
id: Int8!
timestamp: Int8!
sum: BigDecimal! @aggregate(fn: "sum", arg: "value")
}
Expand Down Expand Up @@ -2254,6 +2254,11 @@ type Gravatar @entity {
"_change_block",
"and",
"id",
"id_gt",
"id_gte",
"id_in",
"id_lt",
"id_lte",
"or",
"timestamp",
"timestamp_gt",
Expand Down
24 changes: 21 additions & 3 deletions graph/src/schema/input_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1951,13 +1951,30 @@ mod validations {
})
}

/// The `@entity` direactive accepts two flags `immutable` and
/// The `@entity` directive accepts two flags `immutable` and
/// `timeseries`, and when `timeseries` is `true`, `immutable` can
/// not be `false`.
///
/// For timeseries, also check that there is a `timestamp` field of
/// type `Int8`
/// type `Int8` and that the `id` field has type `Int8`
fn validate_entity_directives(&self) -> Vec<SchemaValidationError> {
fn id_type_is_int8(object_type: &s::ObjectType) -> Option<SchemaValidationError> {
let field = match object_type.field(&*ID) {
Some(field) => field,
None => {
return Some(Err::IdFieldMissing(object_type.name.to_owned()));
}
};

match field.field_type.value_type() {
Ok(ValueType::Int8) => None,
Ok(_) | Err(_) => Some(Err::IllegalIdType(format!(
"Timeseries `{}` must have an `id` field of type `Int8`",
object_type.name
))),
}
}

fn bool_arg(
dir: &s::Directive,
name: &str,
Expand Down Expand Up @@ -1990,7 +2007,8 @@ mod validations {
object_type.name.clone(),
))
} else {
Self::valid_timestamp_field(object_type)
id_type_is_int8(object_type)
.or_else(|| Self::valid_timestamp_field(object_type))
}
} else {
None
Expand Down
10 changes: 7 additions & 3 deletions runtime/test/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1605,13 +1605,13 @@ async fn test_store_ts() {

let schema = r#"
type Data @entity(timeseries: true) {
id: Bytes!
id: Int8!
timestamp: Int8!
amount: BigDecimal!
}

type Stats @aggregation(intervals: ["hour"], source: "Data") {
id: Bytes!
id: Int8!
timestamp: Int8!
max: BigDecimal! @aggregate(fn: "max", arg:"amount")
}"#;
Expand All @@ -1635,8 +1635,12 @@ async fn test_store_ts() {
)
.expect("Setting 'Data' is allowed");

// This is very backhanded: we generate an id the same way that
// `store_setv` should have.
let did = IdType::Int8.generate_id(12, 0).unwrap();

// Set overrides the user-supplied timestamp for timeseries
let data = host.store_get(DATA, DID).unwrap().unwrap();
let data = host.store_get(DATA, &did.to_string()).unwrap().unwrap();
assert_eq!(Some(&Value::from(block_time)), data.get("timestamp"));

let err = host
Expand Down
7 changes: 6 additions & 1 deletion runtime/wasm/src/host_exports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,12 @@ impl<C: Blockchain> HostExports<C> {

Self::expect_object_type(&entity_type, "set")?;

let entity_id = if entity_id == "auto" {
let entity_id = if entity_id == "auto"
|| entity_type
.object_type()
.map(|ot| ot.timeseries)
.unwrap_or(false)
{
if self.data_source_causality_region != CausalityRegion::ONCHAIN {
return Err(anyhow!(
"Autogenerated IDs are only supported for onchain data sources"
Expand Down
20 changes: 20 additions & 0 deletions store/postgres/migrations/2024-02-06-002353_arg_minmax/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- This file was generated by generate.sh in this directory
set search_path = public;
drop aggregate arg_min_int4(int4_and_value);
drop aggregate arg_max_int4(int4_and_value);
drop function arg_from_int4_and_value(int4_and_value);
drop function arg_max_agg_int4(int4_and_value, int4_and_value);
drop function arg_min_agg_int4(int4_and_value, int4_and_value);
drop type int4_and_value;
drop aggregate arg_min_int8(int8_and_value);
drop aggregate arg_max_int8(int8_and_value);
drop function arg_from_int8_and_value(int8_and_value);
drop function arg_max_agg_int8(int8_and_value, int8_and_value);
drop function arg_min_agg_int8(int8_and_value, int8_and_value);
drop type int8_and_value;
drop aggregate arg_min_numeric(numeric_and_value);
drop aggregate arg_max_numeric(numeric_and_value);
drop function arg_from_numeric_and_value(numeric_and_value);
drop function arg_max_agg_numeric(numeric_and_value, numeric_and_value);
drop function arg_min_agg_numeric(numeric_and_value, numeric_and_value);
drop type numeric_and_value;
98 changes: 98 additions & 0 deletions store/postgres/migrations/2024-02-06-002353_arg_minmax/generate.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#! /bin/bash

# Generate up and down migrations to define arg_min and arg_max functions
# for the types listed in `types`.
#
# The functions can all be used like
#
# select first_int4((arg, value)) from t
#
# and return the `arg int4` for the smallest value `value int8`. If there
# are several rows with the smallest value, we try hard to return the first
# one, but that also depends on how Postgres calculates these
# aggregations. Note that the relation over which we are aggregating does
# not need to be ordered.
#
# Unfortunately, it is not possible to do this generically, so we have to
# monomorphize and define an aggregate for each data type that we want to
# use. The `value` is always an `int8`
#
# If changes to these functions are needed, copy this script to a new
# migration, change it and regenerate the up and down migrations

types="int4 int8 numeric"
dir=$(dirname $0)

read -d '' -r prelude <<'EOF'
-- This file was generated by generate.sh in this directory
set search_path = public;
EOF

read -d '' -r up_template <<'EOF'
create type public.@T@_and_value as (
arg @T@,
value int8
);

create or replace function arg_min_agg_@T@ (a @T@_and_value, b @T@_and_value)
returns @T@_and_value
language sql immutable strict parallel safe as
'select case when a.arg is null then b
when b.arg is null then a
when a.value <= b.value then a
else b end';

create or replace function arg_max_agg_@T@ (a @T@_and_value, b @T@_and_value)
returns @T@_and_value
language sql immutable strict parallel safe as
'select case when a.arg is null then b
when b.arg is null then a
when a.value > b.value then a
else b end';

create function arg_from_@T@_and_value(a @T@_and_value)
returns @T@
language sql immutable strict parallel safe as
'select a.arg';

create aggregate arg_min_@T@ (@T@_and_value) (
sfunc = arg_min_agg_@T@,
stype = @T@_and_value,
finalfunc = arg_from_@T@_and_value,
parallel = safe
);

comment on aggregate arg_min_@T@(@T@_and_value) is
'For ''select arg_min_@T@((arg, value)) from ..'' return the arg for the smallest value';

create aggregate arg_max_@T@ (@T@_and_value) (
sfunc = arg_max_agg_@T@,
stype = @T@_and_value,
finalfunc = arg_from_@T@_and_value,
parallel = safe
);

comment on aggregate arg_max_@T@(@T@_and_value) is
'For ''select arg_max_@T@((arg, value)) from ..'' return the arg for the largest value';
EOF

read -d '' -r down_template <<'EOF'
drop aggregate arg_min_@T@(@T@_and_value);
drop aggregate arg_max_@T@(@T@_and_value);
drop function arg_from_@T@_and_value(@T@_and_value);
drop function arg_max_agg_@T@(@T@_and_value, @T@_and_value);
drop function arg_min_agg_@T@(@T@_and_value, @T@_and_value);
drop type @T@_and_value;
EOF

echo "$prelude" > $dir/up.sql
for typ in $types
do
echo "${up_template//@T@/$typ}" >> $dir/up.sql
done

echo "$prelude" > $dir/down.sql
for typ in $types
do
echo "${down_template//@T@/$typ}" >> $dir/down.sql
done
Loading
Loading