diff --git a/nsqd/topic.go b/nsqd/topic.go index 874e7ea08..41dae4336 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -236,28 +236,27 @@ func (t *Topic) messagePump() { var memoryMsgChan chan *Message var backendChan chan []byte - // before Start() - gotChannels := false - started := false - for !started { + // before Start(), avoid blocking Pause() or GetChannel() + for { select { case <-t.channelUpdateChan: - gotChannels = true + continue case <-t.pauseChan: - // no effect before channels added + continue case <-t.exitChan: goto exit case <-t.startChan: - started = true } + break; } - if gotChannels { - go func() { - select { - case t.channelUpdateChan <- 1: - case <-t.exitChan: - } - }() + t.RLock() + for _, c := range t.channelMap { + chans = append(chans, c) + } + t.RUnlock() + if len(chans) > 0 && !t.IsPaused() { + memoryMsgChan = t.memoryMsgChan + backendChan = t.backend.ReadChan() } // main loop