From 6385fd70f09c8ffe7c42c359f831d3938420c76e Mon Sep 17 00:00:00 2001 From: Samuel Tissot Date: Mon, 10 Jun 2019 13:04:57 -0400 Subject: [PATCH] added waitgroup to the Queue. This give the ability to drain all jobs before quiting --- queue.go | 5 +++++ queue_test.go | 27 +++++++++++++++++++++++++++ worker.go | 2 ++ 3 files changed, 34 insertions(+) diff --git a/queue.go b/queue.go index 6ae4463..cdb8746 100644 --- a/queue.go +++ b/queue.go @@ -4,6 +4,10 @@ // interface `Job` package bqueue +import "sync" + +var WG sync.WaitGroup + // Queue that process jobs reveived type Queue struct { maxWorker int @@ -35,6 +39,7 @@ func (q *Queue) Start() { // CollectJob Adds a job to the Queue func (q *Queue) CollectJob(job Job) { + WG.Add(1) q.JobReceived <- job } diff --git a/queue_test.go b/queue_test.go index 54d3c19..15dfc50 100644 --- a/queue_test.go +++ b/queue_test.go @@ -3,8 +3,11 @@ package bqueue import ( "bytes" "errors" + "fmt" "log" "os" + "strconv" + "strings" "testing" "time" @@ -37,6 +40,30 @@ func Test_Collect_and_process_job(t *testing.T) { } +func Test_WG(t *testing.T) { + q := New(1) + q.Start() + + //var output string + var output string + output += captureStout(func() { + for i := 0; i <= 200; i++ { + j := aJob{strconv.Itoa(i)} + q.CollectJob(j) + + } + WG.Wait() + }) + + // look for all instances of "test job: N" + // not very efficient but minimizes false positive + for i := 0; i <= 200; i++ { + if !strings.Contains(output, fmt.Sprintf("test job: %d", i)) { + t.FailNow() + } + } +} + func captureStout(f func()) string { var buf bytes.Buffer output := "" diff --git a/worker.go b/worker.go index 33b4924..4141c5a 100644 --- a/worker.go +++ b/worker.go @@ -52,4 +52,6 @@ func (w *worker) do(j Job) { if err != nil { fmt.Println(err) } + + WG.Done() }