Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

fix: fix kafka proxy tests #14

Merged
merged 4 commits into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions kafka/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ func (r *requestKeyHandler) Handle(requestKeyVersion *kafkaprotocol.RequestKeyVe
return
}

msg := make([]byte, int64(requestKeyVersion.Length-int32(4+len(bufferRead.Bytes()))))
msg := make([]byte, int64(requestKeyVersion.Length-int32(4+bufferRead.Len())))
if _, err = io.ReadFull(io.TeeReader(src, bufferRead), msg); err != nil {
return
}

var req protocol.ProduceRequest
if err = protocol.VersionedDecode(msg, &req, requestKeyVersion.ApiVersion); err != nil {
logrus.Errorln(errors.Wrap(err, "error decoding ProduceRequest"))
// TODO notify error to a given notifier

// Do not return an error but log it.
return shouldReply, nil
Expand All @@ -84,7 +84,7 @@ func (r *requestKeyHandler) Handle(requestKeyVersion *kafkaprotocol.RequestKeyVe
if s.RecordBatch != nil {
for _, r := range s.RecordBatch.Records {
if !isValid(r.Value) {
logrus.Errorln("Message is not valid")
logrus.Debugln("Message is not valid")
} else {
logrus.Debugln("Message is valid")
}
Expand All @@ -93,7 +93,7 @@ func (r *requestKeyHandler) Handle(requestKeyVersion *kafkaprotocol.RequestKeyVe
if s.MsgSet != nil {
for _, mb := range s.MsgSet.Messages {
if !isValid(mb.Msg.Value) {
logrus.Errorln("Message is not valid")
logrus.Debugln("Message is not valid")
} else {
logrus.Debugln("Message is valid")
}
Expand Down
26 changes: 22 additions & 4 deletions kafka/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,19 @@ import (
kafkaproxy "github.com/grepplabs/kafka-proxy/proxy"
kafkaprotocol "github.com/grepplabs/kafka-proxy/proxy/protocol"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
logrustest "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
)

// Note: Taking V8 as random version.
// correlation_id: 0, 0, 0, 6
// client_id_size: 0, 16
// client_id: 99, 111, 110, 115, 111, 108, 101, 45, 112, 114, 111, 100, 117, 99, 101, 114
// transactional_id_size: 255, 255
// acks: 0, 1
var produceRequestV8Headers = []byte{0, 0, 0, 6, 0, 16, 99, 111, 110, 115, 111, 108, 101, 45, 112, 114, 111, 100, 117, 99, 101, 114, 255, 255, 0, 1}

func TestNewKafka(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -53,12 +63,12 @@ func TestRequestKeyHandler_Handle(t *testing.T) {
}{
{
name: "Valid message",
request: []byte{0, 0, 0, 7, 0, 16, 99, 111, 110, 115, 111, 108, 101, 45, 112, 114, 111, 100, 117, 99, 101, 114, 255, 255, 0, 1}, // payload: 'valid message'
request: []byte{0, 0, 5, 220, 0, 0, 0, 1, 0, 4, 100, 101, 109, 111, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 81, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 69, 255, 255, 255, 255, 2, 42, 190, 231, 201, 0, 0, 0, 0, 0, 0, 0, 0, 1, 122, 129, 58, 51, 194, 0, 0, 1, 122, 129, 58, 51, 194, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 1, 38, 0, 0, 0, 1, 26, 118, 97, 108, 105, 100, 32, 109, 101, 115, 115, 97, 103, 101, 0}, // payload: 'valid message'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we provide the exact request body in tests and then convert it to bytes? At the moment I don't see which message is valid/invalid.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but at this moment we do not have a proper encoder.
Encoder/Decoder code comes from Sarama library and at this stage I only copied (yes, I had to copy because their interfaces are not public) the decoder logic, which is the only thing we need in our app by now.

I would postpone the encoding logic for later once the project turns to a more mature stage. There will be several request headers and message versions to support.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I understand, but you can extend the request body as comment with a "shape" of valid/invalid message? Then it will be easy to check if tests passed or not by changes or by bug in changes 😅

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont get what you mean.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@magicmatatjahu Look at my last commit. Not sure if this is what you meant but I hope it looks more clear now.
Ideally, we would be doing this through the ProduceRequest struct, but the cons are all of the ones I mentioned in my previous message.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, have an error in the test, but fixing anyway

Copy link
Collaborator

@magicmatatjahu magicmatatjahu Jul 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I had this situation 😅

image

So yeah, the comment was for describing the shape of bytes. Ok, we can stay with the last commit :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So please fix the lint error and I will accept.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah, I ended up creating a very handmade encoder that only works for tests. Enjoy checking the bytes back and forth dance 😆 . Btw, I ended up using some "magic" bytes so I simplify the work, otherwise, I will be spending a lot of time.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I feel responsible for your efforts. I only had problem with these bytes and I don't like stop you (because it's only a test), so I accept changes 🚀 Good job!

shouldReply: true,
},
{
name: "Invalid message",
request: []byte{0, 0, 0, 8, 0, 16, 99, 111, 110, 115, 111, 108, 101, 45, 112, 114, 111, 100, 117, 99, 101, 114, 255, 255, 0, 1}, // payload: 'invalid message'
request: []byte{0, 0, 5, 220, 0, 0, 0, 1, 0, 4, 100, 101, 109, 111, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 83, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 71, 255, 255, 255, 255, 2, 122, 198, 121, 91, 0, 0, 0, 0, 0, 0, 0, 0, 1, 122, 129, 58, 129, 47, 0, 0, 1, 122, 129, 58, 129, 47, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 1, 42, 0, 0, 0, 1, 30, 105, 110, 118, 97, 108, 105, 100, 32, 109, 101, 115, 115, 97, 103, 101, 0}, // payload: 'invalid message'
shouldReply: true,
},
{
Expand All @@ -71,13 +81,17 @@ func TestRequestKeyHandler_Handle(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
log := logrustest.NewGlobal()
test.request = append(produceRequestV8Headers, test.request...) // This appends the Produce Request headers which we don't care for this test.
kv := &kafkaprotocol.RequestKeyVersion{
ApiKey: test.apiKey, // default is 0, which is a Produce Request
Length: int32(len(test.request)) + 4, // 4 bytes are ApiKey + Version located in all request headers (already read by the time of validating the msg).
ApiVersion: 8, // All test data was grabbed from a Produce Request version 8.
ApiKey: test.apiKey, // default is 0, which is a Produce Request
Length: int32(len(test.request) + 4), // 4 bytes are ApiKey + Version located in all request headers (already read by the time of validating the msg).
}

readBytes := bytes.NewBuffer(nil)
var h requestKeyHandler

shouldReply, err := h.Handle(kv, bytes.NewReader(test.request), &kafkaproxy.RequestsLoopContext{}, readBytes)
assert.NoError(t, err)
assert.Equal(t, test.shouldReply, shouldReply)
Expand All @@ -87,6 +101,10 @@ func TestRequestKeyHandler_Handle(t *testing.T) {
} else {
assert.Equal(t, readBytes.Len(), len(test.request))
}

for _, l := range log.AllEntries() {
assert.NotEqualf(t, l.Level, logrus.ErrorLevel, "%q logged error unexpected", l.Message) // We don't have a notification mechanism for errors yet
}
})
}
}