Skip to content

Commit

Permalink
nsqd: clamp requeue timeout to range instead of dropping connection
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom Holmes committed Mar 24, 2017
1 parent 62c3858 commit 315096f
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 3 deletions.
15 changes: 12 additions & 3 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,9 +712,18 @@ func (p *protocolV2) REQ(client *clientV2, params [][]byte) ([]byte, error) {
}
timeoutDuration := time.Duration(timeoutMs) * time.Millisecond

if timeoutDuration < 0 || timeoutDuration > p.ctx.nsqd.getOpts().MaxReqTimeout {
return nil, protocol.NewFatalClientErr(nil, "E_INVALID",
fmt.Sprintf("REQ timeout %d out of range 0-%d", timeoutDuration, p.ctx.nsqd.getOpts().MaxReqTimeout))
maxReqTimeout := p.ctx.nsqd.getOpts().MaxReqTimeout
clampedTimeout := timeoutDuration

if timeoutDuration < 0 {
clampedTimeout = 0
} else if timeoutDuration > maxReqTimeout {
clampedTimeout = maxReqTimeout
}
if clampedTimeout != timeoutDuration {
p.ctx.nsqd.logf("PROTOCOL(V2): [%s] REQ timeout %d out of range 0-%d. Setting to %d",
client, timeoutDuration, maxReqTimeout, clampedTimeout)
timeoutDuration = clampedTimeout
}

err = client.Channel.RequeueMessage(client.ID, *id, timeoutDuration)
Expand Down
60 changes: 60 additions & 0 deletions nsqd/protocol_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,66 @@ func TestBadFin(t *testing.T) {
test.Equal(t, "E_INVALID Invalid Message ID", string(data))
}

func TestReqTimeoutRange(t *testing.T) {
opts := NewOptions()
opts.Logger = test.NewTestLogger(t)
opts.Verbose = true
opts.MaxReqTimeout = 1 * time.Minute
tcpAddr, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_req" + strconv.Itoa(int(time.Now().Unix()))

conn, err := mustConnectNSQD(tcpAddr)
test.Nil(t, err)
defer conn.Close()

identify(t, conn, nil, frameTypeResponse)
sub(t, conn, topicName, "ch")

topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("ch")
msg := NewMessage(topic.GenerateID(), []byte("test body"))
topic.PutMessage(msg)

_, err = nsq.Ready(1).WriteTo(conn)
test.Nil(t, err)

resp, err := nsq.ReadResponse(conn)
test.Nil(t, err)
frameType, data, err := nsq.UnpackResponse(resp)
msgOut, _ := decodeMessage(data)
test.Equal(t, frameTypeMessage, frameType)
test.Equal(t, msg.ID, msgOut.ID)

_, err = nsq.Requeue(nsq.MessageID(msg.ID), -1).WriteTo(conn)
test.Nil(t, err)

// It should be immediately available for another attempt
resp, err = nsq.ReadResponse(conn)
test.Nil(t, err)
frameType, data, err = nsq.UnpackResponse(resp)
msgOut, _ = decodeMessage(data)
test.Equal(t, frameTypeMessage, frameType)
test.Equal(t, msg.ID, msgOut.ID)

// The priority (processing time) should be >= this
minTs := time.Now().Add(opts.MaxReqTimeout).UnixNano()

_, err = nsq.Requeue(nsq.MessageID(msg.ID), opts.MaxReqTimeout*2).WriteTo(conn)
test.Nil(t, err)

time.Sleep(100 * time.Millisecond)

channel.deferredMutex.Lock()
pqItem := channel.deferredMessages[msg.ID]
channel.deferredMutex.Unlock()

test.NotNil(t, pqItem)
test.Equal(t, true, pqItem.Priority >= minTs)
}

func TestClientAuth(t *testing.T) {
authResponse := `{"ttl":1, "authorizations":[]}`
authSecret := "testsecret"
Expand Down

0 comments on commit 315096f

Please sign in to comment.