From f378dcc4dcdf6209e989f4c2219e3eef11bf5f2d Mon Sep 17 00:00:00 2001 From: seborama Date: Thu, 31 Mar 2022 22:35:34 +0100 Subject: [PATCH] add Stream.Peek --- stream.go | 26 ++++++++++++---- stream_test.go | 81 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 6 deletions(-) diff --git a/stream.go b/stream.go index 8f71cc2..c42bce6 100644 --- a/stream.go +++ b/stream.go @@ -78,12 +78,6 @@ func (s Stream[T]) Concurrency() int { // // This is used for concurrent methods such as Stream.Map. // -// Consumption is ordered by the stream's channel but output -// may be unordered (a slow consumer will be "out-raced" by faster -// consumers). Ordering is dependent on the implementation of -// concurrency. For instance Stream.Map() is orderly but -// Stream.ForEachC is not. -// // Note that to switch off concurrency, you should provide n = 0. // With n = 1, concurrency is internal whereby the Stream writer // will not block on writing a single element (i.e. buffered @@ -307,6 +301,26 @@ func (s Stream[T]) ForEach(c Consumer[T]) { } } +// Peek is akin to ForEach but returns the Stream. +// +// This is useful e.g. for debugging. +// +// This function streams continuously until the in-stream is closed at +// which point the out-stream will be closed too. +func (s Stream[T]) Peek(consumer Consumer[T]) Stream[T] { + outstream := make(chan T, cap(s.stream)) + + go func() { + defer close(outstream) + s.ForEach(func(e T) { + consumer(e) + outstream <- e + }) + }() + + return NewConcurrentStream(outstream, s.concurrency) +} + // ToSlice extracts the elements of the stream into a []T. // // This is a special case of a reduction. diff --git a/stream_test.go b/stream_test.go index bb8e289..383df6e 100644 --- a/stream_test.go +++ b/stream_test.go @@ -595,6 +595,87 @@ func TestStream_Distinct(t *testing.T) { } } +func TestStream_Peek(t *testing.T) { + computeSumTotal := func(callCount, total *int) Consumer[int] { + return func(value int) { + *callCount++ + *total += value + } + } + + tt := []struct { + name string + stream chan int + consumer func(callCount, total *int) Consumer[int] + want []int + wantTotal int + wantCallCount int + }{ + { + name: "Should peek and return empty stream when nil in-stream", + stream: nil, + consumer: computeSumTotal, + want: []int{}, + wantTotal: 0, + wantCallCount: 0, + }, + { + name: "Should peek and return empty stream when empty in-stream", + stream: func() chan int { + c := make(chan int) + go func() { + defer close(c) + }() + return c + }(), + consumer: computeSumTotal, + want: []int{}, + wantTotal: 0, + wantCallCount: 0, + }, + { + name: "Should peek and return stream when populated in-stream", + stream: func() chan int { + c := make(chan int) + go func() { + defer close(c) + c <- 1 + c <- 2 + c <- 3 + c <- 5 + c <- 8 + }() + return c + }(), + consumer: computeSumTotal, + want: []int{ + 1, + 2, + 3, + 5, + 8, + }, + wantTotal: 19, + wantCallCount: 5, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + callCount, total := 0, 0 + + s := Stream[int]{ + stream: tc.stream, + } + + got := s.Peek(tc.consumer(&callCount, &total)) + assert.EqualValues(t, tc.want, got.ToSlice()) + assert.Equal(t, tc.wantTotal, total) + assert.Equal(t, tc.wantCallCount, callCount) + }) + } +} + var float2int = func() Function[float32, Any] { return func(f float32) Any { return int(f)