Skip to content

Commit

Permalink
nsqd: Don't drop connection on out of range requeue timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom Holmes committed Mar 24, 2017
1 parent 62c3858 commit 4054b65
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 1 deletion.
2 changes: 1 addition & 1 deletion nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ func (p *protocolV2) REQ(client *clientV2, params [][]byte) ([]byte, error) {
timeoutDuration := time.Duration(timeoutMs) * time.Millisecond

if timeoutDuration < 0 || timeoutDuration > p.ctx.nsqd.getOpts().MaxReqTimeout {
return nil, protocol.NewFatalClientErr(nil, "E_INVALID",
return nil, protocol.NewClientErr(nil, "E_INVALID",
fmt.Sprintf("REQ timeout %d out of range 0-%d", timeoutDuration, p.ctx.nsqd.getOpts().MaxReqTimeout))
}

Expand Down
48 changes: 48 additions & 0 deletions nsqd/protocol_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,54 @@ func TestBadFin(t *testing.T) {
test.Equal(t, "E_INVALID Invalid Message ID", string(data))
}

func TestBadReq(t *testing.T) {
opts := NewOptions()
opts.Logger = test.NewTestLogger(t)
opts.Verbose = true
opts.MaxReqTimeout = 1 * time.Minute
tcpAddr, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_touch" + strconv.Itoa(int(time.Now().Unix()))

conn, err := mustConnectNSQD(tcpAddr)
test.Nil(t, err)
defer conn.Close()

identify(t, conn, nil, frameTypeResponse)
sub(t, conn, topicName, "ch")

topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("ch")
msg := NewMessage(topic.GenerateID(), []byte("test body"))
topic.PutMessage(msg)

_, err = nsq.Ready(1).WriteTo(conn)
test.Nil(t, err)

resp, err := nsq.ReadResponse(conn)
test.Nil(t, err)
frameType, data, err := nsq.UnpackResponse(resp)
msgOut, _ := decodeMessage(data)
test.Equal(t, frameTypeMessage, frameType)
test.Equal(t, msg.ID, msgOut.ID)

_, err = nsq.Requeue(nsq.MessageID(msg.ID), opts.MaxReqTimeout*2).WriteTo(conn)
test.Nil(t, err)

resp, _ = nsq.ReadResponse(conn)
frameType, data, _ = nsq.UnpackResponse(resp)
test.Equal(t, frameTypeError, frameType)
test.Equal(t, "E_INVALID REQ timeout 120000000000 out of range 0-60000000000", string(data))

// assert that we didn't drop the client
nsqd.RLock()
clients := len(channel.clients)
nsqd.RUnlock()
test.Equal(t, 1, clients)
}

func TestClientAuth(t *testing.T) {
authResponse := `{"ttl":1, "authorizations":[]}`
authSecret := "testsecret"
Expand Down

0 comments on commit 4054b65

Please sign in to comment.