Skip to content

Commit

Permalink
Add retention policy/expiration of history related entities
Browse files Browse the repository at this point in the history
  • Loading branch information
DJAndries committed Oct 12, 2023
1 parent 57be8f4 commit 862e79d
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 28 deletions.
8 changes: 2 additions & 6 deletions command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,15 +247,11 @@ func handleCommitRequest(cache *cache.Cache, commitMsg *sync_pb.CommitMessage, c
if *entityToCommit.DataType == historyTypeID {
// Check if item exists using client_unique_tag
isUpdateOp, err = db.HasItem(clientID, *entityToCommit.ClientDefinedUniqueTag)
log.Info().
Str("Tag", *entityToCommit.ClientDefinedUniqueTag).
Bool("IsUpdate", isUpdateOp).
Msg("History Info")
if err != nil {
log.Error().Err(err).Msg("Insert sync entity failed")
log.Error().Err(err).Msg("Insert history sync entity failed")
rspType := sync_pb.CommitResponse_TRANSIENT_ERROR
entryRsp.ResponseType = &rspType
entryRsp.ErrorMessage = aws.String(fmt.Sprintf("Insert sync entity failed: %v", err.Error()))
entryRsp.ErrorMessage = aws.String(fmt.Sprintf("Insert history sync entity failed: %v", err.Error()))
continue
}
}
Expand Down
2 changes: 1 addition & 1 deletion datastore/dynamo.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (
projPk string = "ClientID, ID"

// Strings for (ClientID, DataTypeMtime) GSI
clientIDDataTypeMtimeIdx string = "ClientIDDataTypeMtimeIndex"
clientIDDataTypeMtimeIdx string = "ClientIDDataTypeMtimeIndexV2"
clientIDDataTypeMtimeIdxPk string = "ClientID"
clientIDDataTypeMtimeIdxSk string = "DataTypeMtime"
)
Expand Down
57 changes: 39 additions & 18 deletions datastore/sync_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@ import (
)

const (
maxBatchGetItemSize = 100 // Limited by AWS.
maxTransactDeleteItemSize = 10 // Limited by AWS.
clientTagItemPrefix = "Client#"
serverTagItemPrefix = "Server#"
conditionalCheckFailed = "ConditionalCheckFailed"
disabledChainID = "disabled_chain"
reasonDeleted = "deleted"
historyTypeID int = 963985
maxBatchGetItemSize = 100 // Limited by AWS.
maxTransactDeleteItemSize = 10 // Limited by AWS.
clientTagItemPrefix = "Client#"
serverTagItemPrefix = "Server#"
conditionalCheckFailed = "ConditionalCheckFailed"
disabledChainID = "disabled_chain"
reasonDeleted = "deleted"
historyTypeID int = 963985
historyDeleteDirectiveTypeID int = 150251
// Expiration time for history and history delete directive
// entities in seconds
HistoryExpirationIntervalSecs = 60 * 60 * 24 * 90 // 90 days
)

// SyncEntity is used to marshal and unmarshal sync items in dynamoDB.
Expand All @@ -51,6 +55,7 @@ type SyncEntity struct {
ClientDefinedUniqueTag *string `dynamodbav:",omitempty"`
UniquePosition []byte `dynamodbav:",omitempty"`
DataTypeMtime *string
ExpirationTime *int64
}

// SyncEntityByClientIDID implements sort.Interface for []SyncEntity based on
Expand Down Expand Up @@ -160,6 +165,8 @@ func (dynamo *Dynamo) InsertSyncEntity(entity *SyncEntity) (bool, error) {
return false, fmt.Errorf("error building expression to insert sync entity: %w", err)
}

// Write tag item for all data types, except for
// the history type, which does not use tag items.
if entity.ClientDefinedUniqueTag != nil && *entity.DataType != historyTypeID {
items := []*dynamodb.TransactWriteItem{}
// Additional item for ensuring tag's uniqueness for a specific client.
Expand Down Expand Up @@ -259,7 +266,7 @@ func (dynamo *Dynamo) HasItem(clientID string, ID string) (bool, error) {
key, err := dynamodbattribute.MarshalMap(primaryKey)

if err != nil {
return false, fmt.Errorf("error marshalling key to check if client tag existed: %w", err)
return false, fmt.Errorf("error marshalling key to check if item existed: %w", err)
}

input := &dynamodb.GetItemInput{
Expand All @@ -270,7 +277,7 @@ func (dynamo *Dynamo) HasItem(clientID string, ID string) (bool, error) {

out, err := dynamo.GetItem(input)
if err != nil {
return false, fmt.Errorf("error calling GetItem to check if client tag existed: %w", err)
return false, fmt.Errorf("error calling GetItem to check if item existed: %w", err)
}

return out.Item != nil, nil
Expand Down Expand Up @@ -638,12 +645,16 @@ func (dynamo *Dynamo) GetUpdatesForType(dataType int, clientToken int64, fetchFo
expression.Value(dataTypeMtimeLowerBound),
expression.Value(dataTypeMtimeUpperBound))
keyCond := expression.KeyAnd(pkCond, skCond)
exprs := expression.NewBuilder().WithKeyCondition(keyCond)

expirationTimeName := expression.Name("ExpirationTime")
filterCond := expression.Or(expression.AttributeNotExists(expirationTimeName),
expression.Equal(expirationTimeName, expression.Value(nil)),
expression.GreaterThan(expirationTimeName, expression.Value(time.Now().Unix())))
if !fetchFolders { // Filter folder entities out if fetchFolder is false.
exprs = exprs.WithFilter(
expression.Equal(expression.Name("Folder"), expression.Value(false)))
filterCond = expression.And(filterCond, expression.Equal(expression.Name("Folder"), expression.Value(false)))
}
expr, err := exprs.Build()

expr, err := expression.NewBuilder().WithKeyCondition(keyCond).WithFilter(filterCond).Build()
if err != nil {
return false, syncEntities, fmt.Errorf("error building expression to get updates: %w", err)
}
Expand Down Expand Up @@ -770,20 +781,29 @@ func CreateDBSyncEntity(entity *sync_pb.SyncEntity, cacheGUID *string, clientID
originatorClientItemID = entity.IdString
}

// The client tag hash must be used as the primary key
// for the history type.
if dataType == historyTypeID {
id = *entity.ClientTagHash
}

now := aws.Int64(utils.UnixMilli(time.Now()))
now := time.Now()

var expirationTime *int64
if dataType == historyTypeID || dataType == historyDeleteDirectiveTypeID {
expirationTime = aws.Int64(now.Unix() + HistoryExpirationIntervalSecs)
}

nowMillis := aws.Int64(now.UnixMilli())
// ctime is only used when inserting a new entity, here we use client passed
// ctime if it is passed, otherwise, use current server time as the creation
// time. When updating, ctime will be ignored later in the query statement.
cTime := now
cTime := nowMillis
if entity.Ctime != nil {
cTime = entity.Ctime
}

dataTypeMtime := strconv.Itoa(dataType) + "#" + strconv.FormatInt(*now, 10)
dataTypeMtime := strconv.Itoa(dataType) + "#" + strconv.FormatInt(*nowMillis, 10)

// Set default values on Deleted and Folder attributes for new entities, the
// default values are specified by sync.proto protocol.
Expand All @@ -804,7 +824,7 @@ func CreateDBSyncEntity(entity *sync_pb.SyncEntity, cacheGUID *string, clientID
ParentID: entity.ParentIdString,
Version: entity.Version,
Ctime: cTime,
Mtime: now,
Mtime: nowMillis,
Name: entity.Name,
NonUniqueName: entity.NonUniqueName,
ServerDefinedUniqueTag: entity.ServerDefinedUniqueTag,
Expand All @@ -817,6 +837,7 @@ func CreateDBSyncEntity(entity *sync_pb.SyncEntity, cacheGUID *string, clientID
UniquePosition: uniquePosition,
DataType: aws.Int(dataType),
DataTypeMtime: aws.String(dataTypeMtime),
ExpirationTime: expirationTime,
}, nil
}

Expand Down
23 changes: 22 additions & 1 deletion datastore/sync_entity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,17 @@ func (suite *SyncEntityTestSuite) TestGetUpdatesForType() {
entity3.DataType = aws.Int(124)
entity3.DataTypeMtime = aws.String("124#12345679")

// non-expired item
entity4 := entity2
entity4.ClientID = "client2"
entity4.ID = "id4"
entity4.ExpirationTime = aws.Int64(time.Now().Unix() + 300)

// expired item
entity5 := entity2
entity5.ClientID = "client2"
entity5.ID = "id5"
entity5.ExpirationTime = aws.Int64(time.Now().Unix() - 300)

_, err := suite.dynamo.InsertSyncEntity(&entity1)
suite.Require().NoError(err, "InsertSyncEntity should succeed")
Expand All @@ -540,6 +548,8 @@ func (suite *SyncEntityTestSuite) TestGetUpdatesForType() {
suite.Require().NoError(err, "InsertSyncEntity should succeed")
_, err = suite.dynamo.InsertSyncEntity(&entity4)
suite.Require().NoError(err, "InsertSyncEntity should succeed")
_, err = suite.dynamo.InsertSyncEntity(&entity5)
suite.Require().NoError(err, "InsertSyncEntity should succeed")

// Get all updates for type 123 and client1 using token = 0.
hasChangesRemaining, syncItems, err := suite.dynamo.GetUpdatesForType(123, 0, true, "client1", 100)
Expand Down Expand Up @@ -675,6 +685,7 @@ func (suite *SyncEntityTestSuite) TestCreateDBSyncEntity() {
DataType: aws.Int(47745), // nigori type ID
OriginatorCacheGUID: guid,
OriginatorClientItemID: pbEntity.IdString,
ExpirationTime: nil,
}

dbEntity, err := datastore.CreateDBSyncEntity(&pbEntity, guid, "client1")
Expand All @@ -699,13 +710,15 @@ func (suite *SyncEntityTestSuite) TestCreateDBSyncEntity() {
expectedDBEntity.Mtime = dbEntity.Mtime
expectedDBEntity.DataTypeMtime = aws.String("47745#" + strconv.FormatInt(*dbEntity.Mtime, 10))
suite.Assert().Equal(dbEntity, &expectedDBEntity)
suite.Assert().Nil(dbEntity.ExpirationTime)

pbEntity.Deleted = nil
pbEntity.Folder = nil
dbEntity, err = datastore.CreateDBSyncEntity(&pbEntity, guid, "client1")
suite.Require().NoError(err, "CreateDBSyncEntity should succeed")
suite.Assert().False(*dbEntity.Deleted, "Default value should be set for Deleted for new entities")
suite.Assert().False(*dbEntity.Folder, "Default value should be set for Deleted for new entities")
suite.Assert().Nil(dbEntity.ExpirationTime)

// Check the case when Ctime and Mtime are provided by the client.
pbEntity.Ctime = aws.Int64(12345678)
Expand All @@ -714,13 +727,15 @@ func (suite *SyncEntityTestSuite) TestCreateDBSyncEntity() {
suite.Require().NoError(err, "CreateDBSyncEntity should succeed")
suite.Assert().Equal(*dbEntity.Ctime, *pbEntity.Ctime, "Client's Ctime should be respected")
suite.Assert().NotEqual(*dbEntity.Mtime, *pbEntity.Mtime, "Client's Mtime should be replaced")
suite.Assert().Nil(dbEntity.ExpirationTime)

// When cacheGUID is nil, ID should be kept and no originator info are filled.
dbEntity, err = datastore.CreateDBSyncEntity(&pbEntity, nil, "client1")
suite.Require().NoError(err, "CreateDBSyncEntity should succeed")
suite.Assert().Equal(dbEntity.ID, *pbEntity.IdString)
suite.Assert().Nil(dbEntity.OriginatorCacheGUID)
suite.Assert().Nil(dbEntity.OriginatorClientItemID)
suite.Assert().Nil(dbEntity.ExpirationTime)

// Check that when updating from a previous version with guid, ID will not be
// replaced.
Expand All @@ -730,19 +745,25 @@ func (suite *SyncEntityTestSuite) TestCreateDBSyncEntity() {
suite.Assert().Equal(dbEntity.ID, *pbEntity.IdString)
suite.Assert().Nil(dbEntity.Deleted, "Deleted won't apply its default value for updated entities")
suite.Assert().Nil(dbEntity.Folder, "Deleted won't apply its default value for updated entities")
suite.Assert().Nil(dbEntity.ExpirationTime)

// Empty unique position should be marshalled to nil without error.
pbEntity.UniquePosition = nil
dbEntity, err = datastore.CreateDBSyncEntity(&pbEntity, guid, "client1")
suite.Require().NoError(err)
suite.Assert().Nil(dbEntity.UniquePosition)
suite.Assert().Nil(dbEntity.ExpirationTime)

// A history entity should have the client tag hash as the ID.
// A history entity should have the client tag hash as the ID,
// and an expiration time.
historyEntitySpecific := &sync_pb.EntitySpecifics_History{}
pbEntity.Specifics = &sync_pb.EntitySpecifics{SpecificsVariant: historyEntitySpecific}
dbEntity, err = datastore.CreateDBSyncEntity(&pbEntity, guid, "client1")
suite.Require().NoError(err)
suite.Assert().Equal(dbEntity.ID, "client_tag")
expectedExpirationTime := time.Now().Unix() + datastore.HistoryExpirationIntervalSecs
suite.Assert().Greater(*dbEntity.ExpirationTime+2, expectedExpirationTime)
suite.Assert().Less(*dbEntity.ExpirationTime-2, expectedExpirationTime)

// Empty specifics should report marshal error.
pbEntity.Specifics = nil
Expand Down
3 changes: 3 additions & 0 deletions dynamo.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ RUN mkdir -p ${DB_LOCATION} && \
DYNAMO_PID=$! && \
aws dynamodb create-table --cli-input-json file://table.json \
--endpoint-url ${AWS_ENDPOINT} --region ${AWS_REGION} && \
aws dynamodb update-time-to-live --table-name client-entity-dev \
--time-to-live-specification "Enabled=true, AttributeName=ExpirationTime" \
--endpoint-url http://localhost:8000 && \
kill $DYNAMO_PID

FROM amazon/dynamodb-local:2.0.0
Expand Down
4 changes: 2 additions & 2 deletions schema/dynamodb/table.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
],
"GlobalSecondaryIndexes": [
{
"IndexName": "ClientIDDataTypeMtimeIndex",
"IndexName": "ClientIDDataTypeMtimeIndexV2",
"KeySchema": [
{
"KeyType": "HASH",
Expand All @@ -25,7 +25,7 @@
],
"Projection": {
"ProjectionType": "INCLUDE",
"NonKeyAttributes": ["Folder"]
"NonKeyAttributes": ["Folder", "ExpirationTime"]
},
"ProvisionedThroughput": {
"ReadCapacityUnits": 1,
Expand Down

0 comments on commit 862e79d

Please sign in to comment.