From f9c2404a6ff1ba20a3a32ed4b65681aae43b4b5d Mon Sep 17 00:00:00 2001 From: Darnell Andries Date: Sat, 7 Oct 2023 00:11:48 -0700 Subject: [PATCH] Add retention policy/expiration of history related entities --- datastore/dynamo.go | 2 +- datastore/sync_entity.go | 47 +++++++++++++++++++++++------------ datastore/sync_entity_test.go | 23 ++++++++++++++++- dynamo.Dockerfile | 3 +++ schema/dynamodb/table.json | 4 +-- 5 files changed, 59 insertions(+), 20 deletions(-) diff --git a/datastore/dynamo.go b/datastore/dynamo.go index 98a24b90..4b2e0110 100644 --- a/datastore/dynamo.go +++ b/datastore/dynamo.go @@ -16,7 +16,7 @@ const ( projPk string = "ClientID, ID" // Strings for (ClientID, DataTypeMtime) GSI - clientIDDataTypeMtimeIdx string = "ClientIDDataTypeMtimeIndex" + clientIDDataTypeMtimeIdx string = "ClientIDDataTypeMtimeIndexV2" clientIDDataTypeMtimeIdxPk string = "ClientID" clientIDDataTypeMtimeIdxSk string = "DataTypeMtime" ) diff --git a/datastore/sync_entity.go b/datastore/sync_entity.go index 839443f3..b50ea35b 100644 --- a/datastore/sync_entity.go +++ b/datastore/sync_entity.go @@ -21,14 +21,18 @@ import ( ) const ( - maxBatchGetItemSize = 100 // Limited by AWS. - maxTransactDeleteItemSize = 10 // Limited by AWS. - clientTagItemPrefix = "Client#" - serverTagItemPrefix = "Server#" - conditionalCheckFailed = "ConditionalCheckFailed" - disabledChainID = "disabled_chain" - reasonDeleted = "deleted" - historyTypeID int = 963985 + maxBatchGetItemSize = 100 // Limited by AWS. + maxTransactDeleteItemSize = 10 // Limited by AWS. + clientTagItemPrefix = "Client#" + serverTagItemPrefix = "Server#" + conditionalCheckFailed = "ConditionalCheckFailed" + disabledChainID = "disabled_chain" + reasonDeleted = "deleted" + historyTypeID int = 963985 + historyDeleteDirectiveTypeID int = 150251 + // Expiration time for history and history delete directive + // entities in seconds + HistoryExpirationIntervalSecs = 60 * 60 * 24 * 90 // 90 days ) // SyncEntity is used to marshal and unmarshal sync items in dynamoDB. @@ -51,6 +55,7 @@ type SyncEntity struct { ClientDefinedUniqueTag *string `dynamodbav:",omitempty"` UniquePosition []byte `dynamodbav:",omitempty"` DataTypeMtime *string + ExpirationTime *int64 } // SyncEntityByClientIDID implements sort.Interface for []SyncEntity based on @@ -638,12 +643,14 @@ func (dynamo *Dynamo) GetUpdatesForType(dataType int, clientToken int64, fetchFo expression.Value(dataTypeMtimeLowerBound), expression.Value(dataTypeMtimeUpperBound)) keyCond := expression.KeyAnd(pkCond, skCond) - exprs := expression.NewBuilder().WithKeyCondition(keyCond) + + expirationTimeName := expression.Name("ExpirationTime") + filterCond := expression.Or(expression.AttributeNotExists(expirationTimeName), expression.Equal(expirationTimeName, expression.Value(nil)), expression.GreaterThan(expirationTimeName, expression.Value(time.Now().Unix()))) if !fetchFolders { // Filter folder entities out if fetchFolder is false. - exprs = exprs.WithFilter( - expression.Equal(expression.Name("Folder"), expression.Value(false))) + filterCond = expression.And(filterCond, expression.Equal(expression.Name("Folder"), expression.Value(false))) } - expr, err := exprs.Build() + + expr, err := expression.NewBuilder().WithKeyCondition(keyCond).WithFilter(filterCond).Build() if err != nil { return false, syncEntities, fmt.Errorf("error building expression to get updates: %w", err) } @@ -774,16 +781,23 @@ func CreateDBSyncEntity(entity *sync_pb.SyncEntity, cacheGUID *string, clientID id = *entity.ClientTagHash } - now := aws.Int64(utils.UnixMilli(time.Now())) + now := time.Now() + + var expirationTime *int64 + if dataType == historyTypeID || dataType == historyDeleteDirectiveTypeID { + expirationTime = aws.Int64(now.Unix() + HistoryExpirationIntervalSecs) + } + + nowMillis := aws.Int64(now.UnixMilli()) // ctime is only used when inserting a new entity, here we use client passed // ctime if it is passed, otherwise, use current server time as the creation // time. When updating, ctime will be ignored later in the query statement. - cTime := now + cTime := nowMillis if entity.Ctime != nil { cTime = entity.Ctime } - dataTypeMtime := strconv.Itoa(dataType) + "#" + strconv.FormatInt(*now, 10) + dataTypeMtime := strconv.Itoa(dataType) + "#" + strconv.FormatInt(*nowMillis, 10) // Set default values on Deleted and Folder attributes for new entities, the // default values are specified by sync.proto protocol. @@ -804,7 +818,7 @@ func CreateDBSyncEntity(entity *sync_pb.SyncEntity, cacheGUID *string, clientID ParentID: entity.ParentIdString, Version: entity.Version, Ctime: cTime, - Mtime: now, + Mtime: nowMillis, Name: entity.Name, NonUniqueName: entity.NonUniqueName, ServerDefinedUniqueTag: entity.ServerDefinedUniqueTag, @@ -817,6 +831,7 @@ func CreateDBSyncEntity(entity *sync_pb.SyncEntity, cacheGUID *string, clientID UniquePosition: uniquePosition, DataType: aws.Int(dataType), DataTypeMtime: aws.String(dataTypeMtime), + ExpirationTime: expirationTime, }, nil } diff --git a/datastore/sync_entity_test.go b/datastore/sync_entity_test.go index eaf27514..ec040c8f 100644 --- a/datastore/sync_entity_test.go +++ b/datastore/sync_entity_test.go @@ -528,9 +528,17 @@ func (suite *SyncEntityTestSuite) TestGetUpdatesForType() { entity3.DataType = aws.Int(124) entity3.DataTypeMtime = aws.String("124#12345679") + // non-expired item entity4 := entity2 entity4.ClientID = "client2" entity4.ID = "id4" + entity4.ExpirationTime = aws.Int64(time.Now().Unix() + 300) + + // expired item + entity5 := entity2 + entity5.ClientID = "client2" + entity5.ID = "id5" + entity5.ExpirationTime = aws.Int64(time.Now().Unix() - 300) _, err := suite.dynamo.InsertSyncEntity(&entity1) suite.Require().NoError(err, "InsertSyncEntity should succeed") @@ -540,6 +548,8 @@ func (suite *SyncEntityTestSuite) TestGetUpdatesForType() { suite.Require().NoError(err, "InsertSyncEntity should succeed") _, err = suite.dynamo.InsertSyncEntity(&entity4) suite.Require().NoError(err, "InsertSyncEntity should succeed") + _, err = suite.dynamo.InsertSyncEntity(&entity5) + suite.Require().NoError(err, "InsertSyncEntity should succeed") // Get all updates for type 123 and client1 using token = 0. hasChangesRemaining, syncItems, err := suite.dynamo.GetUpdatesForType(123, 0, true, "client1", 100) @@ -675,6 +685,7 @@ func (suite *SyncEntityTestSuite) TestCreateDBSyncEntity() { DataType: aws.Int(47745), // nigori type ID OriginatorCacheGUID: guid, OriginatorClientItemID: pbEntity.IdString, + ExpirationTime: nil, } dbEntity, err := datastore.CreateDBSyncEntity(&pbEntity, guid, "client1") @@ -699,6 +710,7 @@ func (suite *SyncEntityTestSuite) TestCreateDBSyncEntity() { expectedDBEntity.Mtime = dbEntity.Mtime expectedDBEntity.DataTypeMtime = aws.String("47745#" + strconv.FormatInt(*dbEntity.Mtime, 10)) suite.Assert().Equal(dbEntity, &expectedDBEntity) + suite.Assert().Nil(dbEntity.ExpirationTime) pbEntity.Deleted = nil pbEntity.Folder = nil @@ -706,6 +718,7 @@ func (suite *SyncEntityTestSuite) TestCreateDBSyncEntity() { suite.Require().NoError(err, "CreateDBSyncEntity should succeed") suite.Assert().False(*dbEntity.Deleted, "Default value should be set for Deleted for new entities") suite.Assert().False(*dbEntity.Folder, "Default value should be set for Deleted for new entities") + suite.Assert().Nil(dbEntity.ExpirationTime) // Check the case when Ctime and Mtime are provided by the client. pbEntity.Ctime = aws.Int64(12345678) @@ -714,6 +727,7 @@ func (suite *SyncEntityTestSuite) TestCreateDBSyncEntity() { suite.Require().NoError(err, "CreateDBSyncEntity should succeed") suite.Assert().Equal(*dbEntity.Ctime, *pbEntity.Ctime, "Client's Ctime should be respected") suite.Assert().NotEqual(*dbEntity.Mtime, *pbEntity.Mtime, "Client's Mtime should be replaced") + suite.Assert().Nil(dbEntity.ExpirationTime) // When cacheGUID is nil, ID should be kept and no originator info are filled. dbEntity, err = datastore.CreateDBSyncEntity(&pbEntity, nil, "client1") @@ -721,6 +735,7 @@ func (suite *SyncEntityTestSuite) TestCreateDBSyncEntity() { suite.Assert().Equal(dbEntity.ID, *pbEntity.IdString) suite.Assert().Nil(dbEntity.OriginatorCacheGUID) suite.Assert().Nil(dbEntity.OriginatorClientItemID) + suite.Assert().Nil(dbEntity.ExpirationTime) // Check that when updating from a previous version with guid, ID will not be // replaced. @@ -730,19 +745,25 @@ func (suite *SyncEntityTestSuite) TestCreateDBSyncEntity() { suite.Assert().Equal(dbEntity.ID, *pbEntity.IdString) suite.Assert().Nil(dbEntity.Deleted, "Deleted won't apply its default value for updated entities") suite.Assert().Nil(dbEntity.Folder, "Deleted won't apply its default value for updated entities") + suite.Assert().Nil(dbEntity.ExpirationTime) // Empty unique position should be marshalled to nil without error. pbEntity.UniquePosition = nil dbEntity, err = datastore.CreateDBSyncEntity(&pbEntity, guid, "client1") suite.Require().NoError(err) suite.Assert().Nil(dbEntity.UniquePosition) + suite.Assert().Nil(dbEntity.ExpirationTime) - // A history entity should have the client tag hash as the ID. + // A history entity should have the client tag hash as the ID, + // and an expiration time. historyEntitySpecific := &sync_pb.EntitySpecifics_History{} pbEntity.Specifics = &sync_pb.EntitySpecifics{SpecificsVariant: historyEntitySpecific} dbEntity, err = datastore.CreateDBSyncEntity(&pbEntity, guid, "client1") suite.Require().NoError(err) suite.Assert().Equal(dbEntity.ID, "client_tag") + expectedExpirationTime := time.Now().Unix() + datastore.HistoryExpirationIntervalSecs + suite.Assert().Greater(*dbEntity.ExpirationTime+2, expectedExpirationTime) + suite.Assert().Less(*dbEntity.ExpirationTime-2, expectedExpirationTime) // Empty specifics should report marshal error. pbEntity.Specifics = nil diff --git a/dynamo.Dockerfile b/dynamo.Dockerfile index e5336b82..4c08c517 100644 --- a/dynamo.Dockerfile +++ b/dynamo.Dockerfile @@ -18,6 +18,9 @@ RUN mkdir -p ${DB_LOCATION} && \ DYNAMO_PID=$! && \ aws dynamodb create-table --cli-input-json file://table.json \ --endpoint-url ${AWS_ENDPOINT} --region ${AWS_REGION} && \ + aws dynamodb update-time-to-live --table-name client-entity-dev \ + --time-to-live-specification "Enabled=true, AttributeName=ExpirationTime" \ + --endpoint-url http://localhost:8000 && \ kill $DYNAMO_PID FROM amazon/dynamodb-local:2.0.0 diff --git a/schema/dynamodb/table.json b/schema/dynamodb/table.json index 253e735e..fadbc8a8 100644 --- a/schema/dynamodb/table.json +++ b/schema/dynamodb/table.json @@ -12,7 +12,7 @@ ], "GlobalSecondaryIndexes": [ { - "IndexName": "ClientIDDataTypeMtimeIndex", + "IndexName": "ClientIDDataTypeMtimeIndexV2", "KeySchema": [ { "KeyType": "HASH", @@ -25,7 +25,7 @@ ], "Projection": { "ProjectionType": "INCLUDE", - "NonKeyAttributes": ["Folder"] + "NonKeyAttributes": ["Folder", "ExpirationTime"] }, "ProvisionedThroughput": { "ReadCapacityUnits": 1,