Skip to content

Commit

Permalink
feat: Sum and average aggregation queries (#1097)
Browse files Browse the repository at this point in the history
* Initial sum aggregation

This commit introduces the sum aggregation with a simple test to ensure it works

* Modify encoding

This change modifies the encoding so that the right data reaches the grpc layer

* PropertyAggregateField with tests

Adds helper functions and a super class so that average and sum can share the same properties.

* Improve transaction tests

Add sum and average to the transaction tests here to improve test coverage for transactions.

* Change the description in the describe block

* Change return type to average

The return type for this function is wrong. The function returns an Average so we should use Average.

* Make alias optional

Make alias optional since the query still works without providing an alias.

* Fix the transaction tests to fail on rollback

The transaction tests should not pass if the transaction is rolled back like they do currently. We must not catch errors and instead let the test fail.

* Add additional assertions to existing tests

Ensure that the addAggregation function works correctly with an additional assertion check.

* Revert "Add additional assertions to existing tests"

This reverts commit 970c0f4.

* Add describe block for comparing equivalent query

This test ensures that all the aggregate queries are actually the same no matter how you build them

* Average, sum and count toProto tests

Write tests to effectively document the toProto output of the various aggregate fields

* Add tests for the sum aggregation

Equivalent tests to count are written for sum in system tests and some more tests are written too to meet requirements outlined by team.

* Add a test for sum and snapshot reads

The test for sum and snapshot reads should look at the database before the data is created and run the tests based on the database in that state

* Add two test blocks for special cases

Add a test block for a dataset with overflow values and a dataset with NaN values.

* Export aggregate field from the client

Aggregate field should be exported from the client so that it can be used easily by users.

* PR follow-up changes

Some idiomatic changes to improve the state of the code in the PR.

* Adjust the values so that tests pass

Values for sum and average should be different from those of count and these tests provide the right values now.

# Conflicts:
#	system-test/datastore.ts

* Add average aggregations

Average aggregations regarding appearances have been added in tests and correct values have been assigned

* Add snapshot reads for run query and aggregate q

The future refactor must implement the TODOs so that there is less repeated code in the codebase. Also, this commit implements snapshot reads for queries and adds a test for the snapshot reads.

* Remove Google error and entity filter

Remove some unused imports as they do not apply to the code anymore

* Should use null for an aggregation query read time

Snapshot reads read at a time that there is no data so sums and averages should reflect that accordingly.

* Remove tests from a bad cherry pick

Tests for sum that have values corresponding to count are still there in the test cases. They should be removed.

* Linting fix

We don’t care about a loss of precision since the literal value indicated is contained in a test and the loss of precision won’t affect the code.

* Do the test on rating instead of appearances

At this point the datastore is populated with data about ratings so computations should be done on that instead.

* The assertion says the request should have failed

An assertion error should be thrown so that the test doesn’t pass if the request is successful.

* Add a comment about using limits in test

The query with the limit will include all data points with the lowest appearance values. This is likely desired, but also important to document.

* Add rollbacks to transaction tests

The rollbacks for the transaction tests ensure that if a test fails then the data gets reset to where it was before.

* refactor getSharedOptionsOnly

Introducing getSharedOptionsOnly allows us to use that function in two different places to avoid a repeated block of code.

* Remove test related to snapshot reads

This test belongs inside another PR because it is not directly related to sum/avg.

* Add a test for multiple types of aggregates

A test should be included that looks at multiple aggregations in a single query.

* Correct descriptions of two tests on overflow

The tests themselves should include the word overflow so that it is clear they are working with an overflow dataset.

* Add a comment for setting the alias

The comment for setting the alias should not make any mention of count since it is agnostic to the aggregation type.

* Add tests to compare various ways to encode alias

No matter how alias is encoded, the data structures should store the aggregations the same way inside an aggregate field so as not to create any confusion.

* Added tests for when an empty alias is provided

Tests for when an empty alias is provided should check that each aggregate query still works.

* Add a comment clarifying the use of snapshot reads

The sleep function should enable us to test snapshot reads for aggregate queries

* Add two tests to explore mixed aggregations alias

Two tests should be explored that evaluate what happens when multiple aggregations are used and when too many aggregations are used.

* Better names for some internal private functions

Shared options functions could be given a better name so that they make more sense to the code that is using them.

* Add a comment explaining why the sleep is needed

The code must explain why the sleep function is needed in the test because its purpose should be clear.

* Add getReadTime function and use for sum/avg

A sum/average test uses snapshot reads for an aggregate query. The key here is write code that will guarantee the read time occurs well before all the data is saved in order to ensure the test isn’t flakey.

* Rename variable to emptyData

emptyData is a more accurate variable name for the datastore save data
  • Loading branch information
danieljbruce authored Sep 1, 2023
1 parent e7c3aaf commit 44ba6f8
Show file tree
Hide file tree
Showing 5 changed files with 772 additions and 70 deletions.
104 changes: 97 additions & 7 deletions src/aggregate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,35 @@ class AggregateQuery {
* @param {string} alias
* @returns {AggregateQuery}
*/
count(alias: string): AggregateQuery {
count(alias?: string): AggregateQuery {
this.aggregations.push(AggregateField.count().alias(alias));
return this;
}

/**
* Add a `sum` aggregate query to the list of aggregations.
*
* @param {string} property
* @param {string} alias
* @returns {AggregateQuery}
*/
sum(property: string, alias?: string): AggregateQuery {
this.aggregations.push(AggregateField.sum(property).alias(alias));
return this;
}

/**
* Add a `average` aggregate query to the list of aggregations.
*
* @param {string} property
* @param {string} alias
* @returns {AggregateQuery}
*/
average(property: string, alias?: string): AggregateQuery {
this.aggregations.push(AggregateField.average(property).alias(alias));
return this;
}

/**
* Add a custom aggregation to the list of aggregations.
*
Expand Down Expand Up @@ -99,7 +123,6 @@ class AggregateQuery {
* Get the proto for the list of aggregations.
*
*/
// eslint-disable-next-line
toProto(): any {
return this.aggregations.map(aggregation => aggregation.toProto());
}
Expand All @@ -122,22 +145,41 @@ abstract class AggregateField {
}

/**
* Gets a copy of the Count aggregate field.
* Gets a copy of the Sum aggregate field.
*
* @returns {Sum}
*/
static sum(property: string): Sum {
return new Sum(property);
}

/**
* Gets a copy of the Average aggregate field.
*
* @returns {Average}
*/
static average(property: string): Average {
return new Average(property);
}

/**
* Sets the alias on the aggregate field that should be used.
*
* @param {string} alias The label used in the results to describe this
* aggregate field when a query is run.
* @returns {AggregateField}
*/
alias(alias: string): AggregateField {
this.alias_ = alias;
alias(alias?: string): AggregateField {
if (alias) {
this.alias_ = alias;
}
return this;
}

/**
* Gets the proto for the aggregate field.
*
*/
// eslint-disable-next-line
abstract toProto(): any;
}

Expand All @@ -146,7 +188,6 @@ abstract class AggregateField {
*
*/
class Count extends AggregateField {
// eslint-disable-next-line
/**
* Gets the proto for the count aggregate field.
*
Expand All @@ -157,4 +198,53 @@ class Count extends AggregateField {
}
}

/**
* A PropertyAggregateField is a class that contains data that defines any
* aggregation that is performed on a property.
*
*/
abstract class PropertyAggregateField extends AggregateField {
abstract operator: string;

/**
* Build a PropertyAggregateField object.
*
* @param {string} property
*/
constructor(public property_: string) {
super();
}

/**
* Gets the proto for the property aggregate field.
*
*/
toProto(): any {
const aggregation = this.property_
? {property: {name: this.property_}}
: {};
return Object.assign(
{operator: this.operator},
this.alias_ ? {alias: this.alias_} : null,
{[this.operator]: aggregation}
);
}
}

/**
* A Sum is a class that contains data that defines a Sum aggregation.
*
*/
class Sum extends PropertyAggregateField {
operator = 'sum';
}

/**
* An Average is a class that contains data that defines an Average aggregation.
*
*/
class Average extends PropertyAggregateField {
operator = 'avg';
}

export {AggregateField, AggregateQuery, AGGREGATE_QUERY};
4 changes: 2 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ import * as is from 'is';
import {Transform, pipeline} from 'stream';

import {entity, Entities, Entity, EntityProto, ValueProto} from './entity';
import {AggregateField} from './aggregate';
import Key = entity.Key;
export {Entity, Key};
export {Entity, Key, AggregateField};
import {PropertyFilter, and, or} from './filter';
export {PropertyFilter, and, or};
import {
Expand Down Expand Up @@ -1818,7 +1819,6 @@ promisifyAll(Datastore, {
'isDouble',
'geoPoint',
'getProjectId',
'getSharedQueryOptions',
'isGeoPoint',
'index',
'int',
Expand Down
53 changes: 25 additions & 28 deletions src/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,28 +276,8 @@ class DatastoreRequest {
}

const makeRequest = (keys: entity.Key[] | KeyProto[]) => {
const reqOpts: RequestOptions = {
keys,
};

if (options.consistency) {
const code = CONSISTENCY_PROTO_CODE[options.consistency.toLowerCase()];

reqOpts.readOptions = {
readConsistency: code,
};
}
if (options.readTime) {
if (reqOpts.readOptions === undefined) {
reqOpts.readOptions = {};
}
const readTime = options.readTime;
const seconds = readTime / 1000;
reqOpts.readOptions.readTime = {
seconds: Math.floor(seconds),
};
}

const reqOpts = this.getRequestOptions(options);
Object.assign(reqOpts, {keys});
this.request_(
{
client: 'DatastoreClient',
Expand Down Expand Up @@ -596,7 +576,7 @@ class DatastoreRequest {
setImmediate(callback, e as Error);
return;
}
const sharedQueryOpts = this.getSharedQueryOptions(query.query, options);
const sharedQueryOpts = this.getQueryOptions(query.query, options);
const aggregationQueryOptions: AggregationQueryOptions = {
nestedQuery: queryProto,
aggregations: query.toProto(),
Expand Down Expand Up @@ -811,7 +791,7 @@ class DatastoreRequest {
setImmediate(onResultSet, e as Error);
return;
}
const sharedQueryOpts = this.getSharedQueryOptions(query, options);
const sharedQueryOpts = this.getQueryOptions(query, options);

const reqOpts: RequestOptions = sharedQueryOpts;
reqOpts.query = queryProto;
Expand Down Expand Up @@ -887,9 +867,8 @@ class DatastoreRequest {
return stream;
}

private getSharedQueryOptions(
query: Query,
options: RunQueryStreamOptions = {}
private getRequestOptions(
options: RunQueryStreamOptions
): SharedQueryOptions {
const sharedQueryOpts = {} as SharedQueryOptions;
if (options.consistency) {
Expand All @@ -898,6 +877,24 @@ class DatastoreRequest {
readConsistency: code,
};
}
if (options.readTime) {
if (sharedQueryOpts.readOptions === undefined) {
sharedQueryOpts.readOptions = {};
}
const readTime = options.readTime;
const seconds = readTime / 1000;
sharedQueryOpts.readOptions.readTime = {
seconds: Math.floor(seconds),
};
}
return sharedQueryOpts;
}

private getQueryOptions(
query: Query,
options: RunQueryStreamOptions = {}
): SharedQueryOptions {
const sharedQueryOpts = this.getRequestOptions(options);
if (query.namespace) {
sharedQueryOpts.partitionId = {
namespaceId: query.namespace,
Expand Down Expand Up @@ -1191,7 +1188,7 @@ export type DeleteResponse = CommitResponse;
* that a callback is omitted.
*/
promisifyAll(DatastoreRequest, {
exclude: ['getSharedQueryOptions'],
exclude: ['getQueryOptions', 'getRequestOptions'],
});

/**
Expand Down
Loading

0 comments on commit 44ba6f8

Please sign in to comment.