From 3f3b559a083e7524ea69b8085bd020747f2d3dc0 Mon Sep 17 00:00:00 2001 From: David Juhasz Date: Wed, 29 Nov 2023 17:46:45 -0800 Subject: [PATCH] Add worker capacity config option Fixes #793. Add a configuration option for the a3m and am workers to allow setting the maximum concurrent sessions the worker can handle, which effectively throttles the number of transfers the worker can process concurrently. --- cmd/enduro-a3m-worker/main.go | 4 +++- cmd/enduro-am-worker/main.go | 4 +++- enduro.toml | 8 ++++++++ internal/a3m/config.go | 5 +++++ internal/am/config.go | 4 ++++ internal/config/config.go | 8 +++++--- internal/config/config_test.go | 6 ++++-- 7 files changed, 32 insertions(+), 7 deletions(-) diff --git a/cmd/enduro-a3m-worker/main.go b/cmd/enduro-a3m-worker/main.go index f118f494a..61c03f417 100644 --- a/cmd/enduro-a3m-worker/main.go +++ b/cmd/enduro-a3m-worker/main.go @@ -120,11 +120,13 @@ func main() { // Activity worker. { + logger.V(1).Info("a3m worker config", "capacity", cfg.A3m.Capacity) + done := make(chan struct{}) workerOpts := temporalsdk_worker.Options{ DisableWorkflowWorker: true, EnableSessionWorker: true, - MaxConcurrentSessionExecutionSize: 1000, + MaxConcurrentSessionExecutionSize: cfg.A3m.Capacity, MaxConcurrentActivityExecutionSize: 1, } w := temporalsdk_worker.New(temporalClient, temporal.A3mWorkerTaskQueue, workerOpts) diff --git a/cmd/enduro-am-worker/main.go b/cmd/enduro-am-worker/main.go index 4503be22e..07bde6aad 100644 --- a/cmd/enduro-am-worker/main.go +++ b/cmd/enduro-am-worker/main.go @@ -103,11 +103,13 @@ func main() { // Activity worker. { + logger.V(1).Info("AM worker config", "capacity", cfg.AM.Capacity) + done := make(chan struct{}) workerOpts := temporalsdk_worker.Options{ DisableWorkflowWorker: true, EnableSessionWorker: true, - MaxConcurrentSessionExecutionSize: 1000, + MaxConcurrentSessionExecutionSize: cfg.AM.Capacity, MaxConcurrentActivityExecutionSize: 1, } w := temporalsdk_worker.New(temporalClient, temporal.AmWorkerTaskQueue, workerOpts) diff --git a/enduro.toml b/enduro.toml index 850c51de6..d9ba6a98b 100644 --- a/enduro.toml +++ b/enduro.toml @@ -71,6 +71,10 @@ taskqueue = "a3m" address = "127.0.0.1:7000" shareDir = "/home/a3m/.local/share/a3m/share" +# capacity limits the number of transfers a worker can process at one time +# (default: 1) +capacity = 1 + [a3m.processing] AssignUuidsToDirectories = true ExamineContents = true @@ -94,6 +98,10 @@ user = "" # Secret: set with env var ENDURO_AM_USER. apiKey = "" # Secret: set with env var ENDURO_AM_APIKEY. processingConfig = "automated" +# capacity limits the number of transfers a worker can process at one time +# (default: 1) +capacity = 1 + # pollInterval is the time to wait between AM polling requests in a string # format compatible with https://pkg.go.dev/time#ParseDuration (Default: 10s). pollInterval = "10s" diff --git a/internal/a3m/config.go b/internal/a3m/config.go index 6fb5f2d42..8dc28683b 100644 --- a/internal/a3m/config.go +++ b/internal/a3m/config.go @@ -6,6 +6,11 @@ type Config struct { Name string ShareDir string Address string + + // Capacity sets the maximum number of worker sessions the worker can + // handle at one time (default: 1). + Capacity int + Processing } diff --git a/internal/am/config.go b/internal/am/config.go index 6af2c8add..2e421025a 100644 --- a/internal/am/config.go +++ b/internal/am/config.go @@ -22,6 +22,10 @@ type Config struct { // SFTP configuration for uploading transfers to Archivematica. SFTP sftp.Config + // Capacity sets the maximum number of worker sessions the worker can + // handle at one time (default: 1). + Capacity int + // PollInterval is the time to wait between poll requests to the AM API. PollInterval time.Duration diff --git a/internal/config/config.go b/internal/config/config.go index be7fb7e6a..d84a71d13 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -65,13 +65,15 @@ func Read(config *Configuration, configFile string) (found bool, configFileUsed v.AddConfigPath("$HOME/.config/") v.AddConfigPath("/etc") v.SetConfigName("enduro") + v.SetDefault("a3m.capacity", 1) v.SetDefault("a3m.processing", a3m.ProcessingDefault) + v.SetDefault("am.capacity", 1) + v.SetDefault("am.pollInterval", 10*time.Second) + v.SetDefault("api.listen", "127.0.0.1:9000") + v.SetDefault("debugListen", "127.0.0.1:9001") v.SetDefault("preservation.taskqueue", temporal.A3mWorkerTaskQueue) v.SetDefault("storage.taskqueue", temporal.GlobalTaskQueue) v.SetDefault("temporal.taskqueue", temporal.GlobalTaskQueue) - v.SetDefault("debugListen", "127.0.0.1:9001") - v.SetDefault("api.listen", "127.0.0.1:9000") - v.SetDefault("am.pollInterval", 10*time.Second) v.SetEnvPrefix("enduro") v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) v.AutomaticEnv() diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 74f52977c..1b1922dcf 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -54,12 +54,14 @@ func TestConfig(t *testing.T) { assert.Equal(t, c.Database.DSN, "") // Valued defaults. - assert.Equal(t, c.DebugListen, "127.0.0.1:9001") + assert.Equal(t, c.A3m.Capacity, 1) assert.Equal(t, c.A3m.Processing, a3m.ProcessingDefault) + assert.Equal(t, c.AM.Capacity, 1) + assert.Equal(t, c.AM.PollInterval, 10*time.Second) assert.Equal(t, c.API.Listen, "127.0.0.1:9000") + assert.Equal(t, c.DebugListen, "127.0.0.1:9001") assert.Equal(t, c.Preservation.TaskQueue, temporal.A3mWorkerTaskQueue) assert.Equal(t, c.Storage.TaskQueue, temporal.GlobalTaskQueue) assert.Equal(t, c.Temporal.TaskQueue, temporal.GlobalTaskQueue) - assert.Equal(t, c.AM.PollInterval, 10*time.Second) }) }