diff --git a/router/handle.go b/router/handle.go index 7ac41a48d3..117c4f5de8 100644 --- a/router/handle.go +++ b/router/handle.go @@ -199,7 +199,8 @@ func (rt *Handle) pickup(ctx context.Context, lastQueryRunTime time.Time, partit firstJob = job } lastJob = job - if slot := rt.findWorkerSlot(workers, job, blockedOrderKeys); slot != nil { + slot, err := rt.findWorkerSlot(workers, job, blockedOrderKeys) + if err == nil { status := jobsdb.JobStatusT{ JobID: job.JobID, AttemptNum: job.LastJobStatus.AttemptNum, @@ -215,8 +216,12 @@ func (rt *Handle) pickup(ctx context.Context, lastQueryRunTime time.Time, partit statusList = append(statusList, &status) reservedJobs = append(reservedJobs, reservedJob{slot: slot, job: job}) } else { + stats.Default.NewTaggedStat("router_iterator_stats_discarded_job_count", stats.CountType, stats.Tags{"destType": rt.destType, "partition": partition, "reason": err.Error()}).Increment() iterator.Discard(job) discardedCount++ + if rt.isolationStrategy.StopIteration(err) { + break + } } } iteratorStats := iterator.Stats() @@ -444,16 +449,15 @@ func (rt *Handle) getQueryParams(partition string, pickUpCount int) jobsdb.GetQu return params } -func (rt *Handle) findWorkerSlot(workers []*worker, job *jobsdb.JobT, blockedOrderKeys map[string]struct{}) *workerSlot { +func (rt *Handle) findWorkerSlot(workers []*worker, job *jobsdb.JobT, blockedOrderKeys map[string]struct{}) (*workerSlot, error) { if rt.backgroundCtx.Err() != nil { - return nil + return nil, types.ErrContextCancelled } var parameters JobParameters - err := json.Unmarshal(job.Parameters, ¶meters) - if err != nil { + if err := json.Unmarshal(job.Parameters, ¶meters); err != nil { rt.logger.Errorf(`[%v Router] :: Unmarshalling parameters failed with the error %v . Returning nil worker`, err) - return nil + return nil, types.ErrParamsUnmarshal } orderKey := jobOrderKey(job.UserID, parameters.DestinationID) @@ -461,27 +465,38 @@ func (rt *Handle) findWorkerSlot(workers []*worker, job *jobsdb.JobT, blockedOrd // this check is done to maintain order. if _, ok := blockedOrderKeys[orderKey]; ok { rt.logger.Debugf(`[%v Router] :: Skipping processing of job:%d of orderKey:%s as orderKey has earlier jobs in throttled map`, rt.destType, job.JobID, orderKey) - return nil + return nil, types.ErrJobOrderBlocked } if !rt.guaranteeUserEventOrder { availableWorkers := lo.Filter(workers, func(w *worker, _ int) bool { return w.AvailableSlots() > 0 }) - if len(availableWorkers) == 0 || rt.shouldThrottle(job, parameters) || rt.shouldBackoff(job) { - return nil + if len(availableWorkers) == 0 { + return nil, types.ErrWorkerNoSlot + } + if rt.shouldBackoff(job) { + return nil, types.ErrJobBackoff + } + if rt.shouldThrottle(job, parameters) { + return nil, types.ErrDestinationThrottled + } + + if slot := availableWorkers[rand.Intn(len(availableWorkers))].ReserveSlot(); slot != nil { // skipcq: GSC-G404 + return slot, nil } - return availableWorkers[rand.Intn(len(availableWorkers))].ReserveSlot() // skipcq: GSC-G404 + return nil, types.ErrWorkerNoSlot + } //#JobOrder (see other #JobOrder comment) worker := workers[getWorkerPartition(orderKey, len(workers))] if rt.shouldBackoff(job) { // backoff blockedOrderKeys[orderKey] = struct{}{} - return nil + return nil, types.ErrJobBackoff } slot := worker.ReserveSlot() if slot == nil { blockedOrderKeys[orderKey] = struct{}{} - return nil + return nil, types.ErrWorkerNoSlot } enter, previousFailedJobID := worker.barrier.Enter(orderKey, job.JobID) @@ -491,9 +506,9 @@ func (rt *Handle) findWorkerSlot(workers []*worker, job *jobsdb.JobT, blockedOrd blockedOrderKeys[orderKey] = struct{}{} worker.barrier.Leave(orderKey, job.JobID) slot.Release() - return nil + return nil, types.ErrDestinationThrottled } - return slot + return slot, nil } previousFailedJobIDStr := "" if previousFailedJobID != nil { @@ -501,7 +516,7 @@ func (rt *Handle) findWorkerSlot(workers []*worker, job *jobsdb.JobT, blockedOrd } rt.logger.Debugf("EventOrder: job %d of orderKey %s is blocked (previousFailedJobID: %s)", job.JobID, orderKey, previousFailedJobIDStr) slot.Release() - return nil + return nil, types.ErrBarrierExists //#EndJobOrder } diff --git a/router/isolation/isolation.go b/router/isolation/isolation.go index 83d1925951..5b3e10f65f 100644 --- a/router/isolation/isolation.go +++ b/router/isolation/isolation.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/rudderlabs/rudder-server/jobsdb" + "github.com/rudderlabs/rudder-server/router/types" "github.com/samber/lo" ) @@ -36,6 +37,8 @@ type Strategy interface { ActivePartitions(ctx context.Context, db jobsdb.MultiTenantJobsDB) ([]string, error) // AugmentQueryParams augments the given GetQueryParamsT with the strategy specific parameters AugmentQueryParams(partition string, params *jobsdb.GetQueryParamsT) + // StopIteration returns true if the iteration should be stopped for the given error + StopIteration(err error) bool } // noneStrategy implements isolation at no level @@ -49,6 +52,10 @@ func (noneStrategy) AugmentQueryParams(_ string, _ *jobsdb.GetQueryParamsT) { // no-op } +func (noneStrategy) StopIteration(_ error) bool { + return false +} + // workspaceStrategy implements isolation at workspace level type workspaceStrategy struct { customVal string @@ -63,6 +70,10 @@ func (workspaceStrategy) AugmentQueryParams(partition string, params *jobsdb.Get params.WorkspaceID = partition } +func (workspaceStrategy) StopIteration(_ error) bool { + return false +} + // destinationStrategy implements isolation at destination level type destinationStrategy struct { destinationFilter func(destinationID string) bool @@ -83,3 +94,8 @@ func (ds destinationStrategy) ActivePartitions(ctx context.Context, db jobsdb.Mu func (destinationStrategy) AugmentQueryParams(partition string, params *jobsdb.GetQueryParamsT) { params.ParameterFilters = append(params.ParameterFilters, jobsdb.ParameterFilterT{Name: "destination_id", Value: partition}) } + +// StopIteration returns true if the error is ErrDestinationThrottled +func (destinationStrategy) StopIteration(err error) bool { + return errors.Is(err, types.ErrDestinationThrottled) +} diff --git a/router/isolation/isolation_test.go b/router/isolation/isolation_test.go new file mode 100644 index 0000000000..55fcbae602 --- /dev/null +++ b/router/isolation/isolation_test.go @@ -0,0 +1,59 @@ +package isolation_test + +import ( + "context" + "testing" + + "github.com/rudderlabs/rudder-server/jobsdb" + "github.com/rudderlabs/rudder-server/router/isolation" + "github.com/rudderlabs/rudder-server/router/types" + "github.com/stretchr/testify/require" +) + +func TestIsolationStrategy(t *testing.T) { + t.Run("none", func(r *testing.T) { + strategy, err := isolation.GetStrategy(isolation.ModeNone, "", func(_ string) bool { return true }) + require.NoError(t, err) + + t.Run("active partitions", func(t *testing.T) { + partitions, err := strategy.ActivePartitions(context.Background(), nil) + require.NoError(t, err) + require.Equal(t, []string{""}, partitions) + }) + t.Run("augment query params", func(t *testing.T) { + var params jobsdb.GetQueryParamsT + toAugment := params + strategy.AugmentQueryParams("partition", &toAugment) + require.Equal(t, params, toAugment) + }) + t.Run("stop iteration", func(t *testing.T) { + require.False(t, strategy.StopIteration(types.ErrBarrierExists)) + require.False(t, strategy.StopIteration(types.ErrDestinationThrottled)) + }) + }) + t.Run("workspace", func(r *testing.T) { + strategy, err := isolation.GetStrategy(isolation.ModeWorkspace, "", func(_ string) bool { return true }) + require.NoError(t, err) + + t.Run("augment query params", func(t *testing.T) { + var params jobsdb.GetQueryParamsT + strategy.AugmentQueryParams("partition", ¶ms) + var expected jobsdb.GetQueryParamsT + expected.WorkspaceID = "partition" + require.Equal(t, expected, params) + }) + + t.Run("stop iteration", func(t *testing.T) { + require.False(t, strategy.StopIteration(types.ErrBarrierExists)) + require.False(t, strategy.StopIteration(types.ErrDestinationThrottled)) + }) + }) + t.Run("destination", func(r *testing.T) { + strategy, err := isolation.GetStrategy(isolation.ModeDestination, "", func(_ string) bool { return true }) + require.NoError(t, err) + t.Run("stop iteration", func(t *testing.T) { + require.False(t, strategy.StopIteration(types.ErrBarrierExists)) + require.True(t, strategy.StopIteration(types.ErrDestinationThrottled)) + }) + }) +} diff --git a/router/router_test.go b/router/router_test.go index b6d262216c..3a5fa9421c 100644 --- a/router/router_test.go +++ b/router/router_test.go @@ -231,21 +231,90 @@ func TestBackoff(t *testing.T) { t.Run("eventorder disabled", func(t *testing.T) { r.guaranteeUserEventOrder = false workers[0].inputReservations = 0 - require.Nil(t, r.findWorkerSlot(workers, backoffJob, map[string]struct{}{})) - require.NotNil(t, r.findWorkerSlot(workers, noBackoffJob1, map[string]struct{}{})) - require.NotNil(t, r.findWorkerSlot(workers, noBackoffJob2, map[string]struct{}{})) - require.NotNil(t, r.findWorkerSlot(workers, noBackoffJob3, map[string]struct{}{})) - require.Nil(t, r.findWorkerSlot(workers, noBackoffJob4, map[string]struct{}{}), "worker's input channel should be full") + + slot, err := r.findWorkerSlot(workers, backoffJob, map[string]struct{}{}) + require.Nil(t, slot) + require.ErrorIs(t, err, types.ErrJobBackoff) + + slot, err = r.findWorkerSlot(workers, noBackoffJob1, map[string]struct{}{}) + require.NotNil(t, slot) + require.NoError(t, err) + + slot, err = r.findWorkerSlot(workers, noBackoffJob2, map[string]struct{}{}) + require.NotNil(t, slot) + require.NoError(t, err) + + slot, err = r.findWorkerSlot(workers, noBackoffJob3, map[string]struct{}{}) + require.NotNil(t, slot) + require.NoError(t, err) + + slot, err = r.findWorkerSlot(workers, noBackoffJob4, map[string]struct{}{}) + require.Nil(t, slot) + require.ErrorIs(t, err, types.ErrWorkerNoSlot) }) t.Run("eventorder enabled", func(t *testing.T) { r.guaranteeUserEventOrder = true workers[0].inputReservations = 0 - require.Nil(t, r.findWorkerSlot(workers, backoffJob, map[string]struct{}{})) - require.NotNil(t, r.findWorkerSlot(workers, noBackoffJob1, map[string]struct{}{})) - require.NotNil(t, r.findWorkerSlot(workers, noBackoffJob2, map[string]struct{}{})) - require.NotNil(t, r.findWorkerSlot(workers, noBackoffJob3, map[string]struct{}{})) - require.Nil(t, r.findWorkerSlot(workers, noBackoffJob4, map[string]struct{}{}), "worker's input channel should be full") + + slot, err := r.findWorkerSlot(workers, backoffJob, map[string]struct{}{}) + require.Nil(t, slot) + require.ErrorIs(t, err, types.ErrJobBackoff) + + slot, err = r.findWorkerSlot(workers, noBackoffJob1, map[string]struct{}{}) + require.NotNil(t, slot) + require.NoError(t, err) + + slot, err = r.findWorkerSlot(workers, noBackoffJob2, map[string]struct{}{}) + require.NotNil(t, slot) + require.NoError(t, err) + + slot, err = r.findWorkerSlot(workers, noBackoffJob3, map[string]struct{}{}) + require.NotNil(t, slot) + require.NoError(t, err) + + slot, err = r.findWorkerSlot(workers, noBackoffJob4, map[string]struct{}{}) + require.Nil(t, slot) + require.ErrorIs(t, err, types.ErrWorkerNoSlot) + }) + + t.Run("context canceled", func(t *testing.T) { + defer func() { r.backgroundCtx = context.Background() }() + r.backgroundCtx, r.backgroundCancel = context.WithCancel(context.Background()) + r.backgroundCancel() + slot, err := r.findWorkerSlot(workers, backoffJob, map[string]struct{}{}) + require.Nil(t, slot) + require.ErrorIs(t, err, types.ErrContextCancelled) + }) + + t.Run("unmarshalling params failure", func(t *testing.T) { + invalidJob := &jobsdb.JobT{ + JobID: 1, + Parameters: []byte(`{"destination_id": "destination"`), + LastJobStatus: jobsdb.JobStatusT{ + JobState: jobsdb.Failed.State, + AttemptNum: 1, + RetryTime: time.Now().Add(1 * time.Hour), + }, + } + slot, err := r.findWorkerSlot(workers, invalidJob, map[string]struct{}{}) + require.Nil(t, slot) + require.ErrorIs(t, err, types.ErrParamsUnmarshal) + }) + + t.Run("blocked job", func(t *testing.T) { + job := &jobsdb.JobT{ + JobID: 1, + Parameters: []byte(`{"destination_id": "destination"}`), + LastJobStatus: jobsdb.JobStatusT{ + JobState: jobsdb.Failed.State, + AttemptNum: 1, + RetryTime: time.Now().Add(1 * time.Hour), + }, + } + slot, err := r.findWorkerSlot(workers, backoffJob, map[string]struct{}{jobOrderKey(job.UserID, "destination"): {}}) + require.Nil(t, slot) + require.ErrorIs(t, err, types.ErrJobOrderBlocked) }) }) } diff --git a/router/types/types.go b/router/types/types.go index 2bfee0f102..e9ead0a0a8 100644 --- a/router/types/types.go +++ b/router/types/types.go @@ -2,6 +2,7 @@ package types import ( "encoding/json" + "errors" "time" backendconfig "github.com/rudderlabs/rudder-server/backend-config" @@ -124,3 +125,20 @@ func (e *EventTypeThrottlingCost) Cost(eventType string) (cost int64) { } return 1 } + +var ( + // ErrContextCancelled is returned when the context is cancelled + ErrContextCancelled = errors.New("context cancelled") + // ErrParamsUnmarshal is returned when it is not possible to unmarshal the job parameters + ErrParamsUnmarshal = errors.New("unmarhall params") + // ErrJobOrderBlocked is returned when the job is blocked by another job discarded by the router in the same loop + ErrJobOrderBlocked = errors.New("blocked") + // ErrWorkerNoSlot is returned when the worker doesn't have an available slot + ErrWorkerNoSlot = errors.New("no slot") + // ErrJobBackoff is returned when the job is backoffed + ErrJobBackoff = errors.New("backoff") + // ErrDestinationThrottled is returned when the destination is being throttled + ErrDestinationThrottled = errors.New("throttled") + // ErrBarrierExists is returned when a job ordering barrier exists for the job's ordering key + ErrBarrierExists = errors.New("barrier") +)