Skip to content

Commit

Permalink
Fix inline history archival (#3216)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Aug 11, 2022
1 parent db5f14f commit 6fe8f3e
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 62 deletions.
4 changes: 0 additions & 4 deletions service/history/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"context"
"net"

"go.temporal.io/server/api/historyservice/v1"

"go.uber.org/fx"
"google.golang.org/grpc"

Expand Down Expand Up @@ -272,7 +270,6 @@ func ArchivalClientProvider(
logger log.Logger,
metricsClient metrics.Client,
config *configs.Config,
historyClient historyservice.HistoryServiceClient,
) warchiver.Client {
return warchiver.NewClient(
metricsClient,
Expand All @@ -281,7 +278,6 @@ func ArchivalClientProvider(
config.NumArchiveSystemWorkflows,
config.ArchiveRequestRPS,
archiverProvider,
historyClient,
)
}

Expand Down
37 changes: 24 additions & 13 deletions service/history/workflow/delete_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,17 +214,6 @@ func (m *DeleteManagerImpl) deleteWorkflowExecutionInternal(
return err
}

if archiveIfEnabled {
isArchived, err := m.archiveWorkflowIfEnabled(ctx, namespaceID, we, currentBranchToken, weCtx, ms, scope)
if err != nil {
return err
}
if isArchived {
// Don't delete workflow data. The workflow data will be deleted after history archived.
return nil
}
}

// These two fields are needed for cassandra standard visibility.
// TODO (alex): Remove them when cassandra standard visibility is removed.
var startTime *time.Time
Expand All @@ -241,6 +230,26 @@ func (m *DeleteManagerImpl) deleteWorkflowExecutionInternal(
}
}

// NOTE: old versions (before server version 1.17.3) of archival workflow will delete workflow history directly
// after archiving history. But getting workflow close time requires workflow close event (for workflows closed by
// server version before 1.17), so this step needs to be done after getting workflow close time.
if archiveIfEnabled {
deletionPromised, err := m.archiveWorkflowIfEnabled(ctx, namespaceID, we, currentBranchToken, weCtx, ms, scope)
if err != nil {
return err
}
if deletionPromised {
// Don't delete workflow data. The workflow data will be deleted after history archived.
// if we proceed to delete mutable state, then history scavanger may kick in and
// delete history before history archival is done.

// HOWEVER, when rolling out this change, we don't know if worker is running an old version of the
// archival workflow (before 1.17.3), which will only delete workflow history. To prevent this from
// happening, worker role must be deployed first.
return nil
}
}

if err := m.shard.DeleteWorkflowExecution(
ctx,
definition.WorkflowKey{
Expand Down Expand Up @@ -270,7 +279,7 @@ func (m *DeleteManagerImpl) archiveWorkflowIfEnabled(
weCtx Context,
ms MutableState,
scope metrics.Scope,
) (isArchived bool, err error) {
) (deletionPromised bool, err error) {

namespaceRegistryEntry := ms.GetNamespaceEntry()

Expand Down Expand Up @@ -329,5 +338,7 @@ func (m *DeleteManagerImpl) archiveWorkflowIfEnabled(
scope.IncCounter(metrics.WorkflowCleanupArchiveCount)
}

return true, nil
// inline archival don't perform deletion
// only archival through archival workflow will
return !resp.HistoryArchivedInline, nil
}
20 changes: 5 additions & 15 deletions service/history/workflow/delete_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,9 @@ func (s *deleteManagerWorkflowSuite) TestDeleteWorkflowExecutionRetention_Archiv
mockMutableState := NewMockMutableState(s.controller)

mockMutableState.EXPECT().GetCurrentBranchToken().Return([]byte{22, 8, 78}, nil)
mockMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{State: enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED}).Times(0)
mockMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{State: enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED})
closeTime := time.Date(1978, 8, 22, 1, 2, 3, 4, time.UTC)
mockMutableState.EXPECT().GetWorkflowCloseTime(gomock.Any()).Return(&closeTime, nil).Times(0)
mockMutableState.EXPECT().GetWorkflowCloseTime(gomock.Any()).Return(&closeTime, nil)

// ====================== Archival mocks =======================================
mockMutableState.EXPECT().GetNamespaceEntry().Return(namespace.NewLocalNamespaceForTest(
Expand Down Expand Up @@ -408,19 +408,6 @@ func (s *deleteManagerWorkflowSuite) TestDeleteWorkflowExecutionRetention_Archiv
}, nil)
// =============================================================

s.mockShardContext.EXPECT().DeleteWorkflowExecution(
gomock.Any(),
definition.WorkflowKey{
NamespaceID: tests.NamespaceID.String(),
WorkflowID: tests.WorkflowID,
RunID: tests.RunID,
},
nil,
nil,
&closeTime,
).Return(nil).Times(0)
mockWeCtx.EXPECT().Clear().Times(0)

err := s.deleteManager.DeleteWorkflowExecutionByRetention(
context.Background(),
tests.NamespaceID,
Expand All @@ -441,6 +428,9 @@ func (s *deleteManagerWorkflowSuite) TestDeleteWorkflowExecutionRetention_Archiv
mockMutableState := NewMockMutableState(s.controller)

mockMutableState.EXPECT().GetCurrentBranchToken().Return([]byte{22, 8, 78}, nil)
mockMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{State: enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED})
closeTime := time.Date(1978, 8, 22, 1, 2, 3, 4, time.UTC)
mockMutableState.EXPECT().GetWorkflowCloseTime(gomock.Any()).Return(&closeTime, nil)

// ====================== Archival mocks =======================================
mockMutableState.EXPECT().GetNamespaceEntry().Return(namespace.NewLocalNamespaceForTest(
Expand Down
18 changes: 0 additions & 18 deletions service/worker/archiver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import (
"math/rand"
"time"

"go.temporal.io/server/api/historyservice/v1"

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
sdkclient "go.temporal.io/sdk/client"
Expand Down Expand Up @@ -106,7 +104,6 @@ type (
numWorkflows dynamicconfig.IntPropertyFn
rateLimiter quotas.RateLimiter
archiverProvider provider.ArchiverProvider
historyClient historyservice.HistoryServiceClient
}

// ArchivalTarget is either history or visibility
Expand Down Expand Up @@ -134,7 +131,6 @@ func NewClient(
numWorkflows dynamicconfig.IntPropertyFn,
requestRPS dynamicconfig.IntPropertyFn,
archiverProvider provider.ArchiverProvider,
historyClient historyservice.HistoryServiceClient,
) Client {
return &client{
metricsScope: metricsClient.Scope(metrics.ArchiverClientScope),
Expand All @@ -145,7 +141,6 @@ func NewClient(
func() float64 { return float64(requestRPS()) },
),
archiverProvider: archiverProvider,
historyClient: historyClient,
}
}

Expand Down Expand Up @@ -232,19 +227,6 @@ func (c *client) archiveHistoryInline(ctx context.Context, request *ClientReques
NextEventID: request.ArchiveRequest.NextEventID,
CloseFailoverVersion: request.ArchiveRequest.CloseFailoverVersion,
})
if err != nil {
return
}

_, err = c.historyClient.DeleteWorkflowExecution(ctx, &historyservice.DeleteWorkflowExecutionRequest{
NamespaceId: request.ArchiveRequest.NamespaceID,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: request.ArchiveRequest.WorkflowID,
RunId: request.ArchiveRequest.RunID,
},
WorkflowVersion: request.ArchiveRequest.CloseFailoverVersion,
ClosedWorkflowOnly: true,
})
}

func (c *client) archiveVisibilityInline(ctx context.Context, request *ClientRequest, logger log.Logger, errCh chan error) {
Expand Down
12 changes: 0 additions & 12 deletions service/worker/archiver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ import (
"fmt"
"testing"

"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/historyservicemock/v1"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand All @@ -59,7 +56,6 @@ type clientSuite struct {
metricsScope *metrics.MockScope
sdkClientFactory *sdk.MockClientFactory
sdkClient *mocksdk.MockClient
historyClient *historyservicemock.MockHistoryServiceClient
client *client
}

Expand All @@ -80,15 +76,13 @@ func (s *clientSuite) SetupTest() {
s.sdkClient = mocksdk.NewMockClient(s.controller)
s.sdkClientFactory = sdk.NewMockClientFactory(s.controller)
s.sdkClientFactory.EXPECT().GetSystemClient(gomock.Any()).Return(s.sdkClient).AnyTimes()
s.historyClient = historyservicemock.NewMockHistoryServiceClient(s.controller)
s.client = NewClient(
s.metricsClient,
log.NewNoopLogger(),
s.sdkClientFactory,
dynamicconfig.GetIntPropertyFn(1000),
dynamicconfig.GetIntPropertyFn(1000),
s.archiverProvider,
s.historyClient,
).(*client)
}

Expand Down Expand Up @@ -163,8 +157,6 @@ func (s *clientSuite) TestArchiveHistoryInlineSuccess() {
s.historyArchiver.EXPECT().Archive(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
s.metricsScope.EXPECT().IncCounter(metrics.ArchiverClientHistoryRequestCount)
s.metricsScope.EXPECT().IncCounter(metrics.ArchiverClientHistoryInlineArchiveAttemptCount)
s.historyClient.EXPECT().DeleteWorkflowExecution(gomock.Any(), gomock.Any()).
Return(&historyservice.DeleteWorkflowExecutionResponse{}, nil)
resp, err := s.client.Archive(context.Background(), &ClientRequest{
ArchiveRequest: &ArchiveRequest{
HistoryURI: "test:///history/archival",
Expand Down Expand Up @@ -252,8 +244,6 @@ func (s *clientSuite) TestArchiveInline_VisibilityFail_HistorySuccess() {
s.archiverProvider.EXPECT().GetHistoryArchiver(gomock.Any(), gomock.Any()).Return(s.historyArchiver, nil)
s.archiverProvider.EXPECT().GetVisibilityArchiver(gomock.Any(), gomock.Any()).Return(s.visibilityArchiver, nil)
s.historyArchiver.EXPECT().Archive(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
s.historyClient.EXPECT().DeleteWorkflowExecution(gomock.Any(), gomock.Any()).
Return(&historyservice.DeleteWorkflowExecutionResponse{}, nil)
s.visibilityArchiver.EXPECT().Archive(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("some random error"))
s.metricsScope.EXPECT().IncCounter(metrics.ArchiverClientHistoryRequestCount)
s.metricsScope.EXPECT().IncCounter(metrics.ArchiverClientHistoryInlineArchiveAttemptCount)
Expand Down Expand Up @@ -309,8 +299,6 @@ func (s *clientSuite) TestArchiveInline_VisibilitySuccess_HistorySuccess() {
s.archiverProvider.EXPECT().GetHistoryArchiver(gomock.Any(), gomock.Any()).Return(s.historyArchiver, nil)
s.archiverProvider.EXPECT().GetVisibilityArchiver(gomock.Any(), gomock.Any()).Return(s.visibilityArchiver, nil)
s.historyArchiver.EXPECT().Archive(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
s.historyClient.EXPECT().DeleteWorkflowExecution(gomock.Any(), gomock.Any()).
Return(&historyservice.DeleteWorkflowExecutionResponse{}, nil)
s.visibilityArchiver.EXPECT().Archive(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
s.metricsScope.EXPECT().IncCounter(metrics.ArchiverClientHistoryRequestCount)
s.metricsScope.EXPECT().IncCounter(metrics.ArchiverClientHistoryInlineArchiveAttemptCount)
Expand Down

0 comments on commit 6fe8f3e

Please sign in to comment.