Skip to content

Commit

Permalink
add Stream: Count, AnyMatchm NoneMatch, AllMatch
Browse files Browse the repository at this point in the history
  • Loading branch information
seborama committed Apr 1, 2022
1 parent 3018433 commit 268e101
Show file tree
Hide file tree
Showing 2 changed files with 267 additions and 1 deletion.
70 changes: 69 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,12 @@ func (s Stream[T]) Intersperse(e T) Stream[T] {

go func() {
defer close(outstream)

if s.stream == nil {
return
}

// this is to get around the inability to test generic types for nil in Go 1.18
// nolint: wsl
select {
case val, ok := <-s.stream:
if !ok {
Expand Down Expand Up @@ -318,6 +318,74 @@ func (s Stream[T]) GroupBy(classifier Function[T, Any]) map[Any][]T {
return resultMap
}

// Count the number of elements in the stream.
//
// This is a continuous terminal operation and hence expects
// the producer to close the stream in order to complete (or
// it will block).
func (s Stream[T]) Count() int {
if s.stream == nil {
return 0
}

count := 0
for range s.stream {
count++
}

return count
}

// AllMatch returns whether all of the elements in the stream
// satisfy the predicate.
//
// This is a continuous terminal operation and hence expects
// the producer to close the stream in order to complete (or
// it will block).
func (s Stream[T]) AllMatch(p Predicate[T]) bool {
if s.stream == nil {
return false
}

for val := range s.stream {
if !p(val) {
return false
}
}

return true
}

// AnyMatch returns whether any of the elements in the stream
// satisfies the predicate.
//
// This is a continuous terminal operation and hence expects
// the producer to close the stream in order to complete (or
// it will block).
func (s Stream[T]) AnyMatch(p Predicate[T]) bool {
if s.stream == nil {
return false
}

for val := range s.stream {
if p(val) {
return true
}
}

return false
}

// NoneMatch returns whether none of the elements in the stream
// satisfies the predicate. It is the opposite of AnyMatch.
//
// This is a continuous terminal operation and hence expects
// the producer to close the stream in order to complete (or
// it will block).
func (s Stream[T]) NoneMatch(p Predicate[T]) bool {
return !s.AnyMatch(p)
}

// ForEach executes the given consumer function for each entry in this stream.
//
// This is a continuous terminal operation. It will only complete if the producer closes the stream.
Expand Down
198 changes: 198 additions & 0 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,204 @@ func TestStream_GroupBy(t *testing.T) {
}
}

func TestStream_Count(t *testing.T) {
tt := map[string]struct {
stream chan int
want int
}{
"Should return 0 for a nil channel": {
stream: nil,
want: 0,
},
"Should return 0 for an empty closed channel": {
stream: func() chan int {
c := make(chan int)
go func() {
defer close(c)
}()
return c
}(),
want: 0,
},
"Should return 3 for a size 3 closed channel": {
stream: func() chan int {
c := make(chan int, 1)
go func() {
defer close(c)
c <- 1
c <- 2
c <- 1
}()
return c
}(),
want: 3,
},
}

for name, tc := range tt {
tc := tc

t.Run(name, func(t *testing.T) {
s := Stream[int]{
stream: tc.stream,
}
if got := s.Count(); got != tc.want {
t.Errorf("Stream.Count() = %v, want %v", got, tc.want)
}
})
}
}

func TestStream_AnyMatch(t *testing.T) {
dataGenerator := func() chan any {
c := make(chan any, 2)
go func() {
defer close(c)
c <- "a"
c <- false
c <- "b"
c <- -17
c <- "c"
}()
return c
}

tt := map[string]struct {
stream chan any
predicate Predicate[any]
want bool
}{
"Should not match any when channel is nil": {
stream: nil,
predicate: True[any](),
want: false,
},
"Should not match any": {
stream: dataGenerator(),
predicate: func(e any) bool { return e == "not in here" },
want: false,
},
"Should match any": {
stream: dataGenerator(),
predicate: func(e any) bool { return e == "b" },
want: true,
},
}

for name, tc := range tt {
tc := tc

t.Run(name, func(t *testing.T) {
s := Stream[any]{
stream: tc.stream,
}
if got := s.AnyMatch(tc.predicate); got != tc.want {
t.Errorf("Stream.AnyMatch() = %v, want %v", got, tc.want)
}
})
}
}

func TestStream_NoneMatch(t *testing.T) {
dataGenerator := func() chan any {
c := make(chan any, 2)
go func() {
defer close(c)
c <- "a"
c <- false
c <- "b"
c <- -17
c <- "c"
}()
return c
}

tt := map[string]struct {
stream chan any
predicate Predicate[any]
want bool
}{
"Should satisfy when channel is nil": {
stream: nil,
predicate: True[any](),
want: true,
},
"Should satisfy": {
stream: dataGenerator(),
predicate: func(e any) bool { return e == "not in here" },
want: true,
},
"Should not satisfy": {
stream: dataGenerator(),
predicate: func(e any) bool { return e == "b" },
want: false,
},
}

for name, tc := range tt {
tc := tc

t.Run(name, func(t *testing.T) {
s := Stream[any]{
stream: tc.stream,
}
if got := s.NoneMatch(tc.predicate); got != tc.want {
t.Errorf("Stream.NoneMatch() = %v, want %v", got, tc.want)
}
})
}
}

func TestStream_AllMatch(t *testing.T) {
dataGenerator := func() chan any {
c := make(chan any, 2)
go func() {
defer close(c)
c <- "a"
c <- false
c <- "b"
c <- -17
c <- "c"
}()
return c
}

tt := map[string]struct {
stream chan any
predicate Predicate[any]
want bool
}{
"Should not match all when channel is nil": {
stream: nil,
predicate: True[any](),
want: false,
},
"Should match all": {
stream: dataGenerator(),
predicate: func(e any) bool { return e != "not in here" },
want: true,
},
"Should not match all": {
stream: dataGenerator(),
predicate: func(e any) bool { return e == "b" },
want: false,
},
}

for name, tc := range tt {
tc := tc

t.Run(name, func(t *testing.T) {
s := Stream[any]{
stream: tc.stream,
}
if got := s.AllMatch(tc.predicate); got != tc.want {
t.Errorf("Stream.AllMatch() = %v, want %v", got, tc.want)
}
})
}
}

func TestStream_ForEach(t *testing.T) {
computeSumTotal := func(callCount, total *int) Consumer[int] {
return func(value int) {
Expand Down

0 comments on commit 268e101

Please sign in to comment.