From 07ab55f704d4f4a571063d84decf4528d4b3a64b Mon Sep 17 00:00:00 2001 From: Doctor Vince Date: Thu, 22 Jun 2023 10:04:18 -0400 Subject: [PATCH] allow for runtime override of KV replica count (#28) --- cmd/run.go | 3 +++ internal/worker/kv_status.go | 5 +---- internal/worker/worker.go | 9 +++++++-- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index bba66599..d307168f 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -33,6 +33,7 @@ var ( dryrun bool faultInjection bool storeKind string + replicas int ) var ( @@ -80,6 +81,7 @@ func runWorker(ctx context.Context) { useStatusKV, faultInjection, flasher.Config.Concurrency, + replicas, stream, inv, flasher.Logger, @@ -105,6 +107,7 @@ func init() { cmdRun.PersistentFlags().BoolVarP(&dryrun, "dry-run", "", false, "In dryrun mode, the worker actions the task without installing firmware") cmdRun.PersistentFlags().BoolVarP(&useStatusKV, "use-kv", "", false, "when this is true, flasher writes status to a NATS KV store instead of sending reply messages") cmdRun.PersistentFlags().BoolVarP(&faultInjection, "fault-injection", "", false, "Tasks can include a Fault attribute to allow fault injection for development purposes") + cmdRun.PersistentFlags().IntVarP(&replicas, "nats-replicas", "r", 1, "the default number of replicas to use for NATS data") if err := cmdRun.MarkPersistentFlagRequired("store"); err != nil { log.Fatal(err) diff --git a/internal/worker/kv_status.go b/internal/worker/kv_status.go index db2d7968..4f3d952c 100644 --- a/internal/worker/kv_status.go +++ b/internal/worker/kv_status.go @@ -18,7 +18,6 @@ import ( var ( statusKVName = string(cotyp.FirmwareInstall) defaultKVOpts = []kv.Option{ - kv.WithReplicas(3), kv.WithDescription("flasher condition status tracking"), kv.WithTTL(10 * 24 * time.Hour), } @@ -78,9 +77,7 @@ func NewStatusKVPublisher(s events.Stream, log *logrus.Logger, opts ...kv.Option } kvOpts := defaultKVOpts - if len(opts) > 0 { - kvOpts = opts - } + kvOpts = append(kvOpts, opts...) statusKV, err := kv.CreateOrBindKVBucket(js, statusKVName, kvOpts...) if err != nil { diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 8dfbf062..f35f5262 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -17,6 +17,7 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" "go.hollow.sh/toolbox/events" + "go.hollow.sh/toolbox/events/pkg/kv" "go.hollow.sh/toolbox/events/registry" cpv1types "github.com/metal-toolbox/conditionorc/pkg/api/v1/types" @@ -55,6 +56,7 @@ type Worker struct { dryrun bool faultInjection bool useStatusKV bool + replicaCount int } // NewOutofbandWorker returns a out of band firmware install worker instance @@ -63,7 +65,8 @@ func New( dryrun, useStatusKV, faultInjection bool, - concurrency int, + concurrency, + replicaCount int, stream events.Stream, repository store.Repository, logger *logrus.Logger, @@ -77,6 +80,7 @@ func New( useStatusKV: useStatusKV, faultInjection: faultInjection, concurrency: concurrency, + replicaCount: replicaCount, syncWG: &sync.WaitGroup{}, stream: stream, store: repository, @@ -106,6 +110,7 @@ func (o *Worker) Run(ctx context.Context) { o.logger.WithFields( logrus.Fields{ + "replica-count": o.replicaCount, "concurrency": o.concurrency, "dry-run": o.dryrun, "fault-injection": o.faultInjection, @@ -305,7 +310,7 @@ func (o *Worker) runTaskWithMonitor(ctx context.Context, task *model.Task, asset func (o *Worker) getStatusPublisher() sm.Publisher { if o.useStatusKV { - return NewStatusKVPublisher(o.stream, o.logger) + return NewStatusKVPublisher(o.stream, o.logger, kv.WithReplicas(o.replicaCount)) } return &statusEmitter{o.stream, o.logger} }