Skip to content

Commit

Permalink
Merge pull request #1314 from mreiferson/ephemeral-live-lock-1251
Browse files Browse the repository at this point in the history
nsqd: fix ephemeral channel sub live lock
  • Loading branch information
mreiferson authored Apr 4, 2021
2 parents 74f0dca + 62d202f commit 619ac10
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 34 deletions.
40 changes: 30 additions & 10 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nsqd
import (
"container/heap"
"errors"
"fmt"
"math"
"strings"
"sync"
Expand Down Expand Up @@ -117,7 +118,7 @@ func NewChannel(topicName string, channelName string, nsqd *NSQD,
)
}

c.nsqd.Notify(c)
c.nsqd.Notify(c, !c.ephemeral)

return c
}
Expand Down Expand Up @@ -164,7 +165,7 @@ func (c *Channel) exit(deleted bool) error {

// since we are explicitly deleting a channel (not just at system exit time)
// de-register this from the lookupd
c.nsqd.Notify(c)
c.nsqd.Notify(c, !c.ephemeral)
} else {
c.nsqd.logf(LOG_INFO, "CHANNEL(%s): closing", c.name)
}
Expand Down Expand Up @@ -288,8 +289,8 @@ func (c *Channel) IsPaused() bool {

// PutMessage writes a Message to the queue
func (c *Channel) PutMessage(m *Message) error {
c.RLock()
defer c.RUnlock()
c.exitMutex.RLock()
defer c.exitMutex.RUnlock()
if c.Exiting() {
return errors.New("exiting")
}
Expand Down Expand Up @@ -390,33 +391,52 @@ func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Dura

// AddClient adds a client to the Channel's client list
func (c *Channel) AddClient(clientID int64, client Consumer) error {
c.Lock()
defer c.Unlock()
c.exitMutex.RLock()
defer c.exitMutex.RUnlock()

if c.Exiting() {
return errors.New("exiting")
}

c.RLock()
_, ok := c.clients[clientID]
numClients := len(c.clients)
c.RUnlock()
if ok {
return nil
}

maxChannelConsumers := c.nsqd.getOpts().MaxChannelConsumers
if maxChannelConsumers != 0 && len(c.clients) >= maxChannelConsumers {
return errors.New("E_TOO_MANY_CHANNEL_CONSUMERS")
if maxChannelConsumers != 0 && numClients >= maxChannelConsumers {
return fmt.Errorf("consumers for %s:%s exceeds limit of %d",
c.topicName, c.name, maxChannelConsumers)
}

c.Lock()
c.clients[clientID] = client
c.Unlock()
return nil
}

// RemoveClient removes a client from the Channel's client list
func (c *Channel) RemoveClient(clientID int64) {
c.Lock()
defer c.Unlock()
c.exitMutex.RLock()
defer c.exitMutex.RUnlock()

if c.Exiting() {
return
}

c.RLock()
_, ok := c.clients[clientID]
c.RUnlock()
if !ok {
return
}

c.Lock()
delete(c.clients, clientID)
c.Unlock()

if len(c.clients) == 0 && c.ephemeral == true {
go c.deleter.Do(func() { c.deleteCallback(c) })
Expand Down
20 changes: 10 additions & 10 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,16 +364,15 @@ func (n *NSQD) PersistMetadata() error {
channels := []interface{}{}
topic.Lock()
for _, channel := range topic.channelMap {
channel.Lock()
if channel.ephemeral {
channel.Unlock()
continue
}
channel.Lock()
channelData := make(map[string]interface{})
channelData["name"] = channel.name
channelData["paused"] = channel.IsPaused()
channels = append(channels, channelData)
channel.Unlock()
channels = append(channels, channelData)
}
topic.Unlock()
topicData["channels"] = channels
Expand Down Expand Up @@ -447,7 +446,7 @@ func (n *NSQD) Exit() {
// GetTopic performs a thread safe operation
// to return a pointer to a Topic object (potentially new)
func (n *NSQD) GetTopic(topicName string) *Topic {
// most likely, we already have this topic, so try read lock first.
// most likely we already have this topic, so try read lock first
n.RLock()
t, ok := n.topicMap[topicName]
n.RUnlock()
Expand All @@ -473,13 +472,14 @@ func (n *NSQD) GetTopic(topicName string) *Topic {
n.logf(LOG_INFO, "TOPIC(%s): created", t.name)
// topic is created but messagePump not yet started

// if loading metadata at startup, no lookupd connections yet, topic started after load
// if this topic was created while loading metadata at startup don't do any further initialization
// (topic will be "started" after loading completes)
if atomic.LoadInt32(&n.isLoading) == 1 {
return t
}

// if using lookupd, make a blocking call to get the topics, and immediately create them.
// this makes sure that any message received is buffered to the right channels
// if using lookupd, make a blocking call to get channels and immediately create them
// to ensure that all channels receive published messages
lookupdHTTPAddrs := n.lookupdHTTPAddrs()
if len(lookupdHTTPAddrs) > 0 {
channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs)
Expand Down Expand Up @@ -537,18 +537,18 @@ func (n *NSQD) DeleteExistingTopic(topicName string) error {
return nil
}

func (n *NSQD) Notify(v interface{}) {
func (n *NSQD) Notify(v interface{}, persist bool) {
// since the in-memory metadata is incomplete,
// should not persist metadata while loading it.
// nsqd will call `PersistMetadata` it after loading
persist := atomic.LoadInt32(&n.isLoading) == 0
loading := atomic.LoadInt32(&n.isLoading) == 1
n.waitGroup.Wrap(func() {
// by selecting on exitChan we guarantee that
// we do not block exit, see issue #123
select {
case <-n.exitChan:
case n.notifyChan <- v:
if !persist {
if loading || !persist {
return
}
n.Lock()
Expand Down
13 changes: 7 additions & 6 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,19 +615,20 @@ func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
// last client can leave the channel between GetChannel() and AddClient().
// Avoid adding a client to an ephemeral channel / topic which has started exiting.
var channel *Channel
for {
for i := 1; ; i++ {
topic := p.nsqd.GetTopic(topicName)
channel = topic.GetChannel(channelName)
if err := channel.AddClient(client.ID, client); err != nil {
return nil, protocol.NewFatalClientErr(nil, "E_TOO_MANY_CHANNEL_CONSUMERS",
fmt.Sprintf("channel consumers for %s:%s exceeds limit of %d",
topicName, channelName, p.nsqd.getOpts().MaxChannelConsumers))
return nil, protocol.NewFatalClientErr(err, "E_SUB_FAILED", "SUB failed "+err.Error())
}

if (channel.ephemeral && channel.Exiting()) || (topic.ephemeral && topic.Exiting()) {
channel.RemoveClient(client.ID)
time.Sleep(1 * time.Millisecond)
continue
if i < 2 {
time.Sleep(100 * time.Millisecond)
continue
}
return nil, protocol.NewFatalClientErr(nil, "E_SUB_FAILED", "SUB failed to deleted topic/channel")
}
break
}
Expand Down
23 changes: 15 additions & 8 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic

t.waitGroup.Wrap(t.messagePump)

t.nsqd.Notify(t)
t.nsqd.Notify(t, !t.ephemeral)

return t
}
Expand Down Expand Up @@ -145,23 +145,28 @@ func (t *Topic) GetExistingChannel(channelName string) (*Channel, error) {

// DeleteExistingChannel removes a channel from the topic only if it exists
func (t *Topic) DeleteExistingChannel(channelName string) error {
t.Lock()
t.RLock()
channel, ok := t.channelMap[channelName]
t.RUnlock()
if !ok {
t.Unlock()
return errors.New("channel does not exist")
}
delete(t.channelMap, channelName)
// not defered so that we can continue while the channel async closes
numChannels := len(t.channelMap)
t.Unlock()

t.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting channel %s", t.name, channel.name)

// delete empties the channel before closing
// (so that we dont leave any messages around)
//
// we do this before removing the channel from map below (with no lock)
// so that any incoming subs will error and not create a new channel
// to enforce ordering
channel.Delete()

t.Lock()
delete(t.channelMap, channelName)
numChannels := len(t.channelMap)
t.Unlock()

// update messagePump state
select {
case t.channelUpdateChan <- 1:
Expand Down Expand Up @@ -355,7 +360,7 @@ func (t *Topic) exit(deleted bool) error {

// since we are explicitly deleting a topic (not just at system exit time)
// de-register this from the lookupd
t.nsqd.Notify(t)
t.nsqd.Notify(t, !t.ephemeral)
} else {
t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing", t.name)
}
Expand All @@ -379,13 +384,15 @@ func (t *Topic) exit(deleted bool) error {
}

// close all the channels
t.RLock()
for _, channel := range t.channelMap {
err := channel.Close()
if err != nil {
// we need to continue regardless of error to close all the channels
t.nsqd.logf(LOG_ERROR, "channel(%s) close - %s", channel.name, err)
}
}
t.RUnlock()

// write anything leftover to disk
t.flush()
Expand Down

0 comments on commit 619ac10

Please sign in to comment.