diff --git a/factory.go b/factory.go index a5a3415c..01af6cdd 100644 --- a/factory.go +++ b/factory.go @@ -292,7 +292,8 @@ func Never() Observable { } } -// Range creates an Observable that emits a particular range of sequential integers. +// Range creates an Observable that emits count sequential integers beginning +// at start. func Range(start, count int, opts ...Option) Observable { if count < 0 { return Thrown(IllegalInputError{error: "count must be positive"}) diff --git a/factory_connectable_test.go b/factory_connectable_test.go index 2931c14a..f9191a3a 100644 --- a/factory_connectable_test.go +++ b/factory_connectable_test.go @@ -197,7 +197,7 @@ func Test_Connectable_IterableRange_Single(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() obs := &ObservableImpl{ - iterable: newRangeIterable(1, 2, WithPublishStrategy(), WithContext(ctx)), + iterable: newRangeIterable(1, 3, WithPublishStrategy(), WithContext(ctx)), } testConnectableSingle(t, obs) } @@ -207,7 +207,7 @@ func Test_Connectable_IterableRange_Composed(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() obs := &ObservableImpl{ - iterable: newRangeIterable(1, 2, WithPublishStrategy(), WithContext(ctx)), + iterable: newRangeIterable(1, 3, WithPublishStrategy(), WithContext(ctx)), } testConnectableComposed(t, obs) } diff --git a/factory_test.go b/factory_test.go index f5ab749e..614234b0 100644 --- a/factory_test.go +++ b/factory_test.go @@ -465,9 +465,9 @@ func Test_Merge_Interval(t *testing.T) { func Test_Range(t *testing.T) { defer goleak.VerifyNone(t) obs := Range(5, 3) - Assert(context.Background(), t, obs, HasItems(5, 6, 7, 8)) + Assert(context.Background(), t, obs, HasItems(5, 6, 7)) // Test whether the observable is reproducible - Assert(context.Background(), t, obs, HasItems(5, 6, 7, 8)) + Assert(context.Background(), t, obs, HasItems(5, 6, 7)) } func Test_Range_NegativeCount(t *testing.T) { diff --git a/iterable_range.go b/iterable_range.go index a2c91069..c4031747 100644 --- a/iterable_range.go +++ b/iterable_range.go @@ -19,7 +19,7 @@ func (i *rangeIterable) Observe(opts ...Option) <-chan Item { next := option.buildChannel() go func() { - for idx := i.start; idx <= i.start+i.count; idx++ { + for idx := i.start; idx <= i.start+i.count-1; idx++ { select { case <-ctx.Done(): return diff --git a/observable_operator_test.go b/observable_operator_test.go index 182a7821..dd6794c0 100644 --- a/observable_operator_test.go +++ b/observable_operator_test.go @@ -370,7 +370,7 @@ func Test_Observable_Count(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() Assert(ctx, t, Range(1, 10000).Count(), - HasItem(int64(10001))) + HasItem(int64(10000))) } func Test_Observable_Count_Parallel(t *testing.T) { @@ -378,7 +378,7 @@ func Test_Observable_Count_Parallel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() Assert(ctx, t, Range(1, 10000).Count(WithCPUPool()), - HasItem(int64(10001))) + HasItem(int64(10000))) } // FIXME @@ -584,16 +584,16 @@ func Test_Observable_ElementAt(t *testing.T) { defer goleak.VerifyNone(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - obs := Range(0, 10000).ElementAt(10000) - Assert(ctx, t, obs, HasItems(10000)) + obs := Range(0, 10000).ElementAt(9999) + Assert(ctx, t, obs, HasItems(9999)) } func Test_Observable_ElementAt_Parallel(t *testing.T) { defer goleak.VerifyNone(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - obs := Range(0, 10000).ElementAt(10000, WithCPUPool()) - Assert(ctx, t, obs, HasItems(10000)) + obs := Range(0, 10000).ElementAt(9999, WithCPUPool()) + Assert(ctx, t, obs, HasItems(9999)) } func Test_Observable_ElementAt_Error(t *testing.T) { @@ -912,18 +912,18 @@ func Test_Observable_GroupBy(t *testing.T) { defer goleak.VerifyNone(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - count := 3 - max := 10 + length := 3 + count := 11 - obs := Range(0, max).GroupBy(count, func(item Item) int { - return item.V.(int) % count - }, WithBufferedChannel(max)) + obs := Range(0, count).GroupBy(length, func(item Item) int { + return item.V.(int) % length + }, WithBufferedChannel(count)) s, err := obs.ToSlice(0) if err != nil { assert.FailNow(t, err.Error()) } - if len(s) != count { - assert.FailNow(t, "length", "got=%d, expected=%d", len(s), count) + if len(s) != length { + assert.FailNow(t, "length", "got=%d, expected=%d", len(s), length) } Assert(ctx, t, s[0].(Observable), HasItems(0, 3, 6, 9), HasNoError()) @@ -935,18 +935,18 @@ func Test_Observable_GroupBy_Error(t *testing.T) { defer goleak.VerifyNone(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - count := 3 - max := 10 + length := 3 + count := 11 - obs := Range(0, max).GroupBy(count, func(item Item) int { + obs := Range(0, count).GroupBy(length, func(item Item) int { return 4 - }, WithBufferedChannel(max)) + }, WithBufferedChannel(count)) s, err := obs.ToSlice(0) if err != nil { assert.FailNow(t, err.Error()) } - if len(s) != count { - assert.FailNow(t, "length", "got=%d, expected=%d", len(s), count) + if len(s) != length { + assert.FailNow(t, "length", "got=%d, expected=%d", len(s), length) } Assert(ctx, t, s[0].(Observable), HasAnError()) @@ -958,15 +958,15 @@ func Test_Observable_GroupByDynamic(t *testing.T) { defer goleak.VerifyNone(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - count := 3 - max := 10 + length := 3 + count := 11 - obs := Range(0, max).GroupByDynamic(func(item Item) string { + obs := Range(0, count).GroupByDynamic(func(item Item) string { if item.V == 10 { return "10" } - return strconv.Itoa(item.V.(int) % count) - }, WithBufferedChannel(max)) + return strconv.Itoa(item.V.(int) % length) + }, WithBufferedChannel(count)) s, err := obs.ToSlice(0) if err != nil { assert.FailNow(t, err.Error()) @@ -1335,7 +1335,7 @@ func Test_Observable_Max(t *testing.T) { return 0 } }) - Assert(ctx, t, obs, HasItem(10000)) + Assert(ctx, t, obs, HasItem(9999)) } func Test_Observable_Max_Parallel(t *testing.T) { @@ -1365,7 +1365,7 @@ func Test_Observable_Max_Parallel(t *testing.T) { return 0 } }, WithCPUPool()) - Assert(ctx, t, obs, HasItem(10000)) + Assert(ctx, t, obs, HasItem(9999)) } func Test_Observable_Min(t *testing.T) { @@ -1458,7 +1458,7 @@ func Test_Observable_Reduce(t *testing.T) { } return 0, errFoo }) - Assert(ctx, t, obs, HasItem(50015001), HasNoError()) + Assert(ctx, t, obs, HasItem(50005000), HasNoError()) } func Test_Observable_Reduce_Empty(t *testing.T) { @@ -1508,7 +1508,7 @@ func Test_Observable_Reduce_Parallel(t *testing.T) { } return 0, errFoo }, WithCPUPool()) - Assert(ctx, t, obs, HasItem(50015001), HasNoError()) + Assert(ctx, t, obs, HasItem(50005000), HasNoError()) } func Test_Observable_Reduce_Parallel_Error(t *testing.T) { @@ -1548,7 +1548,7 @@ func Test_Observable_Reduce_Parallel_WithErrorStrategy(t *testing.T) { } return 0, errFoo }, WithCPUPool(), WithErrorStrategy(ContinueOnError)) - Assert(ctx, t, obs, HasItem(50015000), HasError(errFoo)) + Assert(ctx, t, obs, HasItem(50004999), HasError(errFoo)) } func Test_Observable_Repeat(t *testing.T) {