Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(router): observability on job iterator discards & stop iteration if throttled with destination isolation enabled #3491

Merged
merged 2 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 30 additions & 15 deletions router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ func (rt *Handle) pickup(ctx context.Context, lastQueryRunTime time.Time, partit
firstJob = job
}
lastJob = job
if slot := rt.findWorkerSlot(workers, job, blockedOrderKeys); slot != nil {
slot, err := rt.findWorkerSlot(workers, job, blockedOrderKeys)
if err == nil {
status := jobsdb.JobStatusT{
JobID: job.JobID,
AttemptNum: job.LastJobStatus.AttemptNum,
Expand All @@ -215,8 +216,12 @@ func (rt *Handle) pickup(ctx context.Context, lastQueryRunTime time.Time, partit
statusList = append(statusList, &status)
reservedJobs = append(reservedJobs, reservedJob{slot: slot, job: job})
} else {
stats.Default.NewTaggedStat("router_iterator_stats_discarded_job_count", stats.CountType, stats.Tags{"destType": rt.destType, "partition": partition, "reason": err.Error()}).Increment()
iterator.Discard(job)
discardedCount++
if rt.isolationStrategy.StopIteration(err) {
break
}
}
}
iteratorStats := iterator.Stats()
Expand Down Expand Up @@ -444,44 +449,54 @@ func (rt *Handle) getQueryParams(partition string, pickUpCount int) jobsdb.GetQu
return params
}

func (rt *Handle) findWorkerSlot(workers []*worker, job *jobsdb.JobT, blockedOrderKeys map[string]struct{}) *workerSlot {
func (rt *Handle) findWorkerSlot(workers []*worker, job *jobsdb.JobT, blockedOrderKeys map[string]struct{}) (*workerSlot, error) {
if rt.backgroundCtx.Err() != nil {
return nil
return nil, types.ErrContextCancelled
}

var parameters JobParameters
err := json.Unmarshal(job.Parameters, &parameters)
if err != nil {
if err := json.Unmarshal(job.Parameters, &parameters); err != nil {
rt.logger.Errorf(`[%v Router] :: Unmarshalling parameters failed with the error %v . Returning nil worker`, err)
return nil
return nil, types.ErrParamsUnmarshal
}
orderKey := jobOrderKey(job.UserID, parameters.DestinationID)

// checking if the orderKey is in blockedOrderKeys. If yes, returning nil.
// this check is done to maintain order.
if _, ok := blockedOrderKeys[orderKey]; ok {
rt.logger.Debugf(`[%v Router] :: Skipping processing of job:%d of orderKey:%s as orderKey has earlier jobs in throttled map`, rt.destType, job.JobID, orderKey)
return nil
return nil, types.ErrJobOrderBlocked
}

if !rt.guaranteeUserEventOrder {
availableWorkers := lo.Filter(workers, func(w *worker, _ int) bool { return w.AvailableSlots() > 0 })
if len(availableWorkers) == 0 || rt.shouldThrottle(job, parameters) || rt.shouldBackoff(job) {
return nil
if len(availableWorkers) == 0 {
return nil, types.ErrWorkerNoSlot
}
if rt.shouldBackoff(job) {
return nil, types.ErrJobBackoff
}
if rt.shouldThrottle(job, parameters) {
return nil, types.ErrDestinationThrottled
}

if slot := availableWorkers[rand.Intn(len(availableWorkers))].ReserveSlot(); slot != nil { // skipcq: GSC-G404
return slot, nil
}
return availableWorkers[rand.Intn(len(availableWorkers))].ReserveSlot() // skipcq: GSC-G404
return nil, types.ErrWorkerNoSlot

}

//#JobOrder (see other #JobOrder comment)
worker := workers[getWorkerPartition(orderKey, len(workers))]
if rt.shouldBackoff(job) { // backoff
blockedOrderKeys[orderKey] = struct{}{}
return nil
return nil, types.ErrJobBackoff
}
slot := worker.ReserveSlot()
if slot == nil {
blockedOrderKeys[orderKey] = struct{}{}
return nil
return nil, types.ErrWorkerNoSlot
}

enter, previousFailedJobID := worker.barrier.Enter(orderKey, job.JobID)
Expand All @@ -491,17 +506,17 @@ func (rt *Handle) findWorkerSlot(workers []*worker, job *jobsdb.JobT, blockedOrd
blockedOrderKeys[orderKey] = struct{}{}
worker.barrier.Leave(orderKey, job.JobID)
slot.Release()
return nil
return nil, types.ErrDestinationThrottled
}
return slot
return slot, nil
}
previousFailedJobIDStr := "<nil>"
if previousFailedJobID != nil {
previousFailedJobIDStr = strconv.FormatInt(*previousFailedJobID, 10)
}
rt.logger.Debugf("EventOrder: job %d of orderKey %s is blocked (previousFailedJobID: %s)", job.JobID, orderKey, previousFailedJobIDStr)
slot.Release()
return nil
return nil, types.ErrBarrierExists
//#EndJobOrder
}

Expand Down
16 changes: 16 additions & 0 deletions router/isolation/isolation.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"

"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/router/types"
"github.com/samber/lo"
)

Expand Down Expand Up @@ -36,6 +37,8 @@ type Strategy interface {
ActivePartitions(ctx context.Context, db jobsdb.MultiTenantJobsDB) ([]string, error)
// AugmentQueryParams augments the given GetQueryParamsT with the strategy specific parameters
AugmentQueryParams(partition string, params *jobsdb.GetQueryParamsT)
// StopIteration returns true if the iteration should be stopped for the given error
StopIteration(err error) bool
}

// noneStrategy implements isolation at no level
Expand All @@ -49,6 +52,10 @@ func (noneStrategy) AugmentQueryParams(_ string, _ *jobsdb.GetQueryParamsT) {
// no-op
}

func (noneStrategy) StopIteration(_ error) bool {
return false
}

// workspaceStrategy implements isolation at workspace level
type workspaceStrategy struct {
customVal string
Expand All @@ -63,6 +70,10 @@ func (workspaceStrategy) AugmentQueryParams(partition string, params *jobsdb.Get
params.WorkspaceID = partition
}

func (workspaceStrategy) StopIteration(_ error) bool {
return false
}

// destinationStrategy implements isolation at destination level
type destinationStrategy struct {
destinationFilter func(destinationID string) bool
Expand All @@ -83,3 +94,8 @@ func (ds destinationStrategy) ActivePartitions(ctx context.Context, db jobsdb.Mu
func (destinationStrategy) AugmentQueryParams(partition string, params *jobsdb.GetQueryParamsT) {
params.ParameterFilters = append(params.ParameterFilters, jobsdb.ParameterFilterT{Name: "destination_id", Value: partition})
}

// StopIteration returns true if the error is ErrDestinationThrottled
func (destinationStrategy) StopIteration(err error) bool {
return errors.Is(err, types.ErrDestinationThrottled)
}
59 changes: 59 additions & 0 deletions router/isolation/isolation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package isolation_test

import (
"context"
"testing"

"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/router/isolation"
"github.com/rudderlabs/rudder-server/router/types"
"github.com/stretchr/testify/require"
)

func TestIsolationStrategy(t *testing.T) {
t.Run("none", func(r *testing.T) {
strategy, err := isolation.GetStrategy(isolation.ModeNone, "", func(_ string) bool { return true })
require.NoError(t, err)

t.Run("active partitions", func(t *testing.T) {
partitions, err := strategy.ActivePartitions(context.Background(), nil)
require.NoError(t, err)
require.Equal(t, []string{""}, partitions)
})
t.Run("augment query params", func(t *testing.T) {
var params jobsdb.GetQueryParamsT
toAugment := params
strategy.AugmentQueryParams("partition", &toAugment)
require.Equal(t, params, toAugment)
})
t.Run("stop iteration", func(t *testing.T) {
require.False(t, strategy.StopIteration(types.ErrBarrierExists))
require.False(t, strategy.StopIteration(types.ErrDestinationThrottled))
})
})
t.Run("workspace", func(r *testing.T) {
strategy, err := isolation.GetStrategy(isolation.ModeWorkspace, "", func(_ string) bool { return true })
require.NoError(t, err)

t.Run("augment query params", func(t *testing.T) {
var params jobsdb.GetQueryParamsT
strategy.AugmentQueryParams("partition", &params)
var expected jobsdb.GetQueryParamsT
expected.WorkspaceID = "partition"
require.Equal(t, expected, params)
})

t.Run("stop iteration", func(t *testing.T) {
require.False(t, strategy.StopIteration(types.ErrBarrierExists))
require.False(t, strategy.StopIteration(types.ErrDestinationThrottled))
})
})
t.Run("destination", func(r *testing.T) {
strategy, err := isolation.GetStrategy(isolation.ModeDestination, "", func(_ string) bool { return true })
require.NoError(t, err)
t.Run("stop iteration", func(t *testing.T) {
require.False(t, strategy.StopIteration(types.ErrBarrierExists))
require.True(t, strategy.StopIteration(types.ErrDestinationThrottled))
})
})
}
89 changes: 79 additions & 10 deletions router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,21 +231,90 @@ func TestBackoff(t *testing.T) {
t.Run("eventorder disabled", func(t *testing.T) {
r.guaranteeUserEventOrder = false
workers[0].inputReservations = 0
require.Nil(t, r.findWorkerSlot(workers, backoffJob, map[string]struct{}{}))
require.NotNil(t, r.findWorkerSlot(workers, noBackoffJob1, map[string]struct{}{}))
require.NotNil(t, r.findWorkerSlot(workers, noBackoffJob2, map[string]struct{}{}))
require.NotNil(t, r.findWorkerSlot(workers, noBackoffJob3, map[string]struct{}{}))
require.Nil(t, r.findWorkerSlot(workers, noBackoffJob4, map[string]struct{}{}), "worker's input channel should be full")

slot, err := r.findWorkerSlot(workers, backoffJob, map[string]struct{}{})
require.Nil(t, slot)
require.ErrorIs(t, err, types.ErrJobBackoff)

slot, err = r.findWorkerSlot(workers, noBackoffJob1, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err)

slot, err = r.findWorkerSlot(workers, noBackoffJob2, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err)

slot, err = r.findWorkerSlot(workers, noBackoffJob3, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err)

slot, err = r.findWorkerSlot(workers, noBackoffJob4, map[string]struct{}{})
require.Nil(t, slot)
require.ErrorIs(t, err, types.ErrWorkerNoSlot)
})

t.Run("eventorder enabled", func(t *testing.T) {
r.guaranteeUserEventOrder = true
workers[0].inputReservations = 0
require.Nil(t, r.findWorkerSlot(workers, backoffJob, map[string]struct{}{}))
require.NotNil(t, r.findWorkerSlot(workers, noBackoffJob1, map[string]struct{}{}))
require.NotNil(t, r.findWorkerSlot(workers, noBackoffJob2, map[string]struct{}{}))
require.NotNil(t, r.findWorkerSlot(workers, noBackoffJob3, map[string]struct{}{}))
require.Nil(t, r.findWorkerSlot(workers, noBackoffJob4, map[string]struct{}{}), "worker's input channel should be full")

slot, err := r.findWorkerSlot(workers, backoffJob, map[string]struct{}{})
require.Nil(t, slot)
require.ErrorIs(t, err, types.ErrJobBackoff)

slot, err = r.findWorkerSlot(workers, noBackoffJob1, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err)

slot, err = r.findWorkerSlot(workers, noBackoffJob2, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err)

slot, err = r.findWorkerSlot(workers, noBackoffJob3, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err)

slot, err = r.findWorkerSlot(workers, noBackoffJob4, map[string]struct{}{})
require.Nil(t, slot)
require.ErrorIs(t, err, types.ErrWorkerNoSlot)
})

t.Run("context canceled", func(t *testing.T) {
defer func() { r.backgroundCtx = context.Background() }()
r.backgroundCtx, r.backgroundCancel = context.WithCancel(context.Background())
r.backgroundCancel()
slot, err := r.findWorkerSlot(workers, backoffJob, map[string]struct{}{})
require.Nil(t, slot)
require.ErrorIs(t, err, types.ErrContextCancelled)
})

t.Run("unmarshalling params failure", func(t *testing.T) {
invalidJob := &jobsdb.JobT{
JobID: 1,
Parameters: []byte(`{"destination_id": "destination"`),
LastJobStatus: jobsdb.JobStatusT{
JobState: jobsdb.Failed.State,
AttemptNum: 1,
RetryTime: time.Now().Add(1 * time.Hour),
},
}
slot, err := r.findWorkerSlot(workers, invalidJob, map[string]struct{}{})
require.Nil(t, slot)
require.ErrorIs(t, err, types.ErrParamsUnmarshal)
})

t.Run("blocked job", func(t *testing.T) {
job := &jobsdb.JobT{
JobID: 1,
Parameters: []byte(`{"destination_id": "destination"}`),
LastJobStatus: jobsdb.JobStatusT{
JobState: jobsdb.Failed.State,
AttemptNum: 1,
RetryTime: time.Now().Add(1 * time.Hour),
},
}
slot, err := r.findWorkerSlot(workers, backoffJob, map[string]struct{}{jobOrderKey(job.UserID, "destination"): {}})
require.Nil(t, slot)
require.ErrorIs(t, err, types.ErrJobOrderBlocked)
})
})
}
Expand Down
18 changes: 18 additions & 0 deletions router/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package types

import (
"encoding/json"
"errors"
"time"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
Expand Down Expand Up @@ -124,3 +125,20 @@ func (e *EventTypeThrottlingCost) Cost(eventType string) (cost int64) {
}
return 1
}

var (
// ErrContextCancelled is returned when the context is cancelled
ErrContextCancelled = errors.New("context cancelled")
// ErrParamsUnmarshal is returned when it is not possible to unmarshal the job parameters
ErrParamsUnmarshal = errors.New("unmarhall params")
// ErrJobOrderBlocked is returned when the job is blocked by another job discarded by the router in the same loop
ErrJobOrderBlocked = errors.New("blocked")
// ErrWorkerNoSlot is returned when the worker doesn't have an available slot
ErrWorkerNoSlot = errors.New("no slot")
// ErrJobBackoff is returned when the job is backoffed
ErrJobBackoff = errors.New("backoff")
// ErrDestinationThrottled is returned when the destination is being throttled
ErrDestinationThrottled = errors.New("throttled")
// ErrBarrierExists is returned when a job ordering barrier exists for the job's ordering key
ErrBarrierExists = errors.New("barrier")
)