From 4dce24a1aca31a436cb924eaeb088b7374ce3999 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Sun, 8 Jan 2023 13:01:34 +0800 Subject: [PATCH] chore(queue): refactor queue package (#34) --- .github/workflows/go.yml | 2 +- .golangci.yml | 4 -- go.mod | 7 ++- go.sum | 14 +++--- nats.go | 64 +++------------------------- nats_test.go | 92 ++-------------------------------------- 6 files changed, 20 insertions(+), 163 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 5b4bfb3..9056e3b 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -14,7 +14,7 @@ jobs: - name: Setup golangci-lint uses: golangci/golangci-lint-action@v3 with: - version: v1.45.0 + version: latest args: --verbose # Label of the container job diff --git a/.golangci.yml b/.golangci.yml index 5a0031c..3e40d74 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -4,7 +4,6 @@ linters: fast: false enable: - bodyclose - - deadcode - depguard - dogsled - dupl @@ -27,15 +26,12 @@ linters: - nakedret - noctx - nolintlint - - rowserrcheck - staticcheck - - structcheck - stylecheck - typecheck - unconvert - unparam - unused - - varcheck - whitespace - gofumpt diff --git a/go.mod b/go.mod index 6f652b5..afc2917 100644 --- a/go.mod +++ b/go.mod @@ -3,17 +3,16 @@ module github.com/golang-queue/nats go 1.18 require ( - github.com/golang-queue/queue v0.1.3 - github.com/nats-io/nats.go v1.16.0 + github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98 + github.com/nats-io/nats.go v1.22.1 github.com/stretchr/testify v1.8.1 go.uber.org/goleak v1.2.0 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/goccy/go-json v0.9.7 // indirect + github.com/goccy/go-json v0.10.0 // indirect github.com/golang/protobuf v1.5.2 // indirect - github.com/kr/text v0.2.0 // indirect github.com/nats-io/nats-server/v2 v2.7.4 // indirect github.com/nats-io/nkeys v0.3.0 // indirect github.com/nats-io/nuid v1.0.1 // indirect diff --git a/go.sum b/go.sum index 627087a..9b092ee 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,10 @@ -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/goccy/go-json v0.9.7 h1:IcB+Aqpx/iMHu5Yooh7jEzJk1JZ7Pjtmys2ukPr7EeM= -github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= -github.com/golang-queue/queue v0.1.3 h1:FGIrn8e0fN8EmL3glP0rFEcYVtWUGMEeqX4h4nnzh40= -github.com/golang-queue/queue v0.1.3/go.mod h1:h/PhaoMwT5Jc4sQNus7APgWBUItm6QC9k6JtmwrsRos= +github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA= +github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98 h1:T2DoUcMWZr6uSUQAr5wCEzOiwHB1zJOiATAZ4BUAefg= +github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98/go.mod h1:8P7IgwdxwKh0/W1I9yCuQQGI8OHIuc7fIHi4OYr1COU= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= @@ -14,13 +13,12 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY= github.com/nats-io/nats-server/v2 v2.7.4 h1:c+BZJ3rGzUKCBIM4IXO8uNT2u1vajGbD1kPA6wqCEaM= github.com/nats-io/nats-server/v2 v2.7.4/go.mod h1:1vZ2Nijh8tcyNe8BDVyTviCd9NYzRbubQYiEHsvOQWc= -github.com/nats-io/nats.go v1.16.0 h1:zvLE7fGBQYW6MWaFaRdsgm9qT39PJDQoju+DS8KsO1g= -github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.22.1 h1:XzfqDspY0RNufzdrB8c4hFR+R3dahkxlpWe5+IWJzbE= +github.com/nats-io/nats.go v1.22.1/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/nats.go b/nats.go index 30765f1..52648db 100644 --- a/nats.go +++ b/nats.go @@ -4,13 +4,14 @@ import ( "context" "encoding/json" "sync" - "sync/atomic" + "sync/atomic" //nolint:typecheck,nolintlint "time" "github.com/golang-queue/queue" "github.com/golang-queue/queue/core" + "github.com/golang-queue/queue/job" - "github.com/nats-io/nats.go" + nats "github.com/nats-io/nats.go" ) var _ core.Worker = (*Worker)(nil) @@ -71,62 +72,9 @@ func (w *Worker) startConsumer() (err error) { return err } -func (w *Worker) handle(job *queue.Job) error { - // create channel with buffer size 1 to avoid goroutine leak - done := make(chan error, 1) - panicChan := make(chan interface{}, 1) - startTime := time.Now() - ctx, cancel := context.WithTimeout(context.Background(), job.Timeout) - defer func() { - cancel() - }() - - // run the job - go func() { - // handle panic issue - defer func() { - if p := recover(); p != nil { - panicChan <- p - } - }() - - // run custom process function - done <- w.opts.runFunc(ctx, job) - }() - - select { - case p := <-panicChan: - panic(p) - case <-ctx.Done(): // timeout reached - return ctx.Err() - case <-w.stop: // shutdown service - // cancel job - cancel() - - leftTime := job.Timeout - time.Since(startTime) - // wait job - select { - case <-time.After(leftTime): - return context.DeadlineExceeded - case err := <-done: // job finish - return err - case p := <-panicChan: - panic(p) - } - case err := <-done: // job finish - return err - } -} - // Run start the worker -func (w *Worker) Run(task core.QueuedMessage) error { - data, _ := task.(*queue.Job) - - if err := w.handle(data); err != nil { - return err - } - - return nil +func (w *Worker) Run(ctx context.Context, task core.QueuedMessage) error { + return w.opts.runFunc(ctx, task) } // Shutdown worker @@ -177,7 +125,7 @@ loop: if !ok { return nil, queue.ErrQueueHasBeenClosed } - var data queue.Job + var data job.Message _ = json.Unmarshal(task.Data, &data) return &data, nil case <-time.After(1 * time.Second): diff --git a/nats_test.go b/nats_test.go index 9bec21e..a4cc168 100644 --- a/nats_test.go +++ b/nats_test.go @@ -11,6 +11,7 @@ import ( "github.com/golang-queue/queue" "github.com/golang-queue/queue/core" + "github.com/golang-queue/queue/job" "github.com/stretchr/testify/assert" "go.uber.org/goleak" @@ -155,7 +156,7 @@ func TestJobReachTimeout(t *testing.T) { assert.NoError(t, err) q.Start() time.Sleep(50 * time.Millisecond) - assert.NoError(t, q.QueueWithTimeout(20*time.Millisecond, m)) + assert.NoError(t, q.Queue(m, job.WithTimeout(20*time.Millisecond))) time.Sleep(100 * time.Millisecond) q.Shutdown() q.Wait() @@ -194,7 +195,7 @@ func TestCancelJobAfterShutdown(t *testing.T) { assert.NoError(t, err) q.Start() time.Sleep(50 * time.Millisecond) - assert.NoError(t, q.QueueWithTimeout(150*time.Millisecond, m)) + assert.NoError(t, q.Queue(m, job.WithTimeout(150*time.Millisecond))) time.Sleep(100 * time.Millisecond) q.Shutdown() q.Wait() @@ -272,93 +273,8 @@ func TestGoroutinePanic(t *testing.T) { q.Wait() } -func TestHandleTimeout(t *testing.T) { - job := &queue.Job{ - Timeout: 100 * time.Millisecond, - Payload: []byte("foo"), - } - w := NewWorker( - WithAddr(host+":4222"), - WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error { - time.Sleep(200 * time.Millisecond) - return nil - }), - ) - - err := w.handle(job) - assert.Error(t, err) - assert.Equal(t, context.DeadlineExceeded, err) - assert.NoError(t, w.Shutdown()) - - job = &queue.Job{ - Timeout: 150 * time.Millisecond, - Payload: []byte("foo"), - } - - w = NewWorker( - WithAddr(host+":4222"), - WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error { - time.Sleep(200 * time.Millisecond) - return nil - }), - ) - - done := make(chan error) - go func() { - done <- w.handle(job) - }() - - assert.NoError(t, w.Shutdown()) - - err = <-done - assert.Error(t, err) - assert.Equal(t, context.DeadlineExceeded, err) -} - -func TestJobComplete(t *testing.T) { - job := &queue.Job{ - Timeout: 100 * time.Millisecond, - Payload: []byte("foo"), - } - w := NewWorker( - WithAddr(host+":4222"), - WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error { - return errors.New("job completed") - }), - ) - - err := w.handle(job) - assert.Error(t, err) - assert.Equal(t, errors.New("job completed"), err) - assert.NoError(t, w.Shutdown()) - - job = &queue.Job{ - Timeout: 250 * time.Millisecond, - Payload: []byte("foo"), - } - - w = NewWorker( - WithAddr(host+":4222"), - WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error { - time.Sleep(200 * time.Millisecond) - return errors.New("job completed") - }), - ) - - done := make(chan error) - go func() { - done <- w.handle(job) - }() - - assert.NoError(t, w.Shutdown()) - - err = <-done - assert.Error(t, err) - assert.Equal(t, errors.New("job completed"), err) -} - func TestReQueueTaskInWorkerBeforeShutdown(t *testing.T) { - job := &queue.Job{ + job := &job.Message{ Payload: []byte("foo"), } w := NewWorker(