Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

History type support #176

Merged
merged 4 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cache/instrumented_redis.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package cache

// Code generated by gowrap. DO NOT EDIT.
// template: ../.prom-gowrap.tmpl
// gowrap: http://github.com/hexdigest/gowrap
// DO NOT EDIT!
// This code is generated with http://github.com/hexdigest/gowrap tool
// using ../.prom-gowrap.tmpl template

//go:generate gowrap gen -p github.com/brave/go-sync/cache -i RedisClient -t ../.prom-gowrap.tmpl -o instrumented_redis.go -l ""
//go:generate gowrap gen -p github.com/brave/go-sync/cache -i RedisClient -t ../.prom-gowrap.tmpl -o instrumented_redis.go

import (
"context"
Expand Down
16 changes: 15 additions & 1 deletion command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
setSyncPollInterval int32 = 30
nigoriTypeID int32 = 47745
deviceInfoTypeID int = 154522
historyTypeID int = 963985
maxActiveDevices int = 50
)

Expand Down Expand Up @@ -241,8 +242,21 @@ func handleCommitRequest(cache *cache.Cache, commitMsg *sync_pb.CommitMessage, c
}

oldVersion := *entityToCommit.Version
isUpdateOp := oldVersion != 0
*entityToCommit.Version = *entityToCommit.Mtime
if oldVersion == 0 { // Create
if *entityToCommit.DataType == historyTypeID {
// Check if item exists using client_unique_tag
isUpdateOp, err = db.HasItem(clientID, *entityToCommit.ClientDefinedUniqueTag)
if err != nil {
log.Error().Err(err).Msg("Insert history sync entity failed")
rspType := sync_pb.CommitResponse_TRANSIENT_ERROR
entryRsp.ResponseType = &rspType
entryRsp.ErrorMessage = aws.String(fmt.Sprintf("Insert history sync entity failed: %v", err.Error()))
continue
Comment on lines +251 to +255
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe Error checking if client_unique_tag exists rather than Insert history sync entity failed is more clear?

}
}

if !isUpdateOp { // Create
if itemCount+count >= maxClientObjectQuota {
rspType := sync_pb.CommitResponse_OVER_QUOTA
entryRsp.ResponseType = &rspType
Expand Down
2 changes: 2 additions & 0 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ type Datastore interface {
DisableSyncChain(clientID string) error
// IsSyncChainDisabled checks whether a given sync chain is deleted
IsSyncChainDisabled(clientID string) (bool, error)
// Checks if sync item exists for a client
HasItem(clientID string, ID string) (bool, error)
}
5 changes: 5 additions & 0 deletions datastore/datastoretest/mock_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ func (m *MockDatastore) HasServerDefinedUniqueTag(clientID string, tag string) (
return args.Bool(0), args.Error(1)
}

func (m *MockDatastore) HasItem(clientID string, ID string) (bool, error) {
args := m.Called(clientID, ID)
return args.Bool(0), args.Error(1)
}

// GetClientItemCount mocks calls to GetClientItemCount
func (m *MockDatastore) GetClientItemCount(clientID string) (int, error) {
args := m.Called(clientID)
Expand Down
2 changes: 1 addition & 1 deletion datastore/dynamo.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (
projPk string = "ClientID, ID"

// Strings for (ClientID, DataTypeMtime) GSI
clientIDDataTypeMtimeIdx string = "ClientIDDataTypeMtimeIndex"
clientIDDataTypeMtimeIdx string = "ClientIDDataTypeMtimeIndexV2"
clientIDDataTypeMtimeIdxPk string = "ClientID"
clientIDDataTypeMtimeIdxSk string = "DataTypeMtime"
)
Expand Down
22 changes: 18 additions & 4 deletions datastore/instrumented_datastore.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package datastore

// Code generated by gowrap. DO NOT EDIT.
// template: ../.prom-gowrap.tmpl
// gowrap: http://github.com/hexdigest/gowrap
// DO NOT EDIT!
// This code is generated with http://github.com/hexdigest/gowrap tool
// using ../.prom-gowrap.tmpl template

//go:generate gowrap gen -p github.com/brave/go-sync/datastore -i Datastore -t ../.prom-gowrap.tmpl -o instrumented_datastore.go -l ""
//go:generate gowrap gen -p github.com/brave/go-sync/datastore -i Datastore -t ../.prom-gowrap.tmpl -o instrumented_datastore.go

import (
"time"
Expand Down Expand Up @@ -93,6 +93,20 @@ func (_d DatastoreWithPrometheus) GetUpdatesForType(dataType int, clientToken in
return _d.base.GetUpdatesForType(dataType, clientToken, fetchFolders, clientID, maxSize)
}

// HasItem implements Datastore
func (_d DatastoreWithPrometheus) HasItem(clientID string, ID string) (b1 bool, err error) {
_since := time.Now()
defer func() {
result := "ok"
if err != nil {
result = "error"
}

datastoreDurationSummaryVec.WithLabelValues(_d.instanceName, "HasItem", result).Observe(time.Since(_since).Seconds())
}()
return _d.base.HasItem(clientID, ID)
}

// HasServerDefinedUniqueTag implements Datastore
func (_d DatastoreWithPrometheus) HasServerDefinedUniqueTag(clientID string, tag string) (b1 bool, err error) {
_since := time.Now()
Expand Down
92 changes: 71 additions & 21 deletions datastore/sync_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@ import (
)

const (
maxBatchGetItemSize = 100 // Limited by AWS.
maxTransactDeleteItemSize = 10 // Limited by AWS.
clientTagItemPrefix = "Client#"
serverTagItemPrefix = "Server#"
conditionalCheckFailed = "ConditionalCheckFailed"
disabledChainID = "disabled_chain"
reasonDeleted = "deleted"
maxBatchGetItemSize = 100 // Limited by AWS.
maxTransactDeleteItemSize = 10 // Limited by AWS.
clientTagItemPrefix = "Client#"
serverTagItemPrefix = "Server#"
conditionalCheckFailed = "ConditionalCheckFailed"
disabledChainID = "disabled_chain"
reasonDeleted = "deleted"
historyTypeID int = 963985
historyDeleteDirectiveTypeID int = 150251
// Expiration time for history and history delete directive
// entities in seconds
HistoryExpirationIntervalSecs = 60 * 60 * 24 * 90 // 90 days
)

// SyncEntity is used to marshal and unmarshal sync items in dynamoDB.
Expand All @@ -50,6 +55,7 @@ type SyncEntity struct {
ClientDefinedUniqueTag *string `dynamodbav:",omitempty"`
UniquePosition []byte `dynamodbav:",omitempty"`
DataTypeMtime *string
ExpirationTime *int64
}

// SyncEntityByClientIDID implements sort.Interface for []SyncEntity based on
Expand Down Expand Up @@ -159,7 +165,9 @@ func (dynamo *Dynamo) InsertSyncEntity(entity *SyncEntity) (bool, error) {
return false, fmt.Errorf("error building expression to insert sync entity: %w", err)
}

if entity.ClientDefinedUniqueTag != nil {
// Write tag item for all data types, except for
// the history type, which does not use tag items.
if entity.ClientDefinedUniqueTag != nil && *entity.DataType != historyTypeID {
DJAndries marked this conversation as resolved.
Show resolved Hide resolved
items := []*dynamodb.TransactWriteItem{}
// Additional item for ensuring tag's uniqueness for a specific client.
item := NewServerClientUniqueTagItem(entity.ClientID, *entity.ClientDefinedUniqueTag, false)
Expand Down Expand Up @@ -253,6 +261,28 @@ func (dynamo *Dynamo) HasServerDefinedUniqueTag(clientID string, tag string) (bo
return out.Item != nil, nil
}

func (dynamo *Dynamo) HasItem(clientID string, ID string) (bool, error) {
primaryKey := PrimaryKey{ClientID: clientID, ID: ID}
key, err := dynamodbattribute.MarshalMap(primaryKey)

if err != nil {
return false, fmt.Errorf("error marshalling key to check if item existed: %w", err)
}

input := &dynamodb.GetItemInput{
Key: key,
ProjectionExpression: aws.String(projPk),
TableName: aws.String(Table),
}

out, err := dynamo.GetItem(input)
if err != nil {
return false, fmt.Errorf("error calling GetItem to check if item existed: %w", err)
}

return out.Item != nil, nil
}

// InsertSyncEntitiesWithServerTags is used to insert sync entities with
// server-defined unique tags. To ensure the uniqueness, for each sync entity,
// we will write a tag item and a sync item. Items for all the entities in the
Expand Down Expand Up @@ -470,10 +500,12 @@ func (dynamo *Dynamo) UpdateSyncEntity(entity *SyncEntity, oldVersion int64) (bo
return false, false, fmt.Errorf("error marshalling key to update sync entity: %w", err)
}

// condition to ensure to be update only and the version is matched.
cond := expression.And(
expression.AttributeExists(expression.Name(pk)),
expression.Name("Version").Equal(expression.Value(oldVersion)))
// condition to ensure the request is update only...
cond := expression.AttributeExists(expression.Name(pk))
// ...and the version matches, if applicable
if *entity.DataType != historyTypeID {
cond = expression.And(cond, expression.Name("Version").Equal(expression.Value(oldVersion)))
}

update := expression.Set(expression.Name("Version"), expression.Value(entity.Version))
update = update.Set(expression.Name("Mtime"), expression.Value(entity.Mtime))
Expand Down Expand Up @@ -507,7 +539,7 @@ func (dynamo *Dynamo) UpdateSyncEntity(entity *SyncEntity, oldVersion int64) (bo

// Soft-delete a sync item with a client tag, use a transaction to delete its
// tag item too.
if entity.Deleted != nil && entity.ClientDefinedUniqueTag != nil && *entity.Deleted {
if entity.Deleted != nil && entity.ClientDefinedUniqueTag != nil && *entity.Deleted && *entity.DataType != historyTypeID {
pk := PrimaryKey{
ClientID: entity.ClientID, ID: clientTagItemPrefix + *entity.ClientDefinedUniqueTag}
tagItemKey, err := dynamodbattribute.MarshalMap(pk)
Expand Down Expand Up @@ -613,12 +645,16 @@ func (dynamo *Dynamo) GetUpdatesForType(dataType int, clientToken int64, fetchFo
expression.Value(dataTypeMtimeLowerBound),
expression.Value(dataTypeMtimeUpperBound))
keyCond := expression.KeyAnd(pkCond, skCond)
exprs := expression.NewBuilder().WithKeyCondition(keyCond)

expirationTimeName := expression.Name("ExpirationTime")
filterCond := expression.Or(expression.AttributeNotExists(expirationTimeName),
expression.Equal(expirationTimeName, expression.Value(nil)),
expression.GreaterThan(expirationTimeName, expression.Value(time.Now().Unix())))
if !fetchFolders { // Filter folder entities out if fetchFolder is false.
exprs = exprs.WithFilter(
expression.Equal(expression.Name("Folder"), expression.Value(false)))
filterCond = expression.And(filterCond, expression.Equal(expression.Name("Folder"), expression.Value(false)))
}
expr, err := exprs.Build()

expr, err := expression.NewBuilder().WithKeyCondition(keyCond).WithFilter(filterCond).Build()
if err != nil {
return false, syncEntities, fmt.Errorf("error building expression to get updates: %w", err)
}
Expand Down Expand Up @@ -745,16 +781,29 @@ func CreateDBSyncEntity(entity *sync_pb.SyncEntity, cacheGUID *string, clientID
originatorClientItemID = entity.IdString
}

now := aws.Int64(utils.UnixMilli(time.Now()))
// The client tag hash must be used as the primary key
// for the history type.
if dataType == historyTypeID {
DJAndries marked this conversation as resolved.
Show resolved Hide resolved
id = *entity.ClientTagHash
}

now := time.Now()

var expirationTime *int64
if dataType == historyTypeID || dataType == historyDeleteDirectiveTypeID {
expirationTime = aws.Int64(now.Unix() + HistoryExpirationIntervalSecs)
}

nowMillis := aws.Int64(now.UnixMilli())
// ctime is only used when inserting a new entity, here we use client passed
// ctime if it is passed, otherwise, use current server time as the creation
// time. When updating, ctime will be ignored later in the query statement.
cTime := now
cTime := nowMillis
if entity.Ctime != nil {
cTime = entity.Ctime
}

dataTypeMtime := strconv.Itoa(dataType) + "#" + strconv.FormatInt(*now, 10)
dataTypeMtime := strconv.Itoa(dataType) + "#" + strconv.FormatInt(*nowMillis, 10)

// Set default values on Deleted and Folder attributes for new entities, the
// default values are specified by sync.proto protocol.
Expand All @@ -775,7 +824,7 @@ func CreateDBSyncEntity(entity *sync_pb.SyncEntity, cacheGUID *string, clientID
ParentID: entity.ParentIdString,
Version: entity.Version,
Ctime: cTime,
Mtime: now,
Mtime: nowMillis,
Name: entity.Name,
NonUniqueName: entity.NonUniqueName,
ServerDefinedUniqueTag: entity.ServerDefinedUniqueTag,
Expand All @@ -788,6 +837,7 @@ func CreateDBSyncEntity(entity *sync_pb.SyncEntity, cacheGUID *string, clientID
UniquePosition: uniquePosition,
DataType: aws.Int(dataType),
DataTypeMtime: aws.String(dataTypeMtime),
ExpirationTime: expirationTime,
}, nil
}

Expand Down
Loading