diff --git a/nsqd/channel.go b/nsqd/channel.go index f38e081f3..ac396db22 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -82,9 +82,10 @@ func NewChannel(topicName string, channelName string, nsqd *NSQD, clients: make(map[int64]Consumer), deleteCallback: deleteCallback, nsqd: nsqd, + ephemeral: strings.HasSuffix(channelName, "#ephemeral"), } - // create mem-queue only if size > 0 (do not use unbuffered chan) - if nsqd.getOpts().MemQueueSize > 0 { + // avoid mem-queue if size == 0 for more consistent ordering + if nsqd.getOpts().MemQueueSize > 0 || c.ephemeral { c.memoryMsgChan = make(chan *Message, nsqd.getOpts().MemQueueSize) } if len(nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 { @@ -96,8 +97,7 @@ func NewChannel(topicName string, channelName string, nsqd *NSQD, c.initPQ() - if strings.HasSuffix(channelName, "#ephemeral") { - c.ephemeral = true + if c.ephemeral { c.backend = newDummyBackendQueue() } else { dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) { diff --git a/nsqd/topic.go b/nsqd/topic.go index d02c82989..8c017f3e6 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -46,7 +46,7 @@ func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic t := &Topic{ name: topicName, channelMap: make(map[string]*Channel), - memoryMsgChan: nil, + memoryMsgChan: make(chan *Message, nsqd.getOpts().MemQueueSize), startChan: make(chan int, 1), exitChan: make(chan int), channelUpdateChan: make(chan int), @@ -56,10 +56,6 @@ func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic deleteCallback: deleteCallback, idFactory: NewGUIDFactory(nsqd.getOpts().ID), } - // create mem-queue only if size > 0 (do not use unbuffered chan) - if nsqd.getOpts().MemQueueSize > 0 { - t.memoryMsgChan = make(chan *Message, nsqd.getOpts().MemQueueSize) - } if strings.HasSuffix(topicName, "#ephemeral") { t.ephemeral = true t.backend = newDummyBackendQueue() @@ -222,18 +218,25 @@ func (t *Topic) PutMessages(msgs []*Message) error { } func (t *Topic) put(m *Message) error { - select { - case t.memoryMsgChan <- m: - default: - err := writeMessageToBackend(m, t.backend) - t.nsqd.SetHealth(err) - if err != nil { - t.nsqd.logf(LOG_ERROR, - "TOPIC(%s) ERROR: failed to write message to backend - %s", - t.name, err) - return err + // If mem-queue-size == 0, avoid memory chan, for more consistent ordering, + // but try to use memory chan for deferred messages (they lose deferred timer + // in backend queue) or if topic is ephemeral (there is no backend queue). + if cap(t.memoryMsgChan) > 0 || t.ephemeral || m.deferred != 0 { + select { + case t.memoryMsgChan <- m: + return nil + default: + break // write to backend } } + err := writeMessageToBackend(m, t.backend) + t.nsqd.SetHealth(err) + if err != nil { + t.nsqd.logf(LOG_ERROR, + "TOPIC(%s) ERROR: failed to write message to backend - %s", + t.name, err) + return err + } return nil }