Skip to content

Commit

Permalink
Fix handling of Avro schema evolution (#43) (#2319)
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota authored Oct 2, 2024
1 parent 15f4568 commit 891b6a2
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,17 @@ public async Task<GenericRecord> Deserialize(string topic, Headers headers, byte
}
else
{
datumReader = await GetDatumReader(writerSchema, writerSchema);
Avro.Schema readerSchema;
if (latestSchema != null)
{
var latestSchemaAvro = await GetParsedSchema(latestSchema);
readerSchema = latestSchemaAvro;
}
else
{
readerSchema = writerSchema;
}
datumReader = await GetDatumReader(writerSchema, readerSchema);
data = datumReader.Read(default(GenericRecord), new BinaryDecoder(stream));
}
}
Expand Down Expand Up @@ -190,7 +200,7 @@ private async Task<DatumReader<GenericRecord>> GetDatumReader(Avro.Schema writer
{
readerSchema = writerSchema;
}
datumReader = new GenericReader<GenericRecord>(writerSchema, writerSchema);
datumReader = new GenericReader<GenericRecord>(writerSchema, readerSchema);
datumReaderBySchema[(writerSchema, readerSchema)] = datumReader;
return datumReader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ public void GenericRecord()
Assert.Equal(user["name"], result["name"]);
Assert.Equal(user["favorite_color"], result["favorite_color"]);
Assert.Equal(user["favorite_number"], result["favorite_number"]);

// serialize second object
user = new GenericRecord((RecordSchema) User._SCHEMA);
user.Add("name", "cool");
Expand All @@ -732,6 +732,60 @@ public void GenericRecord()
Assert.Equal(user["favorite_number"], result["favorite_number"]);
}

[Fact]
public void GenericRecordSchemaEvolution()
{
var schemaStr1 = "{\n"
+ " \"name\": \"SchemaEvolution\",\n"
+ " \"type\": \"record\",\n"
+ " \"fields\": [\n"
+ " {\n"
+ " \"name\": \"fieldToDelete\",\n"
+ " \"type\": \"string\"\n"
+ " }\n"
+ " ]\n"
+ "}";
var schemaStr2 = "{\n"
+ " \"name\": \"SchemaEvolution\",\n"
+ " \"type\": \"record\",\n"
+ " \"fields\": [\n"
+ " {\n"
+ " \"name\": \"newOptionalField\",\n"
+ " \"type\": [\"string\", \"null\"],\n"
+ " \"default\": \"optional\"\n"
+ " }\n"
+ " ]\n"
+ "}";

var schema1 = new RegisteredSchema("topic-value", 1, 1, schemaStr1, SchemaType.Avro, null);
var schema2 = new RegisteredSchema("topic-value", 2, 2, schemaStr2, SchemaType.Avro, null);
store[schemaStr1] = 1;
subjectStore["topic-value"] = new List<RegisteredSchema> { schema1 };
var serConfig = new AvroSerializerConfig
{
AutoRegisterSchemas = false,
UseLatestVersion = true
};
var serializer = new AvroSerializer<GenericRecord>(schemaRegistryClient, serConfig);
var deserConfig = new AvroDeserializerConfig
{
UseLatestVersion = true
};
var deserializer = new AvroDeserializer<GenericRecord>(schemaRegistryClient, deserConfig);

var obj = new GenericRecord((RecordSchema) Avro.Schema.Parse(schemaStr1));
obj.Add("fieldToDelete", "bye");

Headers headers = new Headers();
var bytes = serializer.SerializeAsync(obj, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result;

store[schemaStr2] = 2;
subjectStore["topic-value"] = new List<RegisteredSchema> { schema1, schema2 };
var result = deserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result;

Assert.Equal(result["newOptionalField"], "optional");
}

[Fact]
public void GenericRecordCELCondition()
{
Expand Down

0 comments on commit 891b6a2

Please sign in to comment.