Skip to content

Commit

Permalink
nsqd: start Topic messagePump separately from create
Browse files Browse the repository at this point in the history
with new Topic.Start()

and make Topic.Pause() and Topic.GetChannel() no longer block
on the notification channels for Topic.messagePump()

and simplify some calls to WaitGroupWrapper.Wrap()
  • Loading branch information
ploxiln committed Jul 7, 2018
1 parent 2b68a3d commit bda9582
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 27 deletions.
25 changes: 11 additions & 14 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,10 @@ func (n *NSQD) Main() {
})
}

n.waitGroup.Wrap(func() { n.queueScanLoop() })
n.waitGroup.Wrap(func() { n.lookupLoop() })
n.waitGroup.Wrap(n.queueScanLoop)
n.waitGroup.Wrap(n.lookupLoop)
if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(func() { n.statsdLoop() })
n.waitGroup.Wrap(n.statsdLoop)
}
}

Expand Down Expand Up @@ -343,7 +343,9 @@ func (n *NSQD) LoadMetadata() error {
continue
}
topic := n.GetTopic(t.Name)

if t.Paused {
topic.Pause()
}
for _, c := range t.Channels {
if !protocol.IsValidChannelName(c.Name) {
n.logf(LOG_WARN, "skipping creation of invalid channel %s", c.Name)
Expand All @@ -354,12 +356,7 @@ func (n *NSQD) LoadMetadata() error {
channel.Pause()
}
}

// this logic is reversed, and done _after_ channel creation to ensure that all channels
// are created before messages begin flowing
if !t.Paused {
topic.UnPause()
}
topic.Start()
}
return nil
}
Expand Down Expand Up @@ -507,9 +504,9 @@ func (n *NSQD) GetTopic(topicName string) *Topic {
n.Unlock()

n.logf(LOG_INFO, "TOPIC(%s): created", t.name)
// topic is created paused, only un-pause after adding channels
// topic is created but messagePump not yet started

// if loading metadata at startup, no lookupd connections yet, and topic un-paused elsewhere
// if loading metadata at startup, no lookupd connections yet, topic started after load
if atomic.LoadInt32(&n.isLoading) == 1 {
return t
}
Expand All @@ -532,8 +529,8 @@ func (n *NSQD) GetTopic(topicName string) *Topic {
n.logf(LOG_ERROR, "no available nsqlookupd to query for channels to pre-create for topic %s", t.name)
}

// now that all channels are added we can un-pause topic
t.UnPause()
// now that all channels are added, start topic messagePump
t.Start()
return t
}

Expand Down
28 changes: 15 additions & 13 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Topic struct {
deleter sync.Once

paused int32
pauseChan chan bool
pauseChan chan int

ctx *context
}
Expand All @@ -47,10 +47,10 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi
channelMap: make(map[string]*Channel),
memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize),
exitChan: make(chan int),
channelUpdateChan: make(chan int),
channelUpdateChan: make(chan int, 1),
ctx: ctx,
paused: 1,
pauseChan: make(chan bool),
paused: 0,
pauseChan: make(chan int, 1),
deleteCallback: deleteCallback,
idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID),
}
Expand All @@ -75,13 +75,15 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi
)
}

t.waitGroup.Wrap(func() { t.messagePump() })

t.ctx.nsqd.Notify(t)

return t
}

func (t *Topic) Start() {
t.waitGroup.Wrap(t.messagePump)
}

// Exiting returns a boolean indicating if this topic is closed/exiting
func (t *Topic) Exiting() bool {
return atomic.LoadInt32(&t.exitFlag) == 1
Expand All @@ -99,7 +101,7 @@ func (t *Topic) GetChannel(channelName string) *Channel {
// update messagePump state
select {
case t.channelUpdateChan <- 1:
case <-t.exitChan:
default:
}
}

Expand Down Expand Up @@ -153,7 +155,7 @@ func (t *Topic) DeleteExistingChannel(channelName string) error {
// update messagePump state
select {
case t.channelUpdateChan <- 1:
case <-t.exitChan:
default:
}

if numChannels == 0 && t.ephemeral == true {
Expand Down Expand Up @@ -226,7 +228,7 @@ func (t *Topic) messagePump() {
var chans []*Channel
var memoryMsgChan chan *Message
var backendChan chan []byte
// always starts with no channels, channels added async with notification
// channelUpdateChan will activate first (or pauseChan, which will do nothing if first)

for {
select {
Expand All @@ -252,8 +254,8 @@ func (t *Topic) messagePump() {
backendChan = t.backend.ReadChan()
}
continue
case pause := <-t.pauseChan:
if pause || len(chans) == 0 {
case <-t.pauseChan:
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
Expand Down Expand Up @@ -427,8 +429,8 @@ func (t *Topic) doPause(pause bool) error {
}

select {
case t.pauseChan <- pause:
case <-t.exitChan:
case t.pauseChan <- 1:
default:
}

return nil
Expand Down

0 comments on commit bda9582

Please sign in to comment.