diff --git a/nats.go b/nats.go index 26e4f1b..79733c3 100644 --- a/nats.go +++ b/nats.go @@ -49,6 +49,9 @@ func NewWorker(opts ...Option) *Worker { } func (w *Worker) startConsumer() error { + if w.opts.disableConsumer { + return nil + } var err error w.subscription, err = w.client.QueueSubscribe(w.opts.subj, w.opts.queue, func(msg *nats.Msg) { select { diff --git a/nats_test.go b/nats_test.go index 7967fce..2f446a8 100644 --- a/nats_test.go +++ b/nats_test.go @@ -364,10 +364,6 @@ func TestReQueueTaskInWorkerBeforeShutdown(t *testing.T) { WithAddr(host+":4222"), WithSubj("test02"), WithQueue("test02"), - WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error { - log.Println(string(m.Bytes())) - return nil - }), ) assert.NoError(t, w.Queue(job)) @@ -377,3 +373,23 @@ func TestReQueueTaskInWorkerBeforeShutdown(t *testing.T) { // see "re-queue the old job" message assert.NoError(t, w.Shutdown()) } + +func TestWithDisableConsumer(t *testing.T) { + job := queue.Job{ + Payload: []byte("foo"), + } + w := NewWorker( + WithAddr(host+":4222"), + WithSubj("test02"), + WithQueue("test02"), + WithDisableConsumer(), + ) + + assert.NoError(t, w.Queue(job)) + assert.NoError(t, w.Queue(job)) + assert.NoError(t, w.Queue(job)) + time.Sleep(100 * time.Millisecond) + assert.Equal(t, 0, len(w.tasks)) + // see "re-queue the old job" message + assert.NoError(t, w.Shutdown()) +} diff --git a/options.go b/options.go index 77a2e97..770f88a 100644 --- a/options.go +++ b/options.go @@ -10,11 +10,12 @@ import ( type Option func(*options) type options struct { - runFunc func(context.Context, queue.QueuedMessage) error - logger queue.Logger - addr string - subj string - queue string + runFunc func(context.Context, queue.QueuedMessage) error + logger queue.Logger + addr string + subj string + queue string + disableConsumer bool } // WithAddr setup the addr of NATS @@ -52,6 +53,13 @@ func WithLogger(l queue.Logger) Option { } } +// WithDisableConsumer disable consumer +func WithDisableConsumer() Option { + return func(w *options) { + w.disableConsumer = true + } +} + func newOptions(opts ...Option) options { defaultOpts := options{ addr: "127.0.0.1:4222",