Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for using different subject name strategies #88

Merged
merged 2 commits into from
Jun 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ func SerializeAvro(configuration Configuration, topic string, data interface{},
bytesData := []byte(data.(string))

client := SchemaRegistryClientWithConfiguration(configuration.SchemaRegistry)
subject := topic + "-" + string(element)

var subject, subjectNameError = GetSubjectName(schema, topic, element, configuration.Producer.SubjectNameStrategy)
if subjectNameError != nil {
return nil, subjectNameError
}

var schemaInfo *srclient.Schema
schemaID := 0

Expand Down Expand Up @@ -86,22 +91,35 @@ func SerializeAvro(configuration Configuration, topic string, data interface{},
// is used to configure the Schema Registry client. The element is used to define the subject.
// The data should be a byte array.
func DeserializeAvro(configuration Configuration, topic string, data []byte, element Element, schema string, version int) (interface{}, *Xk6KafkaError) {
bytesDecodedData, err := DecodeWireFormat(data)
schemaID, bytesDecodedData, err := DecodeWireFormat(data)
if err != nil {
return nil, NewXk6KafkaError(failedDecodeFromWireFormat,
"Failed to remove wire format from the binary data",
err)
}

client := SchemaRegistryClientWithConfiguration(configuration.SchemaRegistry)
subject := topic + "-" + string(element)
var schemaInfo *srclient.Schema

var xk6KafkaError *Xk6KafkaError
var getSchemaError error

client := SchemaRegistryClientWithConfiguration(configuration.SchemaRegistry)

var subject, subjectNameError = GetSubjectName(schema, topic, element, configuration.Consumer.SubjectNameStrategy)
if subjectNameError != nil {
return nil, subjectNameError
}

if schema != "" {
// Schema is provided, so we need to create it and get the schema ID
schemaInfo, xk6KafkaError = CreateSchema(client, subject, schema, srclient.Avro)
} else if configuration.Consumer.UseMagicPrefix {
// Schema not provided and no valid version flag, so we use te schemaID in the magic prefix
schemaInfo, getSchemaError = client.GetSchema(schemaID)
if getSchemaError != nil {
xk6KafkaError = NewXk6KafkaError(failedCreateAvroCodec,
"Failed to get schema by magic prefix",
getSchemaError)
}
} else {
// Schema is not provided, so we need to fetch the schema from the Schema Registry
schemaInfo, xk6KafkaError = GetSchema(client, subject, schema, srclient.Avro, version)
Expand Down
235 changes: 232 additions & 3 deletions avro_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"github.com/riferrei/srclient"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -51,7 +52,7 @@ func TestSerializeDeserializeAvroFailsOnSchemaError(t *testing.T) {
assert.Equal(t, failedCreateAvroCodec, err.Code)

// Deserialize the key or value
deserialized, err := DeserializeAvro(avroConfig, "topic", []byte{1, 2, 3, 4, 5, 6}, element, jsonSchema, 0)
deserialized, err := DeserializeAvro(avroConfig, "topic", []byte{0, 1, 2, 3, 4, 5}, element, jsonSchema, 0)
assert.Nil(t, deserialized)
assert.Error(t, err.Unwrap())
assert.Equal(t, "Failed to create codec for decoding Avro", err.Message)
Expand All @@ -73,7 +74,7 @@ func TestSerializeDeserializeAvroFailsOnWireFormatError(t *testing.T) {

// Deserialize a broken key or value
// Proper wire-formatted message has 5 bytes (the wire format) plus data
deserialized, err = DeserializeAvro(avroConfig, "topic", []byte{1, 2, 3, 4}, element, schema, 0)
deserialized, err = DeserializeAvro(avroConfig, "topic", []byte{0, 1, 2, 3}, element, schema, 0)
assert.Nil(t, deserialized)
assert.Error(t, err.Unwrap())
assert.Equal(t, "Failed to remove wire format from the binary data", err.Message)
Expand All @@ -92,10 +93,238 @@ func TestSerializeDeserializeAvroFailsOnEncodeDecodeError(t *testing.T) {
assert.Equal(t, "Failed to encode data into Avro", err.Message)
assert.Equal(t, failedEncodeToAvro, err.Code)

deserialized, err := DeserializeAvro(avroConfig, "topic", []byte{1, 2, 3, 4, 5, 6}, element, avroSchema, 0)
deserialized, err := DeserializeAvro(avroConfig, "topic", []byte{0, 1, 2, 3, 5}, element, avroSchema, 0)
assert.Nil(t, deserialized)
assert.Error(t, err.Unwrap())
assert.Equal(t, "Failed to decode data from Avro", err.Message)
assert.Equal(t, failedDecodeAvroFromBinary, err.Code)
}
}

func TestAvroSerializeTopicNameStrategy(t *testing.T) {
data := `{"field":"value"}`
topic := "TestAvroSerializeTopicNameStrategy-topic"
config := Configuration{
Producer: ProducerConfiguration{
ValueSerializer: AvroSerializer,
SubjectNameStrategy: TopicNameStrategy,
},
SchemaRegistry: SchemaRegistryConfiguration{
Url: "http://localhost:8081",
},
}

schema := `{"type":"record","name":"TestAvroSerializeTopicNameStrategyIsDefaultStrategy","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}`

serialized, err := SerializeAvro(config, topic, data, Value, schema, 0)
assert.Nil(t, err)
assert.NotNil(t, serialized)

expectedSubject := topic + "-value"
srClient := SchemaRegistryClientWithConfiguration(config.SchemaRegistry)
schemaResult, err := GetSchema(srClient, expectedSubject, schema, srclient.Avro, 0)
assert.Nil(t, err)
assert.NotNil(t, schemaResult)
}

func TestAvroSerializeTopicNameStrategyIsDefaultStrategy(t *testing.T) {
data := `{"field":"value"}`
topic := "TestAvroSerializeTopicNameStrategyIsDefaultStrategy-topic"
config := Configuration{
Producer: ProducerConfiguration{
ValueSerializer: AvroSerializer,
},
SchemaRegistry: SchemaRegistryConfiguration{
Url: "http://localhost:8081",
},
}

schema := `{"type":"record","name":"TestAvroSerializeTopicNameStrategyIsDefaultStrategy","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}`

serialized, err := SerializeAvro(config, topic, data, Value, schema, 0)
assert.Nil(t, err)
assert.NotNil(t, serialized)

expectedSubject := topic + "-value"
srClient := SchemaRegistryClientWithConfiguration(config.SchemaRegistry)
schemaResult, err := GetSchema(srClient, expectedSubject, schema, srclient.Avro, 0)
assert.Nil(t, err)
assert.NotNil(t, schemaResult)
}

func TestAvroSerializeTopicRecordNameStrategy(t *testing.T) {
data := `{"field":"value"}`
topic := "TestAvroSerializeTopicRecordNameStrategy-topic"
config := Configuration{
Producer: ProducerConfiguration{
ValueSerializer: AvroSerializer,
SubjectNameStrategy: TopicRecordNameStrategy,
},
SchemaRegistry: SchemaRegistryConfiguration{
Url: "http://localhost:8081",
},
}
schema := `{"type":"record","name":"TestAvroSerializeTopicRecordNameStrategy","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}`

serialized, err := SerializeAvro(config, topic, data, Value, schema, 0)
assert.Nil(t, err)
assert.NotNil(t, serialized)

expectedSubject := topic + "-io.confluent.kafka.avro.TestAvroSerializeTopicRecordNameStrategy"
srClient := SchemaRegistryClientWithConfiguration(config.SchemaRegistry)
schemaResult, err := GetSchema(srClient, expectedSubject, schema, srclient.Avro, 0)
assert.Nil(t, err)
assert.NotNil(t, schemaResult)
}

func TestAvroSerializeRecordNameStrategy(t *testing.T) {
data := `{"field":"value"}`
topic := "TestAvroSerializeRecordNameStrategy-topic"
config := Configuration{
Producer: ProducerConfiguration{
ValueSerializer: AvroSerializer,
SubjectNameStrategy: RecordNameStrategy,
},
SchemaRegistry: SchemaRegistryConfiguration{
Url: "http://localhost:8081",
},
}
schema := `{"type":"record","name":"TestAvroSerializeRecordNameStrategy","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}`

serialized, err := SerializeAvro(config, topic, data, Value, schema, 0)
assert.Nil(t, err)
assert.NotNil(t, serialized)

expectedSubject := "io.confluent.kafka.avro.TestAvroSerializeRecordNameStrategy"
srClient := SchemaRegistryClientWithConfiguration(config.SchemaRegistry)
resultSchema, err := GetSchema(srClient, expectedSubject, avroSchema, srclient.Avro, 0)
assert.Nil(t, err)
assert.NotNil(t, resultSchema)
}

func TestAvroDeserializeUsingMagicPrefix(t *testing.T) {
data := `{"field":"value"}`
topic := "TestAvroDeserializeUsingMagicPrefix-topic"
config := Configuration{
Consumer: ConsumerConfiguration{
UseMagicPrefix: true,
},
Producer: ProducerConfiguration{
ValueSerializer: AvroSerializer,
SubjectNameStrategy: RecordNameStrategy,
},
SchemaRegistry: SchemaRegistryConfiguration{
Url: "http://localhost:8081",
},
}
schema := `{"type":"record","name":"TestAvroDeserializeUsingMagicPrefix","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}`

serialized, err := SerializeAvro(config, topic, data, Value, schema, 0)
assert.Nil(t, err)

dData, dErr := DeserializeAvro(config, topic, serialized, Value, "", 0)
assert.Equal(t, "value", dData.(map[string]interface{})["field"])
assert.Nil(t, dErr)
}

func TestAvroDeserializeUsingDefaultSubjectNameStrategy(t *testing.T) {
data := `{"field":"value"}`
topic := "TestAvroDeserializeUsingDefaultSubjectNameStrategy-topic"
config := Configuration{
Producer: ProducerConfiguration{
ValueSerializer: AvroSerializer,
},
Consumer: ConsumerConfiguration{
ValueDeserializer: AvroSerializer,
},
SchemaRegistry: SchemaRegistryConfiguration{
Url: "http://localhost:8081",
},
}
schema := `{"type":"record","name":"TestAvroDeserializeUsingDefaultSubjectNameStrategy","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}`

serialized, err := SerializeAvro(config, topic, data, Value, schema, 0)
assert.Nil(t, err)

dData, dErr := DeserializeAvro(config, topic, serialized, Value, "", 0)
assert.Equal(t, "value", dData.(map[string]interface{})["field"])
assert.Nil(t, dErr)
}

func TestAvroDeserializeUsingSubjectNameStrategyRecordName(t *testing.T) {
data := `{"field":"value"}`
topic := "TestAvroDeserializeUsingSubjectNameStrategyRecordName-topic"
config := Configuration{
Producer: ProducerConfiguration{
ValueSerializer: AvroSerializer,
SubjectNameStrategy: RecordNameStrategy,
},
Consumer: ConsumerConfiguration{
ValueDeserializer: AvroSerializer,
SubjectNameStrategy: RecordNameStrategy,
},
SchemaRegistry: SchemaRegistryConfiguration{
Url: "http://localhost:8081",
},
}
schema := `{"type":"record","name":"TestAvroDeserializeUsingSubjectNameStrategyRecordName","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}`

serialized, err := SerializeAvro(config, topic, data, Value, schema, 0)
assert.Nil(t, err)

dData, dErr := DeserializeAvro(config, topic, serialized, Value, schema, 0)
assert.Equal(t, "value", dData.(map[string]interface{})["field"])
assert.Nil(t, dErr)
}

func TestAvroDeserializeUsingSubjectNameStrategyTopicRecordName(t *testing.T) {
data := `{"field":"value"}`
topic := "TestAvroDeserializeUsingSubjectNameStrategyTopicRecordName-topic"
config := Configuration{
Producer: ProducerConfiguration{
ValueSerializer: AvroSerializer,
SubjectNameStrategy: TopicRecordNameStrategy,
},
Consumer: ConsumerConfiguration{
ValueDeserializer: AvroSerializer,
SubjectNameStrategy: TopicRecordNameStrategy,
},
SchemaRegistry: SchemaRegistryConfiguration{
Url: "http://localhost:8081",
},
}
schema := `{"type":"record","name":"TestAvroDeserializeUsingSubjectNameStrategyTopicRecordName","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}`

serialized, err := SerializeAvro(config, topic, data, Value, schema, 0)
assert.Nil(t, err)

dData, dErr := DeserializeAvro(config, topic, serialized, Value, schema, 0)
assert.Equal(t, "value", dData.(map[string]interface{})["field"])
assert.Nil(t, dErr)
}

func TestAvroDeserializeUsingSubjectNameStrategyTopicName(t *testing.T) {
data := `{"field":"value"}`
topic := "TestAvroDeserializeUsingSubjectNameStrategyTopicName-topic"
config := Configuration{
Producer: ProducerConfiguration{
ValueSerializer: AvroSerializer,
SubjectNameStrategy: TopicNameStrategy,
},
Consumer: ConsumerConfiguration{
ValueDeserializer: AvroSerializer,
SubjectNameStrategy: TopicNameStrategy,
},
SchemaRegistry: SchemaRegistryConfiguration{
Url: "http://localhost:8081",
},
}
schema := `{"type":"record","name":"TestAvroDeserializeUsingSubjectNameStrategyTopicName","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}`

serialized, err := SerializeAvro(config, topic, data, Value, schema, 0)
assert.Nil(t, err)

dData, dErr := DeserializeAvro(config, topic, serialized, Value, schema, 0)
assert.Equal(t, "value", dData.(map[string]interface{})["field"])
assert.Nil(t, dErr)
}
11 changes: 7 additions & 4 deletions configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import (
)

type ConsumerConfiguration struct {
KeyDeserializer string `json:"keyDeserializer"`
ValueDeserializer string `json:"valueDeserializer"`
KeyDeserializer string `json:"keyDeserializer"`
ValueDeserializer string `json:"valueDeserializer"`
SubjectNameStrategy string `json:"subjectNameStrategy"`
UseMagicPrefix bool `json:"useMagicPrefix"`
mostafa marked this conversation as resolved.
Show resolved Hide resolved
}

type ProducerConfiguration struct {
KeySerializer string `json:"keySerializer"`
ValueSerializer string `json:"valueSerializer"`
KeySerializer string `json:"keySerializer"`
ValueSerializer string `json:"valueSerializer"`
SubjectNameStrategy string `json:"subjectNameStrategy"`
}

type Configuration struct {
Expand Down
1 change: 1 addition & 0 deletions error_codes.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
failedEncodeToJson errCode = 2009
failedEncodeJsonToBinary errCode = 2010
failedDecodeJsonFromBinary errCode = 2011
failedToUnmarshalSchema errCode = 2012

// producer
failedWriteMessage errCode = 3000
Expand Down
2 changes: 1 addition & 1 deletion jsonschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func SerializeJson(configuration Configuration, topic string, data interface{},
// configuration is used to configure the Schema Registry client. The element is
// used to define the subject. The data should be a byte array.
func DeserializeJson(configuration Configuration, topic string, data []byte, element Element, schema string, version int) (interface{}, *Xk6KafkaError) {
bytesDecodedData, err := DecodeWireFormat(data)
_, bytesDecodedData, err := DecodeWireFormat(data)
if err != nil {
return nil, NewXk6KafkaError(failedDecodeFromWireFormat,
"Failed to remove wire format from the binary data",
Expand Down
4 changes: 2 additions & 2 deletions jsonschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestSerializeDeserializeJsonFailsOnSchemaError(t *testing.T) {
assert.Equal(t, failedCreateJsonSchemaCodec, err.Code)

// Deserialize the key or value
deserialized, err := DeserializeJson(jsonConfig, "topic", []byte{1, 2, 3, 4, 5, 6}, element, schema, 0)
deserialized, err := DeserializeJson(jsonConfig, "topic", []byte{0, 2, 3, 4, 5, 6}, element, schema, 0)
assert.Nil(t, deserialized)
assert.Error(t, err.Unwrap())
assert.Equal(t, "Failed to create codec for decoding JSON data", err.Message)
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestSerializeDeserializeJsonFailsOnMarshalError(t *testing.T) {
assert.Equal(t, "Failed to unmarshal JSON data", err.Message)
assert.Equal(t, failedUnmarshalJson, err.Code)

deserialized, err := DeserializeJson(jsonConfig, "topic", []byte{1, 2, 3, 4, 5, 6}, element, jsonSchema, 0)
deserialized, err := DeserializeJson(jsonConfig, "topic", []byte{0, 2, 3, 4, 5, 6}, element, jsonSchema, 0)
assert.Nil(t, deserialized)
assert.Error(t, err.Unwrap())
assert.Equal(t, "Failed to unmarshal JSON data", err.Message)
Expand Down
Loading