Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

fix: fix kafka proxy tests #14

Merged
merged 4 commits into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions kafka/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ func (r *requestKeyHandler) Handle(requestKeyVersion *kafkaprotocol.RequestKeyVe
return
}

msg := make([]byte, int64(requestKeyVersion.Length-int32(4+len(bufferRead.Bytes()))))
msg := make([]byte, int64(requestKeyVersion.Length-int32(4+bufferRead.Len())))
if _, err = io.ReadFull(io.TeeReader(src, bufferRead), msg); err != nil {
return
}

var req protocol.ProduceRequest
if err = protocol.VersionedDecode(msg, &req, requestKeyVersion.ApiVersion); err != nil {
logrus.Errorln(errors.Wrap(err, "error decoding ProduceRequest"))
// TODO notify error to a given notifier

// Do not return an error but log it.
return shouldReply, nil
Expand All @@ -84,7 +84,7 @@ func (r *requestKeyHandler) Handle(requestKeyVersion *kafkaprotocol.RequestKeyVe
if s.RecordBatch != nil {
for _, r := range s.RecordBatch.Records {
if !isValid(r.Value) {
logrus.Errorln("Message is not valid")
logrus.Debugln("Message is not valid")
} else {
logrus.Debugln("Message is valid")
}
Expand All @@ -93,7 +93,7 @@ func (r *requestKeyHandler) Handle(requestKeyVersion *kafkaprotocol.RequestKeyVe
if s.MsgSet != nil {
for _, mb := range s.MsgSet.Messages {
if !isValid(mb.Msg.Value) {
logrus.Errorln("Message is not valid")
logrus.Debugln("Message is not valid")
} else {
logrus.Debugln("Message is valid")
}
Expand Down
110 changes: 106 additions & 4 deletions kafka/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package kafka

import (
"bytes"
"encoding/binary"
"hash/crc32"
"testing"

"github.com/asyncapi/event-gateway/proxy"
kafkaproxy "github.com/grepplabs/kafka-proxy/proxy"
kafkaprotocol "github.com/grepplabs/kafka-proxy/proxy/protocol"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
logrustest "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -53,12 +57,12 @@ func TestRequestKeyHandler_Handle(t *testing.T) {
}{
{
name: "Valid message",
request: []byte{0, 0, 0, 7, 0, 16, 99, 111, 110, 115, 111, 108, 101, 45, 112, 114, 111, 100, 117, 99, 101, 114, 255, 255, 0, 1}, // payload: 'valid message'
request: generateProduceRequestV8("valid message"),
shouldReply: true,
},
{
name: "Invalid message",
request: []byte{0, 0, 0, 8, 0, 16, 99, 111, 110, 115, 111, 108, 101, 45, 112, 114, 111, 100, 117, 99, 101, 114, 255, 255, 0, 1}, // payload: 'invalid message'
request: generateProduceRequestV8("invalid message"),
shouldReply: true,
},
{
Expand All @@ -71,13 +75,16 @@ func TestRequestKeyHandler_Handle(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
log := logrustest.NewGlobal()
kv := &kafkaprotocol.RequestKeyVersion{
ApiKey: test.apiKey, // default is 0, which is a Produce Request
Length: int32(len(test.request)) + 4, // 4 bytes are ApiKey + Version located in all request headers (already read by the time of validating the msg).
ApiVersion: 8, // All test data was grabbed from a Produce Request version 8.
ApiKey: test.apiKey, // default is 0, which is a Produce Request
Length: int32(len(test.request) + 4), // 4 bytes are ApiKey + Version located in all request headers (already read by the time of validating the msg).
}

readBytes := bytes.NewBuffer(nil)
var h requestKeyHandler

shouldReply, err := h.Handle(kv, bytes.NewReader(test.request), &kafkaproxy.RequestsLoopContext{}, readBytes)
assert.NoError(t, err)
assert.Equal(t, test.shouldReply, shouldReply)
Expand All @@ -87,6 +94,101 @@ func TestRequestKeyHandler_Handle(t *testing.T) {
} else {
assert.Equal(t, readBytes.Len(), len(test.request))
}

for _, l := range log.AllEntries() {
assert.NotEqualf(t, l.Level, logrus.ErrorLevel, "%q logged error unexpected", l.Message) // We don't have a notification mechanism for errors yet
}
})
}
}

//nolint:funlen
func generateProduceRequestV8(payload string) []byte {
// Note: Taking V8 as random version.
buf := bytes.NewBuffer(nil)

// Request headers
//
// correlation_id: 0, 0, 0, 6
// client_id_size: 0, 16
// client_id: 99, 111, 110, 115, 111, 108, 101, 45, 112, 114, 111, 100, 117, 99, 101, 114
// transactional_id_size: 255, 255
// acks: 0, 1
buf.Write([]byte{0, 0, 0, 6, 0, 16, 99, 111, 110, 115, 111, 108, 101, 45, 112, 114, 111, 100, 117, 99, 101, 114, 255, 255, 0, 1})

// timeout: 0, 0, 5, 200
// topics count: 0, 0, 0, 1
// topic name string len: 0, 4
// topic name: 100, 101, 109, 111
// partition count: 0, 0, 0, 1
// partition: 0, 0, 0, 0
buf.Write([]byte{0, 0, 5, 220, 0, 0, 0, 1, 0, 4, 100, 101, 109, 111, 0, 0, 0, 1, 0, 0, 0, 0})

// request size <int32>
requestSize := make([]byte, 4)
requestSizeInt := uint32(68 + len(payload))
binary.BigEndian.PutUint32(requestSize, requestSizeInt)
buf.Write(requestSize)

// base offset: 0, 0, 0, 0, 0, 0, 0, 0
baseOffset := []byte{0, 0, 0, 0, 0, 0, 0, 0}
buf.Write(baseOffset)

// batch len <int32>
batchLen := make([]byte, 4)
binary.BigEndian.Uint32(requestSize)
binary.BigEndian.PutUint32(batchLen, requestSizeInt+uint32(len(baseOffset)+len(payload)))
buf.Write(batchLen)

// partition leader epoch: 255, 255, 255, 255
// version: 2
leaderEpochAndVersion := []byte{255, 255, 255, 255, 2}
buf.Write(leaderEpochAndVersion)

// CRC32 <int32>
crc32ReservationStart := buf.Len()
buf.Write([]byte{0, 0, 0, 0}) // reserving int32 for crc calculation once the rest of the request is generated.

// attributes: 0, 0
// last offset delta: 0, 0, 0, 0
// first timestamp: 0, 0, 1, 122, 129, 58, 129, 47
// max timestamp: 0, 0, 1, 122, 129, 58, 129, 47
// producer id: 255, 255, 255, 255, 255, 255, 255, 255
// producer epoc: 255, 255
// base sequence: 255, 255, 255, 255
// records https://kafka.apache.org/documentation/#record
// amount: 0, 0, 0, 1
buf.Write([]byte{0, 0, 0, 0, 0, 0, 0, 0, 1, 122, 129, 58, 129, 47, 0, 0, 1, 122, 129, 58, 129, 47, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 1})

// record len <int8>
recordLenInt := 27 + len(payload)
buf.WriteByte(byte(recordLenInt))

// attributes: 0
// timestamp delta: 0
// offset delta: 0
// key: 1
buf.Write([]byte{0, 0, 0, 1})

// message payload len
payloadLen := make([]byte, 1)
binary.PutVarint(payloadLen, int64(len(payload)))
buf.Write(payloadLen)

// Payload
buf.Write([]byte(payload))

// Headers: 0
buf.WriteByte(0)

table := crc32.MakeTable(crc32.Castagnoli)
crc32Calculator := crc32.New(table)
crc32Calculator.Write(buf.Bytes()[crc32ReservationStart+4:])

hash := crc32Calculator.Sum(make([]byte, 0))
for i := 0; i < len(hash); i++ {
buf.Bytes()[crc32ReservationStart+i] = hash[i]
}

return buf.Bytes()
}