diff --git a/nsqd/channel.go b/nsqd/channel.go index 8838c1e3d..f38e081f3 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -3,6 +3,7 @@ package nsqd import ( "container/heap" "errors" + "fmt" "math" "strings" "sync" @@ -117,7 +118,7 @@ func NewChannel(topicName string, channelName string, nsqd *NSQD, ) } - c.nsqd.Notify(c) + c.nsqd.Notify(c, !c.ephemeral) return c } @@ -164,7 +165,7 @@ func (c *Channel) exit(deleted bool) error { // since we are explicitly deleting a channel (not just at system exit time) // de-register this from the lookupd - c.nsqd.Notify(c) + c.nsqd.Notify(c, !c.ephemeral) } else { c.nsqd.logf(LOG_INFO, "CHANNEL(%s): closing", c.name) } @@ -288,8 +289,8 @@ func (c *Channel) IsPaused() bool { // PutMessage writes a Message to the queue func (c *Channel) PutMessage(m *Message) error { - c.RLock() - defer c.RUnlock() + c.exitMutex.RLock() + defer c.exitMutex.RUnlock() if c.Exiting() { return errors.New("exiting") } @@ -390,33 +391,52 @@ func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Dura // AddClient adds a client to the Channel's client list func (c *Channel) AddClient(clientID int64, client Consumer) error { - c.Lock() - defer c.Unlock() + c.exitMutex.RLock() + defer c.exitMutex.RUnlock() + if c.Exiting() { + return errors.New("exiting") + } + + c.RLock() _, ok := c.clients[clientID] + numClients := len(c.clients) + c.RUnlock() if ok { return nil } maxChannelConsumers := c.nsqd.getOpts().MaxChannelConsumers - if maxChannelConsumers != 0 && len(c.clients) >= maxChannelConsumers { - return errors.New("E_TOO_MANY_CHANNEL_CONSUMERS") + if maxChannelConsumers != 0 && numClients >= maxChannelConsumers { + return fmt.Errorf("consumers for %s:%s exceeds limit of %d", + c.topicName, c.name, maxChannelConsumers) } + c.Lock() c.clients[clientID] = client + c.Unlock() return nil } // RemoveClient removes a client from the Channel's client list func (c *Channel) RemoveClient(clientID int64) { - c.Lock() - defer c.Unlock() + c.exitMutex.RLock() + defer c.exitMutex.RUnlock() + if c.Exiting() { + return + } + + c.RLock() _, ok := c.clients[clientID] + c.RUnlock() if !ok { return } + + c.Lock() delete(c.clients, clientID) + c.Unlock() if len(c.clients) == 0 && c.ephemeral == true { go c.deleter.Do(func() { c.deleteCallback(c) }) diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 550647d6e..10fc4ad4c 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -364,16 +364,15 @@ func (n *NSQD) PersistMetadata() error { channels := []interface{}{} topic.Lock() for _, channel := range topic.channelMap { - channel.Lock() if channel.ephemeral { - channel.Unlock() continue } + channel.Lock() channelData := make(map[string]interface{}) channelData["name"] = channel.name channelData["paused"] = channel.IsPaused() - channels = append(channels, channelData) channel.Unlock() + channels = append(channels, channelData) } topic.Unlock() topicData["channels"] = channels @@ -447,7 +446,7 @@ func (n *NSQD) Exit() { // GetTopic performs a thread safe operation // to return a pointer to a Topic object (potentially new) func (n *NSQD) GetTopic(topicName string) *Topic { - // most likely, we already have this topic, so try read lock first. + // most likely we already have this topic, so try read lock first n.RLock() t, ok := n.topicMap[topicName] n.RUnlock() @@ -473,13 +472,14 @@ func (n *NSQD) GetTopic(topicName string) *Topic { n.logf(LOG_INFO, "TOPIC(%s): created", t.name) // topic is created but messagePump not yet started - // if loading metadata at startup, no lookupd connections yet, topic started after load + // if this topic was created while loading metadata at startup don't do any further initialization + // (topic will be "started" after loading completes) if atomic.LoadInt32(&n.isLoading) == 1 { return t } - // if using lookupd, make a blocking call to get the topics, and immediately create them. - // this makes sure that any message received is buffered to the right channels + // if using lookupd, make a blocking call to get channels and immediately create them + // to ensure that all channels receive published messages lookupdHTTPAddrs := n.lookupdHTTPAddrs() if len(lookupdHTTPAddrs) > 0 { channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs) @@ -537,18 +537,18 @@ func (n *NSQD) DeleteExistingTopic(topicName string) error { return nil } -func (n *NSQD) Notify(v interface{}) { +func (n *NSQD) Notify(v interface{}, persist bool) { // since the in-memory metadata is incomplete, // should not persist metadata while loading it. // nsqd will call `PersistMetadata` it after loading - persist := atomic.LoadInt32(&n.isLoading) == 0 + loading := atomic.LoadInt32(&n.isLoading) == 1 n.waitGroup.Wrap(func() { // by selecting on exitChan we guarantee that // we do not block exit, see issue #123 select { case <-n.exitChan: case n.notifyChan <- v: - if !persist { + if loading || !persist { return } n.Lock() diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index ccac1b0f1..25f1303cd 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -615,19 +615,20 @@ func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) { // last client can leave the channel between GetChannel() and AddClient(). // Avoid adding a client to an ephemeral channel / topic which has started exiting. var channel *Channel - for { + for i := 1; ; i++ { topic := p.nsqd.GetTopic(topicName) channel = topic.GetChannel(channelName) if err := channel.AddClient(client.ID, client); err != nil { - return nil, protocol.NewFatalClientErr(nil, "E_TOO_MANY_CHANNEL_CONSUMERS", - fmt.Sprintf("channel consumers for %s:%s exceeds limit of %d", - topicName, channelName, p.nsqd.getOpts().MaxChannelConsumers)) + return nil, protocol.NewFatalClientErr(err, "E_SUB_FAILED", "SUB failed "+err.Error()) } if (channel.ephemeral && channel.Exiting()) || (topic.ephemeral && topic.Exiting()) { channel.RemoveClient(client.ID) - time.Sleep(1 * time.Millisecond) - continue + if i < 2 { + time.Sleep(100 * time.Millisecond) + continue + } + return nil, protocol.NewFatalClientErr(nil, "E_SUB_FAILED", "SUB failed to deleted topic/channel") } break } diff --git a/nsqd/topic.go b/nsqd/topic.go index 76aad14bb..d02c82989 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -82,7 +82,7 @@ func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic t.waitGroup.Wrap(t.messagePump) - t.nsqd.Notify(t) + t.nsqd.Notify(t, !t.ephemeral) return t } @@ -145,23 +145,28 @@ func (t *Topic) GetExistingChannel(channelName string) (*Channel, error) { // DeleteExistingChannel removes a channel from the topic only if it exists func (t *Topic) DeleteExistingChannel(channelName string) error { - t.Lock() + t.RLock() channel, ok := t.channelMap[channelName] + t.RUnlock() if !ok { - t.Unlock() return errors.New("channel does not exist") } - delete(t.channelMap, channelName) - // not defered so that we can continue while the channel async closes - numChannels := len(t.channelMap) - t.Unlock() t.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting channel %s", t.name, channel.name) // delete empties the channel before closing // (so that we dont leave any messages around) + // + // we do this before removing the channel from map below (with no lock) + // so that any incoming subs will error and not create a new channel + // to enforce ordering channel.Delete() + t.Lock() + delete(t.channelMap, channelName) + numChannels := len(t.channelMap) + t.Unlock() + // update messagePump state select { case t.channelUpdateChan <- 1: @@ -355,7 +360,7 @@ func (t *Topic) exit(deleted bool) error { // since we are explicitly deleting a topic (not just at system exit time) // de-register this from the lookupd - t.nsqd.Notify(t) + t.nsqd.Notify(t, !t.ephemeral) } else { t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing", t.name) } @@ -379,6 +384,7 @@ func (t *Topic) exit(deleted bool) error { } // close all the channels + t.RLock() for _, channel := range t.channelMap { err := channel.Close() if err != nil { @@ -386,6 +392,7 @@ func (t *Topic) exit(deleted bool) error { t.nsqd.logf(LOG_ERROR, "channel(%s) close - %s", channel.name, err) } } + t.RUnlock() // write anything leftover to disk t.flush()