diff --git a/events/kafka.go b/events/kafka.go index 263cb766..abbc03f3 100644 --- a/events/kafka.go +++ b/events/kafka.go @@ -2,6 +2,10 @@ package events +import ( + "encoding/json" +) + type KafkaEvent struct { EventSource string `json:"eventSource"` EventSourceARN string `json:"eventSourceArn"` @@ -10,12 +14,37 @@ type KafkaEvent struct { } type KafkaRecord struct { - Topic string `json:"topic"` - Partition int64 `json:"partition"` - Offset int64 `json:"offset"` - Timestamp MilliSecondsEpochTime `json:"timestamp"` - TimestampType string `json:"timestampType"` - Key string `json:"key,omitempty"` - Value string `json:"value,omitempty"` - Headers []map[string][]int8 `json:"headers"` + Topic string `json:"topic"` + Partition int64 `json:"partition"` + Offset int64 `json:"offset"` + Timestamp MilliSecondsEpochTime `json:"timestamp"` + TimestampType string `json:"timestampType"` + Key string `json:"key,omitempty"` + Value string `json:"value,omitempty"` + Headers []map[string]JSONNumberBytes `json:"headers"` +} + +// JSONNumberBytes represents array of bytes in Headers field. +type JSONNumberBytes []byte + +// MarshalJSON converts byte array into array of signed integers. +func (b JSONNumberBytes) MarshalJSON() ([]byte, error) { + signedNumbers := make([]int8, len(b)) + for i, value := range b { + signedNumbers[i] = int8(value) + } + return json.Marshal(signedNumbers) +} + +// UnmarshalJSON converts a given json with potential negative values into byte array. +func (b *JSONNumberBytes) UnmarshalJSON(data []byte) error { + var signedNumbers []int8 + if err := json.Unmarshal(data, &signedNumbers); err != nil { + return err + } + *b = make(JSONNumberBytes, len(signedNumbers)) + for i, value := range signedNumbers { + (*b)[i] = byte(value) + } + return nil } diff --git a/events/kafka_test.go b/events/kafka_test.go index 50b52306..f4ad6577 100644 --- a/events/kafka_test.go +++ b/events/kafka_test.go @@ -21,12 +21,12 @@ func TestKafkaEventMarshaling(t *testing.T) { } // expected values for header - var headerValues [5]int8 + var headerValues [5]byte headerValues[0] = 118 - headerValues[1] = -36 + headerValues[1] = 220 // -36 + 256 headerValues[2] = 0 headerValues[3] = 127 - headerValues[4] = -128 + headerValues[4] = 128 // -128 + 256 assert.Equal(t, inputEvent.BootstrapServers, "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092") assert.Equal(t, inputEvent.EventSource, "aws:kafka")