Skip to content

Commit

Permalink
fixup! chore(router): job worker availability observability & stop it…
Browse files Browse the repository at this point in the history
…eration if throttled
  • Loading branch information
atzoum committed Jun 14, 2023
1 parent afed61a commit d1eaa01
Showing 1 changed file with 59 additions and 0 deletions.
59 changes: 59 additions & 0 deletions router/isolation/isolation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package isolation_test

import (
"context"
"testing"

"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/router/isolation"
"github.com/rudderlabs/rudder-server/router/types"
"github.com/stretchr/testify/require"
)

func TestIsolationStrategy(t *testing.T) {
t.Run("none", func(r *testing.T) {
strategy, err := isolation.GetStrategy(isolation.ModeNone, "", func(_ string) bool { return true })
require.NoError(t, err)

t.Run("active partitions", func(t *testing.T) {
partitions, err := strategy.ActivePartitions(context.Background(), nil)
require.NoError(t, err)
require.Equal(t, []string{""}, partitions)
})
t.Run("augment query params", func(t *testing.T) {
var params jobsdb.GetQueryParamsT
toAugment := params
strategy.AugmentQueryParams("partition", &toAugment)
require.Equal(t, params, toAugment)
})
t.Run("stop iteration", func(t *testing.T) {
require.False(t, strategy.StopIteration(types.ErrBarrierExists))
require.False(t, strategy.StopIteration(types.ErrDestinationThrottled))
})
})
t.Run("workspace", func(r *testing.T) {
strategy, err := isolation.GetStrategy(isolation.ModeWorkspace, "", func(_ string) bool { return true })
require.NoError(t, err)

t.Run("augment query params", func(t *testing.T) {
var params jobsdb.GetQueryParamsT
strategy.AugmentQueryParams("partition", &params)
var expected jobsdb.GetQueryParamsT
expected.WorkspaceID = "partition"
require.Equal(t, expected, params)
})

t.Run("stop iteration", func(t *testing.T) {
require.False(t, strategy.StopIteration(types.ErrBarrierExists))
require.False(t, strategy.StopIteration(types.ErrDestinationThrottled))
})
})
t.Run("destination", func(r *testing.T) {
strategy, err := isolation.GetStrategy(isolation.ModeDestination, "", func(_ string) bool { return true })
require.NoError(t, err)
t.Run("stop iteration", func(t *testing.T) {
require.False(t, strategy.StopIteration(types.ErrBarrierExists))
require.True(t, strategy.StopIteration(types.ErrDestinationThrottled))
})
})
}

0 comments on commit d1eaa01

Please sign in to comment.