Skip to content

Commit

Permalink
NCR-14625 fix concurrent ProcessBatch tests execution
Browse files Browse the repository at this point in the history
* enable t.Parallel() (40s to 7.4s local tests execution reduction)
* update owners
  • Loading branch information
solokirrik committed Jan 22, 2024
1 parent 248cdf4 commit 8e1ba5a
Show file tree
Hide file tree
Showing 16 changed files with 82 additions and 13 deletions.
2 changes: 1 addition & 1 deletion CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1 +1 @@
* mark.salpeter@deliveryhero.com
* @deliveryhero/adtech-tracking-billing-sdk
2 changes: 2 additions & 0 deletions apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
)

func TestLoopApply(t *testing.T) {
t.Parallel()

transform := NewProcessor(func(_ context.Context, s string) ([]string, error) {
return strings.Split(s, ","), nil
}, nil)
Expand Down
5 changes: 4 additions & 1 deletion cancel_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package pipeline_test
import (
"context"
"log"
"testing"
"time"

"github.com/deliveryhero/pipeline/v2"
)

func ExampleCancel() {
func TestExampleCancel(t *testing.T) {
t.Parallel()

// Create a context that lasts for 1 second
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down
2 changes: 2 additions & 0 deletions cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
)

func TestCancel(t *testing.T) {
t.Parallel()

const testDuration = time.Second

// Collect logs
Expand Down
4 changes: 4 additions & 0 deletions collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
// 5. After duration with nothing in the buffer, nothing is returned, channel remains open
// 6. Flushes the buffer if the context is canceled
func TestCollect(t *testing.T) {
t.Parallel()

const maxTestDuration = time.Second
type args struct {
maxSize int
Expand Down Expand Up @@ -109,6 +111,8 @@ func TestCollect(t *testing.T) {
},
}} {
t.Run(test.name, func(t *testing.T) {
t.Parallel()

// Create the in channel
in := make(chan int)
go func() {
Expand Down
4 changes: 4 additions & 0 deletions delay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
)

func TestDelay(t *testing.T) {
t.Parallel()

const maxTestDuration = time.Second
type args struct {
ctxTimeout time.Duration
Expand Down Expand Up @@ -57,6 +59,8 @@ func TestDelay(t *testing.T) {
},
}} {
t.Run(test.name, func(t *testing.T) {
t.Parallel()

// Create in channel
in := Emit(test.args.in...)

Expand Down
2 changes: 2 additions & 0 deletions join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
)

func TestJoin(t *testing.T) {
t.Parallel()

// Emit 10 numbers
want := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
ins := Emit(want...)
Expand Down
3 changes: 2 additions & 1 deletion merge_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package pipeline_test

import (
"fmt"
"testing"

"github.com/deliveryhero/pipeline/v2"
)

func ExampleMerge() {
func TestExampleMerge(t *testing.T) {
one := pipeline.Emit(1)
two := pipeline.Emit(2, 2)
three := pipeline.Emit(3, 3, 3)
Expand Down
4 changes: 4 additions & 0 deletions merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func (t task) do() <-chan error {
// 2. Receives all error messages from the error chans
// 3. Stays open if one of its child chans never closes
func TestMerge(t *testing.T) {
t.Parallel()

maxTestDuration := time.Second
for _, test := range []struct {
description string
Expand Down Expand Up @@ -115,6 +117,8 @@ func TestMerge(t *testing.T) {
tasks: []task{},
}} {
t.Run(test.description, func(t *testing.T) {
t.Parallel()

// Start doing all of the tasks
var errChans []<-chan error
for _, task := range test.tasks {
Expand Down
13 changes: 10 additions & 3 deletions pipeline_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@ import (
"os"
"os/signal"
"syscall"
"testing"
"time"

"github.com/deliveryhero/pipeline/v2"
)

// The following example shows how you can shutdown a pipeline
// gracefully when it receives an error message
func Example_pipelineShutsDownOnError() {
func TestExample_pipelineShutsDownOnError(t *testing.T) {
t.Parallel()

// Create a context that can be canceled
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -57,7 +60,9 @@ func Example_pipelineShutsDownOnError() {

// The following example demonstrates a pipeline
// that naturally finishes its run when the input channel is closed
func Example_pipelineShutsDownWhenInputChannelIsClosed() {
func TestExample_pipelineShutsDownWhenInputChannelIsClosed(t *testing.T) {
t.Parallel()

// Create a context that can be canceled
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -95,7 +100,9 @@ func Example_pipelineShutsDownWhenInputChannelIsClosed() {

// This example demonstrates a pipline
// that runs until the os / container the pipline is running in kills it
func Example_pipelineShutsDownWhenContainerIsKilled() {
func TestExample_pipelineShutsDownWhenContainerIsKilled(t *testing.T) {
t.Parallel()

// Gracefully shutdown the pipeline when the the system is shutting down
// by canceling the context when os.Kill or os.Interrupt signal is sent
ctx, cancel := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
Expand Down
9 changes: 7 additions & 2 deletions process_batch_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package pipeline_test
import (
"context"
"fmt"
"testing"
"time"

"github.com/deliveryhero/pipeline/v2"
)

func ExampleProcessBatch() {
func TestExampleProcessBatch(t *testing.T) {
t.Parallel()

// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -38,7 +41,9 @@ func ExampleProcessBatch() {
// error: could not multiply [5 6], context deadline exceeded
}

func ExampleProcessBatchConcurrently() {
func TestExampleProcessBatchConcurrently(t *testing.T) {
t.Parallel()

// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down
22 changes: 19 additions & 3 deletions process_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
)

func TestProcessBatch(t *testing.T) {
t.Parallel()

const maxTestDuration = time.Second
type args struct {
ctxTimeout time.Duration
Expand Down Expand Up @@ -55,8 +57,11 @@ func TestProcessBatch(t *testing.T) {
// Therefore the out channel should be closed when the test ends
wantOpen: false,
}}
for _, tt := range tests {
for i := range tests {
tt := tests[i]

t.Run(tt.name, func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), tt.args.ctxTimeout)
defer cancel()
Expand Down Expand Up @@ -86,6 +91,8 @@ func TestProcessBatch(t *testing.T) {
}

func TestProcessBatchConcurrently(t *testing.T) {
t.Parallel()

const maxTestDuration = time.Second
type args struct {
ctxTimeout time.Duration
Expand Down Expand Up @@ -136,8 +143,11 @@ func TestProcessBatchConcurrently(t *testing.T) {
// Therefore the out channel should be closed by the end of the test
wantOpen: false,
}}
for _, tt := range tests {
for i := range tests {
tt := tests[i]

t.Run(tt.name, func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), tt.args.ctxTimeout)
defer cancel()
Expand Down Expand Up @@ -167,6 +177,8 @@ func TestProcessBatchConcurrently(t *testing.T) {
}

func Test_processBatch(t *testing.T) {
t.Parallel()

drain := make(chan int, 10000)
const maxTestDuration = time.Second
type args struct {
Expand Down Expand Up @@ -282,8 +294,12 @@ func Test_processBatch(t *testing.T) {
},
},
}}
for _, tt := range tests {
for i := range tests {
tt := tests[i]

t.Run(tt.name, func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), tt.args.ctxTimeout)
defer cancel()

Expand Down
9 changes: 7 additions & 2 deletions process_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"context"
"fmt"
"log"
"testing"
"time"

"github.com/deliveryhero/pipeline/v2"
)

func ExampleProcess() {
func TestExampleProcess(t *testing.T) {
t.Parallel()

// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -38,7 +41,9 @@ func ExampleProcess() {
// error: could not multiply 6, context deadline exceeded
}

func ExampleProcessConcurrently() {
func TestExampleProcessConcurrently(t *testing.T) {
t.Parallel()

// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down
8 changes: 8 additions & 0 deletions process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
)

func TestProcess(t *testing.T) {
t.Parallel()

const maxTestDuration = time.Second
type args struct {
ctxTimeout time.Duration
Expand Down Expand Up @@ -97,6 +99,8 @@ func TestProcess(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
t.Parallel()

// Create the in channel
in := make(chan int)
go func() {
Expand Down Expand Up @@ -159,6 +163,8 @@ func TestProcess(t *testing.T) {
}

func TestProcessConcurrently(t *testing.T) {
t.Parallel()

const maxTestDuration = time.Second
type args struct {
ctxTimeout time.Duration
Expand Down Expand Up @@ -254,6 +260,8 @@ func TestProcessConcurrently(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
t.Parallel()

// Create the in channel
in := Emit(test.args.in...)

Expand Down
2 changes: 2 additions & 0 deletions sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
)

func TestSequence(t *testing.T) {
t.Parallel()

// Create a step that increments the integer by 1
inc := NewProcessor(func(_ context.Context, i int) (int, error) {
i++
Expand Down
4 changes: 4 additions & 0 deletions split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
)

func TestSplit(t *testing.T) {
t.Parallel()

type args struct {
in []int
}
Expand All @@ -29,6 +31,8 @@ func TestSplit(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
t.Parallel()

// Create the in channel
in := Emit(test.args.in)

Expand Down

0 comments on commit 8e1ba5a

Please sign in to comment.