diff --git a/apps/nsqd/nsqd.go b/apps/nsqd/nsqd.go index 25f052d9c..1a7ca34c2 100644 --- a/apps/nsqd/nsqd.go +++ b/apps/nsqd/nsqd.go @@ -119,6 +119,7 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet { flagSet.Duration("max-output-buffer-timeout", opts.MaxOutputBufferTimeout, "maximum client configurable duration of time between flushing to a client") flagSet.Duration("min-output-buffer-timeout", opts.MinOutputBufferTimeout, "minimum client configurable duration of time between flushing to a client") flagSet.Duration("output-buffer-timeout", opts.OutputBufferTimeout, "default duration of time between flushing data to clients") + flagSet.Int("max-channel-client-connection-count", opts.MaxChannelClientConnectionCount, "maximum channel client connection count") // statsd integration options flagSet.String("statsd-address", opts.StatsdAddress, "UDP : of a statsd daemon for pushing stats") diff --git a/nsqd/channel.go b/nsqd/channel.go index 95a7adb21..f070c605e 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -388,6 +388,14 @@ func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Dura return c.StartDeferredTimeout(msg, timeout) } +// IsClientConnectionCountExceeded checks whether client connection count exceeds max_channel_client_connection_count. +func (c *Channel) IsClientConnectionCountExceeded() bool { + c.Lock() + defer c.Unlock() + + return len(c.clients) >= c.ctx.nsqd.getOpts().MaxChannelClientConnectionCount +} + // AddClient adds a client to the Channel's client list func (c *Channel) AddClient(clientID int64, client Consumer) { c.Lock() diff --git a/nsqd/options.go b/nsqd/options.go index fdfda8572..9cb6a51fc 100644 --- a/nsqd/options.go +++ b/nsqd/options.go @@ -52,12 +52,13 @@ type Options struct { ClientTimeout time.Duration // client overridable configuration options - MaxHeartbeatInterval time.Duration `flag:"max-heartbeat-interval"` - MaxRdyCount int64 `flag:"max-rdy-count"` - MaxOutputBufferSize int64 `flag:"max-output-buffer-size"` - MaxOutputBufferTimeout time.Duration `flag:"max-output-buffer-timeout"` - MinOutputBufferTimeout time.Duration `flag:"min-output-buffer-timeout"` - OutputBufferTimeout time.Duration `flag:"output-buffer-timeout"` + MaxHeartbeatInterval time.Duration `flag:"max-heartbeat-interval"` + MaxRdyCount int64 `flag:"max-rdy-count"` + MaxOutputBufferSize int64 `flag:"max-output-buffer-size"` + MaxOutputBufferTimeout time.Duration `flag:"max-output-buffer-timeout"` + MinOutputBufferTimeout time.Duration `flag:"min-output-buffer-timeout"` + OutputBufferTimeout time.Duration `flag:"output-buffer-timeout"` + MaxChannelClientConnectionCount int `flag:"max-channel-client-connection-count"` // statsd integration StatsdAddress string `flag:"statsd-address"` @@ -128,12 +129,13 @@ func NewOptions() *Options { MaxReqTimeout: 1 * time.Hour, ClientTimeout: 60 * time.Second, - MaxHeartbeatInterval: 60 * time.Second, - MaxRdyCount: 2500, - MaxOutputBufferSize: 64 * 1024, - MaxOutputBufferTimeout: 30 * time.Second, - MinOutputBufferTimeout: 25 * time.Millisecond, - OutputBufferTimeout: 250 * time.Millisecond, + MaxHeartbeatInterval: 60 * time.Second, + MaxRdyCount: 2500, + MaxOutputBufferSize: 64 * 1024, + MaxOutputBufferTimeout: 30 * time.Second, + MinOutputBufferTimeout: 25 * time.Millisecond, + OutputBufferTimeout: 250 * time.Millisecond, + MaxChannelClientConnectionCount: 1000, StatsdPrefix: "nsq.%s", StatsdInterval: 60 * time.Second, diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index c2e7d7b42..49b2b7b0b 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -622,6 +622,10 @@ func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) { time.Sleep(1 * time.Millisecond) continue } + if channel.IsClientConnectionCountExceeded() { + return nil, protocol.NewFatalClientErr(nil, "E_TOO_MANY_CHANNEL_CLIENTS", + fmt.Sprintf("TOO many channel clients for %s:%s", topicName, channelName)) + } break } atomic.StoreInt32(&client.State, stateSubscribed)