Skip to content

Commit

Permalink
feat(rx): add distinct operator (#164)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Apr 9, 2024
1 parent 754ec36 commit c8aa638
Show file tree
Hide file tree
Showing 4 changed files with 331 additions and 5 deletions.
116 changes: 116 additions & 0 deletions rx/observable-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,122 @@ func (op *defaultIfEmptyOperator[T]) gatherNext(_ context.Context, _ Item[T],
) {
}

// Distinct suppresses duplicate items in the original Observable and returns
// a new Observable.
func (o *ObservableImpl[T]) Distinct(apply Func[T], opts ...Option[T]) Observable[T] {
const (
forceSeq = false
bypassGather = false
)

return observable(o.parent, o, func() operator[T] {
return &distinctOperator[T]{
apply: apply,
keyset: make(map[interface{}]interface{}),
}
}, forceSeq, bypassGather, opts...)
}

type distinctOperator[T any] struct {
apply Func[T]
keyset map[interface{}]interface{}
}

func (op *distinctOperator[T]) next(ctx context.Context, item Item[T],
dst chan<- Item[T], operatorOptions operatorOptions[T],
) {
key, err := op.apply(ctx, item.V)
if err != nil {
Error[T](err).SendContext(ctx, dst)
operatorOptions.stop()

return
}

_, ok := op.keyset[key]

if !ok {
item.SendContext(ctx, dst)
}

op.keyset[key] = nil
}

func (op *distinctOperator[T]) err(ctx context.Context, item Item[T],
dst chan<- Item[T], operatorOptions operatorOptions[T],
) {
defaultErrorFuncOperator(ctx, item, dst, operatorOptions)
}

func (op *distinctOperator[T]) end(_ context.Context, _ chan<- Item[T]) {
}

func (op *distinctOperator[T]) gatherNext(ctx context.Context, item Item[T],
dst chan<- Item[T], _ operatorOptions[T],
) {
if _, contains := op.keyset[item.V]; !contains {
Of(item.V).SendContext(ctx, dst)

op.keyset[item.V] = nil
}
}

// DistinctUntilChanged suppresses consecutive duplicate items in the original Observable.
// Cannot be run in parallel.
func (o *ObservableImpl[T]) DistinctUntilChanged(apply Func[T], comparator Comparator[T],
opts ...Option[T],
) Observable[T] {
const (
forceSeq = true
bypassGather = false
)

return observable(o.parent, o, func() operator[T] {
return &distinctUntilChangedOperator[T]{
apply: apply,
comparator: comparator,
}
}, forceSeq, bypassGather, opts...)
}

type distinctUntilChangedOperator[T any] struct {
apply Func[T]
current T
comparator Comparator[T]
}

func (op *distinctUntilChangedOperator[T]) next(ctx context.Context, item Item[T],
dst chan<- Item[T], operatorOptions operatorOptions[T]) {
key, err := op.apply(ctx, item.V)

if err != nil {
Error[T](err).SendContext(ctx, dst)
operatorOptions.stop()

return
}

if op.comparator(op.current, key) != 0 {
item.SendContext(ctx, dst)

op.current = key
}
}

func (op *distinctUntilChangedOperator[T]) err(ctx context.Context, item Item[T],
dst chan<- Item[T], operatorOptions operatorOptions[T],
) {
defaultErrorFuncOperator(ctx, item, dst, operatorOptions)
}

func (op *distinctUntilChangedOperator[T]) end(_ context.Context, _ chan<- Item[T]) {
}

func (op *distinctUntilChangedOperator[T]) gatherNext(_ context.Context, _ Item[T],
_ chan<- Item[T], _ operatorOptions[T],
) {
}

// Max determines and emits the maximum-valued item emitted by an Observable according to a comparator.
func (o *ObservableImpl[T]) Max(comparator Comparator[T], initLimit InitLimit[T],
opts ...Option[T],
Expand Down
3 changes: 2 additions & 1 deletion rx/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ type Observable[T any] interface {
Contains(equal Predicate[T], opts ...Option[T]) Single[T]
Count(opts ...Option[T]) Single[T]
DefaultIfEmpty(defaultValue T, opts ...Option[T]) Observable[T]

Distinct(apply Func[T], opts ...Option[T]) Observable[T]
DistinctUntilChanged(apply Func[T], comparator Comparator[T], opts ...Option[T]) Observable[T]
Max(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T]) OptionalSingle[T]
Map(apply Func[T], opts ...Option[T]) Observable[T]
Min(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T]) OptionalSingle[T]
Expand Down
209 changes: 209 additions & 0 deletions rx/observable_operator_distinct_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package rx_test

import (
"context"

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok

"github.com/snivilised/lorax/rx"
)

var _ = Describe("Observable operator", func() {
Context("Distinct", func() {
When("duplicates present", func() {
It("🧪 should: suppress duplicates", func() {
// rxgo: Test_Observable_Distinct
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := testObservable[int](ctx, 1, 2, 2, 1, 3).Distinct(
func(_ context.Context, item int) (int, error) {
return item, nil
},
)

rx.Assert(ctx, obs,
rx.HasItems[int]{
Expected: []int{1, 2, 3},
},
rx.HasNoError[int]{},
)
})
})

Context("Errors", func() {
When("error present", func() {
It("🧪 should: emit values before error and has error", func() {
// rxgo: Test_Observable_Distinct_Error
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := testObservable[int](ctx, 1, 2, 2, errFoo, 3).Distinct(
func(_ context.Context, item int) (int, error) {
return item, nil
},
)

rx.Assert(ctx, obs,
rx.HasItems[int]{
Expected: []int{1, 2},
},
rx.HasError[int]{
Expected: []error{errFoo},
},
)
})
})

When("error present", func() {
It("🧪 should: emit values before error and has error", func() {
// rxgo: Test_Observable_Distinct_Error2
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := testObservable[int](ctx, 1, 2, 2, 2, 3, 4).Distinct(
func(_ context.Context, v int) (int, error) {
if v == 3 {
return 0, errFoo
}

return v, nil
},
)

rx.Assert(ctx, obs,
rx.HasItems[int]{
Expected: []int{1, 2},
}, rx.HasError[int]{
Expected: []error{errFoo},
},
)
})
})
})

Context("Parallel", func() {
Context("foo", func() {
It("🧪 should: ", func() {
// rxgo: Test_Observable_Distinct_Parallel
defer leaktest.Check(GinkgoT())()

/*
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
obs := testObservable[int](ctx, 1, 2, 2, 1, 3).Distinct(
func(_ context.Context, item int) (int, error) {
return item, nil
}, rx.WithCPUPool[int]())
rx.Assert(ctx, obs,
rx.HasItemsNoOrder[int]{
Expected: []int{1, 2, 3},
},
rx.HasNoError[int]{},
)
*/
})
})
})

Context("Parallel/Error", func() {
When("given: foo", func() {
It("🧪 should: ", func() {
// rxgo: Test_Observable_Distinct_Parallel_Error
defer leaktest.Check(GinkgoT())()

/*
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
obs := testObservable[int](ctx, 1, 2, 2, errFoo).Distinct(
func(_ context.Context, item int) (int, error) {
return item, nil
}, rx.WithContext[int](ctx), rx.WithCPUPool[int]())
rx.Assert(ctx, obs, rx.HasError[int]{
Expected: []error{errFoo},
})
*/
})
})

When("given: foo", func() {
It("🧪 should: ", func() {
// rxgo: Test_Observable_Distinct_Parallel_Error2
defer leaktest.Check(GinkgoT())()

/*
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
obs := testObservable[int](ctx, 1, 2, 2, 2, 3, 4).Distinct(
func(_ context.Context, item int) (int, error) {
if item == 3 {
return 0, errFoo
}
return item, nil
}, rx.WithContext[int](ctx), rx.WithCPUPool[int](),
)
rx.Assert[int](ctx, obs, rx.HasError[int]{
Expected: []error{errFoo},
})
*/
})
})
})
})

Context("DistinctUntilChanged", func() {
Context("principle", func() {
It("🧪 should: ", func() {
// rxgo: Test_Observable_DistinctUntilChanged
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := testObservable[int](ctx, 1, 2, 2, 1, 3).DistinctUntilChanged(
func(_ context.Context, v int) (int, error) {
return v, nil
}, rx.LimitComparator, rx.WithCPUPool[int]())

rx.Assert(ctx, obs,
rx.HasItems[int]{
Expected: []int{1, 2, 1, 3},
})
})
})

Context("Parallel", func() {
Context("given: foo", func() {
It("🧪 should: ", func() {
// rxgo: Test_Observable_DistinctUntilChanged_Parallel
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := testObservable[int](ctx, 1, 2, 2, 1, 3).DistinctUntilChanged(
func(_ context.Context, item int) (int, error) {
return item, nil
}, rx.LimitComparator, rx.WithCPUPool[int]())

rx.Assert(ctx, obs, rx.HasItems[int]{
Expected: []int{1, 2, 1, 3},
})
})
})
})
})
})
8 changes: 4 additions & 4 deletions rx/operator-skeleton_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

var _ = Describe("Observable operator", func() {
XContext("${{OPERATOR-NAME}}", func() {
Context("principle", func() {
When("principle", func() {
// success path
It("🧪 should: ", func() {
// rxgo: Test_
Expand All @@ -22,7 +22,7 @@ var _ = Describe("Observable operator", func() {
})

Context("Errors", func() {
Context("given: foo", func() {
When("foo", func() {
It("🧪 should: ", func() {
// rxgo: Test_
defer leaktest.Check(GinkgoT())()
Expand All @@ -31,7 +31,7 @@ var _ = Describe("Observable operator", func() {
})

Context("Parallel", func() {
Context("given: foo", func() {
When("foo", func() {
It("🧪 should: ", func() {
// rxgo: Test_
defer leaktest.Check(GinkgoT())()
Expand All @@ -40,7 +40,7 @@ var _ = Describe("Observable operator", func() {
})

Context("Parallel/Error", func() {
Context("given: foo", func() {
When("foo", func() {
It("🧪 should: ", func() {
// rxgo: Test_
defer leaktest.Check(GinkgoT())()
Expand Down

0 comments on commit c8aa638

Please sign in to comment.