Skip to content

Commit

Permalink
chore(router): periodic flush during pickup
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Jun 14, 2023
1 parent 5021989 commit 863dbe3
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 16 deletions.
45 changes: 29 additions & 16 deletions router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,31 @@ func (rt *Handle) pickup(ctx context.Context, lastQueryRunTime time.Time, partit
var reservedJobs []reservedJob
blockedOrderKeys := make(map[string]struct{})

flushTime := time.Now()
shouldFlush := func() bool {
return time.Since(flushTime) > rt.reloadableConfig.pickupFlushInterval
}
flush := func() {
flushTime = time.Now()
// Mark the jobs as executing
err := misc.RetryWithNotify(context.Background(), rt.reloadableConfig.jobsDBCommandTimeout, rt.reloadableConfig.jobdDBMaxRetries, func(ctx context.Context) error {
return rt.jobsDB.UpdateJobStatus(ctx, statusList, []string{rt.destType}, nil)
}, rt.sendRetryUpdateStats)
if err != nil {
rt.logger.Errorf("Error occurred while marking %s jobs statuses as executing. Panicking. Err: %v", rt.destType, err)
panic(err)
}

rt.logger.Debugf("[DRAIN DEBUG] counts %v final jobs length being processed %v", rt.destType, len(reservedJobs))
assignedTime := time.Now()
for _, reservedJob := range reservedJobs {
reservedJob.slot.Use(workerJob{job: reservedJob.job, assignedAt: assignedTime})
}
pickupCount += len(reservedJobs)
reservedJobs = nil
statusList = nil
}

// Identify jobs which can be processed
for iterator.HasNext() {
if ctx.Err() != nil {
Expand Down Expand Up @@ -215,6 +240,9 @@ func (rt *Handle) pickup(ctx context.Context, lastQueryRunTime time.Time, partit
}
statusList = append(statusList, &status)
reservedJobs = append(reservedJobs, reservedJob{slot: slot, job: job})
if shouldFlush() {
flush()
}
} else {
stats.Default.NewTaggedStat("router_iterator_stats_discarded_job_count", stats.CountType, stats.Tags{"destType": rt.destType, "partition": partition, "reason": err.Error()}).Increment()
iterator.Discard(job)
Expand All @@ -229,22 +257,7 @@ func (rt *Handle) pickup(ctx context.Context, lastQueryRunTime time.Time, partit
stats.Default.NewTaggedStat("router_iterator_stats_total_jobs", stats.GaugeType, stats.Tags{"destType": rt.destType, "partition": partition}).Gauge(iteratorStats.TotalJobs)
stats.Default.NewTaggedStat("router_iterator_stats_discarded_jobs", stats.GaugeType, stats.Tags{"destType": rt.destType, "partition": partition}).Gauge(iteratorStats.DiscardedJobs)

// Mark the jobs as executing
err := misc.RetryWithNotify(context.Background(), rt.reloadableConfig.jobsDBCommandTimeout, rt.reloadableConfig.jobdDBMaxRetries, func(ctx context.Context) error {
return rt.jobsDB.UpdateJobStatus(ctx, statusList, []string{rt.destType}, nil)
}, rt.sendRetryUpdateStats)
if err != nil {
rt.logger.Errorf("Error occurred while marking %s jobs statuses as executing. Panicking. Err: %v", rt.destType, err)
panic(err)
}

rt.logger.Debugf("[DRAIN DEBUG] counts %v final jobs length being processed %v", rt.destType, len(reservedJobs))
assignedTime := time.Now()
for _, reservedJob := range reservedJobs {
reservedJob.slot.Use(workerJob{job: reservedJob.job, assignedAt: assignedTime})
}

pickupCount = len(reservedJobs)
flush()
limitsReached = iteratorStats.LimitsReached
rt.pipelineDelayStats(partition, firstJob, lastJob)
return
Expand Down
1 change: 1 addition & 0 deletions router/handle_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (rt *Handle) Setup(
config.RegisterDurationConfigVariable(10, &rt.reloadableConfig.minRetryBackoff, true, time.Second, []string{"Router.minRetryBackoff", "Router.minRetryBackoffInS"}...)
config.RegisterDurationConfigVariable(300, &rt.reloadableConfig.maxRetryBackoff, true, time.Second, []string{"Router.maxRetryBackoff", "Router.maxRetryBackoffInS"}...)
config.RegisterStringConfigVariable("", &rt.reloadableConfig.toAbortDestinationIDs, true, "Router.toAbortDestinationIDs")
config.RegisterDurationConfigVariable(2, &rt.reloadableConfig.pickupFlushInterval, true, time.Second, "Router.pickupFlushInterval")

config.RegisterDurationConfigVariable(60, &rt.diagnosisTickerTime, false, time.Second, []string{"Diagnostics.routerTimePeriod", "Diagnostics.routerTimePeriodInS"}...)

Expand Down
1 change: 1 addition & 0 deletions router/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type reloadableConfig struct {
payloadLimit int64
routerTimeout time.Duration
retryTimeWindow time.Duration
pickupFlushInterval time.Duration
maxDSQuerySize int
jobIteratorMaxQueries int
jobIteratorDiscardedPercentageTolerance int
Expand Down

0 comments on commit 863dbe3

Please sign in to comment.