From 6fe8f3eb74e2ae90c1c70ef27ddac444aaf85cc8 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Thu, 11 Aug 2022 16:38:07 -0700 Subject: [PATCH] Fix inline history archival (#3216) --- service/history/fx.go | 4 -- service/history/workflow/delete_manager.go | 37 ++++++++++++------- .../history/workflow/delete_manager_test.go | 20 +++------- service/worker/archiver/client.go | 18 --------- service/worker/archiver/client_test.go | 12 ------ 5 files changed, 29 insertions(+), 62 deletions(-) diff --git a/service/history/fx.go b/service/history/fx.go index c03b4cd969d..9ad533e2934 100644 --- a/service/history/fx.go +++ b/service/history/fx.go @@ -28,8 +28,6 @@ import ( "context" "net" - "go.temporal.io/server/api/historyservice/v1" - "go.uber.org/fx" "google.golang.org/grpc" @@ -272,7 +270,6 @@ func ArchivalClientProvider( logger log.Logger, metricsClient metrics.Client, config *configs.Config, - historyClient historyservice.HistoryServiceClient, ) warchiver.Client { return warchiver.NewClient( metricsClient, @@ -281,7 +278,6 @@ func ArchivalClientProvider( config.NumArchiveSystemWorkflows, config.ArchiveRequestRPS, archiverProvider, - historyClient, ) } diff --git a/service/history/workflow/delete_manager.go b/service/history/workflow/delete_manager.go index 69bde68fe35..7b051e10e44 100644 --- a/service/history/workflow/delete_manager.go +++ b/service/history/workflow/delete_manager.go @@ -214,17 +214,6 @@ func (m *DeleteManagerImpl) deleteWorkflowExecutionInternal( return err } - if archiveIfEnabled { - isArchived, err := m.archiveWorkflowIfEnabled(ctx, namespaceID, we, currentBranchToken, weCtx, ms, scope) - if err != nil { - return err - } - if isArchived { - // Don't delete workflow data. The workflow data will be deleted after history archived. - return nil - } - } - // These two fields are needed for cassandra standard visibility. // TODO (alex): Remove them when cassandra standard visibility is removed. var startTime *time.Time @@ -241,6 +230,26 @@ func (m *DeleteManagerImpl) deleteWorkflowExecutionInternal( } } + // NOTE: old versions (before server version 1.17.3) of archival workflow will delete workflow history directly + // after archiving history. But getting workflow close time requires workflow close event (for workflows closed by + // server version before 1.17), so this step needs to be done after getting workflow close time. + if archiveIfEnabled { + deletionPromised, err := m.archiveWorkflowIfEnabled(ctx, namespaceID, we, currentBranchToken, weCtx, ms, scope) + if err != nil { + return err + } + if deletionPromised { + // Don't delete workflow data. The workflow data will be deleted after history archived. + // if we proceed to delete mutable state, then history scavanger may kick in and + // delete history before history archival is done. + + // HOWEVER, when rolling out this change, we don't know if worker is running an old version of the + // archival workflow (before 1.17.3), which will only delete workflow history. To prevent this from + // happening, worker role must be deployed first. + return nil + } + } + if err := m.shard.DeleteWorkflowExecution( ctx, definition.WorkflowKey{ @@ -270,7 +279,7 @@ func (m *DeleteManagerImpl) archiveWorkflowIfEnabled( weCtx Context, ms MutableState, scope metrics.Scope, -) (isArchived bool, err error) { +) (deletionPromised bool, err error) { namespaceRegistryEntry := ms.GetNamespaceEntry() @@ -329,5 +338,7 @@ func (m *DeleteManagerImpl) archiveWorkflowIfEnabled( scope.IncCounter(metrics.WorkflowCleanupArchiveCount) } - return true, nil + // inline archival don't perform deletion + // only archival through archival workflow will + return !resp.HistoryArchivedInline, nil } diff --git a/service/history/workflow/delete_manager_test.go b/service/history/workflow/delete_manager_test.go index f26f6b04046..3b03c888009 100644 --- a/service/history/workflow/delete_manager_test.go +++ b/service/history/workflow/delete_manager_test.go @@ -374,9 +374,9 @@ func (s *deleteManagerWorkflowSuite) TestDeleteWorkflowExecutionRetention_Archiv mockMutableState := NewMockMutableState(s.controller) mockMutableState.EXPECT().GetCurrentBranchToken().Return([]byte{22, 8, 78}, nil) - mockMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{State: enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED}).Times(0) + mockMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{State: enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED}) closeTime := time.Date(1978, 8, 22, 1, 2, 3, 4, time.UTC) - mockMutableState.EXPECT().GetWorkflowCloseTime(gomock.Any()).Return(&closeTime, nil).Times(0) + mockMutableState.EXPECT().GetWorkflowCloseTime(gomock.Any()).Return(&closeTime, nil) // ====================== Archival mocks ======================================= mockMutableState.EXPECT().GetNamespaceEntry().Return(namespace.NewLocalNamespaceForTest( @@ -408,19 +408,6 @@ func (s *deleteManagerWorkflowSuite) TestDeleteWorkflowExecutionRetention_Archiv }, nil) // ============================================================= - s.mockShardContext.EXPECT().DeleteWorkflowExecution( - gomock.Any(), - definition.WorkflowKey{ - NamespaceID: tests.NamespaceID.String(), - WorkflowID: tests.WorkflowID, - RunID: tests.RunID, - }, - nil, - nil, - &closeTime, - ).Return(nil).Times(0) - mockWeCtx.EXPECT().Clear().Times(0) - err := s.deleteManager.DeleteWorkflowExecutionByRetention( context.Background(), tests.NamespaceID, @@ -441,6 +428,9 @@ func (s *deleteManagerWorkflowSuite) TestDeleteWorkflowExecutionRetention_Archiv mockMutableState := NewMockMutableState(s.controller) mockMutableState.EXPECT().GetCurrentBranchToken().Return([]byte{22, 8, 78}, nil) + mockMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{State: enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED}) + closeTime := time.Date(1978, 8, 22, 1, 2, 3, 4, time.UTC) + mockMutableState.EXPECT().GetWorkflowCloseTime(gomock.Any()).Return(&closeTime, nil) // ====================== Archival mocks ======================================= mockMutableState.EXPECT().GetNamespaceEntry().Return(namespace.NewLocalNamespaceForTest( diff --git a/service/worker/archiver/client.go b/service/worker/archiver/client.go index 137b88e3a53..a95401e3956 100644 --- a/service/worker/archiver/client.go +++ b/service/worker/archiver/client.go @@ -33,8 +33,6 @@ import ( "math/rand" "time" - "go.temporal.io/server/api/historyservice/v1" - commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" sdkclient "go.temporal.io/sdk/client" @@ -106,7 +104,6 @@ type ( numWorkflows dynamicconfig.IntPropertyFn rateLimiter quotas.RateLimiter archiverProvider provider.ArchiverProvider - historyClient historyservice.HistoryServiceClient } // ArchivalTarget is either history or visibility @@ -134,7 +131,6 @@ func NewClient( numWorkflows dynamicconfig.IntPropertyFn, requestRPS dynamicconfig.IntPropertyFn, archiverProvider provider.ArchiverProvider, - historyClient historyservice.HistoryServiceClient, ) Client { return &client{ metricsScope: metricsClient.Scope(metrics.ArchiverClientScope), @@ -145,7 +141,6 @@ func NewClient( func() float64 { return float64(requestRPS()) }, ), archiverProvider: archiverProvider, - historyClient: historyClient, } } @@ -232,19 +227,6 @@ func (c *client) archiveHistoryInline(ctx context.Context, request *ClientReques NextEventID: request.ArchiveRequest.NextEventID, CloseFailoverVersion: request.ArchiveRequest.CloseFailoverVersion, }) - if err != nil { - return - } - - _, err = c.historyClient.DeleteWorkflowExecution(ctx, &historyservice.DeleteWorkflowExecutionRequest{ - NamespaceId: request.ArchiveRequest.NamespaceID, - WorkflowExecution: &commonpb.WorkflowExecution{ - WorkflowId: request.ArchiveRequest.WorkflowID, - RunId: request.ArchiveRequest.RunID, - }, - WorkflowVersion: request.ArchiveRequest.CloseFailoverVersion, - ClosedWorkflowOnly: true, - }) } func (c *client) archiveVisibilityInline(ctx context.Context, request *ClientRequest, logger log.Logger, errCh chan error) { diff --git a/service/worker/archiver/client_test.go b/service/worker/archiver/client_test.go index a7a548c65ee..071eb9812b1 100644 --- a/service/worker/archiver/client_test.go +++ b/service/worker/archiver/client_test.go @@ -30,9 +30,6 @@ import ( "fmt" "testing" - "go.temporal.io/server/api/historyservice/v1" - "go.temporal.io/server/api/historyservicemock/v1" - "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -59,7 +56,6 @@ type clientSuite struct { metricsScope *metrics.MockScope sdkClientFactory *sdk.MockClientFactory sdkClient *mocksdk.MockClient - historyClient *historyservicemock.MockHistoryServiceClient client *client } @@ -80,7 +76,6 @@ func (s *clientSuite) SetupTest() { s.sdkClient = mocksdk.NewMockClient(s.controller) s.sdkClientFactory = sdk.NewMockClientFactory(s.controller) s.sdkClientFactory.EXPECT().GetSystemClient(gomock.Any()).Return(s.sdkClient).AnyTimes() - s.historyClient = historyservicemock.NewMockHistoryServiceClient(s.controller) s.client = NewClient( s.metricsClient, log.NewNoopLogger(), @@ -88,7 +83,6 @@ func (s *clientSuite) SetupTest() { dynamicconfig.GetIntPropertyFn(1000), dynamicconfig.GetIntPropertyFn(1000), s.archiverProvider, - s.historyClient, ).(*client) } @@ -163,8 +157,6 @@ func (s *clientSuite) TestArchiveHistoryInlineSuccess() { s.historyArchiver.EXPECT().Archive(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) s.metricsScope.EXPECT().IncCounter(metrics.ArchiverClientHistoryRequestCount) s.metricsScope.EXPECT().IncCounter(metrics.ArchiverClientHistoryInlineArchiveAttemptCount) - s.historyClient.EXPECT().DeleteWorkflowExecution(gomock.Any(), gomock.Any()). - Return(&historyservice.DeleteWorkflowExecutionResponse{}, nil) resp, err := s.client.Archive(context.Background(), &ClientRequest{ ArchiveRequest: &ArchiveRequest{ HistoryURI: "test:///history/archival", @@ -252,8 +244,6 @@ func (s *clientSuite) TestArchiveInline_VisibilityFail_HistorySuccess() { s.archiverProvider.EXPECT().GetHistoryArchiver(gomock.Any(), gomock.Any()).Return(s.historyArchiver, nil) s.archiverProvider.EXPECT().GetVisibilityArchiver(gomock.Any(), gomock.Any()).Return(s.visibilityArchiver, nil) s.historyArchiver.EXPECT().Archive(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) - s.historyClient.EXPECT().DeleteWorkflowExecution(gomock.Any(), gomock.Any()). - Return(&historyservice.DeleteWorkflowExecutionResponse{}, nil) s.visibilityArchiver.EXPECT().Archive(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("some random error")) s.metricsScope.EXPECT().IncCounter(metrics.ArchiverClientHistoryRequestCount) s.metricsScope.EXPECT().IncCounter(metrics.ArchiverClientHistoryInlineArchiveAttemptCount) @@ -309,8 +299,6 @@ func (s *clientSuite) TestArchiveInline_VisibilitySuccess_HistorySuccess() { s.archiverProvider.EXPECT().GetHistoryArchiver(gomock.Any(), gomock.Any()).Return(s.historyArchiver, nil) s.archiverProvider.EXPECT().GetVisibilityArchiver(gomock.Any(), gomock.Any()).Return(s.visibilityArchiver, nil) s.historyArchiver.EXPECT().Archive(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) - s.historyClient.EXPECT().DeleteWorkflowExecution(gomock.Any(), gomock.Any()). - Return(&historyservice.DeleteWorkflowExecutionResponse{}, nil) s.visibilityArchiver.EXPECT().Archive(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) s.metricsScope.EXPECT().IncCounter(metrics.ArchiverClientHistoryRequestCount) s.metricsScope.EXPECT().IncCounter(metrics.ArchiverClientHistoryInlineArchiveAttemptCount)