Skip to content

Commit

Permalink
chore(queues): added worker options
Browse files Browse the repository at this point in the history
  • Loading branch information
Yousuf Jawwad committed Aug 13, 2024
1 parent a85b5e6 commit 734b9e3
Show file tree
Hide file tree
Showing 4 changed files with 285 additions and 3 deletions.
18 changes: 18 additions & 0 deletions queues/defs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package queues

type (
WorkflowSignal string
)

func (s WorkflowSignal) String() string {
return string(s)
}

func (s WorkflowSignal) MarshalJSON() ([]byte, error) {
return []byte(s.String()), nil
}

func (s *WorkflowSignal) UnmarshalJSON(data []byte) error {
*s = WorkflowSignal(data)
return nil
}
7 changes: 4 additions & 3 deletions queues/queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type (
SignalExternalWorkflow(ctx workflow.Context, options workflows.Options, signal string, payload any) (WorkflowFuture, error)

// CreateWorker creates a worker against the queue.
CreateWorker() worker.Worker
CreateWorker(opts ...WorkerOption) worker.Worker
}

// QueueOption is the option for a queue.
Expand Down Expand Up @@ -275,8 +275,9 @@ func (q *queue) RetryPolicy(opts workflows.Options) *temporal.RetryPolicy {
return &temporal.RetryPolicy{MaximumAttempts: attempts}
}

func (q *queue) CreateWorker() worker.Worker {
options := worker.Options{OnFatalError: func(err error) { panic(err) }}
func (q *queue) CreateWorker(opts ...WorkerOption) worker.Worker {
options := NewWorkerOptions(opts...)

return worker.New(q.client, q.Name().String(), options)
}

Expand Down
261 changes: 261 additions & 0 deletions queues/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
package queues

import (
"context"
"time"

"go.temporal.io/sdk/interceptor"
"go.temporal.io/sdk/worker"
)

type (
// WorkerOption is a function that configures a worker.Options struct.
WorkerOption func(*worker.Options)
)

// WithWorkerOptionMaxConcurrentActivityExecutionSize sets the maximum concurrent activity executions this worker can have.
// The zero value of this uses the default value (1000).
func WithWorkerOptionMaxConcurrentActivityExecutionSize(size int) WorkerOption {
return func(o *worker.Options) {
o.MaxConcurrentActivityExecutionSize = size
}
}

// WithWorkerOptionWorkerActivitiesPerSecond sets the rate limiting on number of activities that can be executed per second per worker.
// This can be used to limit resources used by the worker. The zero value of this uses the default value (100,000).
func WithWorkerOptionWorkerActivitiesPerSecond(rate float64) WorkerOption {
return func(o *worker.Options) {
o.WorkerActivitiesPerSecond = rate
}
}

// WithWorkerOptionMaxConcurrentLocalActivityExecutionSize sets the maximum concurrent local activity executions this worker can have.
// The zero value of this uses the default value (1000).
func WithWorkerOptionMaxConcurrentLocalActivityExecutionSize(size int) WorkerOption {
return func(o *worker.Options) {
o.MaxConcurrentLocalActivityExecutionSize = size
}
}

// WithWorkerOptionWorkerLocalActivitiesPerSecond sets the rate limiting on number of local
// activities that can be executed per second per worker.
//
// This can be used to limit resources used by the worker. The zero value of this uses the default value (100,000).
func WithWorkerOptionWorkerLocalActivitiesPerSecond(rate float64) WorkerOption {
return func(o *worker.Options) {
o.WorkerLocalActivitiesPerSecond = rate
}
}

// WithWorkerOptionTaskQueueActivitiesPerSecond sets the rate limiting on number of
// activities that can be executed per second for the entire task queue.
//
// This is managed by the server. The zero value of this uses the default value (100,000).
func WithWorkerOptionTaskQueueActivitiesPerSecond(rate float64) WorkerOption {
return func(o *worker.Options) {
o.TaskQueueActivitiesPerSecond = rate
}
}

// WithWorkerOptionMaxConcurrentActivityTaskPollers sets the maximum number of goroutines that will concurrently poll the
// temporal-server to retrieve activity tasks. The default value is 2.
func WithWorkerOptionMaxConcurrentActivityTaskPollers(count int) WorkerOption {
return func(o *worker.Options) {
o.MaxConcurrentActivityTaskPollers = count
}
}

// WithWorkerOptionMaxConcurrentWorkflowTaskExecutionSize sets the maximum concurrent workflow task executions this worker can have.
// The zero value of this uses the default value (1000). This value cannot be 1.
func WithWorkerOptionMaxConcurrentWorkflowTaskExecutionSize(size int) WorkerOption {
return func(o *worker.Options) {
o.MaxConcurrentWorkflowTaskExecutionSize = size
}
}

// WithWorkerOptionMaxConcurrentWorkflowTaskPollers sets the maximum number of goroutines that will concurrently poll the
// temporal-server to retrieve workflow tasks. The default value is 2. This value cannot be 1.
func WithWorkerOptionMaxConcurrentWorkflowTaskPollers(count int) WorkerOption {
return func(o *worker.Options) {
o.MaxConcurrentWorkflowTaskPollers = count
}
}

// WithWorkerOptionEnableLoggingInReplay enables logging in replay mode. This is only useful for debugging purposes.
// The default value is false.
func WithWorkerOptionEnableLoggingInReplay(enable bool) WorkerOption {
return func(o *worker.Options) {
o.EnableLoggingInReplay = enable
}
}

// WithWorkerOptionStickyScheduleToStartTimeout sets the sticky schedule to start timeout.
// The default value is 5 seconds.
func WithWorkerOptionStickyScheduleToStartTimeout(timeout time.Duration) WorkerOption {
return func(o *worker.Options) {
o.StickyScheduleToStartTimeout = timeout
}
}

// WithWorkerOptionBackgroundActivityContext sets the root context for all activities.
func WithWorkerOptionBackgroundActivityContext(ctx context.Context) WorkerOption {
return func(o *worker.Options) {
o.BackgroundActivityContext = ctx
}
}

// WithWorkerOptionWorkflowPanicPolicy sets how the workflow worker deals with non-deterministic history events and panics.
// The default value is BlockWorkflow.
func WithWorkerOptionWorkflowPanicPolicy(policy worker.WorkflowPanicPolicy) WorkerOption {
return func(o *worker.Options) {
o.WorkflowPanicPolicy = policy
}
}

// WithWorkerOptionWorkerStopTimeout sets the worker graceful stop timeout.
// The default value is 0 seconds.
func WithWorkerOptionWorkerStopTimeout(timeout time.Duration) WorkerOption {
return func(o *worker.Options) {
o.WorkerStopTimeout = timeout
}
}

// WithWorkerOptionEnableSessionWorker enables running session workers for activities within a session.
// The default value is false.
func WithWorkerOptionEnableSessionWorker(enable bool) WorkerOption {
return func(o *worker.Options) {
o.EnableSessionWorker = enable
}
}

// WithWorkerOptionMaxConcurrentSessionExecutionSize sets the maximum number of concurrently running sessions the resource supports.
// The default value is 1000.
func WithWorkerOptionMaxConcurrentSessionExecutionSize(size int) WorkerOption {
return func(o *worker.Options) {
o.MaxConcurrentSessionExecutionSize = size
}
}

// WithWorkerOptionDisableWorkflowWorker disables the workflow worker for this worker.
// The default value is false.
func WithWorkerOptionDisableWorkflowWorker(disable bool) WorkerOption {
return func(o *worker.Options) {
o.DisableWorkflowWorker = disable
}
}

// WithWorkerOptionLocalActivityWorkerOnly sets the worker to only handle workflow tasks and local activities.
// The default value is false.
func WithWorkerOptionLocalActivityWorkerOnly(localOnly bool) WorkerOption {
return func(o *worker.Options) {
o.LocalActivityWorkerOnly = localOnly
}
}

// WithWorkerOptionIdentity sets the identity for the worker, overwriting the client-level Identity value.
func WithWorkerOptionIdentity(identity string) WorkerOption {
return func(o *worker.Options) {
o.Identity = identity
}
}

// WithWorkerOptionDeadlockDetectionTimeout sets the maximum amount of time that a workflow task will be allowed to run.
// The default value is 1 second.
func WithWorkerOptionDeadlockDetectionTimeout(timeout time.Duration) WorkerOption {
return func(o *worker.Options) {
o.DeadlockDetectionTimeout = timeout
}
}

// WithWorkerOptionMaxHeartbeatThrottleInterval sets the maximum amount of time between sending each pending heartbeat to the server.
// The default value is 60 seconds.
func WithWorkerOptionMaxHeartbeatThrottleInterval(interval time.Duration) WorkerOption {
return func(o *worker.Options) {
o.MaxHeartbeatThrottleInterval = interval
}
}

// WithWorkerOptionDefaultHeartbeatThrottleInterval sets the default amount of time between sending each pending heartbeat to the server.
// The default value is 30 seconds.
func WithWorkerOptionDefaultHeartbeatThrottleInterval(interval time.Duration) WorkerOption {
return func(o *worker.Options) {
o.DefaultHeartbeatThrottleInterval = interval
}
}

// WithWorkerOptionInterceptors sets the interceptors to apply to the worker.
func WithWorkerOptionInterceptors(interceptors []interceptor.WorkerInterceptor) WorkerOption {
return func(o *worker.Options) {
o.Interceptors = interceptors
}
}

// WithWorkerOptionOnFatalError sets the callback invoked on fatal error.
func WithWorkerOptionOnFatalError(fn func(error)) WorkerOption {
return func(o *worker.Options) {
o.OnFatalError = fn
}
}

// WithWorkerOptionDisableEagerActivities disables eager activities.
// The default value is false.
func WithWorkerOptionDisableEagerActivities(disable bool) WorkerOption {
return func(o *worker.Options) {
o.DisableEagerActivities = disable
}
}

// WithWorkerOptionMaxConcurrentEagerActivityExecutionSize sets the maximum number of eager activities that can be running.
// The default value of 0 means unlimited.
func WithWorkerOptionMaxConcurrentEagerActivityExecutionSize(size int) WorkerOption {
return func(o *worker.Options) {
o.MaxConcurrentEagerActivityExecutionSize = size
}
}

// WithWorkerOptionDisableRegistrationAliasing disables allowing workflow and activity functions registered with custom names
// from being called with their function references.
// The default value is false.
func WithWorkerOptionDisableRegistrationAliasing(disable bool) WorkerOption {
return func(o *worker.Options) {
o.DisableRegistrationAliasing = disable
}
}

// WithWorkerOptionBuildID assigns a BuildID to this worker.
// NOTE: Experimental.
func WithWorkerOptionBuildID(buildID string) WorkerOption {
return func(o *worker.Options) {
o.BuildID = buildID
}
}

// WithWorkerOptionUseBuildIDForVersioning opts this worker into the Worker Versioning feature.
// NOTE: Experimental.
func WithWorkerOptionUseBuildIDForVersioning(use bool) WorkerOption {
return func(o *worker.Options) {
o.UseBuildIDForVersioning = use
}
}

// NewWorkerOptions creates a new worker.Options struct with the given options applied.
//
// Example usage:
// workerOptions := NewWorkerOptions(
// WithWorkerOptionOnFatalError(func(err error) {
// // Handle fatal error
// }),
// WithWorkerOptionDisableEagerActivities(true),
// WithWorkerOptionMaxConcurrentEagerActivityExecutionSize(10),
// WithWorkerOptionDisableRegistrationAliasing(false),
// WithWorkerOptionBuildID("my-build-id"),
// WithWorkerOptionUseBuildIDForVersioning(true),
// )
func NewWorkerOptions(opts ...WorkerOption) worker.Options {
options := worker.Options{}
for _, opt := range opts {
opt(&options)
}

return options
}
2 changes: 2 additions & 0 deletions workflows/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ func WithIgnoredError(err string) Option {
// WithModifierID("789"),
// // Sometimes we need to over-ride max attempts. The default is workflows.RetryForever.
// WithMaxAttempts(3),
// // If we want to ignore an error, we can do so.
// WithIgnoredError("SomeError"),
// )
//
// id := opts.ID()
Expand Down

0 comments on commit 734b9e3

Please sign in to comment.