Skip to content

Commit

Permalink
Update DB task manager (#2310)
Browse files Browse the repository at this point in the history
* Add necessary functionality to db task manager & UT
  • Loading branch information
wxing1292 authored Dec 20, 2021
1 parent c80744d commit e61e8fe
Show file tree
Hide file tree
Showing 10 changed files with 809 additions and 89 deletions.
230 changes: 162 additions & 68 deletions service/matching/db_task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

enumsspb "go.temporal.io/server/api/enums/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
Expand All @@ -53,21 +54,34 @@ const (
dbTaskUpdateQueueInterval = time.Minute
)

var (
errDBTaskManagerNotReady = serviceerror.NewUnavailable("dbTaskManager is not ready")
)

type (
taskQueueOwnershipProviderFn func() dbTaskQueueOwnership
taskReaderProviderFn func(ownership dbTaskQueueOwnership) dbTaskReader
taskWriterProviderFn func(ownership dbTaskQueueOwnership) dbTaskWriter

dbTaskManager struct {
status int32
taskQueueKey persistence.TaskQueueKey
store persistence.TaskManager
taskQueueOwnership *dbTaskQueueOwnershipImpl
taskReader *dbTaskWriter
taskWriter *dbTaskReader

dispatchTaskFn func(context.Context, *internalTask) error
finishTaskFn func(*persistencespb.AllocatedTaskInfo, error)
logger log.Logger

shutdownChan chan struct{}
dispatchChan chan struct{}
status int32
taskQueueKey persistence.TaskQueueKey
taskQueueKind enumspb.TaskQueueKind
taskIDRangeSize int64
taskQueueOwnershipProvider taskQueueOwnershipProviderFn
taskReaderProvider taskReaderProviderFn
taskWriterProvider taskWriterProviderFn
dispatchTaskFn func(context.Context, *internalTask) error
store persistence.TaskManager
logger log.Logger

dispatchChan chan struct{}
startupChan chan struct{}
shutdownChan chan struct{}

taskQueueOwnership dbTaskQueueOwnership
taskReader dbTaskReader
taskWriter dbTaskWriter
maxDeletedTaskIDInclusive int64 // in mem only
}
)
Expand All @@ -76,46 +90,52 @@ func newDBTaskManager(
taskQueueKey persistence.TaskQueueKey,
taskQueueKind enumspb.TaskQueueKind,
taskIDRangeSize int64,
dispatchTaskFn func(context.Context, *internalTask) error,
store persistence.TaskManager,
logger log.Logger,
dispatchTaskFn func(context.Context, *internalTask) error,
finishTaskFn func(*persistencespb.AllocatedTaskInfo, error),
) (*dbTaskManager, error) {
taskOwnership := newDBTaskQueueOwnership(
taskQueueKey,
taskQueueKind,
taskIDRangeSize,
store,
logger,
)
if err := taskOwnership.takeTaskQueueOwnership(); err != nil {
return nil, err
}

) *dbTaskManager {
return &dbTaskManager{
status: common.DaemonStatusInitialized,
taskQueueKey: taskQueueKey,
store: store,
taskQueueOwnership: taskOwnership,
taskReader: newDBTaskWriter(
taskQueueKey,
taskOwnership,
logger,
),
taskWriter: newDBTaskReader(
taskQueueKey,
store,
taskOwnership.getAckedTaskID(),
logger,
),
status: common.DaemonStatusInitialized,
taskQueueKey: taskQueueKey,
taskQueueKind: taskQueueKind,
taskIDRangeSize: taskIDRangeSize,
taskQueueOwnershipProvider: func() dbTaskQueueOwnership {
return newDBTaskQueueOwnership(
taskQueueKey,
taskQueueKind,
taskIDRangeSize,
store,
logger,
)
},
taskReaderProvider: func(taskQueueOwnership dbTaskQueueOwnership) dbTaskReader {
return newDBTaskReader(
taskQueueKey,
store,
taskQueueOwnership.getAckedTaskID(),
logger,
)
},
taskWriterProvider: func(taskQueueOwnership dbTaskQueueOwnership) dbTaskWriter {
return newDBTaskWriter(
taskQueueKey,
taskQueueOwnership,
logger,
)
},
dispatchTaskFn: dispatchTaskFn,
finishTaskFn: finishTaskFn,
store: store,
logger: logger,

shutdownChan: make(chan struct{}),
dispatchChan: make(chan struct{}, 1),
maxDeletedTaskIDInclusive: taskOwnership.getAckedTaskID(),
}, nil
dispatchChan: make(chan struct{}, 1),
startupChan: make(chan struct{}),
shutdownChan: make(chan struct{}),

taskQueueOwnership: nil,
taskWriter: nil,
taskReader: nil,
maxDeletedTaskIDInclusive: 0,
}
}

func (d *dbTaskManager) Start() {
Expand All @@ -127,9 +147,10 @@ func (d *dbTaskManager) Start() {
return
}

d.SignalDispatch()
go d.readerEventLoop()
d.signalDispatch()
go d.acquireLoop()
go d.writerEventLoop()
go d.readerEventLoop()
}

func (d *dbTaskManager) Stop() {
Expand All @@ -144,18 +165,30 @@ func (d *dbTaskManager) Stop() {
close(d.shutdownChan)
}

func (d *dbTaskManager) SignalDispatch() {
select {
case d.dispatchChan <- struct{}{}:
default: // channel already has an event, don't block
}
}

func (d *dbTaskManager) isStopped() bool {
return atomic.LoadInt32(&d.status) == common.DaemonStatusStopped
}

func (d *dbTaskManager) acquireLoop() {
defer close(d.startupChan)

AcquireLoop:
for !d.isStopped() {
err := d.acquireOwnership()
if err == nil {
break AcquireLoop
}
if !common.IsPersistenceTransientError(err) {
d.Stop()
break AcquireLoop
}
time.Sleep(2 * time.Second)
}
}

func (d *dbTaskManager) writerEventLoop() {
<-d.startupChan

updateQueueTicker := time.NewTicker(dbTaskUpdateQueueInterval)
defer updateQueueTicker.Stop()
// TODO we should impl a more efficient method to
Expand All @@ -178,16 +211,18 @@ func (d *dbTaskManager) writerEventLoop() {
case <-updateQueueTicker.C:
d.persistTaskQueue()
case <-flushTicker.C:
d.taskReader.flushTasks()
d.SignalDispatch()
case <-d.taskReader.notifyFlushChan():
d.taskReader.flushTasks()
d.SignalDispatch()
d.taskWriter.flushTasks()
d.signalDispatch()
case <-d.taskWriter.notifyFlushChan():
d.taskWriter.flushTasks()
d.signalDispatch()
}
}
}

func (d *dbTaskManager) readerEventLoop() {
<-d.startupChan

updateAckTicker := time.NewTicker(dbTaskUpdateAckInterval)
defer updateAckTicker.Stop()

Expand Down Expand Up @@ -215,19 +250,46 @@ func (d *dbTaskManager) readerEventLoop() {
}
}

func (d *dbTaskManager) bufferAndWriteTask(
func (d *dbTaskManager) acquireOwnership() error {
taskQueueOwnership := d.taskQueueOwnershipProvider()
if err := taskQueueOwnership.takeTaskQueueOwnership(); err != nil {
return err
}
d.taskReader = d.taskReaderProvider(taskQueueOwnership)
d.taskWriter = d.taskWriterProvider(taskQueueOwnership)
d.maxDeletedTaskIDInclusive = taskQueueOwnership.getAckedTaskID()
d.taskQueueOwnership = taskQueueOwnership
return nil
}

func (d *dbTaskManager) signalDispatch() {
select {
case d.dispatchChan <- struct{}{}:
default: // channel already has an event, don't block
}
}

func (d *dbTaskManager) BufferAndWriteTask(
task *persistencespb.TaskInfo,
) future.Future {
return d.taskReader.appendTask(task)
select {
case <-d.startupChan:
if d.isStopped() {
return future.NewReadyFuture(nil, errDBTaskManagerNotReady)
}
return d.taskWriter.appendTask(task)
default:
return future.NewReadyFuture(nil, errDBTaskManagerNotReady)
}
}

func (d *dbTaskManager) readAndDispatchTasks() {
iter := d.taskWriter.taskIterator(d.taskQueueOwnership.getLastAllocatedTaskID())
iter := d.taskReader.taskIterator(d.taskQueueOwnership.getLastAllocatedTaskID())
for iter.HasNext() {
item, err := iter.Next()
if err != nil {
d.logger.Error("dbTaskManager encountered error when fetching tasks", tag.Error(err))
d.SignalDispatch()
d.signalDispatch()
return
}

Expand All @@ -241,13 +303,13 @@ func (d *dbTaskManager) mustDispatch(
) {
for !d.isStopped() {
if taskqueue.IsTaskExpired(task) {
d.taskWriter.ackTask(task.TaskId)
d.taskReader.ackTask(task.TaskId)
return
}

err := d.dispatchTaskFn(context.Background(), newInternalTask(
task,
d.finishTaskFn,
d.finishTask,
enumsspb.TASK_SOURCE_DB_BACKLOG,
"",
false,
Expand All @@ -260,7 +322,7 @@ func (d *dbTaskManager) mustDispatch(
}

func (d *dbTaskManager) updateAckTaskID() {
ackedTaskID := d.taskWriter.moveAckedTaskID()
ackedTaskID := d.taskReader.moveAckedTaskID()
d.taskQueueOwnership.updateAckedTaskID(ackedTaskID)
}

Expand Down Expand Up @@ -289,3 +351,35 @@ func (d *dbTaskManager) persistTaskQueue() {
d.logger.Error("dbTaskManager encountered unknown error", tag.Error(err))
}
}

func (d *dbTaskManager) finishTask(
info *persistencespb.AllocatedTaskInfo,
err error,
) {
if err == nil {
d.taskReader.ackTask(info.TaskId)
return
}

// TODO @wxing1292 logic below is subject to discussion
// NOTE: logic below is legacy logic, which will move task with error
// to the end of the queue for later retry
//
// failed to start the task.
// We cannot just remove it from persistence because then it will be lost.
// We handle this by writing the task back to persistence with a higher taskID.
// This will allow subsequent tasks to make progress, and hopefully by the time this task is picked-up
// again the underlying reason for failing to start will be resolved.
// Note that RecordTaskStarted only fails after retrying for a long time, so a single task will not be
// re-written to persistence frequently.
_, err = d.BufferAndWriteTask(info.Data).Get(context.Background())
if err != nil {
d.logger.Error("dbTaskManager encountered error when moving task to end of task queue",
tag.Error(err),
tag.WorkflowTaskQueueName(d.taskQueueKey.TaskQueueName),
tag.WorkflowTaskQueueType(d.taskQueueKey.TaskQueueType))
d.Stop()
return
}
d.taskReader.ackTask(info.TaskId)
}
Loading

0 comments on commit e61e8fe

Please sign in to comment.