Skip to content

Commit

Permalink
nsqd: set memoryMsgChan as nil when -mem-queue-size=0
Browse files Browse the repository at this point in the history
do not use unbuffered chan for in-memory queue when size=0
because user intended all messages to go through backend queue (disk)
  • Loading branch information
bitpeng authored and ploxiln committed Sep 6, 2019
1 parent 6774510 commit b3b29b7
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
6 changes: 5 additions & 1 deletion nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,15 @@ func NewChannel(topicName string, channelName string, ctx *context,
c := &Channel{
topicName: topicName,
name: channelName,
memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize),
memoryMsgChan: nil,
clients: make(map[int64]Consumer),
deleteCallback: deleteCallback,
ctx: ctx,
}
// create mem-queue only if size > 0 (do not use unbuffered chan)
if ctx.nsqd.getOpts().MemQueueSize > 0 {
c.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize)
}
if len(ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 {
c.e2eProcessingLatencyStream = quantile.New(
ctx.nsqd.getOpts().E2EProcessingLatencyWindowTime,
Expand Down
7 changes: 5 additions & 2 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi
t := &Topic{
name: topicName,
channelMap: make(map[string]*Channel),
memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize),
memoryMsgChan: nil,
startChan: make(chan int, 1),
exitChan: make(chan int),
channelUpdateChan: make(chan int),
Expand All @@ -57,7 +57,10 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi
deleteCallback: deleteCallback,
idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID),
}

// create mem-queue only if size > 0 (do not use unbuffered chan)
if ctx.nsqd.getOpts().MemQueueSize > 0 {
t.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize)
}
if strings.HasSuffix(topicName, "#ephemeral") {
t.ephemeral = true
t.backend = newDummyBackendQueue()
Expand Down

0 comments on commit b3b29b7

Please sign in to comment.