Skip to content

Commit

Permalink
allow for runtime override of KV replica count (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
DoctorVin authored Jun 22, 2023
1 parent 2237e85 commit 07ab55f
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 6 deletions.
3 changes: 3 additions & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var (
dryrun bool
faultInjection bool
storeKind string
replicas int
)

var (
Expand Down Expand Up @@ -80,6 +81,7 @@ func runWorker(ctx context.Context) {
useStatusKV,
faultInjection,
flasher.Config.Concurrency,
replicas,
stream,
inv,
flasher.Logger,
Expand All @@ -105,6 +107,7 @@ func init() {
cmdRun.PersistentFlags().BoolVarP(&dryrun, "dry-run", "", false, "In dryrun mode, the worker actions the task without installing firmware")
cmdRun.PersistentFlags().BoolVarP(&useStatusKV, "use-kv", "", false, "when this is true, flasher writes status to a NATS KV store instead of sending reply messages")
cmdRun.PersistentFlags().BoolVarP(&faultInjection, "fault-injection", "", false, "Tasks can include a Fault attribute to allow fault injection for development purposes")
cmdRun.PersistentFlags().IntVarP(&replicas, "nats-replicas", "r", 1, "the default number of replicas to use for NATS data")

if err := cmdRun.MarkPersistentFlagRequired("store"); err != nil {
log.Fatal(err)
Expand Down
5 changes: 1 addition & 4 deletions internal/worker/kv_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
var (
statusKVName = string(cotyp.FirmwareInstall)
defaultKVOpts = []kv.Option{
kv.WithReplicas(3),
kv.WithDescription("flasher condition status tracking"),
kv.WithTTL(10 * 24 * time.Hour),
}
Expand Down Expand Up @@ -78,9 +77,7 @@ func NewStatusKVPublisher(s events.Stream, log *logrus.Logger, opts ...kv.Option
}

kvOpts := defaultKVOpts
if len(opts) > 0 {
kvOpts = opts
}
kvOpts = append(kvOpts, opts...)

statusKV, err := kv.CreateOrBindKVBucket(js, statusKVName, kvOpts...)
if err != nil {
Expand Down
9 changes: 7 additions & 2 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.hollow.sh/toolbox/events"
"go.hollow.sh/toolbox/events/pkg/kv"
"go.hollow.sh/toolbox/events/registry"

cpv1types "github.com/metal-toolbox/conditionorc/pkg/api/v1/types"
Expand Down Expand Up @@ -55,6 +56,7 @@ type Worker struct {
dryrun bool
faultInjection bool
useStatusKV bool
replicaCount int
}

// NewOutofbandWorker returns a out of band firmware install worker instance
Expand All @@ -63,7 +65,8 @@ func New(
dryrun,
useStatusKV,
faultInjection bool,
concurrency int,
concurrency,
replicaCount int,
stream events.Stream,
repository store.Repository,
logger *logrus.Logger,
Expand All @@ -77,6 +80,7 @@ func New(
useStatusKV: useStatusKV,
faultInjection: faultInjection,
concurrency: concurrency,
replicaCount: replicaCount,
syncWG: &sync.WaitGroup{},
stream: stream,
store: repository,
Expand Down Expand Up @@ -106,6 +110,7 @@ func (o *Worker) Run(ctx context.Context) {

o.logger.WithFields(
logrus.Fields{
"replica-count": o.replicaCount,
"concurrency": o.concurrency,
"dry-run": o.dryrun,
"fault-injection": o.faultInjection,
Expand Down Expand Up @@ -305,7 +310,7 @@ func (o *Worker) runTaskWithMonitor(ctx context.Context, task *model.Task, asset

func (o *Worker) getStatusPublisher() sm.Publisher {
if o.useStatusKV {
return NewStatusKVPublisher(o.stream, o.logger)
return NewStatusKVPublisher(o.stream, o.logger, kv.WithReplicas(o.replicaCount))
}
return &statusEmitter{o.stream, o.logger}
}
Expand Down

0 comments on commit 07ab55f

Please sign in to comment.