From bc39d96e64dde1ca85d1b7d6ed7fd0ff93973543 Mon Sep 17 00:00:00 2001 From: Pierce Lopez Date: Mon, 9 Jul 2018 20:13:04 -0400 Subject: [PATCH] nsqd: start Topic messagePump separately from create Add Topic.Start() method, call after all initial channels are added to topic. This way Topics can be created un-paused, like they always have been until very recently. Topic.messagePump() does not pass messages until after being notified by Start() via the new startChan. --- nsqd/nsqd.go | 19 ++++++++----------- nsqd/topic.go | 46 +++++++++++++++++++++++++++++++++++++++------- 2 files changed, 47 insertions(+), 18 deletions(-) diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index b9528fca8..3c5437299 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -343,7 +343,9 @@ func (n *NSQD) LoadMetadata() error { continue } topic := n.GetTopic(t.Name) - + if t.Paused { + topic.Pause() + } for _, c := range t.Channels { if !protocol.IsValidChannelName(c.Name) { n.logf(LOG_WARN, "skipping creation of invalid channel %s", c.Name) @@ -354,12 +356,7 @@ func (n *NSQD) LoadMetadata() error { channel.Pause() } } - - // this logic is reversed, and done _after_ channel creation to ensure that all channels - // are created before messages begin flowing - if !t.Paused { - topic.UnPause() - } + topic.Start() } return nil } @@ -507,9 +504,9 @@ func (n *NSQD) GetTopic(topicName string) *Topic { n.Unlock() n.logf(LOG_INFO, "TOPIC(%s): created", t.name) - // topic is created paused, only un-pause after adding channels + // topic is created but messagePump not yet started - // if loading metadata at startup, no lookupd connections yet, and topic un-paused elsewhere + // if loading metadata at startup, no lookupd connections yet, topic started after load if atomic.LoadInt32(&n.isLoading) == 1 { return t } @@ -532,8 +529,8 @@ func (n *NSQD) GetTopic(topicName string) *Topic { n.logf(LOG_ERROR, "no available nsqlookupd to query for channels to pre-create for topic %s", t.name) } - // now that all channels are added we can un-pause topic - t.UnPause() + // now that all channels are added, start topic messagePump + t.Start() return t } diff --git a/nsqd/topic.go b/nsqd/topic.go index e6d625d96..1310bc74a 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -24,6 +24,7 @@ type Topic struct { channelMap map[string]*Channel backend BackendQueue memoryMsgChan chan *Message + startChan chan int exitChan chan int channelUpdateChan chan int waitGroup util.WaitGroupWrapper @@ -35,7 +36,7 @@ type Topic struct { deleter sync.Once paused int32 - pauseChan chan bool + pauseChan chan int ctx *context } @@ -46,11 +47,12 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi name: topicName, channelMap: make(map[string]*Channel), memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize), + startChan: make(chan int, 1), exitChan: make(chan int), channelUpdateChan: make(chan int), ctx: ctx, - paused: 1, - pauseChan: make(chan bool), + paused: 0, + pauseChan: make(chan int), deleteCallback: deleteCallback, idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID), } @@ -82,6 +84,13 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi return t } +func (t *Topic) Start() { + select { + case t.startChan <- 1: + default: + } +} + // Exiting returns a boolean indicating if this topic is closed/exiting func (t *Topic) Exiting() bool { return atomic.LoadInt32(&t.exitFlag) == 1 @@ -226,8 +235,31 @@ func (t *Topic) messagePump() { var chans []*Channel var memoryMsgChan chan *Message var backendChan chan []byte - // always starts with no channels, channels added async with notification + // do not pass messages before Start(), but avoid blocking Pause() or GetChannel() + for { + select { + case <-t.channelUpdateChan: + continue + case <-t.pauseChan: + continue + case <-t.exitChan: + goto exit + case <-t.startChan: + } + break + } + t.RLock() + for _, c := range t.channelMap { + chans = append(chans, c) + } + t.RUnlock() + if len(chans) > 0 && !t.IsPaused() { + memoryMsgChan = t.memoryMsgChan + backendChan = t.backend.ReadChan() + } + + // main message loop for { select { case msg = <-memoryMsgChan: @@ -252,8 +284,8 @@ func (t *Topic) messagePump() { backendChan = t.backend.ReadChan() } continue - case pause := <-t.pauseChan: - if pause || len(chans) == 0 { + case <-t.pauseChan: + if len(chans) == 0 || t.IsPaused() { memoryMsgChan = nil backendChan = nil } else { @@ -427,7 +459,7 @@ func (t *Topic) doPause(pause bool) error { } select { - case t.pauseChan <- pause: + case t.pauseChan <- 1: case <-t.exitChan: }