diff --git a/nsqd/channel.go b/nsqd/channel.go index ad9c66ce9..606387267 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -77,11 +77,15 @@ func NewChannel(topicName string, channelName string, ctx *context, c := &Channel{ topicName: topicName, name: channelName, - memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize), + memoryMsgChan: nil, clients: make(map[int64]Consumer), deleteCallback: deleteCallback, ctx: ctx, } + // create mem-queue only if size > 0 (do not use unbuffered chan) + if ctx.nsqd.getOpts().MemQueueSize > 0 { + c.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize) + } if len(ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 { c.e2eProcessingLatencyStream = quantile.New( ctx.nsqd.getOpts().E2EProcessingLatencyWindowTime, diff --git a/nsqd/topic.go b/nsqd/topic.go index c9884fe22..cd2b83065 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -47,7 +47,7 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi t := &Topic{ name: topicName, channelMap: make(map[string]*Channel), - memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize), + memoryMsgChan: nil, startChan: make(chan int, 1), exitChan: make(chan int), channelUpdateChan: make(chan int), @@ -57,7 +57,10 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi deleteCallback: deleteCallback, idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID), } - + // create mem-queue only if size > 0 (do not use unbuffered chan) + if ctx.nsqd.getOpts().MemQueueSize > 0 { + t.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize) + } if strings.HasSuffix(topicName, "#ephemeral") { t.ephemeral = true t.backend = newDummyBackendQueue()