Skip to content

Commit

Permalink
feat(rx): contains operator (#158)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Apr 9, 2024
1 parent f68b9cd commit 08ba0da
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 0 deletions.
92 changes: 92 additions & 0 deletions rx/observable-operator-contains_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package rx_test

import (
"context"

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok

"github.com/snivilised/lorax/rx"
)

func two(i rx.Item[int]) bool {
return i.V == 2
}

var _ = Describe("Observable operator", func() {
Context("Contains", func() {
When("sequence contains item", func() {
It("🧪 should: result in true", func() {
// rxgo: Test_Observable_Contain
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

rx.Assert(ctx,
testObservable[int](ctx, 1, 2, 3).Contains(two),
rx.IsTrue[int]{},
)
})
})

When("sequence does not contain item", func() {
It("🧪 should: result in false", func() {
// rxgo: Test_Observable_Contain
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

rx.Assert(ctx,
testObservable[int](ctx, 1, 4, 3).Contains(two),
rx.IsFalse[int]{},
)
})
})

Context("Parallel", func() {
When("sequence contains item", func() {
It("🧪 should: result in true", func() {
// rxgo: Test_Observable_Contain_Parallel
defer leaktest.Check(GinkgoT())()

/*
TODO(impl): CPUPool
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rx.Assert(ctx,
testObservable[int](ctx, 1, 2, 3).Contains(two,
rx.WithContext[int](ctx),
rx.WithCPUPool[int](),
),
rx.IsTrue[int]{},
)
*/
})
})

When("sequence does not contain item", func() {
It("🧪 should: result in false", func() {
// rxgo: Test_Observable_Contain_Parallel
defer leaktest.Check(GinkgoT())()

/*
TODO(impl): CPUPool
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rx.Assert(ctx,
testObservable[int](ctx, 1, 4, 3).Contains(two,
rx.WithContext[int](ctx),
rx.WithCPUPool[int](),
),
rx.IsFalse[int]{},
)
*/
})
})
})
})
})
54 changes: 54 additions & 0 deletions rx/observable-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,60 @@ func (o *ObservableImpl[T]) Run(opts ...Option[T]) Disposed {
return dispose
}

func (o *ObservableImpl[T]) Contains(equal Predicate[T], opts ...Option[T]) Single[T] {
const (
forceSeq = false
bypassGather = false
)

return single(o.parent, o, func() operator[T] {
return &containsOperator[T]{
equal: equal,
contains: false,
}
}, forceSeq, bypassGather, opts...)
}

type containsOperator[T any] struct {
equal Predicate[T]
contains bool
}

func (op *containsOperator[T]) next(ctx context.Context, item Item[T],
dst chan<- Item[T], operatorOptions operatorOptions[T],
) {
if op.equal(item) {
True[T]().SendContext(ctx, dst)

op.contains = true

operatorOptions.stop()
}
}

func (op *containsOperator[T]) err(ctx context.Context, item Item[T],
dst chan<- Item[T], operatorOptions operatorOptions[T],
) {
defaultErrorFuncOperator(ctx, item, dst, operatorOptions)
}

func (op *containsOperator[T]) end(ctx context.Context, dst chan<- Item[T]) {
if !op.contains {
False[T]().SendContext(ctx, dst)
}
}

func (op *containsOperator[T]) gatherNext(ctx context.Context, item Item[T],
dst chan<- Item[T], operatorOptions operatorOptions[T],
) {
if item.IsBoolean() && item.B {
True[T]().SendContext(ctx, dst)
operatorOptions.stop()

op.contains = true
}
}

// Max determines and emits the maximum-valued item emitted by an Observable according to a comparator.
func (o *ObservableImpl[T]) Max(comparator Comparator[T], initLimit InitLimit[T],
opts ...Option[T],
Expand Down
1 change: 1 addition & 0 deletions rx/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Observable[T any] interface {
Average(calc Calculator[T], opts ...Option[T]) Single[T]
BackOffRetry(backOffCfg backoff.BackOff, opts ...Option[T]) Observable[T]
Connect(ctx context.Context) (context.Context, Disposable)
Contains(equal Predicate[T], opts ...Option[T]) Single[T]

Max(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T]) OptionalSingle[T]
Map(apply Func[T], opts ...Option[T]) Observable[T]
Expand Down

0 comments on commit 08ba0da

Please sign in to comment.