diff --git a/redis.go b/redis.go index 369b3b0..33ede2e 100644 --- a/redis.go +++ b/redis.go @@ -3,7 +3,6 @@ package redisdb import ( "context" "encoding/json" - "fmt" "strings" "sync" "sync/atomic" @@ -153,16 +152,6 @@ func (w *Worker) Shutdown() error { return nil } -// Capacity for channel -func (w *Worker) Capacity() int { - return 0 -} - -// Usage for count of channel usage -func (w *Worker) Usage() int { - return 0 -} - // Queue send notification to queue func (w *Worker) Queue(job core.QueuedMessage) error { if atomic.LoadInt32(&w.stopFlag) == 1 { @@ -182,41 +171,35 @@ func (w *Worker) Queue(job core.QueuedMessage) error { // Run start the worker func (w *Worker) Run(task core.QueuedMessage) error { - for { - // check queue status - select { - case <-w.stop: - return nil - default: - } + data, _ := task.(queue.Job) - select { - case m, ok := <-w.channel: - select { - case <-w.stop: - return nil - default: - } + if err := w.handle(data); err != nil { + return err + } + return nil +} + +// Request a new task +func (w *Worker) Request() (core.QueuedMessage, error) { + clock := 0 +loop: + for { + select { + case task, ok := <-w.channel: if !ok { - return fmt.Errorf("redis pubsub: channel=%s closed", w.opts.channelName) + return nil, queue.ErrQueueHasBeenClosed } - var data queue.Job - if err := json.Unmarshal([]byte(m.Payload), &data); err != nil { - w.opts.logger.Error("json unmarshal error: ", err) - continue - } - if err := w.handle(data); err != nil { - w.opts.logger.Error("handle job error: ", err) + _ = json.Unmarshal([]byte(task.Payload), &data) + return data, nil + case <-time.After(1 * time.Second): + if clock == 5 { + break loop } - case <-w.stop: - return nil + clock += 1 } } -} -// Request a new task -func (w *Worker) Request() (core.QueuedMessage, error) { return nil, queue.ErrNoTaskInQueue }