From b460b6e4397f732b0e700a553c47d73ca3ccf00e Mon Sep 17 00:00:00 2001 From: Sergio Moya <1083296+smoya@users.noreply.github.com> Date: Wed, 7 Jul 2021 16:31:00 +0200 Subject: [PATCH 1/4] fix: fix kafka proxy tests --- kafka/proxy.go | 8 ++++---- kafka/proxy_test.go | 26 ++++++++++++++++++++++---- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/kafka/proxy.go b/kafka/proxy.go index 8712802..7b9af25 100644 --- a/kafka/proxy.go +++ b/kafka/proxy.go @@ -66,14 +66,14 @@ func (r *requestKeyHandler) Handle(requestKeyVersion *kafkaprotocol.RequestKeyVe return } - msg := make([]byte, int64(requestKeyVersion.Length-int32(4+len(bufferRead.Bytes())))) + msg := make([]byte, int64(requestKeyVersion.Length-int32(4+bufferRead.Len()))) if _, err = io.ReadFull(io.TeeReader(src, bufferRead), msg); err != nil { return } - var req protocol.ProduceRequest if err = protocol.VersionedDecode(msg, &req, requestKeyVersion.ApiVersion); err != nil { logrus.Errorln(errors.Wrap(err, "error decoding ProduceRequest")) + // TODO notify error to a given notifier // Do not return an error but log it. return shouldReply, nil @@ -84,7 +84,7 @@ func (r *requestKeyHandler) Handle(requestKeyVersion *kafkaprotocol.RequestKeyVe if s.RecordBatch != nil { for _, r := range s.RecordBatch.Records { if !isValid(r.Value) { - logrus.Errorln("Message is not valid") + logrus.Debugln("Message is not valid") } else { logrus.Debugln("Message is valid") } @@ -93,7 +93,7 @@ func (r *requestKeyHandler) Handle(requestKeyVersion *kafkaprotocol.RequestKeyVe if s.MsgSet != nil { for _, mb := range s.MsgSet.Messages { if !isValid(mb.Msg.Value) { - logrus.Errorln("Message is not valid") + logrus.Debugln("Message is not valid") } else { logrus.Debugln("Message is valid") } diff --git a/kafka/proxy_test.go b/kafka/proxy_test.go index 3fc04fd..e54147e 100644 --- a/kafka/proxy_test.go +++ b/kafka/proxy_test.go @@ -8,9 +8,19 @@ import ( kafkaproxy "github.com/grepplabs/kafka-proxy/proxy" kafkaprotocol "github.com/grepplabs/kafka-proxy/proxy/protocol" "github.com/pkg/errors" + "github.com/sirupsen/logrus" + logrustest "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" ) +// Note: Taking V8 as random version. +// correlation_id: 0, 0, 0, 6 +// client_id_size: 0, 16 +// client_id: 99, 111, 110, 115, 111, 108, 101, 45, 112, 114, 111, 100, 117, 99, 101, 114 +// transactional_id_size: 255, 255 +// acks: 0, 1 +var produceRequestV8Headers = []byte{0, 0, 0, 6, 0, 16, 99, 111, 110, 115, 111, 108, 101, 45, 112, 114, 111, 100, 117, 99, 101, 114, 255, 255, 0, 1} + func TestNewKafka(t *testing.T) { tests := []struct { name string @@ -53,12 +63,12 @@ func TestRequestKeyHandler_Handle(t *testing.T) { }{ { name: "Valid message", - request: []byte{0, 0, 0, 7, 0, 16, 99, 111, 110, 115, 111, 108, 101, 45, 112, 114, 111, 100, 117, 99, 101, 114, 255, 255, 0, 1}, // payload: 'valid message' + request: []byte{0, 0, 5, 220, 0, 0, 0, 1, 0, 4, 100, 101, 109, 111, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 81, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 69, 255, 255, 255, 255, 2, 42, 190, 231, 201, 0, 0, 0, 0, 0, 0, 0, 0, 1, 122, 129, 58, 51, 194, 0, 0, 1, 122, 129, 58, 51, 194, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 1, 38, 0, 0, 0, 1, 26, 118, 97, 108, 105, 100, 32, 109, 101, 115, 115, 97, 103, 101, 0}, // payload: 'valid message' shouldReply: true, }, { name: "Invalid message", - request: []byte{0, 0, 0, 8, 0, 16, 99, 111, 110, 115, 111, 108, 101, 45, 112, 114, 111, 100, 117, 99, 101, 114, 255, 255, 0, 1}, // payload: 'invalid message' + request: []byte{0, 0, 5, 220, 0, 0, 0, 1, 0, 4, 100, 101, 109, 111, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 83, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 71, 255, 255, 255, 255, 2, 122, 198, 121, 91, 0, 0, 0, 0, 0, 0, 0, 0, 1, 122, 129, 58, 129, 47, 0, 0, 1, 122, 129, 58, 129, 47, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 1, 42, 0, 0, 0, 1, 30, 105, 110, 118, 97, 108, 105, 100, 32, 109, 101, 115, 115, 97, 103, 101, 0}, // payload: 'invalid message' shouldReply: true, }, { @@ -71,13 +81,17 @@ func TestRequestKeyHandler_Handle(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + log := logrustest.NewGlobal() + test.request = append(produceRequestV8Headers, test.request...) // This appends the Produce Request headers which we don't care for this test. kv := &kafkaprotocol.RequestKeyVersion{ - ApiKey: test.apiKey, // default is 0, which is a Produce Request - Length: int32(len(test.request)) + 4, // 4 bytes are ApiKey + Version located in all request headers (already read by the time of validating the msg). + ApiVersion: 8, // All test data was grabbed from a Produce Request version 8. + ApiKey: test.apiKey, // default is 0, which is a Produce Request + Length: int32(len(test.request) + 4), // 4 bytes are ApiKey + Version located in all request headers (already read by the time of validating the msg). } readBytes := bytes.NewBuffer(nil) var h requestKeyHandler + shouldReply, err := h.Handle(kv, bytes.NewReader(test.request), &kafkaproxy.RequestsLoopContext{}, readBytes) assert.NoError(t, err) assert.Equal(t, test.shouldReply, shouldReply) @@ -87,6 +101,10 @@ func TestRequestKeyHandler_Handle(t *testing.T) { } else { assert.Equal(t, readBytes.Len(), len(test.request)) } + + for _, l := range log.AllEntries() { + assert.NotEqualf(t, l.Level, logrus.ErrorLevel, "%q logged error unexpected", l.Message) // We don't have a notification mechanism for errors yet + } }) } } From 73afed354c9dca8743666eaf0f87fdd3b94df9f3 Mon Sep 17 00:00:00 2001 From: Sergio Moya <1083296+smoya@users.noreply.github.com> Date: Thu, 8 Jul 2021 12:15:34 +0200 Subject: [PATCH 2/4] extract produce request payload generation to a function --- kafka/proxy_test.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/kafka/proxy_test.go b/kafka/proxy_test.go index e54147e..67b0c26 100644 --- a/kafka/proxy_test.go +++ b/kafka/proxy_test.go @@ -2,6 +2,7 @@ package kafka import ( "bytes" + "encoding/binary" "testing" "github.com/asyncapi/event-gateway/proxy" @@ -63,12 +64,12 @@ func TestRequestKeyHandler_Handle(t *testing.T) { }{ { name: "Valid message", - request: []byte{0, 0, 5, 220, 0, 0, 0, 1, 0, 4, 100, 101, 109, 111, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 81, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 69, 255, 255, 255, 255, 2, 42, 190, 231, 201, 0, 0, 0, 0, 0, 0, 0, 0, 1, 122, 129, 58, 51, 194, 0, 0, 1, 122, 129, 58, 51, 194, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 1, 38, 0, 0, 0, 1, 26, 118, 97, 108, 105, 100, 32, 109, 101, 115, 115, 97, 103, 101, 0}, // payload: 'valid message' + request: generateProduceRequestV8("valid message"), shouldReply: true, }, { name: "Invalid message", - request: []byte{0, 0, 5, 220, 0, 0, 0, 1, 0, 4, 100, 101, 109, 111, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 83, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 71, 255, 255, 255, 255, 2, 122, 198, 121, 91, 0, 0, 0, 0, 0, 0, 0, 0, 1, 122, 129, 58, 129, 47, 0, 0, 1, 122, 129, 58, 129, 47, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 1, 42, 0, 0, 0, 1, 30, 105, 110, 118, 97, 108, 105, 100, 32, 109, 101, 115, 115, 97, 103, 101, 0}, // payload: 'invalid message' + request: generateProduceRequestV8("invalid message"), shouldReply: true, }, { @@ -108,3 +109,16 @@ func TestRequestKeyHandler_Handle(t *testing.T) { }) } } + +func generateProduceRequestV8(payload string) []byte { + raw := []byte{0, 0, 5, 220, 0, 0, 0, 1, 0, 4, 100, 101, 109, 111, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 81, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 69, 255, 255, 255, 255, 2, 42, 190, 231, 201, 0, 0, 0, 0, 0, 0, 0, 0, 1, 122, 129, 58, 51, 194, 0, 0, 1, 122, 129, 58, 51, 194, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 1, 38, 0, 0, 0, 1} + + // Based on https://kafka.apache.org/documentation/#record + messageLen := make([]byte, 1) + binary.PutVarint(messageLen, int64(len(payload))) + + bytesPayload := append(messageLen, []byte(payload)...) + raw = append(raw, bytesPayload...) + + return append(raw, 0) // No headers +} From d365345c476096e6afc34bd6a49932359d02ecf5 Mon Sep 17 00:00:00 2001 From: Sergio Moya <1083296+smoya@users.noreply.github.com> Date: Thu, 8 Jul 2021 18:31:50 +0200 Subject: [PATCH 3/4] produce request encoder for tests --- kafka/proxy_test.go | 87 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 79 insertions(+), 8 deletions(-) diff --git a/kafka/proxy_test.go b/kafka/proxy_test.go index 67b0c26..7a44919 100644 --- a/kafka/proxy_test.go +++ b/kafka/proxy_test.go @@ -3,6 +3,7 @@ package kafka import ( "bytes" "encoding/binary" + "hash/crc32" "testing" "github.com/asyncapi/event-gateway/proxy" @@ -83,7 +84,6 @@ func TestRequestKeyHandler_Handle(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { log := logrustest.NewGlobal() - test.request = append(produceRequestV8Headers, test.request...) // This appends the Produce Request headers which we don't care for this test. kv := &kafkaprotocol.RequestKeyVersion{ ApiVersion: 8, // All test data was grabbed from a Produce Request version 8. ApiKey: test.apiKey, // default is 0, which is a Produce Request @@ -110,15 +110,86 @@ func TestRequestKeyHandler_Handle(t *testing.T) { } } +//nolint:funlen func generateProduceRequestV8(payload string) []byte { - raw := []byte{0, 0, 5, 220, 0, 0, 0, 1, 0, 4, 100, 101, 109, 111, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 81, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 69, 255, 255, 255, 255, 2, 42, 190, 231, 201, 0, 0, 0, 0, 0, 0, 0, 0, 1, 122, 129, 58, 51, 194, 0, 0, 1, 122, 129, 58, 51, 194, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 1, 38, 0, 0, 0, 1} + buf := bytes.NewBuffer(nil) - // Based on https://kafka.apache.org/documentation/#record - messageLen := make([]byte, 1) - binary.PutVarint(messageLen, int64(len(payload))) + // Request headers + buf.Write(produceRequestV8Headers) - bytesPayload := append(messageLen, []byte(payload)...) - raw = append(raw, bytesPayload...) + // timeout: 0, 0, 5, 200 + // topics count: 0, 0, 0, 1 + // topic name string len: 0, 4 + // topic name: 100, 101, 109, 111 + // partition count: 0, 0, 0, 1 + // partition: 0, 0, 0, 0 + buf.Write([]byte{0, 0, 5, 220, 0, 0, 0, 1, 0, 4, 100, 101, 109, 111, 0, 0, 0, 1, 0, 0, 0, 0}) - return append(raw, 0) // No headers + // request size + requestSize := make([]byte, 4) + requestSizeInt := uint32(68 + len(payload)) + binary.BigEndian.PutUint32(requestSize, requestSizeInt) + buf.Write(requestSize) + + // base offset: 0, 0, 0, 0, 0, 0, 0, 0 + baseOffset := []byte{0, 0, 0, 0, 0, 0, 0, 0} + buf.Write(baseOffset) + + // batch len + batchLen := make([]byte, 4) + binary.BigEndian.Uint32(requestSize) + binary.BigEndian.PutUint32(batchLen, requestSizeInt+uint32(len(baseOffset)+len(payload))) + buf.Write(batchLen) + + // partition leader epoch: 255, 255, 255, 255 + // version: 2 + leaderEpochAndVersion := []byte{255, 255, 255, 255, 2} + buf.Write(leaderEpochAndVersion) + + // CRC32 + crc32ReservationStart := buf.Len() + buf.Write([]byte{0, 0, 0, 0}) // reserving int32 for crc calculation once the rest of the request is generated. + + // attributes: 0, 0 + // last offset delta: 0, 0, 0, 0 + // first timestamp: 0, 0, 1, 122, 129, 58, 129, 47 + // max timestamp: 0, 0, 1, 122, 129, 58, 129, 47 + // producer id: 255, 255, 255, 255, 255, 255, 255, 255 + // producer epoc: 255, 255 + // base sequence: 255, 255, 255, 255 + // records https://kafka.apache.org/documentation/#record + // amount: 0, 0, 0, 1 + buf.Write([]byte{0, 0, 0, 0, 0, 0, 0, 0, 1, 122, 129, 58, 129, 47, 0, 0, 1, 122, 129, 58, 129, 47, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 1}) + + // record len + recordLenInt := 27 + len(payload) + buf.WriteByte(byte(recordLenInt)) + + // attributes: 0 + // timestamp delta: 0 + // offset delta: 0 + // key: 1 + buf.Write([]byte{0, 0, 0, 1}) + + // message payload len + payloadLen := make([]byte, 1) + binary.PutVarint(payloadLen, int64(len(payload))) + buf.Write(payloadLen) + + // Payload + buf.Write([]byte(payload)) + + // Headers: 0 + buf.WriteByte(0) + + table := crc32.MakeTable(crc32.Castagnoli) + crc32Calculator := crc32.New(table) + crc32Calculator.Write(buf.Bytes()[crc32ReservationStart+4:]) + + hash := crc32Calculator.Sum(make([]byte, 0)) + for i := 0; i < len(hash); i++ { + buf.Bytes()[crc32ReservationStart+i] = hash[i] + } + + return buf.Bytes() } From 2fd758d72a3eebbd05458c140ad0a2d24da8730a Mon Sep 17 00:00:00 2001 From: Sergio Moya <1083296+smoya@users.noreply.github.com> Date: Thu, 8 Jul 2021 18:49:40 +0200 Subject: [PATCH 4/4] remove const --- kafka/proxy_test.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/kafka/proxy_test.go b/kafka/proxy_test.go index 7a44919..f76db9f 100644 --- a/kafka/proxy_test.go +++ b/kafka/proxy_test.go @@ -15,14 +15,6 @@ import ( "github.com/stretchr/testify/assert" ) -// Note: Taking V8 as random version. -// correlation_id: 0, 0, 0, 6 -// client_id_size: 0, 16 -// client_id: 99, 111, 110, 115, 111, 108, 101, 45, 112, 114, 111, 100, 117, 99, 101, 114 -// transactional_id_size: 255, 255 -// acks: 0, 1 -var produceRequestV8Headers = []byte{0, 0, 0, 6, 0, 16, 99, 111, 110, 115, 111, 108, 101, 45, 112, 114, 111, 100, 117, 99, 101, 114, 255, 255, 0, 1} - func TestNewKafka(t *testing.T) { tests := []struct { name string @@ -112,10 +104,17 @@ func TestRequestKeyHandler_Handle(t *testing.T) { //nolint:funlen func generateProduceRequestV8(payload string) []byte { + // Note: Taking V8 as random version. buf := bytes.NewBuffer(nil) // Request headers - buf.Write(produceRequestV8Headers) + // + // correlation_id: 0, 0, 0, 6 + // client_id_size: 0, 16 + // client_id: 99, 111, 110, 115, 111, 108, 101, 45, 112, 114, 111, 100, 117, 99, 101, 114 + // transactional_id_size: 255, 255 + // acks: 0, 1 + buf.Write([]byte{0, 0, 0, 6, 0, 16, 99, 111, 110, 115, 111, 108, 101, 45, 112, 114, 111, 100, 117, 99, 101, 114, 255, 255, 0, 1}) // timeout: 0, 0, 5, 200 // topics count: 0, 0, 0, 1