Skip to content

Commit

Permalink
add max_channel_client_connection_count
Browse files Browse the repository at this point in the history
  • Loading branch information
andyxning committed Feb 20, 2019
1 parent cbdcd54 commit b871a1c
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 12 deletions.
1 change: 1 addition & 0 deletions apps/nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
flagSet.Duration("max-output-buffer-timeout", opts.MaxOutputBufferTimeout, "maximum client configurable duration of time between flushing to a client")
flagSet.Duration("min-output-buffer-timeout", opts.MinOutputBufferTimeout, "minimum client configurable duration of time between flushing to a client")
flagSet.Duration("output-buffer-timeout", opts.OutputBufferTimeout, "default duration of time between flushing data to clients")
flagSet.Int("max-channel-client-connection-count", opts.MaxChannelClientConnectionCount, "maximum channel client connection count")

// statsd integration options
flagSet.String("statsd-address", opts.StatsdAddress, "UDP <addr>:<port> of a statsd daemon for pushing stats")
Expand Down
8 changes: 8 additions & 0 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,14 @@ func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Dura
return c.StartDeferredTimeout(msg, timeout)
}

// IsClientConnectionCountExceeded checks whether client connection count exceeds max_channel_client_connection_count.
func (c *Channel) IsClientConnectionCountExceeded() bool {
c.Lock()
defer c.Unlock()

return len(c.clients) >= c.ctx.nsqd.getOpts().MaxChannelClientConnectionCount
}

// AddClient adds a client to the Channel's client list
func (c *Channel) AddClient(clientID int64, client Consumer) {
c.Lock()
Expand Down
26 changes: 14 additions & 12 deletions nsqd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ type Options struct {
ClientTimeout time.Duration

// client overridable configuration options
MaxHeartbeatInterval time.Duration `flag:"max-heartbeat-interval"`
MaxRdyCount int64 `flag:"max-rdy-count"`
MaxOutputBufferSize int64 `flag:"max-output-buffer-size"`
MaxOutputBufferTimeout time.Duration `flag:"max-output-buffer-timeout"`
MinOutputBufferTimeout time.Duration `flag:"min-output-buffer-timeout"`
OutputBufferTimeout time.Duration `flag:"output-buffer-timeout"`
MaxHeartbeatInterval time.Duration `flag:"max-heartbeat-interval"`
MaxRdyCount int64 `flag:"max-rdy-count"`
MaxOutputBufferSize int64 `flag:"max-output-buffer-size"`
MaxOutputBufferTimeout time.Duration `flag:"max-output-buffer-timeout"`
MinOutputBufferTimeout time.Duration `flag:"min-output-buffer-timeout"`
OutputBufferTimeout time.Duration `flag:"output-buffer-timeout"`
MaxChannelClientConnectionCount int `flag:"max-channel-client-connection-count"`

// statsd integration
StatsdAddress string `flag:"statsd-address"`
Expand Down Expand Up @@ -128,12 +129,13 @@ func NewOptions() *Options {
MaxReqTimeout: 1 * time.Hour,
ClientTimeout: 60 * time.Second,

MaxHeartbeatInterval: 60 * time.Second,
MaxRdyCount: 2500,
MaxOutputBufferSize: 64 * 1024,
MaxOutputBufferTimeout: 30 * time.Second,
MinOutputBufferTimeout: 25 * time.Millisecond,
OutputBufferTimeout: 250 * time.Millisecond,
MaxHeartbeatInterval: 60 * time.Second,
MaxRdyCount: 2500,
MaxOutputBufferSize: 64 * 1024,
MaxOutputBufferTimeout: 30 * time.Second,
MinOutputBufferTimeout: 25 * time.Millisecond,
OutputBufferTimeout: 250 * time.Millisecond,
MaxChannelClientConnectionCount: 1000,

StatsdPrefix: "nsq.%s",
StatsdInterval: 60 * time.Second,
Expand Down
4 changes: 4 additions & 0 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,10 @@ func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
time.Sleep(1 * time.Millisecond)
continue
}
if channel.IsClientConnectionCountExceeded() {
return nil, protocol.NewFatalClientErr(nil, "E_TOO_MANY_CHANNEL_CLIENTS",
fmt.Sprintf("TOO many channel clients for %s:%s", topicName, channelName))
}
break
}
atomic.StoreInt32(&client.State, stateSubscribed)
Expand Down

0 comments on commit b871a1c

Please sign in to comment.