Skip to content

Commit

Permalink
chore(queue): refactor queue package (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
appleboy authored Jan 8, 2023
1 parent df9ad16 commit 4dce24a
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 163 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
- name: Setup golangci-lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.45.0
version: latest
args: --verbose

# Label of the container job
Expand Down
4 changes: 0 additions & 4 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ linters:
fast: false
enable:
- bodyclose
- deadcode
- depguard
- dogsled
- dupl
Expand All @@ -27,15 +26,12 @@ linters:
- nakedret
- noctx
- nolintlint
- rowserrcheck
- staticcheck
- structcheck
- stylecheck
- typecheck
- unconvert
- unparam
- unused
- varcheck
- whitespace
- gofumpt

Expand Down
7 changes: 3 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@ module github.com/golang-queue/nats
go 1.18

require (
github.com/golang-queue/queue v0.1.3
github.com/nats-io/nats.go v1.16.0
github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98
github.com/nats-io/nats.go v1.22.1
github.com/stretchr/testify v1.8.1
go.uber.org/goleak v1.2.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/goccy/go-json v0.9.7 // indirect
github.com/goccy/go-json v0.10.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/nats-io/nats-server/v2 v2.7.4 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
Expand Down
14 changes: 6 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/goccy/go-json v0.9.7 h1:IcB+Aqpx/iMHu5Yooh7jEzJk1JZ7Pjtmys2ukPr7EeM=
github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/golang-queue/queue v0.1.3 h1:FGIrn8e0fN8EmL3glP0rFEcYVtWUGMEeqX4h4nnzh40=
github.com/golang-queue/queue v0.1.3/go.mod h1:h/PhaoMwT5Jc4sQNus7APgWBUItm6QC9k6JtmwrsRos=
github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA=
github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98 h1:T2DoUcMWZr6uSUQAr5wCEzOiwHB1zJOiATAZ4BUAefg=
github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98/go.mod h1:8P7IgwdxwKh0/W1I9yCuQQGI8OHIuc7fIHi4OYr1COU=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
Expand All @@ -14,13 +13,12 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
github.com/nats-io/nats-server/v2 v2.7.4 h1:c+BZJ3rGzUKCBIM4IXO8uNT2u1vajGbD1kPA6wqCEaM=
github.com/nats-io/nats-server/v2 v2.7.4/go.mod h1:1vZ2Nijh8tcyNe8BDVyTviCd9NYzRbubQYiEHsvOQWc=
github.com/nats-io/nats.go v1.16.0 h1:zvLE7fGBQYW6MWaFaRdsgm9qT39PJDQoju+DS8KsO1g=
github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.22.1 h1:XzfqDspY0RNufzdrB8c4hFR+R3dahkxlpWe5+IWJzbE=
github.com/nats-io/nats.go v1.22.1/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
64 changes: 6 additions & 58 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"
"encoding/json"
"sync"
"sync/atomic"
"sync/atomic" //nolint:typecheck,nolintlint
"time"

"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
"github.com/golang-queue/queue/job"

"github.com/nats-io/nats.go"
nats "github.com/nats-io/nats.go"
)

var _ core.Worker = (*Worker)(nil)
Expand Down Expand Up @@ -71,62 +72,9 @@ func (w *Worker) startConsumer() (err error) {
return err
}

func (w *Worker) handle(job *queue.Job) error {
// create channel with buffer size 1 to avoid goroutine leak
done := make(chan error, 1)
panicChan := make(chan interface{}, 1)
startTime := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), job.Timeout)
defer func() {
cancel()
}()

// run the job
go func() {
// handle panic issue
defer func() {
if p := recover(); p != nil {
panicChan <- p
}
}()

// run custom process function
done <- w.opts.runFunc(ctx, job)
}()

select {
case p := <-panicChan:
panic(p)
case <-ctx.Done(): // timeout reached
return ctx.Err()
case <-w.stop: // shutdown service
// cancel job
cancel()

leftTime := job.Timeout - time.Since(startTime)
// wait job
select {
case <-time.After(leftTime):
return context.DeadlineExceeded
case err := <-done: // job finish
return err
case p := <-panicChan:
panic(p)
}
case err := <-done: // job finish
return err
}
}

// Run start the worker
func (w *Worker) Run(task core.QueuedMessage) error {
data, _ := task.(*queue.Job)

if err := w.handle(data); err != nil {
return err
}

return nil
func (w *Worker) Run(ctx context.Context, task core.QueuedMessage) error {
return w.opts.runFunc(ctx, task)
}

// Shutdown worker
Expand Down Expand Up @@ -177,7 +125,7 @@ loop:
if !ok {
return nil, queue.ErrQueueHasBeenClosed
}
var data queue.Job
var data job.Message
_ = json.Unmarshal(task.Data, &data)
return &data, nil
case <-time.After(1 * time.Second):
Expand Down
92 changes: 4 additions & 88 deletions nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
"github.com/golang-queue/queue/job"

"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
Expand Down Expand Up @@ -155,7 +156,7 @@ func TestJobReachTimeout(t *testing.T) {
assert.NoError(t, err)
q.Start()
time.Sleep(50 * time.Millisecond)
assert.NoError(t, q.QueueWithTimeout(20*time.Millisecond, m))
assert.NoError(t, q.Queue(m, job.WithTimeout(20*time.Millisecond)))
time.Sleep(100 * time.Millisecond)
q.Shutdown()
q.Wait()
Expand Down Expand Up @@ -194,7 +195,7 @@ func TestCancelJobAfterShutdown(t *testing.T) {
assert.NoError(t, err)
q.Start()
time.Sleep(50 * time.Millisecond)
assert.NoError(t, q.QueueWithTimeout(150*time.Millisecond, m))
assert.NoError(t, q.Queue(m, job.WithTimeout(150*time.Millisecond)))
time.Sleep(100 * time.Millisecond)
q.Shutdown()
q.Wait()
Expand Down Expand Up @@ -272,93 +273,8 @@ func TestGoroutinePanic(t *testing.T) {
q.Wait()
}

func TestHandleTimeout(t *testing.T) {
job := &queue.Job{
Timeout: 100 * time.Millisecond,
Payload: []byte("foo"),
}
w := NewWorker(
WithAddr(host+":4222"),
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
time.Sleep(200 * time.Millisecond)
return nil
}),
)

err := w.handle(job)
assert.Error(t, err)
assert.Equal(t, context.DeadlineExceeded, err)
assert.NoError(t, w.Shutdown())

job = &queue.Job{
Timeout: 150 * time.Millisecond,
Payload: []byte("foo"),
}

w = NewWorker(
WithAddr(host+":4222"),
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
time.Sleep(200 * time.Millisecond)
return nil
}),
)

done := make(chan error)
go func() {
done <- w.handle(job)
}()

assert.NoError(t, w.Shutdown())

err = <-done
assert.Error(t, err)
assert.Equal(t, context.DeadlineExceeded, err)
}

func TestJobComplete(t *testing.T) {
job := &queue.Job{
Timeout: 100 * time.Millisecond,
Payload: []byte("foo"),
}
w := NewWorker(
WithAddr(host+":4222"),
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
return errors.New("job completed")
}),
)

err := w.handle(job)
assert.Error(t, err)
assert.Equal(t, errors.New("job completed"), err)
assert.NoError(t, w.Shutdown())

job = &queue.Job{
Timeout: 250 * time.Millisecond,
Payload: []byte("foo"),
}

w = NewWorker(
WithAddr(host+":4222"),
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
time.Sleep(200 * time.Millisecond)
return errors.New("job completed")
}),
)

done := make(chan error)
go func() {
done <- w.handle(job)
}()

assert.NoError(t, w.Shutdown())

err = <-done
assert.Error(t, err)
assert.Equal(t, errors.New("job completed"), err)
}

func TestReQueueTaskInWorkerBeforeShutdown(t *testing.T) {
job := &queue.Job{
job := &job.Message{
Payload: []byte("foo"),
}
w := NewWorker(
Expand Down

0 comments on commit 4dce24a

Please sign in to comment.