diff --git a/rx/factory_test.go b/rx/factory_test.go index e5078e3..3cf503e 100644 --- a/rx/factory_test.go +++ b/rx/factory_test.go @@ -719,7 +719,7 @@ var _ = Describe("Factory", func() { }) When("given: custom structure", func() { - It("🧪 should: ", func() { + It("🧪 should: create observable without error", func() { // Test_Just_CustomStructure defer leaktest.Check(GinkgoT())() @@ -745,7 +745,7 @@ var _ = Describe("Factory", func() { }) When("given: channel", func() { - XIt("🧪 should: ???", func() { + XIt("🧪 should: ???", decorators.Label("sending chan not supported yet"), func() { // Test_Just_Channel defer leaktest.Check(GinkgoT())() @@ -1039,7 +1039,7 @@ var _ = Describe("Factory", func() { Context("NominatedRangeIterator", func() { When("positive count", func() { - It("🧪 should: create observable", decorators.Label("need pointer receiver on T"), func() { + It("🧪 should: create observable", func() { // Test_Range defer leaktest.Check(GinkgoT())() diff --git a/rx/observable-operator-average_test.go b/rx/observable-operator-average_test.go index ba6d952..95f8150 100644 --- a/rx/observable-operator-average_test.go +++ b/rx/observable-operator-average_test.go @@ -27,7 +27,6 @@ import ( "github.com/fortytw2/leaktest" . "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok - "github.com/onsi/ginkgo/v2/dsl/decorators" "github.com/snivilised/lorax/rx" ) @@ -146,8 +145,8 @@ var _ = Describe("Observable operator", func() { }) Context("Parallel/Error", func() { - Context("given: foo", func() { - XIt("🧪 should: ", decorators.Label("broken average.gatherNext"), func() { + Context("given: invalid input", func() { + It("🧪 should: result in error", func() { // rxgo: Test_Observable_AverageFloat32_Parallel_Error defer leaktest.Check(GinkgoT())() diff --git a/rx/observable-operator-reduce_test.go b/rx/observable-operator-reduce_test.go index 643ef6c..5fa6d3b 100644 --- a/rx/observable-operator-reduce_test.go +++ b/rx/observable-operator-reduce_test.go @@ -27,13 +27,12 @@ import ( "github.com/fortytw2/leaktest" . "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok - "github.com/onsi/ginkgo/v2/dsl/decorators" "github.com/snivilised/lorax/enums" "github.com/snivilised/lorax/rx" ) var _ = Describe("Observable operator", func() { - XContext("Reduce", decorators.Label("broken by reduce acc"), func() { + Context("Reduce", func() { When("using Range", func() { It("🧪 should: compute reduction ok", func() { // rxgo: Test_Observable_Reduce @@ -45,9 +44,9 @@ var _ = Describe("Observable operator", func() { obs := rx.Range(&rx.NumericRangeIterator[int]{ StartAt: 1, Whilst: rx.LessThan(10001), - }).Reduce( // 1, 10000 - func(_ context.Context, acc, num rx.Item[int]) (int, error) { - return acc.V + num.Num(), nil + }).Reduce( + func(_ context.Context, acc, item rx.Item[int]) (int, error) { + return acc.V + item.V, nil }, ) rx.Assert(ctx, obs, @@ -104,7 +103,7 @@ var _ = Describe("Observable operator", func() { Context("Parallel", func() { When("using Range", func() { - XIt("🧪 should: compute reduction ok", decorators.Label("repairing"), func() { + It("🧪 should: compute reduction ok", func() { // rxgo: Test_Observable_Reduce_Parallel defer leaktest.Check(GinkgoT())() @@ -113,10 +112,10 @@ var _ = Describe("Observable operator", func() { obs := rx.Range(&rx.NumericRangeIterator[int]{ StartAt: 1, - Whilst: rx.LessThan(6), + Whilst: rx.LessThan(10001), }).Reduce( - func(_ context.Context, acc, num rx.Item[int]) (int, error) { - return acc.Num() + num.Num(), nil + func(_ context.Context, acc, item rx.Item[int]) (int, error) { + return acc.V + item.V, nil }, rx.WithCPUPool[int](), ) rx.Assert(ctx, obs, @@ -131,7 +130,7 @@ var _ = Describe("Observable operator", func() { Context("Parallel/Error", func() { When("using Range", func() { - XIt("🧪 should: result in error", func() { + It("🧪 should: result in error", func() { // rxgo: Test_Observable_Reduce_Parallel_Error defer leaktest.Check(GinkgoT())() @@ -141,11 +140,11 @@ var _ = Describe("Observable operator", func() { StartAt: 1, Whilst: rx.LessThan(10001), }).Reduce( - func(_ context.Context, acc, num rx.Item[int]) (int, error) { - if num.Num() == 1000 { + func(_ context.Context, acc, item rx.Item[int]) (int, error) { + if item.V == 1000 { return 0, errFoo } - return acc.Num() + num.Num(), nil + return acc.V + item.V, nil }, rx.WithContext[int](ctx), rx.WithCPUPool[int](), ) rx.Assert(ctx, obs, @@ -157,21 +156,22 @@ var _ = Describe("Observable operator", func() { }) When("error with error strategy", func() { - XIt("🧪 should: result in error", func() { + It("🧪 should: result in error", func() { // rxgo: Test_Observable_Reduce_Parallel_WithErrorStrategy defer leaktest.Check(GinkgoT())() ctx, cancel := context.WithCancel(context.Background()) defer cancel() + obs := rx.Range(&rx.NumericRangeIterator[int]{ StartAt: 1, Whilst: rx.LessThan(10001), }).Reduce( - func(_ context.Context, acc, num rx.Item[int]) (int, error) { - if num.Num() == 1 { + func(_ context.Context, acc, item rx.Item[int]) (int, error) { + if item.V == 1 { return 0, errFoo } - return acc.Num() + num.Num(), nil + return acc.V + item.V, nil }, rx.WithCPUPool[int](), rx.WithErrorStrategy[int](enums.ContinueOnError), ) rx.Assert(ctx, obs, diff --git a/rx/observable-operator-sum_test.go b/rx/observable-operator-sum_test.go index 89dc035..e508b3a 100644 --- a/rx/observable-operator-sum_test.go +++ b/rx/observable-operator-sum_test.go @@ -27,7 +27,6 @@ import ( "github.com/fortytw2/leaktest" . "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok - "github.com/onsi/ginkgo/v2/dsl/decorators" "github.com/snivilised/lorax/rx" ) @@ -35,7 +34,7 @@ import ( var _ = Describe("Observable operator", func() { Context("Sum", func() { When("principle", func() { - XIt("🧪 should: return sum", decorators.Label("broken by reduce acc"), func() { + It("🧪 should: return sum", func() { // rxgo: Test_Observable_SumFloat32_OnlyFloat32 defer leaktest.Check(GinkgoT())() diff --git a/rx/observable-operator.go b/rx/observable-operator.go index 1d4af7b..dfaa5d7 100644 --- a/rx/observable-operator.go +++ b/rx/observable-operator.go @@ -93,6 +93,7 @@ func (op *allOperator[T]) gatherNext(ctx context.Context, item Item[T], } } +// Average calculates the average of numbers emitted by an Observable and emits the result. func (o *ObservableImpl[T]) Average(opts ...Option[T], ) Single[T] { const ( @@ -1496,7 +1497,7 @@ func (o *ObservableImpl[T]) Reduce(apply Func2[T], opts ...Option[T]) OptionalSi return optionalSingle(o.parent, o, func() operator[T] { return &reduceOperator[T]{ apply: apply, - acc: Num[T](0), // acc needs to be a Num: bomb!!! + acc: Zero[T](), empty: true, } }, forceSeq, bypassGather, opts...) @@ -1512,7 +1513,7 @@ func (op *reduceOperator[T]) next(ctx context.Context, item Item[T], dst chan<- Item[T], operatorOptions operatorOptions[T], ) { op.empty = false - v, err := op.apply(ctx, op.acc, item) // bomb!!! + v, err := op.apply(ctx, op.acc, item) if err != nil { Error[T](err).SendContext(ctx, dst) @@ -2233,10 +2234,9 @@ func (o *ObservableImpl[T]) StartWith(iterable Iterable[T], opts ...Option[T]) O // Sum calculates the average emitted by an Observable and emits the result func (o *ObservableImpl[T]) Sum(opts ...Option[T]) OptionalSingle[T] { - options := parseOptions[T]() + options := parseOptions(opts...) calc := options.calc() - // TODO: bomb!!!: do we use Num? return o.Reduce(func(_ context.Context, acc, item Item[T]) (T, error) { if calc == nil { var (