diff --git a/nsqd/topic.go b/nsqd/topic.go index 16cab3667..874e7ea08 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -24,6 +24,7 @@ type Topic struct { channelMap map[string]*Channel backend BackendQueue memoryMsgChan chan *Message + startChan chan int exitChan chan int channelUpdateChan chan int waitGroup util.WaitGroupWrapper @@ -46,11 +47,12 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi name: topicName, channelMap: make(map[string]*Channel), memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize), + startChan: make(chan int, 1), exitChan: make(chan int), - channelUpdateChan: make(chan int, 1), + channelUpdateChan: make(chan int), ctx: ctx, paused: 0, - pauseChan: make(chan int, 1), + pauseChan: make(chan int), deleteCallback: deleteCallback, idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID), } @@ -75,13 +77,18 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi ) } + t.waitGroup.Wrap(t.messagePump) + t.ctx.nsqd.Notify(t) return t } func (t *Topic) Start() { - t.waitGroup.Wrap(t.messagePump) + select { + case t.startChan <- 1: + default: + } } // Exiting returns a boolean indicating if this topic is closed/exiting @@ -101,7 +108,7 @@ func (t *Topic) GetChannel(channelName string) *Channel { // update messagePump state select { case t.channelUpdateChan <- 1: - default: + case <-t.exitChan: } } @@ -155,7 +162,7 @@ func (t *Topic) DeleteExistingChannel(channelName string) error { // update messagePump state select { case t.channelUpdateChan <- 1: - default: + case <-t.exitChan: } if numChannels == 0 && t.ephemeral == true { @@ -228,8 +235,32 @@ func (t *Topic) messagePump() { var chans []*Channel var memoryMsgChan chan *Message var backendChan chan []byte - // channelUpdateChan will activate first (or pauseChan, which will do nothing if first) + // before Start() + gotChannels := false + started := false + for !started { + select { + case <-t.channelUpdateChan: + gotChannels = true + case <-t.pauseChan: + // no effect before channels added + case <-t.exitChan: + goto exit + case <-t.startChan: + started = true + } + } + if gotChannels { + go func() { + select { + case t.channelUpdateChan <- 1: + case <-t.exitChan: + } + }() + } + + // main loop for { select { case msg = <-memoryMsgChan: @@ -430,7 +461,7 @@ func (t *Topic) doPause(pause bool) error { select { case t.pauseChan <- 1: - default: + case <-t.exitChan: } return nil