From 891b6a2f0a28bb84bf3294ae44324a1dcb652efc Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 2 Oct 2024 12:47:28 -0700 Subject: [PATCH] Fix handling of Avro schema evolution (#43) (#2319) --- .../GenericDeserializerImpl.cs | 14 ++++- .../SerializeDeserialize.cs | 56 ++++++++++++++++++- 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs index 4a8df357d..abb0cbcbe 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs @@ -141,7 +141,17 @@ public async Task Deserialize(string topic, Headers headers, byte } else { - datumReader = await GetDatumReader(writerSchema, writerSchema); + Avro.Schema readerSchema; + if (latestSchema != null) + { + var latestSchemaAvro = await GetParsedSchema(latestSchema); + readerSchema = latestSchemaAvro; + } + else + { + readerSchema = writerSchema; + } + datumReader = await GetDatumReader(writerSchema, readerSchema); data = datumReader.Read(default(GenericRecord), new BinaryDecoder(stream)); } } @@ -190,7 +200,7 @@ private async Task> GetDatumReader(Avro.Schema writer { readerSchema = writerSchema; } - datumReader = new GenericReader(writerSchema, writerSchema); + datumReader = new GenericReader(writerSchema, readerSchema); datumReaderBySchema[(writerSchema, readerSchema)] = datumReader; return datumReader; } diff --git a/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs b/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs index ffe258bca..9a8962188 100644 --- a/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs +++ b/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs @@ -717,7 +717,7 @@ public void GenericRecord() Assert.Equal(user["name"], result["name"]); Assert.Equal(user["favorite_color"], result["favorite_color"]); Assert.Equal(user["favorite_number"], result["favorite_number"]); - + // serialize second object user = new GenericRecord((RecordSchema) User._SCHEMA); user.Add("name", "cool"); @@ -732,6 +732,60 @@ public void GenericRecord() Assert.Equal(user["favorite_number"], result["favorite_number"]); } + [Fact] + public void GenericRecordSchemaEvolution() + { + var schemaStr1 = "{\n" + + " \"name\": \"SchemaEvolution\",\n" + + " \"type\": \"record\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"fieldToDelete\",\n" + + " \"type\": \"string\"\n" + + " }\n" + + " ]\n" + + "}"; + var schemaStr2 = "{\n" + + " \"name\": \"SchemaEvolution\",\n" + + " \"type\": \"record\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"newOptionalField\",\n" + + " \"type\": [\"string\", \"null\"],\n" + + " \"default\": \"optional\"\n" + + " }\n" + + " ]\n" + + "}"; + + var schema1 = new RegisteredSchema("topic-value", 1, 1, schemaStr1, SchemaType.Avro, null); + var schema2 = new RegisteredSchema("topic-value", 2, 2, schemaStr2, SchemaType.Avro, null); + store[schemaStr1] = 1; + subjectStore["topic-value"] = new List { schema1 }; + var serConfig = new AvroSerializerConfig + { + AutoRegisterSchemas = false, + UseLatestVersion = true + }; + var serializer = new AvroSerializer(schemaRegistryClient, serConfig); + var deserConfig = new AvroDeserializerConfig + { + UseLatestVersion = true + }; + var deserializer = new AvroDeserializer(schemaRegistryClient, deserConfig); + + var obj = new GenericRecord((RecordSchema) Avro.Schema.Parse(schemaStr1)); + obj.Add("fieldToDelete", "bye"); + + Headers headers = new Headers(); + var bytes = serializer.SerializeAsync(obj, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result; + + store[schemaStr2] = 2; + subjectStore["topic-value"] = new List { schema1, schema2 }; + var result = deserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result; + + Assert.Equal(result["newOptionalField"], "optional"); + } + [Fact] public void GenericRecordCELCondition() {