From d1eaa012004a76645a34cbf967ed2f0d1c49d51b Mon Sep 17 00:00:00 2001 From: Aris Tzoumas Date: Wed, 14 Jun 2023 08:48:28 +0300 Subject: [PATCH] fixup! chore(router): job worker availability observability & stop iteration if throttled --- router/isolation/isolation_test.go | 59 ++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 router/isolation/isolation_test.go diff --git a/router/isolation/isolation_test.go b/router/isolation/isolation_test.go new file mode 100644 index 0000000000..55fcbae602 --- /dev/null +++ b/router/isolation/isolation_test.go @@ -0,0 +1,59 @@ +package isolation_test + +import ( + "context" + "testing" + + "github.com/rudderlabs/rudder-server/jobsdb" + "github.com/rudderlabs/rudder-server/router/isolation" + "github.com/rudderlabs/rudder-server/router/types" + "github.com/stretchr/testify/require" +) + +func TestIsolationStrategy(t *testing.T) { + t.Run("none", func(r *testing.T) { + strategy, err := isolation.GetStrategy(isolation.ModeNone, "", func(_ string) bool { return true }) + require.NoError(t, err) + + t.Run("active partitions", func(t *testing.T) { + partitions, err := strategy.ActivePartitions(context.Background(), nil) + require.NoError(t, err) + require.Equal(t, []string{""}, partitions) + }) + t.Run("augment query params", func(t *testing.T) { + var params jobsdb.GetQueryParamsT + toAugment := params + strategy.AugmentQueryParams("partition", &toAugment) + require.Equal(t, params, toAugment) + }) + t.Run("stop iteration", func(t *testing.T) { + require.False(t, strategy.StopIteration(types.ErrBarrierExists)) + require.False(t, strategy.StopIteration(types.ErrDestinationThrottled)) + }) + }) + t.Run("workspace", func(r *testing.T) { + strategy, err := isolation.GetStrategy(isolation.ModeWorkspace, "", func(_ string) bool { return true }) + require.NoError(t, err) + + t.Run("augment query params", func(t *testing.T) { + var params jobsdb.GetQueryParamsT + strategy.AugmentQueryParams("partition", ¶ms) + var expected jobsdb.GetQueryParamsT + expected.WorkspaceID = "partition" + require.Equal(t, expected, params) + }) + + t.Run("stop iteration", func(t *testing.T) { + require.False(t, strategy.StopIteration(types.ErrBarrierExists)) + require.False(t, strategy.StopIteration(types.ErrDestinationThrottled)) + }) + }) + t.Run("destination", func(r *testing.T) { + strategy, err := isolation.GetStrategy(isolation.ModeDestination, "", func(_ string) bool { return true }) + require.NoError(t, err) + t.Run("stop iteration", func(t *testing.T) { + require.False(t, strategy.StopIteration(types.ErrBarrierExists)) + require.True(t, strategy.StopIteration(types.ErrDestinationThrottled)) + }) + }) +}