Skip to content

Commit

Permalink
add Stream.Drop* (#24)
Browse files Browse the repository at this point in the history
* add Stream.Drop*

* add Stream: Head* / Last* / Take* / Drop* / StartsWith / EndsWith
  • Loading branch information
seborama committed Apr 2, 2022
1 parent b2683ec commit f410f84
Show file tree
Hide file tree
Showing 4 changed files with 1,105 additions and 18 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ Streams:
- All/Any/None -Match
- Intersperse
- Distinct
- Head* / Last* / Take* / Drop*
- StartsWith / EndsWith
- ForEach / Peek
- ...
- ComparableStream
Expand Down
2 changes: 1 addition & 1 deletion errors.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package fuego

// PanicMissingChannel signifies that the Stream is missing a channel.
const PanicMissingChannel = "stream creation requires a channel"
const PanicMissingChannel = "stream requires a channel"

// PanicNoSuchElement signifies that the requested element is not present.
// Examples: when the Stream is empty, or when an Optional does not have a value.
Expand Down
251 changes: 251 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package fuego
import (
"fmt"

"github.com/google/go-cmp/cmp"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -386,6 +387,256 @@ func (s Stream[T]) NoneMatch(p Predicate[T]) bool {
return !s.AnyMatch(p)
}

// Drop the first 'n' elements of this stream and returns a new stream.
//
// This function streams continuously until the in-stream is closed at
// which point the out-stream will be closed too.
func (s Stream[T]) Drop(n uint64) Stream[T] {
return s.DropWhile(func() func(e T) bool {
count := uint64(0)
return func(e T) bool {
count++
return count <= n
}
}())
}

// DropWhile drops the first elements of this stream while the predicate
// is satisfied and returns a new stream.
//
// This function streams continuously until the in-stream is closed at
// which point the out-stream will be closed too.
func (s Stream[T]) DropWhile(p Predicate[T]) Stream[T] {
outstream := make(chan T, cap(s.stream))

go func() {
defer close(outstream)

if s.stream == nil {
return
}

// drop elements as required
for val := range s.stream {
if p(val) {
continue
}
outstream <- val

break
}

// flush the remainder to outstream
for val := range s.stream {
outstream <- val
}
}()

return NewConcurrentStream(outstream, s.concurrency)
}

// DropUntil drops the first elements of this stream until the predicate
// is satisfied and returns a new stream.
//
// This function streams continuously until the in-stream is closed at
// which point the out-stream will be closed too.
func (s Stream[T]) DropUntil(p Predicate[T]) Stream[T] {
return s.DropWhile(p.Negate())
}

// Last returns the last Entry in this stream.
//
// This function streams continuously until the in-stream is closed at
// which point the out-stream will be closed too.
func (s Stream[T]) Last() T {
return s.LastN(1)[0]
}

// LastN returns a slice of the last n elements in this stream.
//
// This function streams continuously until the in-stream is closed at
// which point the out-stream will be closed too.
func (s Stream[T]) LastN(n uint64) []T {
const flushTriggerDefault = uint64(100)

if s.stream == nil {
panic(PanicMissingChannel)
}

if n < 1 {
panic(PanicNoSuchElement)
}

val, ok := <-s.stream
if !ok {
panic(PanicNoSuchElement)
}

result := []T{val}

count := uint64(len(result))
flushTrigger := flushTriggerDefault

if n > flushTrigger {
flushTrigger = n
}

for val = range s.stream {
result = append(result, val)
if count++; count > flushTrigger {
// this is simply to reduce the number of
// slice resizing operations
result = result[uint64(len(result))-n:]
count = 0
}
}

if uint64(len(result)) > n {
return result[uint64(len(result))-n:]
}

return result
}

// Head returns the first Entry in this stream.
//
// This function only consumes at most one element from the stream.
func (s Stream[T]) Head() T {
head := s.HeadN(1)
if len(head) != 1 {
panic(PanicNoSuchElement)
}

return head[0]
}

// HeadN returns a slice of the first n elements in this stream.
//
// This function only consumes at most 'n' elements from the stream.
func (s Stream[T]) HeadN(n uint64) []T {
return Collect(
s.Take(n),
NewCollector(
func() []T { return []T{} },
func(e1 []T, e2 T) []T { return append(e1, e2) },
IdentityFinisher[[]T],
))
}

// Take returns a stream of the first 'n' elements of this stream.
//
// This function streams continuously until the 'n' elements are picked
// or the in-stream is closed at which point the out-stream
// will be closed too.
func (s Stream[T]) Take(n uint64) Stream[T] {
counterIsLessThanOrEqualTo := func(maxCount uint64) Predicate[T] {
counter := uint64(0)

return func(t T) bool {
counter++
return counter <= maxCount
}
}

return s.TakeWhile(counterIsLessThanOrEqualTo(n))
}

// Limit is a synonym for Take.
func (s Stream[T]) Limit(n uint64) Stream[T] {
return s.Take(n)
}

// TakeWhile returns a stream of the first elements of this
// stream while the predicate is satisfied.
//
// This function streams continuously until the in-stream is closed at
// which point the out-stream will be closed too.
func (s Stream[T]) TakeWhile(p Predicate[T]) Stream[T] {
if s.stream == nil {
panic(PanicMissingChannel)
}

outstream := make(chan T, cap(s.stream))

go func() {
defer close(outstream)

for val := range s.stream {
if !p(val) {
return
}
outstream <- val
}
}()

return NewConcurrentStream(outstream, s.concurrency)
}

// TakeUntil returns a stream of the first elements
// of this stream until the predicate is satisfied.
//
// This function streams continuously until the in-stream is closed at
// which point the out-stream will be closed too.
func (s Stream[T]) TakeUntil(p Predicate[T]) Stream[T] {
return s.TakeWhile(p.Negate())
}

// StartsWith returns true when this stream starts
// with the elements in the supplied slice.
//
// This function only consume as much data from the stream as
// is necessary to prove (or disprove) it starts with the supplied
// slice data.
func (s Stream[T]) StartsWith(slice []T) bool {
startElements := s.HeadN(uint64(len(slice)))
if len(slice) == 0 || len(startElements) != len(slice) {
return false
}

for idx, el := range slice {
if !cmp.Equal(el, startElements[idx]) {
return false
}
}

return true
}

// EndsWith returns true when this stream ends
// with the supplied elements.
//
// This is a potentially expensive method since it has
// to consume all the elements in the Stream.
//
// This function streams continuously until the in-stream is closed at
// which point the out-stream will be closed too.
func (s Stream[T]) EndsWith(slice []T) bool {
if len(slice) == 0 {
return false
}

endElements := func() []T {
defer func() {
// TODO: this doesn't look great... Need to re-write LastN like HeadN as a collect of TakeRight (to be implemented)
_ = recover()
}()

return s.LastN(uint64(len(slice)))
}()

if len(endElements) != len(slice) {
return false
}

for idx, el := range slice {
if !cmp.Equal(el, endElements[idx]) {
return false
}
}

return true
}

// ForEach executes the given consumer function for each entry in this stream.
//
// This is a continuous terminal operation. It will only complete if the producer closes the stream.
Expand Down
Loading

0 comments on commit f410f84

Please sign in to comment.