diff --git a/benchmark_test.go b/benchmark_test.go index 4f00d2d..7faa36a 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -78,7 +78,6 @@ func BenchmarkQueue(b *testing.B) { m := job.NewMessage(&mockMessage{ message: "foo", }) - m.Encode() for n := 0; n < b.N; n++ { if err := q.queue(&m); err != nil { diff --git a/core/worker.go b/core/worker.go index bbe8014..52dc9ae 100644 --- a/core/worker.go +++ b/core/worker.go @@ -30,3 +30,8 @@ type Worker interface { type QueuedMessage interface { Bytes() []byte } + +type TaskMessage interface { + QueuedMessage + Payload() []byte +} diff --git a/job/job.go b/job/job.go index 32340fe..17246f4 100644 --- a/job/job.go +++ b/job/job.go @@ -21,7 +21,7 @@ type Message struct { Timeout time.Duration `json:"timeout" msgpack:"timeout"` // Payload is the payload data of the task. - Payload []byte `json:"body" msgpack:"body"` + Body []byte `json:"body" msgpack:"body"` // RetryCount set count of retry // default is 0, no retry. @@ -48,19 +48,30 @@ type Message struct { // Jitter eases contention by randomizing backoff steps Jitter bool `json:"jitter" msgpack:"jitter"` +} - // Data to save Unsafe cast - Data []byte +// Payload returns the payload data of the Message. +// It returns the byte slice of the payload. +// +// Returns: +// - A byte slice containing the payload data. +func (m *Message) Payload() []byte { + return m.Body } -// Bytes get internal data +// Bytes returns the byte slice of the Message struct. +// If the marshalling process encounters an error, the function will panic. +// It returns the marshalled byte slice. +// +// Returns: +// - A byte slice containing the msgpack-encoded data. func (m *Message) Bytes() []byte { - return m.Data -} + b, err := json.Marshal(m) + if err != nil { + panic(err) + } -// Encode for encoding the structure -func (m *Message) Encode() { - m.Data = Encode(m) + return b } // NewMessage create new message @@ -74,7 +85,7 @@ func NewMessage(m core.QueuedMessage, opts ...AllowOption) Message { RetryMin: o.retryMin, RetryMax: o.retryMax, Timeout: o.timeout, - Payload: m.Bytes(), + Body: m.Bytes(), } } @@ -92,6 +103,15 @@ func NewTask(task TaskFunc, opts ...AllowOption) Message { } } +// Encode takes a Message struct and marshals it into a byte slice using msgpack. +// If the marshalling process encounters an error, the function will panic. +// It returns the marshalled byte slice. +// +// Parameters: +// - m: A pointer to the Message struct to be encoded. +// +// Returns: +// - A byte slice containing the msgpack-encoded data. func Encode(m *Message) []byte { b, err := json.Marshal(m) if err != nil { diff --git a/job/job_test.go b/job/job_test.go index 012f949..ab92a56 100644 --- a/job/job_test.go +++ b/job/job_test.go @@ -30,13 +30,12 @@ func TestMessageEncodeDecode(t *testing.T) { }, ) - m.Encode() out := Decode(m.Bytes()) assert.Equal(t, int64(100), out.RetryCount) assert.Equal(t, 30*time.Millisecond, out.RetryDelay) assert.Equal(t, 3*time.Millisecond, out.Timeout) - assert.Equal(t, "foo", string(out.Payload)) + assert.Equal(t, "foo", string(out.Payload())) assert.Equal(t, 200*time.Millisecond, out.RetryMin) assert.Equal(t, 20*time.Second, out.RetryMax) assert.Equal(t, 4.0, out.RetryFactor) diff --git a/queue.go b/queue.go index 71a196b..248bd5b 100644 --- a/queue.go +++ b/queue.go @@ -127,7 +127,6 @@ func (q *Queue) Wait() { // Queue to queue single job with binary func (q *Queue) Queue(message core.QueuedMessage, opts ...job.AllowOption) error { data := job.NewMessage(message, opts...) - data.Encode() return q.queue(&data) } @@ -160,7 +159,7 @@ func (q *Queue) work(task core.QueuedMessage) { q.metric.DecBusyWorker() e := recover() if e != nil { - q.logger.Errorf("panic error: %v", e) + q.logger.Fatalf("panic error: %v", e) } q.schedule() @@ -182,13 +181,12 @@ func (q *Queue) work(task core.QueuedMessage) { } func (q *Queue) run(task core.QueuedMessage) error { - data := task.(*job.Message) - if data.Task == nil { - data = job.Decode(task.Bytes()) - data.Data = data.Payload + switch t := task.(type) { + case *job.Message: + return q.handle(t) + default: + return errors.New("invalid task type") } - - return q.handle(data) } func (q *Queue) handle(m *job.Message) error { diff --git a/queue_test.go b/queue_test.go index d9ede90..e99d4b6 100644 --- a/queue_test.go +++ b/queue_test.go @@ -80,7 +80,7 @@ func TestNewQueueWithDefaultWorker(t *testing.T) { func TestHandleTimeout(t *testing.T) { m := &job.Message{ Timeout: 100 * time.Millisecond, - Payload: []byte("foo"), + Body: []byte("foo"), } w := NewRing( WithFn(func(ctx context.Context, m core.QueuedMessage) error { @@ -112,7 +112,7 @@ func TestHandleTimeout(t *testing.T) { func TestJobComplete(t *testing.T) { m := &job.Message{ Timeout: 100 * time.Millisecond, - Payload: []byte("foo"), + Body: []byte("foo"), } w := NewRing( WithFn(func(ctx context.Context, m core.QueuedMessage) error { @@ -132,7 +132,7 @@ func TestJobComplete(t *testing.T) { m = &job.Message{ Timeout: 250 * time.Millisecond, - Payload: []byte("foo"), + Body: []byte("foo"), } w = NewRing( diff --git a/ring.go b/ring.go index 5b40255..a4f9e47 100644 --- a/ring.go +++ b/ring.go @@ -6,6 +6,7 @@ import ( "sync/atomic" "github.com/golang-queue/queue/core" + "github.com/golang-queue/queue/job" ) var _ core.Worker = (*Ring)(nil) @@ -25,9 +26,21 @@ type Ring struct { stopFlag int32 } +type Data struct { + Payload []byte `json:"payload"` +} + +func (d *Data) Bytes() []byte { + return d.Payload +} + // Run to execute new task func (s *Ring) Run(ctx context.Context, task core.QueuedMessage) error { - return s.runFunc(ctx, task) + v, _ := task.(*job.Message) + data := &Data{ + Payload: v.Body, + } + return s.runFunc(ctx, data) } // Shutdown the worker