Skip to content

Commit

Permalink
Topic.Start() try 2: pre-Start messagePump
Browse files Browse the repository at this point in the history
  • Loading branch information
ploxiln committed Jul 9, 2018
1 parent bda9582 commit 8af7608
Showing 1 changed file with 38 additions and 7 deletions.
45 changes: 38 additions & 7 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Topic struct {
channelMap map[string]*Channel
backend BackendQueue
memoryMsgChan chan *Message
startChan chan int
exitChan chan int
channelUpdateChan chan int
waitGroup util.WaitGroupWrapper
Expand All @@ -46,11 +47,12 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi
name: topicName,
channelMap: make(map[string]*Channel),
memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize),
startChan: make(chan int, 1),
exitChan: make(chan int),
channelUpdateChan: make(chan int, 1),
channelUpdateChan: make(chan int),
ctx: ctx,
paused: 0,
pauseChan: make(chan int, 1),
pauseChan: make(chan int),
deleteCallback: deleteCallback,
idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID),
}
Expand All @@ -75,13 +77,18 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi
)
}

t.waitGroup.Wrap(t.messagePump)

t.ctx.nsqd.Notify(t)

return t
}

func (t *Topic) Start() {
t.waitGroup.Wrap(t.messagePump)
select {
case t.startChan <- 1:
default:
}
}

// Exiting returns a boolean indicating if this topic is closed/exiting
Expand All @@ -101,7 +108,7 @@ func (t *Topic) GetChannel(channelName string) *Channel {
// update messagePump state
select {
case t.channelUpdateChan <- 1:
default:
case <-t.exitChan:
}
}

Expand Down Expand Up @@ -155,7 +162,7 @@ func (t *Topic) DeleteExistingChannel(channelName string) error {
// update messagePump state
select {
case t.channelUpdateChan <- 1:
default:
case <-t.exitChan:
}

if numChannels == 0 && t.ephemeral == true {
Expand Down Expand Up @@ -228,8 +235,32 @@ func (t *Topic) messagePump() {
var chans []*Channel
var memoryMsgChan chan *Message
var backendChan chan []byte
// channelUpdateChan will activate first (or pauseChan, which will do nothing if first)

// before Start()
gotChannels := false
started := false
for !started {
select {
case <-t.channelUpdateChan:
gotChannels = true
case <-t.pauseChan:
// no effect before channels added
case <-t.exitChan:
goto exit
case <-t.startChan:
started = true
}
}
if gotChannels {
go func() {
select {
case t.channelUpdateChan <- 1:
case <-t.exitChan:
}
}()
}

// main loop
for {
select {
case msg = <-memoryMsgChan:
Expand Down Expand Up @@ -430,7 +461,7 @@ func (t *Topic) doPause(pause bool) error {

select {
case t.pauseChan <- 1:
default:
case <-t.exitChan:
}

return nil
Expand Down

0 comments on commit 8af7608

Please sign in to comment.