Skip to content

Commit

Permalink
Delegate branch token serialization/deserialization to store layer (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
norberthu authored Aug 16, 2022
1 parent 4873231 commit 53d11ae
Show file tree
Hide file tree
Showing 15 changed files with 386 additions and 43 deletions.
6 changes: 6 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,10 @@ const (
PersistenceAppendRawHistoryNodesScope
// PersistenceDeleteHistoryNodesScope tracks DeleteHistoryNodes calls made by service to persistence layer
PersistenceDeleteHistoryNodesScope
// PersistenceParseHistoryBranchInfoScope tracks NewHistoryBranch calls made by service to persistence layer
PersistenceParseHistoryBranchInfoScope
// PersistenceUpdateHistoryBranchInfoScope tracks NewHistoryBranch calls made by service to persistence layer
PersistenceUpdateHistoryBranchInfoScope
// PersistenceNewHistoryBranchScope tracks NewHistoryBranch calls made by service to persistence layer
PersistenceNewHistoryBranchScope
// PersistenceReadHistoryBranchScope tracks ReadHistoryBranch calls made by service to persistence layer
Expand Down Expand Up @@ -1408,6 +1412,8 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceAppendHistoryNodesScope: {operation: "AppendHistoryNodes"},
PersistenceAppendRawHistoryNodesScope: {operation: "AppendRawHistoryNodes"},
PersistenceDeleteHistoryNodesScope: {operation: "DeleteHistoryNodes"},
PersistenceParseHistoryBranchInfoScope: {operation: "ParseHistoryBranch"},
PersistenceUpdateHistoryBranchInfoScope: {operation: "UpdateHistoryBranch"},
PersistenceNewHistoryBranchScope: {operation: "NewHistoryBranch"},
PersistenceReadHistoryBranchScope: {operation: "ReadHistoryBranch"},
PersistenceReadHistoryBranchReverseScope: {operation: "ReadHistoryBranchReverse"},
Expand Down
30 changes: 30 additions & 0 deletions common/persistence/cassandra/history_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,36 @@ func (h *HistoryStore) DeleteHistoryNodes(
return nil
}

// ParseHistoryBranchInfo parses the history branch for branch information
func (h *HistoryStore) ParseHistoryBranchInfo(
ctx context.Context,
request *p.ParseHistoryBranchInfoRequest,
) (*p.ParseHistoryBranchInfoResponse, error) {

branchInfo, err := p.ParseHistoryBranchToken(request.BranchToken)
if err != nil {
return nil, err
}
return &p.ParseHistoryBranchInfoResponse{
BranchInfo: branchInfo,
}, nil
}

// UpdateHistoryBranchInfo updates the history branch with branch information
func (h *HistoryStore) UpdateHistoryBranchInfo(
ctx context.Context,
request *p.UpdateHistoryBranchInfoRequest,
) (*p.UpdateHistoryBranchInfoResponse, error) {

branchToken, err := p.UpdateHistoryBranchToken(request.BranchToken, request.BranchInfo)
if err != nil {
return nil, err
}
return &p.UpdateHistoryBranchInfoResponse{
BranchToken: branchToken,
}, nil
}

// NewHistoryBranch initializes a new history branch
func (h *HistoryStore) NewHistoryBranch(
ctx context.Context,
Expand Down
20 changes: 20 additions & 0 deletions common/persistence/client/fault_injection.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,26 @@ func (e *FaultInjectionExecutionStore) DeleteHistoryNodes(
return e.baseExecutionStore.DeleteHistoryNodes(ctx, request)
}

func (e *FaultInjectionExecutionStore) ParseHistoryBranchInfo(
ctx context.Context,
request *persistence.ParseHistoryBranchInfoRequest,
) (*persistence.ParseHistoryBranchInfoResponse, error) {
if err := e.ErrorGenerator.Generate(); err != nil {
return nil, err
}
return e.baseExecutionStore.ParseHistoryBranchInfo(ctx, request)
}

func (e *FaultInjectionExecutionStore) UpdateHistoryBranchInfo(
ctx context.Context,
request *persistence.UpdateHistoryBranchInfoRequest,
) (*persistence.UpdateHistoryBranchInfoResponse, error) {
if err := e.ErrorGenerator.Generate(); err != nil {
return nil, err
}
return e.baseExecutionStore.UpdateHistoryBranchInfo(ctx, request)
}

func (e *FaultInjectionExecutionStore) NewHistoryBranch(
ctx context.Context,
request *persistence.NewHistoryBranchRequest,
Expand Down
51 changes: 51 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,30 @@ type (
NodeID int64
}

ParseHistoryBranchInfoRequest struct {
// The branch token to parse the branch info from
BranchToken []byte
}

ParseHistoryBranchInfoResponse struct {
// The branch info parsed from the branch token
BranchInfo *persistencespb.HistoryBranch
}

UpdateHistoryBranchInfoRequest struct {
// The original branch token
BranchToken []byte
// The branch info to update with
BranchInfo *persistencespb.HistoryBranch
}

UpdateHistoryBranchInfoResponse struct {
// The newly updated branch token
BranchToken []byte
}

NewHistoryBranchRequest struct {
// The tree ID for the new branch token
TreeID string
}

Expand Down Expand Up @@ -1060,6 +1083,10 @@ type (
AppendHistoryNodes(ctx context.Context, request *AppendHistoryNodesRequest) (*AppendHistoryNodesResponse, error)
// AppendRawHistoryNodes add a node of raw histories to history node table
AppendRawHistoryNodes(ctx context.Context, request *AppendRawHistoryNodesRequest) (*AppendHistoryNodesResponse, error)
// ParseHistoryBranchInfo parses the history branch for branch information
ParseHistoryBranchInfo(ctx context.Context, request *ParseHistoryBranchInfoRequest) (*ParseHistoryBranchInfoResponse, error)
// UpdateHistoryBranchInfo updates the history branch with branch information
UpdateHistoryBranchInfo(ctx context.Context, request *UpdateHistoryBranchInfoRequest) (*UpdateHistoryBranchInfoResponse, error)
// NewHistoryBranch initializes a new history branch
NewHistoryBranch(ctx context.Context, request *NewHistoryBranchRequest) (*NewHistoryBranchResponse, error)
// ReadHistoryBranch returns history node data for a branch
Expand Down Expand Up @@ -1196,6 +1223,28 @@ func UnixMilliseconds(t time.Time) int64 {
return unixNano / int64(time.Millisecond)
}

func ParseHistoryBranchToken(branchToken []byte) (*persistencespb.HistoryBranch, error) {
// TODO: instead of always using the implementation from the serialization package, this should be injected
return serialization.HistoryBranchFromBlob(branchToken, enumspb.ENCODING_TYPE_PROTO3.String())
}

func UpdateHistoryBranchToken(branchToken []byte, branchInfo *persistencespb.HistoryBranch) ([]byte, error) {
bi, err := ParseHistoryBranchToken(branchToken)
if err != nil {
return nil, err
}
bi.TreeId = branchInfo.TreeId
bi.BranchId = branchInfo.BranchId
bi.Ancestors = branchInfo.Ancestors

// TODO: instead of always using the implementation from the serialization package, this should be injected
blob, err := serialization.HistoryBranchToBlob(bi)
if err != nil {
return nil, err
}
return blob.Data, nil
}

// NewHistoryBranchToken return a new branch token
func NewHistoryBranchToken(treeID string) ([]byte, error) {
branchID := primitives.NewUUID().String()
Expand All @@ -1204,6 +1253,7 @@ func NewHistoryBranchToken(treeID string) ([]byte, error) {
BranchId: branchID,
Ancestors: []*persistencespb.HistoryBranchRange{},
}
// TODO: instead of always using the implementation from the serialization package, this should be injected
datablob, err := serialization.HistoryBranchToBlob(bi)
if err != nil {
return nil, err
Expand All @@ -1219,6 +1269,7 @@ func NewHistoryBranchTokenByBranchID(treeID, branchID string) ([]byte, error) {
BranchId: branchID,
Ancestors: []*persistencespb.HistoryBranchRange{},
}
// TODO: instead of always using the implementation from the serialization package, this should be injected
datablob, err := serialization.HistoryBranchToBlob(bi)
if err != nil {
return nil, err
Expand Down
30 changes: 30 additions & 0 deletions common/persistence/dataInterfaces_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 10 additions & 8 deletions common/persistence/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (m *executionManagerImpl) CreateWorkflowExecution(
) (*CreateWorkflowExecutionResponse, error) {

newSnapshot := request.NewWorkflowSnapshot
newWorkflowNewEvents, newHistoryDiff, err := m.serializeWorkflowEventBatches(request.ShardID, request.NewWorkflowEvents)
newWorkflowNewEvents, newHistoryDiff, err := m.serializeWorkflowEventBatches(ctx, request.ShardID, request.NewWorkflowEvents)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -140,13 +140,13 @@ func (m *executionManagerImpl) UpdateWorkflowExecution(
updateMutation := request.UpdateWorkflowMutation
newSnapshot := request.NewWorkflowSnapshot

updateWorkflowNewEvents, updateWorkflowHistoryDiff, err := m.serializeWorkflowEventBatches(request.ShardID, request.UpdateWorkflowEvents)
updateWorkflowNewEvents, updateWorkflowHistoryDiff, err := m.serializeWorkflowEventBatches(ctx, request.ShardID, request.UpdateWorkflowEvents)
if err != nil {
return nil, err
}
updateMutation.ExecutionInfo.ExecutionStats.HistorySize += int64(updateWorkflowHistoryDiff.SizeDiff)

newWorkflowNewEvents, newWorkflowHistoryDiff, err := m.serializeWorkflowEventBatches(request.ShardID, request.NewWorkflowEvents)
newWorkflowNewEvents, newWorkflowHistoryDiff, err := m.serializeWorkflowEventBatches(ctx, request.ShardID, request.NewWorkflowEvents)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -230,21 +230,21 @@ func (m *executionManagerImpl) ConflictResolveWorkflowExecution(
newSnapshot := request.NewWorkflowSnapshot
currentMutation := request.CurrentWorkflowMutation

resetWorkflowEventsNewEvents, resetWorkflowHistoryDiff, err := m.serializeWorkflowEventBatches(request.ShardID, request.ResetWorkflowEvents)
resetWorkflowEventsNewEvents, resetWorkflowHistoryDiff, err := m.serializeWorkflowEventBatches(ctx, request.ShardID, request.ResetWorkflowEvents)
if err != nil {
return nil, err
}
resetSnapshot.ExecutionInfo.ExecutionStats.HistorySize += int64(resetWorkflowHistoryDiff.SizeDiff)

newWorkflowEventsNewEvents, newWorkflowHistoryDiff, err := m.serializeWorkflowEventBatches(request.ShardID, request.NewWorkflowEvents)
newWorkflowEventsNewEvents, newWorkflowHistoryDiff, err := m.serializeWorkflowEventBatches(ctx, request.ShardID, request.NewWorkflowEvents)
if err != nil {
return nil, err
}
if newSnapshot != nil {
newSnapshot.ExecutionInfo.ExecutionStats.HistorySize += int64(newWorkflowHistoryDiff.SizeDiff)
}

currentWorkflowEventsNewEvents, currentWorkflowHistoryDiff, err := m.serializeWorkflowEventBatches(request.ShardID, request.CurrentWorkflowEvents)
currentWorkflowEventsNewEvents, currentWorkflowHistoryDiff, err := m.serializeWorkflowEventBatches(ctx, request.ShardID, request.CurrentWorkflowEvents)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -388,6 +388,7 @@ func (m *executionManagerImpl) SetWorkflowExecution(
}

func (m *executionManagerImpl) serializeWorkflowEventBatches(
ctx context.Context,
shardID int32,
eventBatches []*WorkflowEvents,
) ([]*InternalAppendHistoryNodesRequest, *HistoryStatistics, error) {
Expand All @@ -398,7 +399,7 @@ func (m *executionManagerImpl) serializeWorkflowEventBatches(

workflowNewEvents := make([]*InternalAppendHistoryNodesRequest, 0, len(eventBatches))
for _, workflowEvents := range eventBatches {
newEvents, err := m.serializeWorkflowEvents(shardID, workflowEvents)
newEvents, err := m.serializeWorkflowEvents(ctx, shardID, workflowEvents)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -432,6 +433,7 @@ func (m *executionManagerImpl) DeserializeBufferedEvents( // unexport
}

func (m *executionManagerImpl) serializeWorkflowEvents(
ctx context.Context,
shardID int32,
workflowEvents *WorkflowEvents,
) (*InternalAppendHistoryNodesRequest, error) {
Expand All @@ -452,7 +454,7 @@ func (m *executionManagerImpl) serializeWorkflowEvents(
request.Info = BuildHistoryGarbageCleanupInfo(workflowEvents.NamespaceID, workflowEvents.WorkflowID, workflowEvents.RunID)
}

return m.serializeAppendHistoryNodesRequest(request)
return m.serializeAppendHistoryNodesRequest(ctx, request)
}

func (m *executionManagerImpl) SerializeWorkflowMutation( // unexport
Expand Down
Loading

0 comments on commit 53d11ae

Please sign in to comment.