Skip to content

Commit

Permalink
Add worker capacity config option
Browse files Browse the repository at this point in the history
Fixes #793.

Add a configuration option for the a3m and am workers to allow setting
the maximum concurrent sessions the worker can handle, which
effectively throttles the number of transfers the worker can process
concurrently.
  • Loading branch information
djjuhasz committed Nov 30, 2023
1 parent 68f1e1e commit 3f3b559
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 7 deletions.
4 changes: 3 additions & 1 deletion cmd/enduro-a3m-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,13 @@ func main() {

// Activity worker.
{
logger.V(1).Info("a3m worker config", "capacity", cfg.A3m.Capacity)

done := make(chan struct{})
workerOpts := temporalsdk_worker.Options{
DisableWorkflowWorker: true,
EnableSessionWorker: true,
MaxConcurrentSessionExecutionSize: 1000,
MaxConcurrentSessionExecutionSize: cfg.A3m.Capacity,
MaxConcurrentActivityExecutionSize: 1,
}
w := temporalsdk_worker.New(temporalClient, temporal.A3mWorkerTaskQueue, workerOpts)
Expand Down
4 changes: 3 additions & 1 deletion cmd/enduro-am-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,13 @@ func main() {

// Activity worker.
{
logger.V(1).Info("AM worker config", "capacity", cfg.AM.Capacity)

done := make(chan struct{})
workerOpts := temporalsdk_worker.Options{
DisableWorkflowWorker: true,
EnableSessionWorker: true,
MaxConcurrentSessionExecutionSize: 1000,
MaxConcurrentSessionExecutionSize: cfg.AM.Capacity,
MaxConcurrentActivityExecutionSize: 1,
}
w := temporalsdk_worker.New(temporalClient, temporal.AmWorkerTaskQueue, workerOpts)
Expand Down
8 changes: 8 additions & 0 deletions enduro.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ taskqueue = "a3m"
address = "127.0.0.1:7000"
shareDir = "/home/a3m/.local/share/a3m/share"

# capacity limits the number of transfers a worker can process at one time
# (default: 1)
capacity = 1

[a3m.processing]
AssignUuidsToDirectories = true
ExamineContents = true
Expand All @@ -94,6 +98,10 @@ user = "" # Secret: set with env var ENDURO_AM_USER.
apiKey = "" # Secret: set with env var ENDURO_AM_APIKEY.
processingConfig = "automated"

# capacity limits the number of transfers a worker can process at one time
# (default: 1)
capacity = 1

# pollInterval is the time to wait between AM polling requests in a string
# format compatible with https://pkg.go.dev/time#ParseDuration (Default: 10s).
pollInterval = "10s"
Expand Down
5 changes: 5 additions & 0 deletions internal/a3m/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ type Config struct {
Name string
ShareDir string
Address string

// Capacity sets the maximum number of worker sessions the worker can
// handle at one time (default: 1).
Capacity int

Processing
}

Expand Down
4 changes: 4 additions & 0 deletions internal/am/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ type Config struct {
// SFTP configuration for uploading transfers to Archivematica.
SFTP sftp.Config

// Capacity sets the maximum number of worker sessions the worker can
// handle at one time (default: 1).
Capacity int

// PollInterval is the time to wait between poll requests to the AM API.
PollInterval time.Duration

Expand Down
8 changes: 5 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,15 @@ func Read(config *Configuration, configFile string) (found bool, configFileUsed
v.AddConfigPath("$HOME/.config/")
v.AddConfigPath("/etc")
v.SetConfigName("enduro")
v.SetDefault("a3m.capacity", 1)
v.SetDefault("a3m.processing", a3m.ProcessingDefault)
v.SetDefault("am.capacity", 1)
v.SetDefault("am.pollInterval", 10*time.Second)
v.SetDefault("api.listen", "127.0.0.1:9000")
v.SetDefault("debugListen", "127.0.0.1:9001")
v.SetDefault("preservation.taskqueue", temporal.A3mWorkerTaskQueue)
v.SetDefault("storage.taskqueue", temporal.GlobalTaskQueue)
v.SetDefault("temporal.taskqueue", temporal.GlobalTaskQueue)
v.SetDefault("debugListen", "127.0.0.1:9001")
v.SetDefault("api.listen", "127.0.0.1:9000")
v.SetDefault("am.pollInterval", 10*time.Second)
v.SetEnvPrefix("enduro")
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
v.AutomaticEnv()
Expand Down
6 changes: 4 additions & 2 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ func TestConfig(t *testing.T) {
assert.Equal(t, c.Database.DSN, "")

// Valued defaults.
assert.Equal(t, c.DebugListen, "127.0.0.1:9001")
assert.Equal(t, c.A3m.Capacity, 1)
assert.Equal(t, c.A3m.Processing, a3m.ProcessingDefault)
assert.Equal(t, c.AM.Capacity, 1)
assert.Equal(t, c.AM.PollInterval, 10*time.Second)
assert.Equal(t, c.API.Listen, "127.0.0.1:9000")
assert.Equal(t, c.DebugListen, "127.0.0.1:9001")
assert.Equal(t, c.Preservation.TaskQueue, temporal.A3mWorkerTaskQueue)
assert.Equal(t, c.Storage.TaskQueue, temporal.GlobalTaskQueue)
assert.Equal(t, c.Temporal.TaskQueue, temporal.GlobalTaskQueue)
assert.Equal(t, c.AM.PollInterval, 10*time.Second)
})
}

0 comments on commit 3f3b559

Please sign in to comment.