Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Cassandra visibility TTL with explicit DELETE #2387

Merged
merged 7 commits into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
600 changes: 334 additions & 266 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions common/persistence/serialization/task_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,7 @@ func (s *TaskSerializer) VisibilityDeleteTaskToProto(
Version: deleteVisibilityTask.Version,
TaskId: deleteVisibilityTask.TaskID,
VisibilityTime: &deleteVisibilityTask.VisibilityTimestamp,
CloseTime: deleteVisibilityTask.CloseTime,
}
}

Expand All @@ -977,6 +978,7 @@ func (s *TaskSerializer) visibilityDeleteTaskFromProto(
VisibilityTimestamp: *deleteVisibilityTask.VisibilityTime,
TaskID: deleteVisibilityTask.TaskId,
Version: deleteVisibilityTask.Version,
CloseTime: deleteVisibilityTask.CloseTime,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ type (
*VisibilityRequestBase
CloseTime time.Time
HistoryLength int64
Retention *time.Duration // not persisted, used for cassandra ttl
}

// UpsertWorkflowExecutionRequest is used to upsert workflow execution
Expand Down Expand Up @@ -168,6 +167,7 @@ type (
RunID string
WorkflowID string
TaskID int64
CloseTime time.Time
}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ import (
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
workflowpb "go.temporal.io/api/workflow/v1"

"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/cassandra"
persistencetests "go.temporal.io/server/common/persistence/persistence-tests"
"go.temporal.io/server/common/persistence/visibility"
"go.temporal.io/server/common/persistence/visibility/manager"
Expand Down Expand Up @@ -196,72 +196,6 @@ func (s *VisibilityPersistenceSuite) TestBasicVisibilityShortWorkflow() {
s.assertClosedExecutionEquals(closedRecord, resp.Executions[0])
}

func (s *VisibilityPersistenceSuite) TestVisibilityRetention() {
if _, ok := s.DefaultTestCluster.(*cassandra.TestCluster); !ok {
return
}

testNamespaceUUID := namespace.ID(uuid.New())

workflowExecution := commonpb.WorkflowExecution{
WorkflowId: "visibility-workflow-test-visibility-retention",
RunId: "3c095198-0c33-4136-939a-c29fbbb6a802",
}

startTime := time.Now().UTC().Add(-1 * time.Hour)
err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(&manager.RecordWorkflowExecutionStartedRequest{
VisibilityRequestBase: &manager.VisibilityRequestBase{
NamespaceID: testNamespaceUUID,
Execution: workflowExecution,
WorkflowTypeName: "visibility-workflow",
StartTime: startTime,
},
})
s.NoError(err0)

retention := 1 * time.Second
err2 := s.VisibilityMgr.RecordWorkflowExecutionClosed(&manager.RecordWorkflowExecutionClosedRequest{
VisibilityRequestBase: &manager.VisibilityRequestBase{
NamespaceID: testNamespaceUUID,
Execution: workflowExecution,
WorkflowTypeName: "visibility-workflow",
StartTime: startTime,
},
CloseTime: startTime.Add(1 * time.Minute),
Retention: &retention,
})
s.NoError(err2)

resp, err3 := s.VisibilityMgr.ListOpenWorkflowExecutions(&manager.ListWorkflowExecutionsRequest{
NamespaceID: testNamespaceUUID,
PageSize: 1,
EarliestStartTime: startTime,
LatestStartTime: startTime,
})
s.NoError(err3)
s.Equal(0, len(resp.Executions))

resp, err4 := s.VisibilityMgr.ListClosedWorkflowExecutions(&manager.ListWorkflowExecutionsRequest{
NamespaceID: testNamespaceUUID,
PageSize: 1,
EarliestStartTime: startTime.Add(1 * time.Minute), // This is actually close_time
LatestStartTime: startTime.Add(1 * time.Minute),
})
s.NoError(err4)
s.Equal(1, len(resp.Executions))

// Sleep for retention to fire.
time.Sleep(retention)
resp2, err5 := s.VisibilityMgr.ListClosedWorkflowExecutions(&manager.ListWorkflowExecutionsRequest{
NamespaceID: testNamespaceUUID,
PageSize: 1,
EarliestStartTime: startTime.Add(1 * time.Minute), // This is actually close_time
LatestStartTime: startTime.Add(1 * time.Minute),
})
s.NoError(err5)
s.Equal(0, len(resp2.Executions))
}

// TestVisibilityPagination test
func (s *VisibilityPersistenceSuite) TestVisibilityPagination() {
testNamespaceUUID := namespace.ID(uuid.New())
Expand Down Expand Up @@ -572,13 +506,10 @@ func (s *VisibilityPersistenceSuite) TestFilteringByStatus() {

// TestDelete test
func (s *VisibilityPersistenceSuite) TestDelete() {
if s.VisibilityMgr.GetName() == "cassandra" {
// This test is not applicable for cassandra.
return
}
nRows := 5
testNamespaceUUID := namespace.ID(uuid.New())
startTime := time.Now().UTC().Add(time.Second * -5)
closeTime := time.Now().UTC()
startTime := closeTime.Add(-5 * time.Second)
for i := 0; i < nRows; i++ {
workflowExecution := commonpb.WorkflowExecution{
WorkflowId: uuid.New(),
Expand All @@ -601,7 +532,7 @@ func (s *VisibilityPersistenceSuite) TestDelete() {
StartTime: startTime,
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
},
CloseTime: time.Now(),
CloseTime: closeTime,
HistoryLength: 3,
}
err1 := s.VisibilityMgr.RecordWorkflowExecutionClosed(closeReq)
Expand All @@ -611,7 +542,7 @@ func (s *VisibilityPersistenceSuite) TestDelete() {
resp, err3 := s.VisibilityMgr.ListClosedWorkflowExecutions(&manager.ListWorkflowExecutionsRequest{
NamespaceID: testNamespaceUUID,
EarliestStartTime: startTime,
LatestStartTime: time.Now(),
LatestStartTime: closeTime,
PageSize: 10,
})
s.Nil(err3)
Expand All @@ -621,14 +552,16 @@ func (s *VisibilityPersistenceSuite) TestDelete() {
for _, row := range resp.Executions {
err4 := s.VisibilityMgr.DeleteWorkflowExecution(&manager.VisibilityDeleteWorkflowExecutionRequest{
NamespaceID: testNamespaceUUID,
WorkflowID: row.GetExecution().GetWorkflowId(),
RunID: row.GetExecution().GetRunId(),
CloseTime: closeTime,
})
s.Nil(err4)
remaining--
resp, err5 := s.VisibilityMgr.ListClosedWorkflowExecutions(&manager.ListWorkflowExecutionsRequest{
NamespaceID: testNamespaceUUID,
EarliestStartTime: startTime,
LatestStartTime: time.Now(),
LatestStartTime: closeTime,
PageSize: 10,
})
s.Nil(err5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

enumspb "go.temporal.io/api/enums/v1"

"go.temporal.io/server/common/config"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand All @@ -42,9 +43,6 @@ import (
const (
namespacePartition = 0
cassandraPersistenceName = "cassandra"

// ref: https://docs.datastax.com/en/dse-trblshoot/doc/troubleshooting/recoveringTtlYear2038Problem.html
maxCassandraTTL = int64(315360000) // Cassandra max support time is 2038-01-19T03:14:06+00:00. Updated this to 10 years to support until year 2028
)

const (
Expand All @@ -58,14 +56,16 @@ const (
`AND start_time = ? ` +
`AND run_id = ?`

templateCreateWorkflowExecutionClosedWithTTL = `INSERT INTO closed_executions (` +
`namespace_id, namespace_partition, workflow_id, run_id, start_time, execution_time, close_time, workflow_type_name, status, history_length, memo, encoding, task_queue) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`

templateCreateWorkflowExecutionClosed = `INSERT INTO closed_executions (` +
`namespace_id, namespace_partition, workflow_id, run_id, start_time, execution_time, close_time, workflow_type_name, status, history_length, memo, encoding, task_queue) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`

templateDeleteWorkflowExecutionClosed = `DELETE FROM closed_executions ` +
`WHERE namespace_id = ? ` +
`AND namespace_partition = ? ` +
`AND close_time = ? ` +
`AND run_id = ?`

templateGetOpenWorkflowExecutions = `SELECT workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, task_queue ` +
`FROM open_executions ` +
`WHERE namespace_id = ? ` +
Expand Down Expand Up @@ -190,49 +190,21 @@ func (v *visibilityStore) RecordWorkflowExecutionClosed(request *store.InternalR
)

// Next, add a row in the closed table.

// Find how long to keep the row
var retentionSeconds int64
if request.Retention != nil {
retentionSeconds = int64(request.Retention.Seconds())
} else {
retentionSeconds = maxCassandraTTL + 1
}

if retentionSeconds > maxCassandraTTL {
batch.Query(templateCreateWorkflowExecutionClosed,
request.NamespaceID,
namespacePartition,
request.WorkflowID,
request.RunID,
persistence.UnixMilliseconds(request.StartTime),
persistence.UnixMilliseconds(request.ExecutionTime),
persistence.UnixMilliseconds(request.CloseTime),
request.WorkflowTypeName,
request.Status,
request.HistoryLength,
request.Memo.Data,
request.Memo.EncodingType.String(),
request.TaskQueue,
)
} else {
batch.Query(templateCreateWorkflowExecutionClosedWithTTL,
request.NamespaceID,
namespacePartition,
request.WorkflowID,
request.RunID,
persistence.UnixMilliseconds(request.StartTime),
persistence.UnixMilliseconds(request.ExecutionTime),
persistence.UnixMilliseconds(request.CloseTime),
request.WorkflowTypeName,
request.Status,
request.HistoryLength,
request.Memo.Data,
request.Memo.EncodingType.String(),
request.TaskQueue,
retentionSeconds,
)
}
batch.Query(templateCreateWorkflowExecutionClosed,
request.NamespaceID,
namespacePartition,
request.WorkflowID,
request.RunID,
persistence.UnixMilliseconds(request.StartTime),
persistence.UnixMilliseconds(request.ExecutionTime),
persistence.UnixMilliseconds(request.CloseTime),
request.WorkflowTypeName,
request.Status,
request.HistoryLength,
request.Memo.Data,
request.Memo.EncodingType.String(),
request.TaskQueue,
)

// RecordWorkflowExecutionStarted is using StartTime as the timestamp for every query in `open_executions` table.
// Due to the fact that cross DC using mutable state creation time as workflow start time and visibility using event time
Expand Down Expand Up @@ -460,8 +432,16 @@ func (v *visibilityStore) ListClosedWorkflowExecutionsByStatus(
return response, nil
}

// DeleteWorkflowExecution is a no-op since deletes are auto-handled by cassandra TTLs
func (v *visibilityStore) DeleteWorkflowExecution(_ *manager.VisibilityDeleteWorkflowExecutionRequest) error {
func (v *visibilityStore) DeleteWorkflowExecution(request *manager.VisibilityDeleteWorkflowExecutionRequest) error {
query := v.session.Query(templateDeleteWorkflowExecutionClosed,
request.NamespaceID.String(),
namespacePartition,
persistence.UnixMilliseconds(request.CloseTime),
request.RunID).
Consistency(v.lowConslevel)
if err := query.Exec(); err != nil {
return gocql.ConvertError("DeleteWorkflowExecution", err)
}
return nil
}

Expand Down
1 change: 0 additions & 1 deletion common/persistence/visibility/store/visibility_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ type (
*InternalVisibilityRequestBase
CloseTime time.Time
HistoryLength int64
Retention *time.Duration
}

// InternalUpsertWorkflowExecutionRequest is request to UpsertWorkflowExecution
Expand Down
1 change: 0 additions & 1 deletion common/persistence/visibility/visibility_manager_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func (p *visibilityManagerImpl) RecordWorkflowExecutionClosed(request *manager.R
InternalVisibilityRequestBase: requestBase,
CloseTime: request.CloseTime,
HistoryLength: request.HistoryLength,
Retention: request.Retention,
}
return p.store.RecordWorkflowExecutionClosed(req)
}
Expand Down
Loading