Skip to content

Commit

Permalink
Retry after sdk post-Start errors in per-namespace worker (#3385)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr authored Sep 15, 2022
1 parent 6494f88 commit 5742408
Showing 1 changed file with 156 additions and 86 deletions.
242 changes: 156 additions & 86 deletions service/worker/pernamespaceworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ package worker

import (
"context"
"errors"
"fmt"
"os"
"sync"
"sync/atomic"
"time"

enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
sdkclient "go.temporal.io/sdk/client"
sdkworker "go.temporal.io/sdk/worker"
"go.uber.org/fx"
Expand Down Expand Up @@ -89,16 +91,23 @@ type (
}

perNamespaceWorker struct {
wm *perNamespaceWorkerManager
wm *perNamespaceWorkerManager
logger log.Logger

lock sync.Mutex // protects below fields
ns *namespace.Namespace
retrier backoff.Retrier
retryTimer *time.Timer
componentSet string
client sdkclient.Client
worker sdkworker.Worker
}
)

var (
errNoWorkerNeeded = errors.New("no worker needed") // sentinel value, not a real error
)

func NewPerNamespaceWorkerManager(params perNamespaceWorkerManagerInitParams) *perNamespaceWorkerManager {
return &perNamespaceWorkerManager{
logger: log.With(params.Logger, tag.ComponentPerNSWorkerManager),
Expand Down Expand Up @@ -165,7 +174,7 @@ func (wm *perNamespaceWorkerManager) Stop() {
wm.lock.Unlock()

for _, worker := range workers {
worker.stopWorker()
worker.stopWorkerAndResetTimer()
}

wm.logger.Info("", tag.LifeCycleStopped)
Expand Down Expand Up @@ -194,8 +203,10 @@ func (wm *perNamespaceWorkerManager) getWorkerByNamespace(ns *namespace.Namespac
}

worker := &perNamespaceWorker{
wm: wm,
ns: ns,
wm: wm,
logger: log.With(wm.logger, tag.WorkflowNamespace(ns.Name().String())),
retrier: backoff.NewRetrier(backoff.NewExponentialRetryPolicy(wm.initialRetry), backoff.SystemClock),
ns: ns,
}

wm.workers[ns.ID()] = worker
Expand Down Expand Up @@ -228,105 +239,138 @@ func (wm *perNamespaceWorkerManager) getWorkerMultiplicity(ns *namespace.Namespa
return multiplicity, nil
}

// called on namespace state change callback
func (w *perNamespaceWorker) refreshWithNewNamespace(ns *namespace.Namespace, deleted bool) {
w.lock.Lock()
w.ns = ns
// namespace name can change, but don't update w.logger, otherwise we'd have to hold w.lock
// just to log.
isRetrying := w.retryTimer != nil
w.lock.Unlock()

if deleted {
w.stopWorker()
w.stopWorkerAndResetTimer()
// if namespace is fully deleted from db, we can remove from our map also
w.wm.removeWorker(ns)
return
}
w.lock.Lock()
w.ns = ns
w.lock.Unlock()
w.refresh(ns)

if !isRetrying {
w.refresh(ns)
}
}

// called on all namespaces on membership change in worker ring
func (w *perNamespaceWorker) refreshWithExistingNamespace() {
w.lock.Lock()
ns := w.ns
isRetrying := w.retryTimer != nil
w.lock.Unlock()
w.refresh(ns)

if !isRetrying {
w.refresh(ns)
}
}

// This is called after change to this namespace state _or_ any membership change in the server
// worker ring. It runs in its own goroutine (except for server shutdown), and multiple
// goroutines for the same namespace may be running at once. That's okay because they should
// eventually converge on the same state (running or not running, set of components) and exit.
func (w *perNamespaceWorker) refresh(ns *namespace.Namespace) {
op := func() error {
if !w.wm.Running() || ns.State() == enumspb.NAMESPACE_STATE_DELETED {
w.stopWorker()
return nil
}

// figure out which components are enabled at all for this namespace
var enabledComponents []workercommon.PerNSWorkerComponent
var componentSet string
for _, cmp := range w.wm.components {
options := cmp.DedicatedWorkerOptions(ns)
if options.Enabled {
enabledComponents = append(enabledComponents, cmp)
componentSet += fmt.Sprintf("%p,", cmp)
}
}
w.handleError(w.tryRefresh(ns))
}

if len(enabledComponents) == 0 {
// no components enabled, we don't need a worker
w.stopWorker()
return nil
}
// handleError should be called on errors from worker creation or run. it will attempt to
// refresh the worker again at a later time.
func (w *perNamespaceWorker) handleError(err error) {
if err == nil {
return
} else if err == errNoWorkerNeeded {
w.stopWorkerAndResetTimer()
return
}

// check if we are responsible for this namespace at all
multiplicity, err := w.wm.getWorkerMultiplicity(ns)
if err != nil {
w.wm.logger.Error("Failed to look up hosts", tag.WorkflowNamespace(ns.Name().String()), tag.Error(err))
// TODO: add metric also
return err
}
if multiplicity == 0 {
// not ours, don't need a worker
w.stopWorker()
return nil
}
// ensure this changes if multiplicity changes
componentSet += fmt.Sprintf("%d", multiplicity)
w.lock.Lock()
defer w.lock.Unlock()

// we do need a worker, but maybe we have one already
if w.retryTimer != nil {
// this shouldn't ever happen
w.logger.Error("bug: handleError found existing timer")
return
}
sleep := w.retrier.NextBackOff()
if sleep < 0 {
w.logger.Error("Failed to start sdk worker, out of retries", tag.Error(err))
return
}
w.logger.Warn("Failed to start sdk worker", tag.Error(err), tag.NewDurationTag("sleep", sleep))
w.retryTimer = time.AfterFunc(sleep, func() {
w.lock.Lock()
if componentSet == w.componentSet {
// no change in set of components enabled
w.lock.Unlock()
return nil
}
// set of components changed, need to recreate worker. first stop old one
w.stopWorkerLocked()
w.retryTimer = nil
ns := w.ns
w.lock.Unlock()
w.refresh(ns)
})
}

// create worker outside of lock
client, worker, err := w.startWorker(ns, enabledComponents, multiplicity)
if err != nil {
w.wm.logger.Error("Failed to start sdk worker", tag.WorkflowNamespace(ns.Name().String()), tag.Error(err))
// TODO: add metric also
return err
}
// Only call from refresh so that errors are handled properly. Returning an error from here
// means that we should retry creating/starting the worker. Returning noWorkerNeeded means any
// existing worker should be stopped.
func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error {
if !w.wm.Running() || ns.State() == enumspb.NAMESPACE_STATE_DELETED {
return errNoWorkerNeeded
}

w.lock.Lock()
defer w.lock.Unlock()
// maybe there was a race and someone else created a client already. stop ours
if !w.wm.Running() || w.client != nil || w.worker != nil {
worker.Stop()
client.Close()
return nil
// figure out which components are enabled at all for this namespace
var enabledComponents []workercommon.PerNSWorkerComponent
var componentSet string
for _, cmp := range w.wm.components {
options := cmp.DedicatedWorkerOptions(ns)
if options.Enabled {
enabledComponents = append(enabledComponents, cmp)
componentSet += fmt.Sprintf("%p,", cmp)
}
w.client = client
w.worker = worker
w.componentSet = componentSet
}

if len(enabledComponents) == 0 {
// no components enabled, we don't need a worker
return errNoWorkerNeeded
}

// check if we are responsible for this namespace at all
multiplicity, err := w.wm.getWorkerMultiplicity(ns)
if err != nil {
w.logger.Error("Failed to look up hosts", tag.Error(err))
// TODO: add metric also
return err
}
if multiplicity == 0 {
// not ours, don't need a worker
return errNoWorkerNeeded
}
// ensure this changes if multiplicity changes
componentSet += fmt.Sprintf("%d", multiplicity)

// we do need a worker, but maybe we have one already
w.lock.Lock()
defer w.lock.Unlock()

if componentSet == w.componentSet {
// no change in set of components enabled, leave existing running
return nil
}
policy := backoff.NewExponentialRetryPolicy(w.wm.initialRetry).
WithMaximumInterval(1 * time.Minute).
WithExpirationInterval(backoff.NoInterval)
backoff.ThrottleRetry(op, policy, nil)
// set of components changed, need to recreate worker. first stop old one
w.stopWorkerLocked()

// create new one. note that even before startWorker returns, the worker may have started
// and already called the fatal error handler. we need to set w.client+worker+componentSet
// before releasing the lock to keep our state consistent.
client, worker, err := w.startWorker(ns, enabledComponents, multiplicity)
if err != nil {
// TODO: add metric also
return err
}

w.client = client
w.worker = worker
w.componentSet = componentSet
return nil
}

func (w *perNamespaceWorker) startWorker(
Expand All @@ -335,6 +379,7 @@ func (w *perNamespaceWorker) startWorker(
multiplicity int,
) (sdkclient.Client, sdkworker.Worker, error) {
nsName := ns.Name().String()
// this should not block because it uses an existing grpc connection
client := w.wm.sdkClientFactory.NewClient(sdkclient.Options{
Namespace: nsName,
DataConverter: sdk.PreferProtoDataConverter,
Expand All @@ -347,19 +392,15 @@ func (w *perNamespaceWorker) startWorker(
// other defaults are already large enough.
sdkoptions.MaxConcurrentWorkflowTaskPollers = 2 * multiplicity
sdkoptions.MaxConcurrentActivityTaskPollers = 2 * multiplicity
sdkoptions.OnFatalError = func(error) {
// if the sdk sees a fatal error (e.g. namespace does not exist), it will log it and
// Stop() the worker. that means we should not call Stop() ourself.
w.lock.Lock()
defer w.lock.Unlock()
w.worker = nil
}
sdkoptions.OnFatalError = w.onFatalError

// this should not block because the client already has server capabilities
sdkworker := w.wm.sdkWorkerFactory.New(client, primitives.PerNSWorkerTaskQueue, sdkoptions)
for _, cmp := range components {
cmp.Register(sdkworker, ns)
}
// TODO: use Run() and handle post-startup errors by recreating worker

// this blocks by calling DescribeNamespace a few times (with a 10s timeout)
err := sdkworker.Start()
if err != nil {
client.Close()
Expand All @@ -369,10 +410,39 @@ func (w *perNamespaceWorker) startWorker(
return client, sdkworker, nil
}

func (w *perNamespaceWorker) onFatalError(err error) {
// clean up worker and client
w.stopWorker()

switch err.(type) {
case *serviceerror.NamespaceNotFound:
// if this is a NamespaceNotFound, we should retry
w.handleError(err)
default:
// other sdk fatal errors:
// serviceerror.InvalidArgument
// serviceerror.ClientVersionNotSupported
w.logger.Error("sdk worker got non-retryable error, not restarting", tag.Error(err))
}
}

func (w *perNamespaceWorker) stopWorker() {
w.lock.Lock()
defer w.lock.Unlock()

w.stopWorkerLocked()
}

func (w *perNamespaceWorker) stopWorkerAndResetTimer() {
w.lock.Lock()
defer w.lock.Unlock()

w.stopWorkerLocked()
w.retrier.Reset()
if w.retryTimer != nil {
w.retryTimer.Stop()
w.retryTimer = nil
}
}

func (w *perNamespaceWorker) stopWorkerLocked() {
Expand Down

0 comments on commit 5742408

Please sign in to comment.