Skip to content

Commit

Permalink
chore: update dependencies and refactor message handling
Browse files Browse the repository at this point in the history
- Update `github.com/golang-queue/queue` dependency from v0.2.1 to v0.3.0
- Remove indirect dependencies `github.com/vmihailenco/msgpack/v5` and `github.com/vmihailenco/tagparser/v2`
- Change function signatures to use `core.TaskMessage` instead of `core.QueuedMessage`
- Modify test cases to use `core.TaskMessage` and update payload handling accordingly
- Rename `Payload` field to `Body` in `job.Message` struct

Signed-off-by: appleboy <appleboy.tw@gmail.com>
  • Loading branch information
appleboy committed Jan 20, 2025
1 parent ad78b30 commit 210c4c2
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 26 deletions.
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/golang-queue/nats
go 1.22

require (
github.com/golang-queue/queue v0.2.1
github.com/golang-queue/queue v0.3.0
github.com/nats-io/nats.go v1.38.0
github.com/stretchr/testify v1.10.0
github.com/testcontainers/testcontainers-go v0.35.0
Expand Down Expand Up @@ -52,8 +52,6 @@ require (
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
Expand Down
8 changes: 2 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-queue/queue v0.2.1 h1:W0JeHniILD4oxvs+E/hDuV0zlnQh0qeTy73BAjrigCw=
github.com/golang-queue/queue v0.2.1/go.mod h1:eUZ3HH9GbhoEKQSlxCBQ4pPXeadbJ7QKBMZ0kIZNDHI=
github.com/golang-queue/queue v0.3.0 h1:gyBLNT9EDOsChazYScp8iLiwLfG0SdnCDmNUybcHig4=
github.com/golang-queue/queue v0.3.0/go.mod h1:SkjMwz1TjxZOrF7kABvbar1CagcMxwRtXt5Tx00wb4g=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
Expand Down Expand Up @@ -121,10 +121,6 @@ github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFA
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=
Expand Down
6 changes: 3 additions & 3 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (w *Worker) startConsumer() (err error) {
}

// Run start the worker
func (w *Worker) Run(ctx context.Context, task core.QueuedMessage) error {
func (w *Worker) Run(ctx context.Context, task core.TaskMessage) error {
return w.opts.runFunc(ctx, task)
}

Expand Down Expand Up @@ -101,7 +101,7 @@ func (w *Worker) Shutdown() error {
}

// Queue send notification to queue
func (w *Worker) Queue(job core.QueuedMessage) error {
func (w *Worker) Queue(job core.TaskMessage) error {
if atomic.LoadInt32(&w.stopFlag) == 1 {
return queue.ErrQueueShutdown
}
Expand All @@ -115,7 +115,7 @@ func (w *Worker) Queue(job core.QueuedMessage) error {
}

// Request a new task
func (w *Worker) Request() (core.QueuedMessage, error) {
func (w *Worker) Request() (core.TaskMessage, error) {
_ = w.startConsumer()
clock := 0
loop:
Expand Down
22 changes: 11 additions & 11 deletions nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ func TestCustomFuncAndWait(t *testing.T) {
WithAddr(endpoint),
WithSubj("test"),
WithQueue("test"),
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
log.Println("show message: " + string(m.Bytes()))
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
log.Println("show message: " + string(m.Payload()))
time.Sleep(500 * time.Millisecond)
return nil
}),
Expand Down Expand Up @@ -192,11 +192,11 @@ func TestJobReachTimeout(t *testing.T) {
WithAddr(endpoint),
WithSubj("JobReachTimeout"),
WithQueue("test"),
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
for {
select {
case <-ctx.Done():
log.Println("get data:", string(m.Bytes()))
log.Println("get data:", string(m.Payload()))
if errors.Is(ctx.Err(), context.Canceled) {
log.Println("queue has been shutdown and cancel the job")
} else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
Expand Down Expand Up @@ -236,11 +236,11 @@ func TestCancelJobAfterShutdown(t *testing.T) {
WithSubj("CancelJob"),
WithQueue("test"),
WithLogger(queue.NewLogger()),
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
for {
select {
case <-ctx.Done():
log.Println("get data:", string(m.Bytes()))
log.Println("get data:", string(m.Payload()))
if errors.Is(ctx.Err(), context.Canceled) {
log.Println("queue has been shutdown and cancel the job")
} else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
Expand Down Expand Up @@ -280,19 +280,19 @@ func TestGoroutineLeak(t *testing.T) {
WithSubj("GoroutineLeak"),
WithQueue("test"),
WithLogger(queue.NewEmptyLogger()),
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
for {
select {
case <-ctx.Done():
log.Println("get data:", string(m.Bytes()))
log.Println("get data:", string(m.Payload()))
if errors.Is(ctx.Err(), context.Canceled) {
log.Println("queue has been shutdown and cancel the job")
} else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
log.Println("job deadline exceeded")
}
return nil
default:
log.Println("get data:", string(m.Bytes()))
log.Println("get data:", string(m.Payload()))
time.Sleep(50 * time.Millisecond)
return nil
}
Expand Down Expand Up @@ -327,7 +327,7 @@ func TestGoroutinePanic(t *testing.T) {
w := NewWorker(
WithAddr(endpoint),
WithSubj("GoroutinePanic"),
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
panic("missing something")
}),
)
Expand All @@ -351,7 +351,7 @@ func TestReQueueTaskInWorkerBeforeShutdown(t *testing.T) {
natsC, endpoint := setupNatsContainer(ctx, t)
defer testcontainers.CleanupContainer(t, natsC)
job := &job.Message{
Payload: []byte("foo"),
Body: []byte("foo"),
}
w := NewWorker(
WithAddr(endpoint),
Expand Down
6 changes: 3 additions & 3 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
type Option func(*options)

type options struct {
runFunc func(context.Context, core.QueuedMessage) error
runFunc func(context.Context, core.TaskMessage) error
logger queue.Logger
addr string
subj string
Expand Down Expand Up @@ -45,7 +45,7 @@ func WithQueue(queue string) Option {
}

// WithRunFunc setup the run func of queue
func WithRunFunc(fn func(context.Context, core.QueuedMessage) error) Option {
func WithRunFunc(fn func(context.Context, core.TaskMessage) error) Option {
return func(w *options) {
w.runFunc = fn
}
Expand All @@ -64,7 +64,7 @@ func newOptions(opts ...Option) options {
subj: "foobar",
queue: "foobar",
logger: queue.NewLogger(),
runFunc: func(context.Context, core.QueuedMessage) error {
runFunc: func(context.Context, core.TaskMessage) error {
return nil
},
}
Expand Down

0 comments on commit 210c4c2

Please sign in to comment.