Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

Commit

Permalink
fix: fix kafka proxy tests (#14)
Browse files Browse the repository at this point in the history
* fix: fix kafka proxy tests

* extract produce request payload generation to a function

* produce request encoder for tests

* remove const
  • Loading branch information
smoya authored Jul 8, 2021
1 parent 41593ba commit a3e8448
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 8 deletions.
8 changes: 4 additions & 4 deletions kafka/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ func (r *requestKeyHandler) Handle(requestKeyVersion *kafkaprotocol.RequestKeyVe
return
}

msg := make([]byte, int64(requestKeyVersion.Length-int32(4+len(bufferRead.Bytes()))))
msg := make([]byte, int64(requestKeyVersion.Length-int32(4+bufferRead.Len())))
if _, err = io.ReadFull(io.TeeReader(src, bufferRead), msg); err != nil {
return
}

var req protocol.ProduceRequest
if err = protocol.VersionedDecode(msg, &req, requestKeyVersion.ApiVersion); err != nil {
logrus.Errorln(errors.Wrap(err, "error decoding ProduceRequest"))
// TODO notify error to a given notifier

// Do not return an error but log it.
return shouldReply, nil
Expand All @@ -84,7 +84,7 @@ func (r *requestKeyHandler) Handle(requestKeyVersion *kafkaprotocol.RequestKeyVe
if s.RecordBatch != nil {
for _, r := range s.RecordBatch.Records {
if !isValid(r.Value) {
logrus.Errorln("Message is not valid")
logrus.Debugln("Message is not valid")
} else {
logrus.Debugln("Message is valid")
}
Expand All @@ -93,7 +93,7 @@ func (r *requestKeyHandler) Handle(requestKeyVersion *kafkaprotocol.RequestKeyVe
if s.MsgSet != nil {
for _, mb := range s.MsgSet.Messages {
if !isValid(mb.Msg.Value) {
logrus.Errorln("Message is not valid")
logrus.Debugln("Message is not valid")
} else {
logrus.Debugln("Message is valid")
}
Expand Down
110 changes: 106 additions & 4 deletions kafka/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package kafka

import (
"bytes"
"encoding/binary"
"hash/crc32"
"testing"

"github.com/asyncapi/event-gateway/proxy"
kafkaproxy "github.com/grepplabs/kafka-proxy/proxy"
kafkaprotocol "github.com/grepplabs/kafka-proxy/proxy/protocol"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
logrustest "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -53,12 +57,12 @@ func TestRequestKeyHandler_Handle(t *testing.T) {
}{
{
name: "Valid message",
request: []byte{0, 0, 0, 7, 0, 16, 99, 111, 110, 115, 111, 108, 101, 45, 112, 114, 111, 100, 117, 99, 101, 114, 255, 255, 0, 1}, // payload: 'valid message'
request: generateProduceRequestV8("valid message"),
shouldReply: true,
},
{
name: "Invalid message",
request: []byte{0, 0, 0, 8, 0, 16, 99, 111, 110, 115, 111, 108, 101, 45, 112, 114, 111, 100, 117, 99, 101, 114, 255, 255, 0, 1}, // payload: 'invalid message'
request: generateProduceRequestV8("invalid message"),
shouldReply: true,
},
{
Expand All @@ -71,13 +75,16 @@ func TestRequestKeyHandler_Handle(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
log := logrustest.NewGlobal()
kv := &kafkaprotocol.RequestKeyVersion{
ApiKey: test.apiKey, // default is 0, which is a Produce Request
Length: int32(len(test.request)) + 4, // 4 bytes are ApiKey + Version located in all request headers (already read by the time of validating the msg).
ApiVersion: 8, // All test data was grabbed from a Produce Request version 8.
ApiKey: test.apiKey, // default is 0, which is a Produce Request
Length: int32(len(test.request) + 4), // 4 bytes are ApiKey + Version located in all request headers (already read by the time of validating the msg).
}

readBytes := bytes.NewBuffer(nil)
var h requestKeyHandler

shouldReply, err := h.Handle(kv, bytes.NewReader(test.request), &kafkaproxy.RequestsLoopContext{}, readBytes)
assert.NoError(t, err)
assert.Equal(t, test.shouldReply, shouldReply)
Expand All @@ -87,6 +94,101 @@ func TestRequestKeyHandler_Handle(t *testing.T) {
} else {
assert.Equal(t, readBytes.Len(), len(test.request))
}

for _, l := range log.AllEntries() {
assert.NotEqualf(t, l.Level, logrus.ErrorLevel, "%q logged error unexpected", l.Message) // We don't have a notification mechanism for errors yet
}
})
}
}

//nolint:funlen
func generateProduceRequestV8(payload string) []byte {
// Note: Taking V8 as random version.
buf := bytes.NewBuffer(nil)

// Request headers
//
// correlation_id: 0, 0, 0, 6
// client_id_size: 0, 16
// client_id: 99, 111, 110, 115, 111, 108, 101, 45, 112, 114, 111, 100, 117, 99, 101, 114
// transactional_id_size: 255, 255
// acks: 0, 1
buf.Write([]byte{0, 0, 0, 6, 0, 16, 99, 111, 110, 115, 111, 108, 101, 45, 112, 114, 111, 100, 117, 99, 101, 114, 255, 255, 0, 1})

// timeout: 0, 0, 5, 200
// topics count: 0, 0, 0, 1
// topic name string len: 0, 4
// topic name: 100, 101, 109, 111
// partition count: 0, 0, 0, 1
// partition: 0, 0, 0, 0
buf.Write([]byte{0, 0, 5, 220, 0, 0, 0, 1, 0, 4, 100, 101, 109, 111, 0, 0, 0, 1, 0, 0, 0, 0})

// request size <int32>
requestSize := make([]byte, 4)
requestSizeInt := uint32(68 + len(payload))
binary.BigEndian.PutUint32(requestSize, requestSizeInt)
buf.Write(requestSize)

// base offset: 0, 0, 0, 0, 0, 0, 0, 0
baseOffset := []byte{0, 0, 0, 0, 0, 0, 0, 0}
buf.Write(baseOffset)

// batch len <int32>
batchLen := make([]byte, 4)
binary.BigEndian.Uint32(requestSize)
binary.BigEndian.PutUint32(batchLen, requestSizeInt+uint32(len(baseOffset)+len(payload)))
buf.Write(batchLen)

// partition leader epoch: 255, 255, 255, 255
// version: 2
leaderEpochAndVersion := []byte{255, 255, 255, 255, 2}
buf.Write(leaderEpochAndVersion)

// CRC32 <int32>
crc32ReservationStart := buf.Len()
buf.Write([]byte{0, 0, 0, 0}) // reserving int32 for crc calculation once the rest of the request is generated.

// attributes: 0, 0
// last offset delta: 0, 0, 0, 0
// first timestamp: 0, 0, 1, 122, 129, 58, 129, 47
// max timestamp: 0, 0, 1, 122, 129, 58, 129, 47
// producer id: 255, 255, 255, 255, 255, 255, 255, 255
// producer epoc: 255, 255
// base sequence: 255, 255, 255, 255
// records https://kafka.apache.org/documentation/#record
// amount: 0, 0, 0, 1
buf.Write([]byte{0, 0, 0, 0, 0, 0, 0, 0, 1, 122, 129, 58, 129, 47, 0, 0, 1, 122, 129, 58, 129, 47, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 1})

// record len <int8>
recordLenInt := 27 + len(payload)
buf.WriteByte(byte(recordLenInt))

// attributes: 0
// timestamp delta: 0
// offset delta: 0
// key: 1
buf.Write([]byte{0, 0, 0, 1})

// message payload len
payloadLen := make([]byte, 1)
binary.PutVarint(payloadLen, int64(len(payload)))
buf.Write(payloadLen)

// Payload
buf.Write([]byte(payload))

// Headers: 0
buf.WriteByte(0)

table := crc32.MakeTable(crc32.Castagnoli)
crc32Calculator := crc32.New(table)
crc32Calculator.Write(buf.Bytes()[crc32ReservationStart+4:])

hash := crc32Calculator.Sum(make([]byte, 0))
for i := 0; i < len(hash); i++ {
buf.Bytes()[crc32ReservationStart+i] = hash[i]
}

return buf.Bytes()
}

0 comments on commit a3e8448

Please sign in to comment.