Skip to content

Commit

Permalink
Merge pull request #1376 from ploxiln/topic_always_memqueue
Browse files Browse the repository at this point in the history
nsqd: allow unbuffered memory chan if ephemeral or deferred
  • Loading branch information
ploxiln authored Aug 2, 2022
2 parents ae2e77a + 4f5d227 commit 4138c44
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 19 deletions.
8 changes: 4 additions & 4 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ func NewChannel(topicName string, channelName string, nsqd *NSQD,
clients: make(map[int64]Consumer),
deleteCallback: deleteCallback,
nsqd: nsqd,
ephemeral: strings.HasSuffix(channelName, "#ephemeral"),
}
// create mem-queue only if size > 0 (do not use unbuffered chan)
if nsqd.getOpts().MemQueueSize > 0 {
// avoid mem-queue if size == 0 for more consistent ordering
if nsqd.getOpts().MemQueueSize > 0 || c.ephemeral {
c.memoryMsgChan = make(chan *Message, nsqd.getOpts().MemQueueSize)
}
if len(nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 {
Expand All @@ -96,8 +97,7 @@ func NewChannel(topicName string, channelName string, nsqd *NSQD,

c.initPQ()

if strings.HasSuffix(channelName, "#ephemeral") {
c.ephemeral = true
if c.ephemeral {
c.backend = newDummyBackendQueue()
} else {
dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
Expand Down
33 changes: 18 additions & 15 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic
t := &Topic{
name: topicName,
channelMap: make(map[string]*Channel),
memoryMsgChan: nil,
memoryMsgChan: make(chan *Message, nsqd.getOpts().MemQueueSize),
startChan: make(chan int, 1),
exitChan: make(chan int),
channelUpdateChan: make(chan int),
Expand All @@ -56,10 +56,6 @@ func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic
deleteCallback: deleteCallback,
idFactory: NewGUIDFactory(nsqd.getOpts().ID),
}
// create mem-queue only if size > 0 (do not use unbuffered chan)
if nsqd.getOpts().MemQueueSize > 0 {
t.memoryMsgChan = make(chan *Message, nsqd.getOpts().MemQueueSize)
}
if strings.HasSuffix(topicName, "#ephemeral") {
t.ephemeral = true
t.backend = newDummyBackendQueue()
Expand Down Expand Up @@ -222,18 +218,25 @@ func (t *Topic) PutMessages(msgs []*Message) error {
}

func (t *Topic) put(m *Message) error {
select {
case t.memoryMsgChan <- m:
default:
err := writeMessageToBackend(m, t.backend)
t.nsqd.SetHealth(err)
if err != nil {
t.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to write message to backend - %s",
t.name, err)
return err
// If mem-queue-size == 0, avoid memory chan, for more consistent ordering,
// but try to use memory chan for deferred messages (they lose deferred timer
// in backend queue) or if topic is ephemeral (there is no backend queue).
if cap(t.memoryMsgChan) > 0 || t.ephemeral || m.deferred != 0 {
select {
case t.memoryMsgChan <- m:
return nil
default:
break // write to backend
}
}
err := writeMessageToBackend(m, t.backend)
t.nsqd.SetHealth(err)
if err != nil {
t.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to write message to backend - %s",
t.name, err)
return err
}
return nil
}

Expand Down

0 comments on commit 4138c44

Please sign in to comment.