From 28389b0637bce5e8ca44fb98eacf6b98bbaecc10 Mon Sep 17 00:00:00 2001 From: Andy Xie Date: Fri, 28 Jul 2017 15:19:52 +0800 Subject: [PATCH] fix mput binary param parse --- nsqd/http.go | 41 ++++++++++++++++++++++++++++++++++------- nsqd/http_test.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 7 deletions(-) diff --git a/nsqd/http.go b/nsqd/http.go index 0323fa79e..5e38cab09 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -24,6 +24,13 @@ import ( "github.com/nsqio/nsq/internal/version" ) +var boolParams = map[string]bool{ + "true": true, + "1": true, + "false": false, + "0": false, +} + type httpServer struct { ctx *context tlsEnabled bool @@ -241,15 +248,35 @@ func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou return nil, err } - _, ok := reqParams["binary"] + // if `binary` param is not specified, `text` mode is used. + // + // if `binary` param is specified: + // 1. "true" or "1", use `binary` mode + // 2. "false" or "0", use `text` mode + // 3. All other **empty** or **non-empty** values will be logged as deprecated + // and will be parsed as `true`, i.e., `binary` mode + // + vals, ok := reqParams["binary"] + + binaryMode := false if ok { - tmp := make([]byte, 4) - msgs, err = readMPUB(req.Body, tmp, topic, - s.ctx.nsqd.getOpts().MaxMsgSize, s.ctx.nsqd.getOpts().MaxBodySize) - if err != nil { - return nil, http_api.Err{413, err.(*protocol.FatalClientErr).Code[2:]} + var exists bool + if binaryMode, exists = boolParams[vals[0]]; !exists { + binaryMode = true + s.ctx.nsqd.logf(LOG_WARN, "deprecated value '%s' used for /mpub binary param", vals[0]) } - } else { + + if binaryMode { + tmp := make([]byte, 4) + msgs, err = readMPUB(req.Body, tmp, topic, + s.ctx.nsqd.getOpts().MaxMsgSize, s.ctx.nsqd.getOpts().MaxBodySize) + if err != nil { + return nil, http_api.Err{413, err.(*protocol.FatalClientErr).Code[2:]} + } + } + } + + if !binaryMode { // add 1 so that it's greater than our max when we test for it // (LimitReader returns a "fake" EOF) readMax := s.ctx.nsqd.getOpts().MaxBodySize + 1 diff --git a/nsqd/http_test.go b/nsqd/http_test.go index 7bcb78f06..c3be21d4c 100644 --- a/nsqd/http_test.go +++ b/nsqd/http_test.go @@ -172,6 +172,35 @@ func TestHTTPmpubBinary(t *testing.T) { test.Equal(t, int64(5), topic.Depth()) } +func TestHTTPmpubForNonNormalizedBinaryParam(t *testing.T) { + opts := NewOptions() + opts.Logger = test.NewTestLogger(t) + _, httpAddr, nsqd := mustStartNSQD(opts) + defer os.RemoveAll(opts.DataPath) + defer nsqd.Exit() + + topicName := "test_http_mpub_bin" + strconv.Itoa(int(time.Now().Unix())) + topic := nsqd.GetTopic(topicName) + + mpub := make([][]byte, 5) + for i := range mpub { + mpub[i] = make([]byte, 100) + } + cmd, _ := nsq.MultiPublish(topicName, mpub) + buf := bytes.NewBuffer(cmd.Body) + + url := fmt.Sprintf("http://%s/mpub?topic=%s&binary=non_normalized_binary_param", httpAddr, topicName) + resp, err := http.Post(url, "application/octet-stream", buf) + test.Nil(t, err) + defer resp.Body.Close() + body, _ := ioutil.ReadAll(resp.Body) + test.Equal(t, "OK", string(body)) + + time.Sleep(5 * time.Millisecond) + + test.Equal(t, int64(5), topic.Depth()) +} + func TestHTTPpubDefer(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t)