Skip to content

Commit

Permalink
protect client add/remove when exiting
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Mar 1, 2021
1 parent c299baa commit 1a3a6f6
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 7 deletions.
28 changes: 22 additions & 6 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,8 @@ func (c *Channel) IsPaused() bool {

// PutMessage writes a Message to the queue
func (c *Channel) PutMessage(m *Message) error {
c.RLock()
defer c.RUnlock()
c.exitMutex.RLock()
defer c.exitMutex.RUnlock()
if c.Exiting() {
return errors.New("exiting")
}
Expand Down Expand Up @@ -391,10 +391,16 @@ func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Dura

// AddClient adds a client to the Channel's client list
func (c *Channel) AddClient(clientID int64, client Consumer) error {
c.Lock()
defer c.Unlock()
c.exitMutex.RLock()
defer c.exitMutex.RUnlock()

if c.Exiting() {
return errors.New("exiting")
}

c.RLock()
_, ok := c.clients[clientID]
c.RUnlock()
if ok {
return nil
}
Expand All @@ -405,20 +411,30 @@ func (c *Channel) AddClient(clientID int64, client Consumer) error {
c.topicName, c.name, c.nsqd.getOpts().MaxChannelConsumers)
}

c.Lock()
c.clients[clientID] = client
c.Unlock()
return nil
}

// RemoveClient removes a client from the Channel's client list
func (c *Channel) RemoveClient(clientID int64) {
c.Lock()
defer c.Unlock()
c.exitMutex.RLock()
defer c.exitMutex.RUnlock()

if c.Exiting() {
return
}

c.RLock()
_, ok := c.clients[clientID]
c.RUnlock()
if !ok {
return
}
c.Lock()
delete(c.clients, clientID)
c.Unlock()

if len(c.clients) == 0 && c.ephemeral == true {
go c.deleter.Do(func() { c.deleteCallback(c) })
Expand Down
2 changes: 1 addition & 1 deletion nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,8 @@ func (n *NSQD) PersistMetadata() error {
channelData := make(map[string]interface{})
channelData["name"] = channel.name
channelData["paused"] = channel.IsPaused()
channels = append(channels, channelData)
channel.Unlock()
channels = append(channels, channelData)
}
topic.Unlock()
topicData["channels"] = channels
Expand Down

0 comments on commit 1a3a6f6

Please sign in to comment.