Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: clamp requeue timeout to valid range instead of dropping connection #868

Merged
merged 1 commit into from
Mar 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,9 +712,18 @@ func (p *protocolV2) REQ(client *clientV2, params [][]byte) ([]byte, error) {
}
timeoutDuration := time.Duration(timeoutMs) * time.Millisecond

if timeoutDuration < 0 || timeoutDuration > p.ctx.nsqd.getOpts().MaxReqTimeout {
return nil, protocol.NewFatalClientErr(nil, "E_INVALID",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I'm curious now if it should still return some indication that this happened rather than silently proceeding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have no way of telling the client about it, since we don't want to return an error. We could log it on the server?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could be noisy, but still seems like the best option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's as noisy as the previous behavior, where we would log the fatal error and the connection drop every time someone did this ¯\(ツ)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mreiferson I think hanging up at IDENTIFY and REQ is worth thinking about some more. We actively monitor client errors, whereas early de-queues are not something we'd figure out right away. I would be happy with "nsqd requeued your message, but not exactly as you requested" but there's no mechanism for that. Our use case is we have some last ditch effort REQ's in the 12h range, if nsqd was configured incorrectly we'd see these come around again in an hour. Something to consider.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@judwhite nsqd does hang up on a bad IDENTIFY. On REQ, I agree this new behavior is more correct because the previous behavior would result in the intended message timing out rather than being requeued. In most cases that's probably less desirable if the actual time-to-reprocess is important.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the biggest issue here is that if the client has a max-in-flight > 1, any other message that was in flight would also be timed out, which can cause problems when you want exactly (or as close as possible to) 1 successful processing of each message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mreiferson @tsholmes MaxReqTimeout is the maximum REQ delay the client can specify, it's not related to -msg-timeout. Is that right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct, it's a separate option:

$ nsqd --help
...
  -max-msg-timeout duration
    	maximum duration before a message will timeout (default 15m0s)
...
  -max-req-timeout duration
    	maximum requeuing timeout for a message (default 1h0m0s)
...
  -msg-timeout string
    	duration to wait before auto-requeing a message (default "1m0s")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mreiferson I think I see what you're saying. When REQ failed previously nsqd continued the message timeout and it gets requeued anyway with 0 delay (is that right?). I suppose 'it depends' if you'd rather see errors on the client or have the server override your request parameters without notification.

fmt.Sprintf("REQ timeout %d out of range 0-%d", timeoutDuration, p.ctx.nsqd.getOpts().MaxReqTimeout))
maxReqTimeout := p.ctx.nsqd.getOpts().MaxReqTimeout
clampedTimeout := timeoutDuration

if timeoutDuration < 0 {
clampedTimeout = 0
} else if timeoutDuration > maxReqTimeout {
clampedTimeout = maxReqTimeout
}
if clampedTimeout != timeoutDuration {
p.ctx.nsqd.logf("PROTOCOL(V2): [%s] REQ timeout %d out of range 0-%d. Setting to %d",
client, timeoutDuration, maxReqTimeout, clampedTimeout)
timeoutDuration = clampedTimeout
}

err = client.Channel.RequeueMessage(client.ID, *id, timeoutDuration)
Expand Down
60 changes: 60 additions & 0 deletions nsqd/protocol_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,66 @@ func TestBadFin(t *testing.T) {
test.Equal(t, "E_INVALID Invalid Message ID", string(data))
}

func TestReqTimeoutRange(t *testing.T) {
opts := NewOptions()
opts.Logger = test.NewTestLogger(t)
opts.Verbose = true
opts.MaxReqTimeout = 1 * time.Minute
tcpAddr, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_req" + strconv.Itoa(int(time.Now().Unix()))

conn, err := mustConnectNSQD(tcpAddr)
test.Nil(t, err)
defer conn.Close()

identify(t, conn, nil, frameTypeResponse)
sub(t, conn, topicName, "ch")

topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("ch")
msg := NewMessage(topic.GenerateID(), []byte("test body"))
topic.PutMessage(msg)

_, err = nsq.Ready(1).WriteTo(conn)
test.Nil(t, err)

resp, err := nsq.ReadResponse(conn)
test.Nil(t, err)
frameType, data, err := nsq.UnpackResponse(resp)
msgOut, _ := decodeMessage(data)
test.Equal(t, frameTypeMessage, frameType)
test.Equal(t, msg.ID, msgOut.ID)

_, err = nsq.Requeue(nsq.MessageID(msg.ID), -1).WriteTo(conn)
test.Nil(t, err)

// It should be immediately available for another attempt
resp, err = nsq.ReadResponse(conn)
test.Nil(t, err)
frameType, data, err = nsq.UnpackResponse(resp)
msgOut, _ = decodeMessage(data)
test.Equal(t, frameTypeMessage, frameType)
test.Equal(t, msg.ID, msgOut.ID)

// The priority (processing time) should be >= this
minTs := time.Now().Add(opts.MaxReqTimeout).UnixNano()

_, err = nsq.Requeue(nsq.MessageID(msg.ID), opts.MaxReqTimeout*2).WriteTo(conn)
test.Nil(t, err)

time.Sleep(100 * time.Millisecond)

channel.deferredMutex.Lock()
pqItem := channel.deferredMessages[msg.ID]
channel.deferredMutex.Unlock()

test.NotNil(t, pqItem)
test.Equal(t, true, pqItem.Priority >= minTs)
}

func TestClientAuth(t *testing.T) {
authResponse := `{"ttl":1, "authorizations":[]}`
authSecret := "testsecret"
Expand Down