Skip to content

Commit

Permalink
refactor: refactor message handling and encoding in queue system (#138)
Browse files Browse the repository at this point in the history
- Remove `Encode` method call in `BenchmarkQueue` function
- Add `TaskMessage` interface extending `QueuedMessage` with `Payload` method
- Rename `Payload` field to `Body` in `Message` struct
- Add `Payload` method to `Message` struct to return `Body`
- Replace `Data` field and `Encode` method in `Message` struct with `Bytes` method using JSON marshalling
- Update `NewMessage` to use `Body` instead of `Payload`
- Add documentation for `Encode` method in `job` package
- Update `TestMessageEncodeDecode` to use `Payload()` method
- Replace `Encode` method call with direct handling of `task` in `Queue` methods
- Change logger from `Errorf` to `Fatalf` in `Queue` error handling
- Add type assertion and handling for `job.Message` in `Queue`'s `run` method
- Import `job` package in `ring.go`
- Add `Data` struct with `Payload` field and `Bytes` method in `ring.go`
- Update `Ring`'s `Run` method to use `Data` struct for task handling

Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
  • Loading branch information
appleboy authored Jan 20, 2025
1 parent 71e206e commit cc471ea
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 25 deletions.
1 change: 0 additions & 1 deletion benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func BenchmarkQueue(b *testing.B) {
m := job.NewMessage(&mockMessage{
message: "foo",
})
m.Encode()

for n := 0; n < b.N; n++ {
if err := q.queue(&m); err != nil {
Expand Down
5 changes: 5 additions & 0 deletions core/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,8 @@ type Worker interface {
type QueuedMessage interface {
Bytes() []byte
}

type TaskMessage interface {
QueuedMessage
Payload() []byte
}
40 changes: 30 additions & 10 deletions job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Message struct {
Timeout time.Duration `json:"timeout" msgpack:"timeout"`

// Payload is the payload data of the task.
Payload []byte `json:"body" msgpack:"body"`
Body []byte `json:"body" msgpack:"body"`

// RetryCount set count of retry
// default is 0, no retry.
Expand All @@ -48,19 +48,30 @@ type Message struct {

// Jitter eases contention by randomizing backoff steps
Jitter bool `json:"jitter" msgpack:"jitter"`
}

// Data to save Unsafe cast
Data []byte
// Payload returns the payload data of the Message.
// It returns the byte slice of the payload.
//
// Returns:
// - A byte slice containing the payload data.
func (m *Message) Payload() []byte {
return m.Body
}

// Bytes get internal data
// Bytes returns the byte slice of the Message struct.
// If the marshalling process encounters an error, the function will panic.
// It returns the marshalled byte slice.
//
// Returns:
// - A byte slice containing the msgpack-encoded data.
func (m *Message) Bytes() []byte {
return m.Data
}
b, err := json.Marshal(m)
if err != nil {
panic(err)
}

// Encode for encoding the structure
func (m *Message) Encode() {
m.Data = Encode(m)
return b
}

// NewMessage create new message
Expand All @@ -74,7 +85,7 @@ func NewMessage(m core.QueuedMessage, opts ...AllowOption) Message {
RetryMin: o.retryMin,
RetryMax: o.retryMax,
Timeout: o.timeout,
Payload: m.Bytes(),
Body: m.Bytes(),
}
}

Expand All @@ -92,6 +103,15 @@ func NewTask(task TaskFunc, opts ...AllowOption) Message {
}
}

// Encode takes a Message struct and marshals it into a byte slice using msgpack.
// If the marshalling process encounters an error, the function will panic.
// It returns the marshalled byte slice.
//
// Parameters:
// - m: A pointer to the Message struct to be encoded.
//
// Returns:
// - A byte slice containing the msgpack-encoded data.
func Encode(m *Message) []byte {
b, err := json.Marshal(m)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ func TestMessageEncodeDecode(t *testing.T) {
},
)

m.Encode()
out := Decode(m.Bytes())

assert.Equal(t, int64(100), out.RetryCount)
assert.Equal(t, 30*time.Millisecond, out.RetryDelay)
assert.Equal(t, 3*time.Millisecond, out.Timeout)
assert.Equal(t, "foo", string(out.Payload))
assert.Equal(t, "foo", string(out.Payload()))
assert.Equal(t, 200*time.Millisecond, out.RetryMin)
assert.Equal(t, 20*time.Second, out.RetryMax)
assert.Equal(t, 4.0, out.RetryFactor)
Expand Down
14 changes: 6 additions & 8 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ func (q *Queue) Wait() {
// Queue to queue single job with binary
func (q *Queue) Queue(message core.QueuedMessage, opts ...job.AllowOption) error {
data := job.NewMessage(message, opts...)
data.Encode()

return q.queue(&data)
}
Expand Down Expand Up @@ -160,7 +159,7 @@ func (q *Queue) work(task core.QueuedMessage) {
q.metric.DecBusyWorker()
e := recover()
if e != nil {
q.logger.Errorf("panic error: %v", e)
q.logger.Fatalf("panic error: %v", e)
}
q.schedule()

Expand All @@ -182,13 +181,12 @@ func (q *Queue) work(task core.QueuedMessage) {
}

func (q *Queue) run(task core.QueuedMessage) error {
data := task.(*job.Message)
if data.Task == nil {
data = job.Decode(task.Bytes())
data.Data = data.Payload
switch t := task.(type) {
case *job.Message:
return q.handle(t)
default:
return errors.New("invalid task type")
}

return q.handle(data)
}

func (q *Queue) handle(m *job.Message) error {
Expand Down
6 changes: 3 additions & 3 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestNewQueueWithDefaultWorker(t *testing.T) {
func TestHandleTimeout(t *testing.T) {
m := &job.Message{
Timeout: 100 * time.Millisecond,
Payload: []byte("foo"),
Body: []byte("foo"),
}
w := NewRing(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestHandleTimeout(t *testing.T) {
func TestJobComplete(t *testing.T) {
m := &job.Message{
Timeout: 100 * time.Millisecond,
Payload: []byte("foo"),
Body: []byte("foo"),
}
w := NewRing(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
Expand All @@ -132,7 +132,7 @@ func TestJobComplete(t *testing.T) {

m = &job.Message{
Timeout: 250 * time.Millisecond,
Payload: []byte("foo"),
Body: []byte("foo"),
}

w = NewRing(
Expand Down
15 changes: 14 additions & 1 deletion ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync/atomic"

"github.com/golang-queue/queue/core"
"github.com/golang-queue/queue/job"
)

var _ core.Worker = (*Ring)(nil)
Expand All @@ -25,9 +26,21 @@ type Ring struct {
stopFlag int32
}

type Data struct {
Payload []byte `json:"payload"`
}

func (d *Data) Bytes() []byte {
return d.Payload
}

// Run to execute new task
func (s *Ring) Run(ctx context.Context, task core.QueuedMessage) error {
return s.runFunc(ctx, task)
v, _ := task.(*job.Message)
data := &Data{
Payload: v.Body,
}
return s.runFunc(ctx, data)
}

// Shutdown the worker
Expand Down

0 comments on commit cc471ea

Please sign in to comment.