Skip to content

Commit

Permalink
feat(datastore): Support aggregation query in transaction (#8439)
Browse files Browse the repository at this point in the history
* feat(datastore): Support aggregation query in transaction

* feat(datastore): Refactoring integration test

* feat(datastore): Integration tests for sum and average
  • Loading branch information
bhshkh authored Aug 22, 2023
1 parent a9fff18 commit 37681ff
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 12 deletions.
57 changes: 51 additions & 6 deletions datastore/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,15 @@ func TestIntegration_AggregationQueries(t *testing.T) {
for i := range keys {
keys[i] = IncompleteKey("SQChild", parent)
}
keys, err := client.PutMulti(ctx, keys, children)

// Create transaction with read before creating entities
readTime := time.Now()
txBeforeCreate, err := client.NewTransaction(ctx, []TransactionOption{ReadOnly, WithReadTime(readTime)}...)
if err != nil {
t.Fatalf("client.NewTransaction: %v", err)
}

keys, err = client.PutMulti(ctx, keys, children)
if err != nil {
t.Fatalf("client.PutMulti: %v", err)
}
Expand All @@ -733,13 +741,22 @@ func TestIntegration_AggregationQueries(t *testing.T) {
}
}()

// Create transaction with read after creating entities
readTime = time.Now()
txAfterCreate, err := client.NewTransaction(ctx, []TransactionOption{ReadOnly, WithReadTime(readTime)}...)
if err != nil {
t.Fatalf("client.NewTransaction: %v", err)
}

testCases := []struct {
desc string
aggQuery *AggregationQuery
wantFailure bool
wantErrMsg string
wantAggResult AggregationResult
desc string
aggQuery *AggregationQuery
transactionOpts []TransactionOption
wantFailure bool
wantErrMsg string
wantAggResult AggregationResult
}{

{
desc: "Count Failure - Missing index",
aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T>=", now).
Expand All @@ -757,6 +774,34 @@ func TestIntegration_AggregationQueries(t *testing.T) {
"count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 5}},
},
},
{
desc: "Aggregations in transaction before creating entities",
aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now).
Transaction(txBeforeCreate).
NewAggregationQuery().
WithCount("count").
WithSum("I", "sum").
WithAvg("I", "avg"),
wantAggResult: map[string]interface{}{
"count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 0}},
"sum": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 0}},
"avg": &pb.Value{ValueType: &pb.Value_NullValue{NullValue: structpb.NullValue_NULL_VALUE}},
},
},
{
desc: "Aggregations in transaction after creating entities",
aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now).
Transaction(txAfterCreate).
NewAggregationQuery().
WithCount("count").
WithSum("I", "sum").
WithAvg("I", "avg"),
wantAggResult: map[string]interface{}{
"count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 8}},
"sum": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 28}},
"avg": &pb.Value{ValueType: &pb.Value_DoubleValue{DoubleValue: 3.5}},
},
},
{
desc: "Multiple aggregations",
aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now).
Expand Down
1 change: 0 additions & 1 deletion datastore/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,6 @@ func DecodeCursor(s string) (Cursor, error) {
// NewAggregationQuery returns an AggregationQuery with this query as its
// base query.
func (q *Query) NewAggregationQuery() *AggregationQuery {
q.eventual = true
return &AggregationQuery{
query: q,
aggregationQueries: make([]*pb.AggregationQuery_Aggregation, 0),
Expand Down
5 changes: 0 additions & 5 deletions datastore/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,6 @@ func fakeRunAggregationQuery(req *pb.RunAggregationQueryRequest) (*pb.RunAggrega
},
},
},
ReadOptions: &pb.ReadOptions{
ConsistencyType: &pb.ReadOptions_ReadConsistency_{
ReadConsistency: pb.ReadOptions_EVENTUAL,
},
},
}
if !proto.Equal(req, expectedIn) {
return nil, fmt.Errorf("unsupported argument: got %v want %v", req, expectedIn)
Expand Down

0 comments on commit 37681ff

Please sign in to comment.