Skip to content

Commit

Permalink
nsqd: allow unbuffered memory chan if ephemeral or deferred
Browse files Browse the repository at this point in the history
In b3b29b7 / #1159 unbuffered
memoryMsgChan were replaced with nil chan for both Topic and Channel
if mem-queue-size=0. This inadvertently made deferred publish never
work (with mem-queue-size=0), because some metadata is lost when
messages go through the "backend" disk queue, instead of immediately
going through the topic memory chan to the channels' deferred pqueues.

The motivation for fully disabling the memoryMsgQueue for
mem-queue-size=0 was to avoid excessively shuffling message order,
which would happen if some messages jump instantly through the
memory-queue while others take the longer way through the disk-queue.
So, only allow using the memory queue in this case if really needed,
for deferred messages, or for ephemeral topic or channel which just
lose all messages if they all go through the no-op backend queue.

Granted, mem-queue-size=0 never worked very well, in many respects.
  • Loading branch information
ploxiln committed Aug 2, 2022
1 parent 784d911 commit 4f5d227
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 19 deletions.
8 changes: 4 additions & 4 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ func NewChannel(topicName string, channelName string, nsqd *NSQD,
clients: make(map[int64]Consumer),
deleteCallback: deleteCallback,
nsqd: nsqd,
ephemeral: strings.HasSuffix(channelName, "#ephemeral"),
}
// create mem-queue only if size > 0 (do not use unbuffered chan)
if nsqd.getOpts().MemQueueSize > 0 {
// avoid mem-queue if size == 0 for more consistent ordering
if nsqd.getOpts().MemQueueSize > 0 || c.ephemeral {
c.memoryMsgChan = make(chan *Message, nsqd.getOpts().MemQueueSize)
}
if len(nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 {
Expand All @@ -96,8 +97,7 @@ func NewChannel(topicName string, channelName string, nsqd *NSQD,

c.initPQ()

if strings.HasSuffix(channelName, "#ephemeral") {
c.ephemeral = true
if c.ephemeral {
c.backend = newDummyBackendQueue()
} else {
dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
Expand Down
33 changes: 18 additions & 15 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic
t := &Topic{
name: topicName,
channelMap: make(map[string]*Channel),
memoryMsgChan: nil,
memoryMsgChan: make(chan *Message, nsqd.getOpts().MemQueueSize),
startChan: make(chan int, 1),
exitChan: make(chan int),
channelUpdateChan: make(chan int),
Expand All @@ -56,10 +56,6 @@ func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic
deleteCallback: deleteCallback,
idFactory: NewGUIDFactory(nsqd.getOpts().ID),
}
// create mem-queue only if size > 0 (do not use unbuffered chan)
if nsqd.getOpts().MemQueueSize > 0 {
t.memoryMsgChan = make(chan *Message, nsqd.getOpts().MemQueueSize)
}
if strings.HasSuffix(topicName, "#ephemeral") {
t.ephemeral = true
t.backend = newDummyBackendQueue()
Expand Down Expand Up @@ -222,18 +218,25 @@ func (t *Topic) PutMessages(msgs []*Message) error {
}

func (t *Topic) put(m *Message) error {
select {
case t.memoryMsgChan <- m:
default:
err := writeMessageToBackend(m, t.backend)
t.nsqd.SetHealth(err)
if err != nil {
t.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to write message to backend - %s",
t.name, err)
return err
// If mem-queue-size == 0, avoid memory chan, for more consistent ordering,
// but try to use memory chan for deferred messages (they lose deferred timer
// in backend queue) or if topic is ephemeral (there is no backend queue).
if cap(t.memoryMsgChan) > 0 || t.ephemeral || m.deferred != 0 {
select {
case t.memoryMsgChan <- m:
return nil
default:
break // write to backend
}
}
err := writeMessageToBackend(m, t.backend)
t.nsqd.SetHealth(err)
if err != nil {
t.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to write message to backend - %s",
t.name, err)
return err
}
return nil
}

Expand Down

0 comments on commit 4f5d227

Please sign in to comment.