Skip to content

Commit

Permalink
Fix use of RecordNameStrategy (#40) (#2315)
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota authored Sep 18, 2024
1 parent e3bdba8 commit 3d5663d
Show file tree
Hide file tree
Showing 12 changed files with 135 additions and 97 deletions.
32 changes: 17 additions & 15 deletions src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,13 @@ public async Task<GenericRecord> Deserialize(string topic, Headers headers, byte
throw new InvalidDataException($"Expecting data framing of length 5 bytes or more but total data size is {array.Length} bytes");
}

string subject = this.subjectNameStrategy != null
// use the subject name strategy specified in the serializer config if available.
? this.subjectNameStrategy(
new SerializationContext(isKey ? MessageComponentType.Key : MessageComponentType.Value, topic),
null)
// else fall back to the deprecated config from (or default as currently supplied by) SchemaRegistry.
: schemaRegistryClient == null
? null
: isKey
? schemaRegistryClient.ConstructKeySubjectName(topic)
: schemaRegistryClient.ConstructValueSubjectName(topic);

Schema latestSchema = await GetReaderSchema(subject)
.ConfigureAwait(continueOnCapturedContext: false);
string subject = GetSubjectName(topic, isKey, null);
Schema latestSchema = null;
if (subject != null)
{
latestSchema = await GetReaderSchema(subject)
.ConfigureAwait(continueOnCapturedContext: false);
}

Schema writerSchemaJson;
Avro.Schema writerSchema;
Expand All @@ -101,7 +94,16 @@ public async Task<GenericRecord> Deserialize(string topic, Headers headers, byte
var writerId = IPAddress.NetworkToHostOrder(reader.ReadInt32());

(writerSchemaJson, writerSchema) = await GetSchema(subject, writerId);

if (subject == null)
{
subject = GetSubjectName(topic, isKey, writerSchema.Fullname);
if (subject != null)
{
latestSchema = await GetReaderSchema(subject)
.ConfigureAwait(continueOnCapturedContext: false);
}
}

if (latestSchema != null)
{
migrations = await GetMigrations(subject, writerSchemaJson, latestSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,7 @@ public async Task<byte[]> Serialize(string topic, Headers headers, GenericRecord
// better to use hash functions based on the writerSchemaString
// object reference, not value.

subject = this.subjectNameStrategy != null
// use the subject name strategy specified in the serializer config if available.
? this.subjectNameStrategy(new SerializationContext(isKey ? MessageComponentType.Key : MessageComponentType.Value, topic), data.Schema.Fullname)
// else fall back to the deprecated config from (or default as currently supplied by) SchemaRegistry.
: isKey
? schemaRegistryClient.ConstructKeySubjectName(topic, data.Schema.Fullname)
: schemaRegistryClient.ConstructValueSubjectName(topic, data.Schema.Fullname);

subject = GetSubjectName(topic, isKey, data.Schema.Fullname);
latestSchema = await GetReaderSchema(subject)
.ConfigureAwait(continueOnCapturedContext: false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,23 +121,17 @@ public async Task<T> Deserialize(string topic, Headers headers, byte[] array, bo

if (array.Length < 5)
{
throw new InvalidDataException($"Expecting data framing of length 5 bytes or more but total data size is {array.Length} bytes");
throw new InvalidDataException(
$"Expecting data framing of length 5 bytes or more but total data size is {array.Length} bytes");
}

string subject = this.subjectNameStrategy != null
// use the subject name strategy specified in the serializer config if available.
? this.subjectNameStrategy(
new SerializationContext(isKey ? MessageComponentType.Key : MessageComponentType.Value, topic),
null)
// else fall back to the deprecated config from (or default as currently supplied by) SchemaRegistry.
: schemaRegistryClient == null
? null
: isKey
? schemaRegistryClient.ConstructKeySubjectName(topic)
: schemaRegistryClient.ConstructValueSubjectName(topic);

Schema latestSchema = await GetReaderSchema(subject)
.ConfigureAwait(continueOnCapturedContext: false);
string subject = GetSubjectName(topic, isKey, null);
Schema latestSchema = null;
if (subject != null)
{
latestSchema = await GetReaderSchema(subject)
.ConfigureAwait(continueOnCapturedContext: false);
}

Schema writerSchemaJson = null;
Avro.Schema writerSchema = null;
Expand All @@ -154,6 +148,15 @@ public async Task<T> Deserialize(string topic, Headers headers, byte[] array, bo
var writerId = IPAddress.NetworkToHostOrder(reader.ReadInt32());

(writerSchemaJson, writerSchema) = await GetSchema(subject, writerId);
if (subject == null)
{
subject = GetSubjectName(topic, isKey, writerSchema.Fullname);
if (subject != null)
{
latestSchema = await GetReaderSchema(subject)
.ConfigureAwait(continueOnCapturedContext: false);
}
}

if (latestSchema != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,7 @@ public async Task<byte[]> Serialize(string topic, Headers headers, T data, bool
fullname = ((Avro.RecordSchema)((ISpecificRecord)data).Schema).Fullname;
}

subject = this.subjectNameStrategy != null
// use the subject name strategy specified in the serializer config if available.
? this.subjectNameStrategy(new SerializationContext(isKey ? MessageComponentType.Key : MessageComponentType.Value, topic), fullname)
// else fall back to the deprecated config from (or default as currently supplied by) SchemaRegistry.
: isKey
? schemaRegistryClient.ConstructKeySubjectName(topic, fullname)
: schemaRegistryClient.ConstructValueSubjectName(topic, fullname);

subject = GetSubjectName(topic, isKey, fullname);
latestSchema = await GetReaderSchema(subject)
.ConfigureAwait(continueOnCapturedContext: false);

Expand Down
44 changes: 23 additions & 21 deletions src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ namespace Confluent.SchemaRegistry.Serdes
public class JsonDeserializer<T> : AsyncDeserializer<T, JsonSchema> where T : class
{
private readonly JsonSchemaGeneratorSettings jsonSchemaGeneratorSettings;

private JsonSchemaValidator validator = new JsonSchemaValidator();

private JsonSchema schema = null;

/// <summary>
/// Initialize a new JsonDeserializer instance.
/// </summary>
Expand All @@ -74,12 +74,12 @@ public JsonDeserializer(IEnumerable<KeyValuePair<string, string>> config = null,
{
}

public JsonDeserializer(ISchemaRegistryClient schemaRegistryClient, IEnumerable<KeyValuePair<string, string>> config = null, JsonSchemaGeneratorSettings jsonSchemaGeneratorSettings = null)
public JsonDeserializer(ISchemaRegistryClient schemaRegistryClient, IEnumerable<KeyValuePair<string, string>> config = null, JsonSchemaGeneratorSettings jsonSchemaGeneratorSettings = null)
: this(schemaRegistryClient, config != null ? new JsonDeserializerConfig(config) : null, jsonSchemaGeneratorSettings)
{
}

public JsonDeserializer(ISchemaRegistryClient schemaRegistryClient, JsonDeserializerConfig config,
public JsonDeserializer(ISchemaRegistryClient schemaRegistryClient, JsonDeserializerConfig config,
JsonSchemaGeneratorSettings jsonSchemaGeneratorSettings = null, RuleRegistry ruleRegistry = null)
: base(schemaRegistryClient, config, ruleRegistry)
{
Expand Down Expand Up @@ -108,7 +108,7 @@ public JsonDeserializer(ISchemaRegistryClient schemaRegistryClient, JsonDeserial
/// </param>
/// <param name="schema">
/// Schema to use for validation, used when external
/// schema references are present in the schema.
/// schema references are present in the schema.
/// Populate the References list of the schema for
/// the same. Assuming the referenced schemas have
/// already been registered in the registry.
Expand Down Expand Up @@ -155,24 +155,17 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i
{
throw new InvalidDataException($"Expecting data framing of length 6 bytes or more but total data size is {array.Length} bytes");
}

bool isKey = context.Component == MessageComponentType.Key;
string topic = context.Topic;
string subject = this.subjectNameStrategy != null
// use the subject name strategy specified in the serializer config if available.
? this.subjectNameStrategy(
new SerializationContext(isKey ? MessageComponentType.Key : MessageComponentType.Value, topic),
null)
// else fall back to the deprecated config from (or default as currently supplied by) SchemaRegistry.
: schemaRegistryClient == null
? null
: isKey
? schemaRegistryClient.ConstructKeySubjectName(topic)
: schemaRegistryClient.ConstructValueSubjectName(topic);

Schema latestSchema = await GetReaderSchema(subject)
.ConfigureAwait(continueOnCapturedContext: false);

string subject = GetSubjectName(topic, isKey, null);
Schema latestSchema = null;
if (subject != null)
{
latestSchema = await GetReaderSchema(subject)
.ConfigureAwait(continueOnCapturedContext: false);
}

try
{
Schema writerSchema = null;
Expand All @@ -193,6 +186,15 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i
if (schemaRegistryClient != null)
{
(writerSchema, writerSchemaJson) = await GetSchema(subject, writerId);
if (subject == null)
{
subject = GetSubjectName(topic, isKey, writerSchemaJson.Title);
if (subject != null)
{
latestSchema = await GetReaderSchema(subject)
.ConfigureAwait(continueOnCapturedContext: false);
}
}
}

if (latestSchema != null)
Expand Down
9 changes: 1 addition & 8 deletions src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,7 @@ public override async Task<byte[]> SerializeAsync(T value, SerializationContext
await serdeMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
try
{
subject = this.subjectNameStrategy != null
// use the subject name strategy specified in the serializer config if available.
? this.subjectNameStrategy(context, this.schemaFullname)
// else fall back to the deprecated config from (or default as currently supplied by) SchemaRegistry.
: context.Component == MessageComponentType.Key
? schemaRegistryClient.ConstructKeySubjectName(context.Topic, this.schemaFullname)
: schemaRegistryClient.ConstructValueSubjectName(context.Topic, this.schemaFullname);

subject = GetSubjectName(context.Topic, context.Component == MessageComponentType.Key, this.schemaFullname);
latestSchema = await GetReaderSchema(subject, new Schema(schemaText, ReferenceList, SchemaType.Json))
.ConfigureAwait(continueOnCapturedContext: false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,8 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i

bool isKey = context.Component == MessageComponentType.Key;
string topic = context.Topic;
string subject = this.subjectNameStrategy != null
// use the subject name strategy specified in the serializer config if available.
? this.subjectNameStrategy(
new SerializationContext(isKey ? MessageComponentType.Key : MessageComponentType.Value, topic),
null)
// else fall back to the deprecated config from (or default as currently supplied by) SchemaRegistry.
: schemaRegistryClient == null
? null
: isKey
? schemaRegistryClient.ConstructKeySubjectName(topic)
: schemaRegistryClient.ConstructValueSubjectName(topic);

string subject = GetSubjectName(topic, isKey, null);

// Currently Protobuf does not support migration rules because of lack of support for DynamicMessage
// See https://github.com/protocolbuffers/protobuf/issues/658
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,7 @@ public override async Task<byte[]> SerializeAsync(T value, SerializationContext
await serdeMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
try
{
subject = this.subjectNameStrategy != null
// use the subject name strategy specified in the serializer config if available.
? this.subjectNameStrategy(context, fullname)
// else fall back to the deprecated config from (or default as currently supplied by) SchemaRegistry.
: context.Component == MessageComponentType.Key
? schemaRegistryClient.ConstructKeySubjectName(context.Topic, fullname)
: schemaRegistryClient.ConstructValueSubjectName(context.Topic, fullname);

subject = GetSubjectName(context.Topic, context.Component == MessageComponentType.Key, fullname);
latestSchema = await GetReaderSchema(subject)
.ConfigureAwait(continueOnCapturedContext: false);

Expand Down
25 changes: 25 additions & 0 deletions src/Confluent.SchemaRegistry/AsyncSerde.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,31 @@ protected AsyncSerde(ISchemaRegistryClient schemaRegistryClient, SerdeConfig con
}
}

protected string GetSubjectName(string topic, bool isKey, string recordType)
{
try
{
string subject = this.subjectNameStrategy != null
// use the subject name strategy specified in the serializer config if available.
? this.subjectNameStrategy(
new SerializationContext(
isKey ? MessageComponentType.Key : MessageComponentType.Value,
topic),
recordType)
// else fall back to the deprecated config from (or default as currently supplied by) SchemaRegistry.
: schemaRegistryClient == null
? null
: isKey
? schemaRegistryClient.ConstructKeySubjectName(topic, recordType)
: schemaRegistryClient.ConstructValueSubjectName(topic, recordType);
return subject;
}
catch (Exception e)
{
return null;
}
}

protected async Task<(Schema, TParsedSchema)> GetSchema(string subject, int writerId, string format = null)
{
Schema writerSchema = await schemaRegistryClient.GetSchemaBySubjectAndIdAsync(subject, writerId, format)
Expand Down
2 changes: 1 addition & 1 deletion src/Confluent.SchemaRegistry/Rest/RestService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ public async Task<Schema> GetSchemaAsync(int id, string format)

public async Task<Schema> GetSchemaBySubjectAndIdAsync(string subject, int id, string format)
=> SanitizeSchema(
(await RequestAsync<Schema>($"schemas/ids/{id}?subject={subject}{(format != null ? "&format=" + format : "")}",
(await RequestAsync<Schema>($"schemas/ids/{id}?subject={(subject ?? "")}{(format != null ? "&format=" + format : "")}",
HttpMethod.Get)
.ConfigureAwait(continueOnCapturedContext: false)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public BaseSerializeDeserializeTests()
testTopic = "topic";
var schemaRegistryMock = new Mock<ISchemaRegistryClient>();
schemaRegistryMock.Setup(x => x.ConstructValueSubjectName(testTopic, It.IsAny<string>())).Returns($"{testTopic}-value");
schemaRegistryMock.Setup(x => x.RegisterSchemaAsync("topic-value", It.IsAny<string>(), It.IsAny<bool>())).ReturnsAsync(
schemaRegistryMock.Setup(x => x.RegisterSchemaAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>())).ReturnsAsync(
(string topic, string schema, bool normalize) => store.TryGetValue(schema, out int id) ? id : store[schema] = store.Count + 1
);
schemaRegistryMock.Setup(x => x.GetSchemaBySubjectAndIdAsync(It.IsAny<string>(), It.IsAny<int>(), It.IsAny<string>())).ReturnsAsync(
Expand Down
Loading

0 comments on commit 3d5663d

Please sign in to comment.