From 46dcc81a6d08ee95f35d7f60f573cc96f8d983ef Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 18 Sep 2024 11:55:38 -0700 Subject: [PATCH] Fix use of RecordNameStrategy (#40) --- .../GenericDeserializerImpl.cs | 32 +++++++------- .../GenericSerializerImpl.cs | 9 +--- .../SpecificDeserializerImpl.cs | 33 +++++++------- .../SpecificSerializerImpl.cs | 9 +--- .../JsonDeserializer.cs | 44 ++++++++++--------- .../JsonSerializer.cs | 9 +--- .../ProtobufDeserializer.cs | 14 +----- .../ProtobufSerializer.cs | 9 +--- src/Confluent.SchemaRegistry/AsyncSerde.cs | 25 +++++++++++ .../Rest/RestService.cs | 2 +- .../BaseSerializeDeserialize.cs | 2 +- .../SerializeDeserialize.cs | 44 +++++++++++++++++++ 12 files changed, 135 insertions(+), 97 deletions(-) diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs index 6830aa939..4a8df357d 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs @@ -71,20 +71,13 @@ public async Task Deserialize(string topic, Headers headers, byte throw new InvalidDataException($"Expecting data framing of length 5 bytes or more but total data size is {array.Length} bytes"); } - string subject = this.subjectNameStrategy != null - // use the subject name strategy specified in the serializer config if available. - ? this.subjectNameStrategy( - new SerializationContext(isKey ? MessageComponentType.Key : MessageComponentType.Value, topic), - null) - // else fall back to the deprecated config from (or default as currently supplied by) SchemaRegistry. - : schemaRegistryClient == null - ? null - : isKey - ? schemaRegistryClient.ConstructKeySubjectName(topic) - : schemaRegistryClient.ConstructValueSubjectName(topic); - - Schema latestSchema = await GetReaderSchema(subject) - .ConfigureAwait(continueOnCapturedContext: false); + string subject = GetSubjectName(topic, isKey, null); + Schema latestSchema = null; + if (subject != null) + { + latestSchema = await GetReaderSchema(subject) + .ConfigureAwait(continueOnCapturedContext: false); + } Schema writerSchemaJson; Avro.Schema writerSchema; @@ -101,7 +94,16 @@ public async Task Deserialize(string topic, Headers headers, byte var writerId = IPAddress.NetworkToHostOrder(reader.ReadInt32()); (writerSchemaJson, writerSchema) = await GetSchema(subject, writerId); - + if (subject == null) + { + subject = GetSubjectName(topic, isKey, writerSchema.Fullname); + if (subject != null) + { + latestSchema = await GetReaderSchema(subject) + .ConfigureAwait(continueOnCapturedContext: false); + } + } + if (latestSchema != null) { migrations = await GetMigrations(subject, writerSchemaJson, latestSchema) diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs index 16f269b1f..9a6a39c7a 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs @@ -131,14 +131,7 @@ public async Task Serialize(string topic, Headers headers, GenericRecord // better to use hash functions based on the writerSchemaString // object reference, not value. - subject = this.subjectNameStrategy != null - // use the subject name strategy specified in the serializer config if available. - ? this.subjectNameStrategy(new SerializationContext(isKey ? MessageComponentType.Key : MessageComponentType.Value, topic), data.Schema.Fullname) - // else fall back to the deprecated config from (or default as currently supplied by) SchemaRegistry. - : isKey - ? schemaRegistryClient.ConstructKeySubjectName(topic, data.Schema.Fullname) - : schemaRegistryClient.ConstructValueSubjectName(topic, data.Schema.Fullname); - + subject = GetSubjectName(topic, isKey, data.Schema.Fullname); latestSchema = await GetReaderSchema(subject) .ConfigureAwait(continueOnCapturedContext: false); diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificDeserializerImpl.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificDeserializerImpl.cs index 9736a2bd3..c26f0401c 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificDeserializerImpl.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificDeserializerImpl.cs @@ -121,23 +121,17 @@ public async Task Deserialize(string topic, Headers headers, byte[] array, bo if (array.Length < 5) { - throw new InvalidDataException($"Expecting data framing of length 5 bytes or more but total data size is {array.Length} bytes"); + throw new InvalidDataException( + $"Expecting data framing of length 5 bytes or more but total data size is {array.Length} bytes"); } - - string subject = this.subjectNameStrategy != null - // use the subject name strategy specified in the serializer config if available. - ? this.subjectNameStrategy( - new SerializationContext(isKey ? MessageComponentType.Key : MessageComponentType.Value, topic), - null) - // else fall back to the deprecated config from (or default as currently supplied by) SchemaRegistry. - : schemaRegistryClient == null - ? null - : isKey - ? schemaRegistryClient.ConstructKeySubjectName(topic) - : schemaRegistryClient.ConstructValueSubjectName(topic); - Schema latestSchema = await GetReaderSchema(subject) - .ConfigureAwait(continueOnCapturedContext: false); + string subject = GetSubjectName(topic, isKey, null); + Schema latestSchema = null; + if (subject != null) + { + latestSchema = await GetReaderSchema(subject) + .ConfigureAwait(continueOnCapturedContext: false); + } Schema writerSchemaJson = null; Avro.Schema writerSchema = null; @@ -154,6 +148,15 @@ public async Task Deserialize(string topic, Headers headers, byte[] array, bo var writerId = IPAddress.NetworkToHostOrder(reader.ReadInt32()); (writerSchemaJson, writerSchema) = await GetSchema(subject, writerId); + if (subject == null) + { + subject = GetSubjectName(topic, isKey, writerSchema.Fullname); + if (subject != null) + { + latestSchema = await GetReaderSchema(subject) + .ConfigureAwait(continueOnCapturedContext: false); + } + } if (latestSchema != null) { diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs index 4a4cf0e92..07efa10ad 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs @@ -204,14 +204,7 @@ public async Task Serialize(string topic, Headers headers, T data, bool fullname = ((Avro.RecordSchema)((ISpecificRecord)data).Schema).Fullname; } - subject = this.subjectNameStrategy != null - // use the subject name strategy specified in the serializer config if available. - ? this.subjectNameStrategy(new SerializationContext(isKey ? MessageComponentType.Key : MessageComponentType.Value, topic), fullname) - // else fall back to the deprecated config from (or default as currently supplied by) SchemaRegistry. - : isKey - ? schemaRegistryClient.ConstructKeySubjectName(topic, fullname) - : schemaRegistryClient.ConstructValueSubjectName(topic, fullname); - + subject = GetSubjectName(topic, isKey, fullname); latestSchema = await GetReaderSchema(subject) .ConfigureAwait(continueOnCapturedContext: false); diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs b/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs index 89dabb6ed..a3c0a07af 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs @@ -54,11 +54,11 @@ namespace Confluent.SchemaRegistry.Serdes public class JsonDeserializer : AsyncDeserializer where T : class { private readonly JsonSchemaGeneratorSettings jsonSchemaGeneratorSettings; - + private JsonSchemaValidator validator = new JsonSchemaValidator(); private JsonSchema schema = null; - + /// /// Initialize a new JsonDeserializer instance. /// @@ -74,12 +74,12 @@ public JsonDeserializer(IEnumerable> config = null, { } - public JsonDeserializer(ISchemaRegistryClient schemaRegistryClient, IEnumerable> config = null, JsonSchemaGeneratorSettings jsonSchemaGeneratorSettings = null) + public JsonDeserializer(ISchemaRegistryClient schemaRegistryClient, IEnumerable> config = null, JsonSchemaGeneratorSettings jsonSchemaGeneratorSettings = null) : this(schemaRegistryClient, config != null ? new JsonDeserializerConfig(config) : null, jsonSchemaGeneratorSettings) { } - public JsonDeserializer(ISchemaRegistryClient schemaRegistryClient, JsonDeserializerConfig config, + public JsonDeserializer(ISchemaRegistryClient schemaRegistryClient, JsonDeserializerConfig config, JsonSchemaGeneratorSettings jsonSchemaGeneratorSettings = null, RuleRegistry ruleRegistry = null) : base(schemaRegistryClient, config, ruleRegistry) { @@ -108,7 +108,7 @@ public JsonDeserializer(ISchemaRegistryClient schemaRegistryClient, JsonDeserial /// /// /// Schema to use for validation, used when external - /// schema references are present in the schema. + /// schema references are present in the schema. /// Populate the References list of the schema for /// the same. Assuming the referenced schemas have /// already been registered in the registry. @@ -155,24 +155,17 @@ public override async Task DeserializeAsync(ReadOnlyMemory data, bool i { throw new InvalidDataException($"Expecting data framing of length 6 bytes or more but total data size is {array.Length} bytes"); } - + bool isKey = context.Component == MessageComponentType.Key; string topic = context.Topic; - string subject = this.subjectNameStrategy != null - // use the subject name strategy specified in the serializer config if available. - ? this.subjectNameStrategy( - new SerializationContext(isKey ? MessageComponentType.Key : MessageComponentType.Value, topic), - null) - // else fall back to the deprecated config from (or default as currently supplied by) SchemaRegistry. - : schemaRegistryClient == null - ? null - : isKey - ? schemaRegistryClient.ConstructKeySubjectName(topic) - : schemaRegistryClient.ConstructValueSubjectName(topic); - - Schema latestSchema = await GetReaderSchema(subject) - .ConfigureAwait(continueOnCapturedContext: false); - + string subject = GetSubjectName(topic, isKey, null); + Schema latestSchema = null; + if (subject != null) + { + latestSchema = await GetReaderSchema(subject) + .ConfigureAwait(continueOnCapturedContext: false); + } + try { Schema writerSchema = null; @@ -193,6 +186,15 @@ public override async Task DeserializeAsync(ReadOnlyMemory data, bool i if (schemaRegistryClient != null) { (writerSchema, writerSchemaJson) = await GetSchema(subject, writerId); + if (subject == null) + { + subject = GetSubjectName(topic, isKey, writerSchemaJson.Title); + if (subject != null) + { + latestSchema = await GetReaderSchema(subject) + .ConfigureAwait(continueOnCapturedContext: false); + } + } } if (latestSchema != null) diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs b/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs index c023acc83..d7a1aa6de 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs @@ -183,14 +183,7 @@ public override async Task SerializeAsync(T value, SerializationContext await serdeMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false); try { - subject = this.subjectNameStrategy != null - // use the subject name strategy specified in the serializer config if available. - ? this.subjectNameStrategy(context, this.schemaFullname) - // else fall back to the deprecated config from (or default as currently supplied by) SchemaRegistry. - : context.Component == MessageComponentType.Key - ? schemaRegistryClient.ConstructKeySubjectName(context.Topic, this.schemaFullname) - : schemaRegistryClient.ConstructValueSubjectName(context.Topic, this.schemaFullname); - + subject = GetSubjectName(context.Topic, context.Component == MessageComponentType.Key, this.schemaFullname); latestSchema = await GetReaderSchema(subject, new Schema(schemaText, ReferenceList, SchemaType.Json)) .ConfigureAwait(continueOnCapturedContext: false); diff --git a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufDeserializer.cs b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufDeserializer.cs index 2f112d9ba..dff67e64e 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufDeserializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufDeserializer.cs @@ -123,18 +123,8 @@ public override async Task DeserializeAsync(ReadOnlyMemory data, bool i bool isKey = context.Component == MessageComponentType.Key; string topic = context.Topic; - string subject = this.subjectNameStrategy != null - // use the subject name strategy specified in the serializer config if available. - ? this.subjectNameStrategy( - new SerializationContext(isKey ? MessageComponentType.Key : MessageComponentType.Value, topic), - null) - // else fall back to the deprecated config from (or default as currently supplied by) SchemaRegistry. - : schemaRegistryClient == null - ? null - : isKey - ? schemaRegistryClient.ConstructKeySubjectName(topic) - : schemaRegistryClient.ConstructValueSubjectName(topic); - + string subject = GetSubjectName(topic, isKey, null); + // Currently Protobuf does not support migration rules because of lack of support for DynamicMessage // See https://github.com/protocolbuffers/protobuf/issues/658 /* diff --git a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs index a18de9287..7de427c11 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs @@ -257,14 +257,7 @@ public override async Task SerializeAsync(T value, SerializationContext await serdeMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false); try { - subject = this.subjectNameStrategy != null - // use the subject name strategy specified in the serializer config if available. - ? this.subjectNameStrategy(context, fullname) - // else fall back to the deprecated config from (or default as currently supplied by) SchemaRegistry. - : context.Component == MessageComponentType.Key - ? schemaRegistryClient.ConstructKeySubjectName(context.Topic, fullname) - : schemaRegistryClient.ConstructValueSubjectName(context.Topic, fullname); - + subject = GetSubjectName(context.Topic, context.Component == MessageComponentType.Key, fullname); latestSchema = await GetReaderSchema(subject) .ConfigureAwait(continueOnCapturedContext: false); diff --git a/src/Confluent.SchemaRegistry/AsyncSerde.cs b/src/Confluent.SchemaRegistry/AsyncSerde.cs index f135c8eb4..082f2dfd9 100644 --- a/src/Confluent.SchemaRegistry/AsyncSerde.cs +++ b/src/Confluent.SchemaRegistry/AsyncSerde.cs @@ -63,6 +63,31 @@ protected AsyncSerde(ISchemaRegistryClient schemaRegistryClient, SerdeConfig con } } + protected string GetSubjectName(string topic, bool isKey, string recordType) + { + try + { + string subject = this.subjectNameStrategy != null + // use the subject name strategy specified in the serializer config if available. + ? this.subjectNameStrategy( + new SerializationContext( + isKey ? MessageComponentType.Key : MessageComponentType.Value, + topic), + recordType) + // else fall back to the deprecated config from (or default as currently supplied by) SchemaRegistry. + : schemaRegistryClient == null + ? null + : isKey + ? schemaRegistryClient.ConstructKeySubjectName(topic, recordType) + : schemaRegistryClient.ConstructValueSubjectName(topic, recordType); + return subject; + } + catch (Exception e) + { + return null; + } + } + protected async Task<(Schema, TParsedSchema)> GetSchema(string subject, int writerId, string format = null) { Schema writerSchema = await schemaRegistryClient.GetSchemaBySubjectAndIdAsync(subject, writerId, format) diff --git a/src/Confluent.SchemaRegistry/Rest/RestService.cs b/src/Confluent.SchemaRegistry/Rest/RestService.cs index 94a1427b0..ef91a46a7 100644 --- a/src/Confluent.SchemaRegistry/Rest/RestService.cs +++ b/src/Confluent.SchemaRegistry/Rest/RestService.cs @@ -323,7 +323,7 @@ public async Task GetSchemaAsync(int id, string format) public async Task GetSchemaBySubjectAndIdAsync(string subject, int id, string format) => SanitizeSchema( - (await RequestAsync($"schemas/ids/{id}?subject={subject}{(format != null ? "&format=" + format : "")}", + (await RequestAsync($"schemas/ids/{id}?subject={(subject ?? "")}{(format != null ? "&format=" + format : "")}", HttpMethod.Get) .ConfigureAwait(continueOnCapturedContext: false))); diff --git a/test/Confluent.SchemaRegistry.Serdes.UnitTests/BaseSerializeDeserialize.cs b/test/Confluent.SchemaRegistry.Serdes.UnitTests/BaseSerializeDeserialize.cs index e6ca34ee2..8fff067a6 100644 --- a/test/Confluent.SchemaRegistry.Serdes.UnitTests/BaseSerializeDeserialize.cs +++ b/test/Confluent.SchemaRegistry.Serdes.UnitTests/BaseSerializeDeserialize.cs @@ -44,7 +44,7 @@ public BaseSerializeDeserializeTests() testTopic = "topic"; var schemaRegistryMock = new Mock(); schemaRegistryMock.Setup(x => x.ConstructValueSubjectName(testTopic, It.IsAny())).Returns($"{testTopic}-value"); - schemaRegistryMock.Setup(x => x.RegisterSchemaAsync("topic-value", It.IsAny(), It.IsAny())).ReturnsAsync( + schemaRegistryMock.Setup(x => x.RegisterSchemaAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync( (string topic, string schema, bool normalize) => store.TryGetValue(schema, out int id) ? id : store[schema] = store.Count + 1 ); schemaRegistryMock.Setup(x => x.GetSchemaBySubjectAndIdAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync( diff --git a/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs b/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs index 9bc74e529..ffe258bca 100644 --- a/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs +++ b/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs @@ -133,6 +133,50 @@ public void ISpecificRecord() var bytes = serializer.SerializeAsync(user, new SerializationContext(MessageComponentType.Value, testTopic)).Result; var result = deserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result; + Assert.Equal(user.name, result.name); + Assert.Equal(user.favorite_color, result.favorite_color); + Assert.Equal(user.favorite_number, result.favorite_number); + + // serialize second object + user = new User + { + favorite_color = "red", + favorite_number = 100, + name = "awesome" + }; + + bytes = serializer.SerializeAsync(user, new SerializationContext(MessageComponentType.Value, testTopic)).Result; + result = deserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result; + + Assert.Equal(user.name, result.name); + Assert.Equal(user.favorite_color, result.favorite_color); + Assert.Equal(user.favorite_number, result.favorite_number); + } + + [Fact] + public void ISpecificRecordRecordNameStrategy() + { + var serializerConfig = new AvroSerializerConfig + { + SubjectNameStrategy = SubjectNameStrategy.Record + }; + var serializer = new AvroSerializer(schemaRegistryClient, serializerConfig); + var deserializerConfig = new AvroDeserializerConfig + { + SubjectNameStrategy = SubjectNameStrategy.Record + }; + var deserializer = new AvroDeserializer(schemaRegistryClient, deserializerConfig); + + var user = new User + { + favorite_color = "blue", + favorite_number = 100, + name = "awesome" + }; + + var bytes = serializer.SerializeAsync(user, new SerializationContext(MessageComponentType.Value, testTopic)).Result; + var result = deserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result; + Assert.Equal(user.name, result.name); Assert.Equal(user.favorite_color, result.favorite_color); Assert.Equal(user.favorite_number, result.favorite_number);