Skip to content

Commit

Permalink
Merge pull request #1050 from ploxiln/create_topic_paused
Browse files Browse the repository at this point in the history
nsqd: create Topic paused
  • Loading branch information
mreiferson authored Jul 6, 2018
2 parents e3897d9 + 32f99eb commit 2b68a3d
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 37 deletions.
36 changes: 10 additions & 26 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,19 +504,15 @@ func (n *NSQD) GetTopic(topicName string) *Topic {
t = NewTopic(topicName, &context{n}, deleteCallback)
n.topicMap[topicName] = t

// if we're creating a new topic in this process while we're loading from metadata, the topic may already
// have message data persisted on disk. To ensure that all channels are created before message flow
// begins, we pause the topic.
if atomic.LoadInt32(&n.isLoading) == 1 {
t.Pause()
}
n.Unlock()

n.logf(LOG_INFO, "TOPIC(%s): created", t.name)
// topic is created paused, only un-pause after adding channels

// release our global nsqd lock, and switch to a more granular topic lock while we init our
// channels from lookupd. This blocks concurrent PutMessages to this topic.
t.Lock()
n.Unlock()
// if loading metadata at startup, no lookupd connections yet, and topic un-paused elsewhere
if atomic.LoadInt32(&n.isLoading) == 1 {
return t
}

// if using lookupd, make a blocking call to get the topics, and immediately create them.
// this makes sure that any message received is buffered to the right channels
Expand All @@ -528,28 +524,16 @@ func (n *NSQD) GetTopic(topicName string) *Topic {
}
for _, channelName := range channelNames {
if strings.HasSuffix(channelName, "#ephemeral") {
// we don't want to pre-create ephemeral channels
// because there isn't a client connected
continue
continue // do not create ephemeral channel with no consumer client
}
t.getOrCreateChannel(channelName)
t.GetChannel(channelName)
}
} else if len(n.getOpts().NSQLookupdTCPAddresses) > 0 {
n.logf(LOG_ERROR, "no available nsqlookupd to query for channels to pre-create for topic %s", t.name)
}

t.Unlock()

// NOTE: I would prefer for this to only happen in topic.GetChannel() but we're special
// casing the code above so that we can control the locks such that it is impossible
// for a message to be written to a (new) topic while we're looking up channels
// from lookupd...
//
// update messagePump state
select {
case t.channelUpdateChan <- 1:
case <-t.exitChan:
}
// now that all channels are added we can un-pause topic
t.UnPause()
return t
}

Expand Down
13 changes: 2 additions & 11 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi
exitChan: make(chan int),
channelUpdateChan: make(chan int),
ctx: ctx,
paused: 1,
pauseChan: make(chan bool),
deleteCallback: deleteCallback,
idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID),
Expand Down Expand Up @@ -225,17 +226,7 @@ func (t *Topic) messagePump() {
var chans []*Channel
var memoryMsgChan chan *Message
var backendChan chan []byte

t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()

if len(chans) > 0 {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
// always starts with no channels, channels added async with notification

for {
select {
Expand Down

0 comments on commit 2b68a3d

Please sign in to comment.