Skip to content

Commit

Permalink
Fix fail workflow task and generate new workflow task (#2335)
Browse files Browse the repository at this point in the history
* Fix fail workflow task and new workflow task
  • Loading branch information
yiminc authored Dec 30, 2021
1 parent 05f584c commit 733a052
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 5 deletions.
121 changes: 121 additions & 0 deletions host/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,115 @@ func (s *clientIntegrationSuite) Test_ActivityTimeouts() {
//s.printHistory(id, workflowRun.GetRunID())
}

func (s *clientIntegrationSuite) Test_UnhandledCommandAndNewTask() {
/*
Event sequence:
1st WorkflowTask runs a local activity.
While local activity is running, a signal is received by server.
After signal is received, local activity completed, and workflow drains signal chan (no signal yet) and complete workflow.
Server failed the complete request because there is unhandled signal.
Server rescheduled a new workflow task.
Workflow runs the local activity again and drain the signal chan (with one signal) and complete workflow.
Server complete workflow as requested.
*/

sigReadyToSendChan := make(chan struct{}, 1)
sigSendDoneChan := make(chan struct{})
localActivityFn := func(ctx context.Context) error {
// to unblock signal sending, so signal is send after first workflow task started.
select {
case sigReadyToSendChan <- struct{}{}:
default:
}

// this will block workflow task and cause the signal to become buffered event
select {
case <-sigSendDoneChan:
case <-ctx.Done():
}

return nil
}

var err1 error
var receivedSig string
workflowFn := func(ctx workflow.Context) error {
ctx1 := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
StartToCloseTimeout: 10 * time.Second,
RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1},
})
f1 := workflow.ExecuteLocalActivity(ctx1, localActivityFn)
err1 = f1.Get(ctx1, nil)
if err1 != nil {
return err1
}

sigCh := workflow.GetSignalChannel(ctx, "signal-name")

for {
var sigVal string
ok := sigCh.ReceiveAsync(&sigVal)
if !ok {
break
}
receivedSig = sigVal
}

return nil
}

s.worker.RegisterWorkflow(workflowFn)

id := "integration-test-unhandled-command-new-task"
workflowOptions := sdkclient.StartWorkflowOptions{
ID: id,
TaskQueue: s.taskQueue,
// Intentionally use same timeout for WorkflowTaskTimeout and WorkflowRunTimeout so if workflow task is not
// correctly dispatched, it would time out which would fail the workflow and cause test to fail.
WorkflowTaskTimeout: 10 * time.Second,
WorkflowRunTimeout: 10 * time.Second,
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
workflowRun, err := s.sdkClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn)
if err != nil {
s.Logger.Fatal("Start workflow failed with err", tag.Error(err))
}

s.NotNil(workflowRun)
s.True(workflowRun.GetRunID() != "")

// block until first workflow task started
select {
case <-sigReadyToSendChan:
case <-ctx.Done():
}

err = s.sdkClient.SignalWorkflow(ctx, id, workflowRun.GetRunID(), "signal-name", "signal-value")
s.NoError(err)

close(sigSendDoneChan)

err = workflowRun.Get(ctx, nil)
s.NoError(err) // if new workflow task is not correctly dispatched, it would cause timeout error here
s.Equal("signal-value", receivedSig)

// verify event sequence
expectedHistory := []enumspb.EventType{
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED,
enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED,
enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED, // due to unhandled signal
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED, // this is the buffered signal
enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED,
enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED,
enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED,
enumspb.EVENT_TYPE_MARKER_RECORDED,
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED,
}
s.assertHistory(id, workflowRun.GetRunID(), expectedHistory)
}

func (s *clientIntegrationSuite) Test_BufferedQuery() {
localActivityFn := func(ctx context.Context) error {
time.Sleep(5 * time.Second) // use local activity sleep to block workflow task to force query to be buffered
Expand Down Expand Up @@ -684,3 +793,15 @@ func (s *clientIntegrationSuite) printHistory(workflowID string, runID string) {
}
common.PrettyPrintHistory(history, s.Logger)
}

func (s *clientIntegrationSuite) assertHistory(wid, rid string, expected []enumspb.EventType) {
iter := s.sdkClient.GetWorkflowHistory(context.Background(), wid, rid, false, 0)
var events []enumspb.EventType
for iter.HasNext() {
event, err := iter.Next()
s.NoError(err)
events = append(events, event.GetEventType())
}

s.Equal(expected, events)
}
9 changes: 4 additions & 5 deletions service/history/workflowTaskHandlerCallbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,25 +496,24 @@ Update_History_Loop:
createNewWorkflowTask := msBuilder.IsWorkflowExecutionRunning() && (hasUnhandledEvents || request.GetForceCreateNewWorkflowTask() || activityNotStartedCancelled)
var newWorkflowTaskScheduledID int64
if createNewWorkflowTask {
bypassTaskGeneration := request.GetReturnNewWorkflowTask() && wtFailedCause == nil
var newWorkflowTask *workflow.WorkflowTaskInfo
var err error
if workflowTaskHeartbeating && !workflowTaskHeartbeatTimeout {
newWorkflowTask, err = msBuilder.AddWorkflowTaskScheduledEventAsHeartbeat(
request.GetReturnNewWorkflowTask(),
bypassTaskGeneration,
currentWorkflowTask.OriginalScheduledTime,
)
} else {
newWorkflowTask, err = msBuilder.AddWorkflowTaskScheduledEvent(
request.GetReturnNewWorkflowTask(),
)
newWorkflowTask, err = msBuilder.AddWorkflowTaskScheduledEvent(bypassTaskGeneration)
}
if err != nil {
return nil, err
}

newWorkflowTaskScheduledID = newWorkflowTask.ScheduleID
// skip transfer task for workflow task if request asking to return new workflow task
if request.GetReturnNewWorkflowTask() {
if bypassTaskGeneration {
// start the new workflow task if request asked to do so
// TODO: replace the poll request
_, _, err := msBuilder.AddWorkflowTaskStartedEvent(
Expand Down

0 comments on commit 733a052

Please sign in to comment.