Skip to content

Commit

Permalink
feat(rx): add sample operator (#196)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Apr 12, 2024
1 parent 0937cae commit 310c245
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 0 deletions.
30 changes: 30 additions & 0 deletions rx/observable-operator-sample_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package rx_test

import (
"context"

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok

"github.com/snivilised/lorax/rx"
)

var _ = Describe("Observable operator", func() {
Context("Sample", func() {
When("empty", func() {
It("🧪 should: ", func() {
// rxgo: Test_Observable_Sample_Empty
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := testObservable[int](ctx, 1).Sample(rx.Empty[int](), rx.WithContext[int](ctx))
rx.Assert(ctx, obs,
rx.IsEmpty[int]{},
rx.HasNoError[int]{},
)
})
})
})
})
82 changes: 82 additions & 0 deletions rx/observable-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1641,6 +1641,88 @@ func (o *ObservableImpl[T]) Run(opts ...Option[T]) Disposed {
return dispose
}

// Sample returns an Observable that emits the most recent items emitted by the source
// Iterable whenever the input Iterable emits an item.
func (o *ObservableImpl[T]) Sample(iterable Iterable[T], opts ...Option[T]) Observable[T] {
option := parseOptions(opts...)
next := option.buildChannel()
ctx := option.buildContext(o.parent)
itCh := make(chan Item[T])
obsCh := make(chan Item[T])

go func() {
defer close(obsCh)

observe := o.Observe(opts...)

for {
select {
case <-ctx.Done():
return
case i, ok := <-observe:
if !ok {
return
}

i.SendContext(ctx, obsCh)
}
}
}()

go func() {
defer close(itCh)

observe := iterable.Observe(opts...)

for {
select {
case <-ctx.Done():
return
case i, ok := <-observe:
if !ok {
return
}

i.SendContext(ctx, itCh)
}
}
}()

go func() {
defer close(next)

var lastEmittedItem Item[T]

isItemWaitingToBeEmitted := false

for {
select {
case _, ok := <-itCh:
if ok {
if isItemWaitingToBeEmitted {
next <- lastEmittedItem

isItemWaitingToBeEmitted = false
}
} else {
return
}
case item, ok := <-obsCh:
if ok {
lastEmittedItem = item
isItemWaitingToBeEmitted = true
} else {
return
}
}
}
}()

return &ObservableImpl[T]{
iterable: newChannelIterable(next),
}
}

// !!!

// ToSlice collects all items from an Observable and emit them in a slice and
Expand Down
1 change: 1 addition & 0 deletions rx/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Observable[T any] interface {
Repeat(count int64, frequency Duration, opts ...Option[T]) Observable[T]
Retry(count int, shouldRetry ShouldRetryFunc, opts ...Option[T]) Observable[T]
Run(opts ...Option[T]) Disposed
Sample(iterable Iterable[T], opts ...Option[T]) Observable[T]
ToSlice(initialCapacity int, opts ...Option[T]) ([]Item[T], error)
}

Expand Down

0 comments on commit 310c245

Please sign in to comment.