Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix intermittent CI failure in EmptyQueue #23753

Merged
55 changes: 29 additions & 26 deletions modules/queue/unique_queue_disk_channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
package queue

import (
"os"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -16,10 +16,7 @@ import (
)

func TestPersistableChannelUniqueQueue(t *testing.T) {
if os.Getenv("CI") != "" {
t.Skip("Skipping because test is flaky on CI")
}

// Create a temporary directory for the queue
tmpDir := t.TempDir()
_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)

Expand Down Expand Up @@ -100,7 +97,7 @@ func TestPersistableChannelUniqueQueue(t *testing.T) {
executedInitial := map[string][]string{}
hasInitial := map[string][]string{}

fillQueue := func(name string, done chan struct{}) {
fillQueue := func(name string, done chan int64) {
t.Run("Initial Filling: "+name, func(t *testing.T) {
lock := sync.Mutex{}

Expand Down Expand Up @@ -157,33 +154,39 @@ func TestPersistableChannelUniqueQueue(t *testing.T) {
assert.Equal(t, 101, len(executedInitial[name])+len(hasInitial[name]))
mapLock.Unlock()
})
mapLock.Lock()
count := int64(len(hasInitial[name]))
mapLock.Unlock()
done <- count
close(done)
}

doneA := make(chan struct{})
doneB := make(chan struct{})
hasQueueAChan := make(chan int64)
hasQueueBChan := make(chan int64)

go fillQueue("QueueA", doneA)
go fillQueue("QueueB", doneB)
go fillQueue("QueueA", hasQueueAChan)
go fillQueue("QueueB", hasQueueBChan)

<-doneA
<-doneB
hasA := <-hasQueueAChan
hasB := <-hasQueueBChan

executedEmpty := map[string][]string{}
hasEmpty := map[string][]string{}
emptyQueue := func(name string, done chan struct{}) {
emptyQueue := func(name string, numInQueue int64, done chan struct{}) {
t.Run("Empty Queue: "+name, func(t *testing.T) {
lock := sync.Mutex{}
stop := make(chan struct{})

// collect the tasks that have been executed
atomicCount := int64(0)
handle := func(data ...Data) []Data {
lock.Lock()
for _, datum := range data {
mapLock.Lock()
executedEmpty[name] = append(executedEmpty[name], datum.(string))
mapLock.Unlock()
if datum.(string) == "final" {
count := atomic.AddInt64(&atomicCount, 1)
if count >= numInQueue {
close(stop)
}
}
Expand Down Expand Up @@ -217,11 +220,11 @@ func TestPersistableChannelUniqueQueue(t *testing.T) {
close(done)
}

doneA = make(chan struct{})
doneB = make(chan struct{})
doneA := make(chan struct{})
doneB := make(chan struct{})

go emptyQueue("QueueA", doneA)
go emptyQueue("QueueB", doneB)
go emptyQueue("QueueA", hasA, doneA)
go emptyQueue("QueueB", hasB, doneB)

<-doneA
<-doneB
Expand All @@ -237,20 +240,20 @@ func TestPersistableChannelUniqueQueue(t *testing.T) {
hasEmpty = map[string][]string{}
mapLock.Unlock()

doneA = make(chan struct{})
doneB = make(chan struct{})
hasQueueAChan = make(chan int64)
hasQueueBChan = make(chan int64)

go fillQueue("QueueA", doneA)
go fillQueue("QueueB", doneB)
go fillQueue("QueueA", hasQueueAChan)
go fillQueue("QueueB", hasQueueBChan)

<-doneA
<-doneB
hasA = <-hasQueueAChan
hasB = <-hasQueueBChan

doneA = make(chan struct{})
doneB = make(chan struct{})

go emptyQueue("QueueA", doneA)
go emptyQueue("QueueB", doneB)
go emptyQueue("QueueA", hasA, doneA)
go emptyQueue("QueueB", hasB, doneB)

<-doneA
<-doneB
Expand Down