Skip to content

Commit

Permalink
nsqd: start Topic messagePump separately from create
Browse files Browse the repository at this point in the history
Add Topic.Start() method, call after all initial channels are added
to topic. This way Topics can be created un-paused, like they always
have been until very recently.

Topic.messagePump() does not pass messages until after being
notified by Start() via the new startChan.
  • Loading branch information
ploxiln committed Jul 10, 2018
1 parent c1c9041 commit bc39d96
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 18 deletions.
19 changes: 8 additions & 11 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,9 @@ func (n *NSQD) LoadMetadata() error {
continue
}
topic := n.GetTopic(t.Name)

if t.Paused {
topic.Pause()
}
for _, c := range t.Channels {
if !protocol.IsValidChannelName(c.Name) {
n.logf(LOG_WARN, "skipping creation of invalid channel %s", c.Name)
Expand All @@ -354,12 +356,7 @@ func (n *NSQD) LoadMetadata() error {
channel.Pause()
}
}

// this logic is reversed, and done _after_ channel creation to ensure that all channels
// are created before messages begin flowing
if !t.Paused {
topic.UnPause()
}
topic.Start()
}
return nil
}
Expand Down Expand Up @@ -507,9 +504,9 @@ func (n *NSQD) GetTopic(topicName string) *Topic {
n.Unlock()

n.logf(LOG_INFO, "TOPIC(%s): created", t.name)
// topic is created paused, only un-pause after adding channels
// topic is created but messagePump not yet started

// if loading metadata at startup, no lookupd connections yet, and topic un-paused elsewhere
// if loading metadata at startup, no lookupd connections yet, topic started after load
if atomic.LoadInt32(&n.isLoading) == 1 {
return t
}
Expand All @@ -532,8 +529,8 @@ func (n *NSQD) GetTopic(topicName string) *Topic {
n.logf(LOG_ERROR, "no available nsqlookupd to query for channels to pre-create for topic %s", t.name)
}

// now that all channels are added we can un-pause topic
t.UnPause()
// now that all channels are added, start topic messagePump
t.Start()
return t
}

Expand Down
46 changes: 39 additions & 7 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Topic struct {
channelMap map[string]*Channel
backend BackendQueue
memoryMsgChan chan *Message
startChan chan int
exitChan chan int
channelUpdateChan chan int
waitGroup util.WaitGroupWrapper
Expand All @@ -35,7 +36,7 @@ type Topic struct {
deleter sync.Once

paused int32
pauseChan chan bool
pauseChan chan int

ctx *context
}
Expand All @@ -46,11 +47,12 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi
name: topicName,
channelMap: make(map[string]*Channel),
memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize),
startChan: make(chan int, 1),
exitChan: make(chan int),
channelUpdateChan: make(chan int),
ctx: ctx,
paused: 1,
pauseChan: make(chan bool),
paused: 0,
pauseChan: make(chan int),
deleteCallback: deleteCallback,
idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID),
}
Expand Down Expand Up @@ -82,6 +84,13 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi
return t
}

func (t *Topic) Start() {
select {
case t.startChan <- 1:
default:
}
}

// Exiting returns a boolean indicating if this topic is closed/exiting
func (t *Topic) Exiting() bool {
return atomic.LoadInt32(&t.exitFlag) == 1
Expand Down Expand Up @@ -226,8 +235,31 @@ func (t *Topic) messagePump() {
var chans []*Channel
var memoryMsgChan chan *Message
var backendChan chan []byte
// always starts with no channels, channels added async with notification

// do not pass messages before Start(), but avoid blocking Pause() or GetChannel()
for {
select {
case <-t.channelUpdateChan:
continue
case <-t.pauseChan:
continue
case <-t.exitChan:
goto exit
case <-t.startChan:
}
break
}
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) > 0 && !t.IsPaused() {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}

// main message loop
for {
select {
case msg = <-memoryMsgChan:
Expand All @@ -252,8 +284,8 @@ func (t *Topic) messagePump() {
backendChan = t.backend.ReadChan()
}
continue
case pause := <-t.pauseChan:
if pause || len(chans) == 0 {
case <-t.pauseChan:
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
Expand Down Expand Up @@ -427,7 +459,7 @@ func (t *Topic) doPause(pause bool) error {
}

select {
case t.pauseChan <- pause:
case t.pauseChan <- 1:
case <-t.exitChan:
}

Expand Down

0 comments on commit bc39d96

Please sign in to comment.