Skip to content

Commit

Permalink
feat(rx): add sequence equal operator (#200)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Apr 12, 2024
1 parent ac4a77b commit 038923a
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 3 deletions.
99 changes: 99 additions & 0 deletions rx/observable-operator-sequence-equal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package rx_test

import (
"context"

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
"github.com/snivilised/lorax/rx"
)

var _ = Describe("Observable operator", func() {
Context("SequenceEqual", func() {
When("even sequence", func() {
It("🧪 should: ", func() {
// rxgo: Test_Observable_SequenceEqual_EvenSequence
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sequence := testObservable[int](ctx, 2, 5, 12, 43, 98, 100, 213)
result := testObservable[int](ctx, 2, 5, 12, 43, 98, 100, 213).SequenceEqual(
sequence,
rx.NumericItemLimitComparator,
)
rx.Assert(ctx, result,
rx.IsTrue[int]{},
)
})
})

When("Uneven sequence", func() {
It("🧪 should: ", func() {
// rxgo: Test_Observable_SequenceEqual_UnevenSequence
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sequence := testObservable[int](ctx, 2, 5, 12, 43, 98, 100, 213)
result := testObservable[int](ctx, 2, 5, 12, 43, 15, 100, 213).SequenceEqual(
sequence,
rx.NumericItemLimitComparator,
rx.WithContext[int](ctx),
)
rx.Assert(ctx, result,
rx.IsTrue[int]{},
)
})
})

When("Different sequence length", func() {
It("🧪 should: ", func() {
// rxgo: Test_Observable_SequenceEqual_DifferentLengthSequence
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sequenceShorter := testObservable[int](ctx, 2, 5, 12, 43, 98, 100)
sequenceLonger := testObservable[int](ctx, 2, 5, 12, 43, 98, 100, 213, 512)

resultForShorter := testObservable[int](ctx, 2, 5, 12, 43, 98, 100, 213).SequenceEqual(
sequenceShorter,
rx.NumericItemLimitComparator,
)
rx.Assert(ctx, resultForShorter,
rx.IsFalse[int]{},
)

resultForLonger := testObservable[int](ctx, 2, 5, 12, 43, 98, 100, 213).SequenceEqual(
sequenceLonger,
rx.NumericItemLimitComparator,
)
rx.Assert(ctx, resultForLonger,
rx.IsFalse[int]{},
)
})
})

When("empty", func() {
It("🧪 should: ", func() {
// rxgo: Test_Observable_SequenceEqual_Empty
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

result := rx.Empty[int]().SequenceEqual(
rx.Empty[int](),
rx.NumericItemLimitComparator,
)
rx.Assert(ctx, result,
rx.IsTrue[int]{},
)
})
})
})
})
152 changes: 149 additions & 3 deletions rx/observable-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1755,9 +1755,8 @@ func (op *scanOperator[T]) next(ctx context.Context, item Item[T],
return
}

it := Of(v)
it.SendContext(ctx, dst)
op.current = it
op.current = Of(v)
op.current.SendContext(ctx, dst)
}

func (op *scanOperator[T]) err(ctx context.Context, item Item[T],
Expand All @@ -1774,6 +1773,153 @@ func (op *scanOperator[T]) gatherNext(_ context.Context, _ Item[T],
) {
}

// Compares first items of two sequences and returns true if they are equal and false if
// they are not. Besides, it returns two new sequences - input sequences without compared items.
func popAndCompareFirstItems[T any]( //nolint:gocritic // foo
inputSequence1 []Item[T],
inputSequence2 []Item[T],
comparator Comparator[T],
) (bool, []Item[T], []Item[T]) {
if len(inputSequence1) > 0 && len(inputSequence2) > 0 {
s1, sequence1 := inputSequence1[0], inputSequence1[1:]
s2, sequence2 := inputSequence2[0], inputSequence2[1:]

return comparator(s1, s2) == 0, sequence1, sequence2
}

return true, inputSequence1, inputSequence2
}

// Send sends the items to a given channel.
func (o *ObservableImpl[T]) Send(output chan<- Item[T], opts ...Option[T]) {
go func() {
option := parseOptions(opts...)
ctx := option.buildContext(o.parent)
observe := o.Observe(opts...)
loop:
for {
select {
case <-ctx.Done():
break loop
case i, ok := <-observe:
if !ok {
break loop
}

if i.IsError() {
output <- i
break loop
}

i.SendContext(ctx, output)
}
}
close(output)
}()
}

// SequenceEqual emits true if an Observable and the input Observable emit the same items,
// in the same order, with the same termination state. Otherwise, it emits false.
func (o *ObservableImpl[T]) SequenceEqual(iterable Iterable[T],
comparator Comparator[T],
opts ...Option[T],
) Single[T] {
option := parseOptions(opts...)
next := option.buildChannel()
ctx := option.buildContext(o.parent)
itCh := make(chan Item[T])
obsCh := make(chan Item[T])

go func() {
defer close(obsCh)

observe := o.Observe(opts...)

for {
select {
case <-ctx.Done():
return
case i, ok := <-observe:
if !ok {
return
}

i.SendContext(ctx, obsCh)
}
}
}()

go func() {
defer close(itCh)

observe := iterable.Observe(opts...)

for {
select {
case <-ctx.Done():
return
case i, ok := <-observe:
if !ok {
return
}

i.SendContext(ctx, itCh)
}
}
}()

go func() {
var (
mainSequence, obsSequence []Item[T]
)

areCorrect := true
isMainChannelClosed := false
isObsChannelClosed := false

for {
select {
case item, ok := <-itCh:
if ok {
mainSequence = append(mainSequence, item)

areCorrect, mainSequence, obsSequence = popAndCompareFirstItems(
mainSequence, obsSequence,
comparator,
)
} else {
isMainChannelClosed = true
}

case item, ok := <-obsCh:
if ok {
obsSequence = append(obsSequence, item)
areCorrect, mainSequence, obsSequence = popAndCompareFirstItems(
mainSequence, obsSequence,
comparator,
)
} else {
isObsChannelClosed = true
}
}

if !areCorrect || (isMainChannelClosed && isObsChannelClosed) {
break
}
}

Bool[T](
areCorrect && len(mainSequence) == 0 && len(obsSequence) == 0,
).SendContext(ctx, next)

close(next)
}()

return &SingleImpl[T]{
iterable: newChannelIterable(next),
}
}

// !!!

// ToSlice collects all items from an Observable and emit them in a slice and
Expand Down
1 change: 1 addition & 0 deletions rx/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Observable[T any] interface {
Run(opts ...Option[T]) Disposed
Sample(iterable Iterable[T], opts ...Option[T]) Observable[T]
Scan(apply Func2[T], opts ...Option[T]) Observable[T]
SequenceEqual(iterable Iterable[T], comparator Comparator[T], opts ...Option[T]) Single[T]
ToSlice(initialCapacity int, opts ...Option[T]) ([]Item[T], error)
}

Expand Down

0 comments on commit 038923a

Please sign in to comment.