Skip to content

Commit

Permalink
Add normalize.schemas config to SR client (#1874)
Browse files Browse the repository at this point in the history
* Add normalize.schemas config to SR client

* Fix tests

* Fix tests

* Incorporate review feedback

* Remove extra line
  • Loading branch information
rayokota authored Aug 15, 2022
1 parent 89b7c98 commit 7567e89
Show file tree
Hide file tree
Showing 19 changed files with 200 additions and 58 deletions.
6 changes: 4 additions & 2 deletions src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ namespace Confluent.SchemaRegistry.Serdes
public class AvroSerializer<T> : IAsyncSerializer<T>
{
private bool autoRegisterSchema = true;
private bool normalizeSchemas = false;
private bool useLatestVersion = false;
private int initialBufferSize = DefaultInitialBufferSize;
private SubjectNameStrategyDelegate subjectNameStrategy = null;
Expand Down Expand Up @@ -111,6 +112,7 @@ public AvroSerializer(ISchemaRegistryClient schemaRegistryClient, AvroSerializer

if (config.BufferBytes != null) { this.initialBufferSize = config.BufferBytes.Value; }
if (config.AutoRegisterSchemas != null) { this.autoRegisterSchema = config.AutoRegisterSchemas.Value; }
if (config.NormalizeSchemas != null) { this.normalizeSchemas = config.NormalizeSchemas.Value; }
if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); }

Expand Down Expand Up @@ -154,8 +156,8 @@ public async Task<byte[]> SerializeAsync(T value, SerializationContext context)
if (serializerImpl == null)
{
serializerImpl = typeof(T) == typeof(GenericRecord)
? (IAvroSerializerImpl<T>)new GenericSerializerImpl(schemaRegistryClient, autoRegisterSchema, useLatestVersion, initialBufferSize, subjectNameStrategy)
: new SpecificSerializerImpl<T>(schemaRegistryClient, autoRegisterSchema, useLatestVersion, initialBufferSize, subjectNameStrategy);
? (IAvroSerializerImpl<T>)new GenericSerializerImpl(schemaRegistryClient, autoRegisterSchema, normalizeSchemas, useLatestVersion, initialBufferSize, subjectNameStrategy)
: new SpecificSerializerImpl<T>(schemaRegistryClient, autoRegisterSchema, normalizeSchemas, useLatestVersion, initialBufferSize, subjectNameStrategy);
}

return await serializerImpl.Serialize(context.Topic, value, context.Component == MessageComponentType.Key).ConfigureAwait(continueOnCapturedContext: false);
Expand Down
21 changes: 21 additions & 0 deletions src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializerConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ public static class PropertyNames
/// </summary>
public const string AutoRegisterSchemas = "avro.serializer.auto.register.schemas";

/// <summary>
/// Specifies whether to normalize schemas, which will transform schemas
/// to have a consistent format, including ordering properties and references.
///
/// default: false
/// </summary>
public const string NormalizeSchemas = "avro.serializer.normalize.schemas";

/// <summary>
/// Specifies whether or not the Avro serializer should use the latest subject
/// version for serialization.
Expand Down Expand Up @@ -113,6 +121,19 @@ public bool? AutoRegisterSchemas
}


/// <summary>
/// Specifies whether to normalize schemas, which will transform schemas
/// to have a consistent format, including ordering properties and references.
///
/// default: false
/// </summary>
public bool? NormalizeSchemas
{
get { return GetBool(PropertyNames.NormalizeSchemas); }
set { SetObject(PropertyNames.NormalizeSchemas, value); }
}


/// <summary>
/// Specifies whether or not the Avro serializer should use the latest subject
/// version for serialization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ internal class GenericSerializerImpl : IAvroSerializerImpl<GenericRecord>
{
private ISchemaRegistryClient schemaRegistryClient;
private bool autoRegisterSchema;
private bool normalizeSchemas;
private bool useLatestVersion;
private int initialBufferSize;
private SubjectNameStrategyDelegate subjectNameStrategy;
Expand All @@ -47,12 +48,14 @@ internal class GenericSerializerImpl : IAvroSerializerImpl<GenericRecord>
public GenericSerializerImpl(
ISchemaRegistryClient schemaRegistryClient,
bool autoRegisterSchema,
bool normalizeSchemas,
bool useLatestVersion,
int initialBufferSize,
SubjectNameStrategyDelegate subjectNameStrategy)
{
this.schemaRegistryClient = schemaRegistryClient;
this.autoRegisterSchema = autoRegisterSchema;
this.normalizeSchemas = normalizeSchemas;
this.useLatestVersion = useLatestVersion;
this.initialBufferSize = initialBufferSize;
this.subjectNameStrategy = subjectNameStrategy;
Expand Down Expand Up @@ -146,12 +149,12 @@ public async Task<byte[]> Serialize(string topic, GenericRecord data, bool isKey
if (autoRegisterSchema)
{
newSchemaId = await schemaRegistryClient
.RegisterSchemaAsync(subject, writerSchemaString)
.RegisterSchemaAsync(subject, writerSchemaString, normalizeSchemas)
.ConfigureAwait(continueOnCapturedContext: false);
}
else
{
newSchemaId = await schemaRegistryClient.GetSchemaIdAsync(subject, writerSchemaString)
newSchemaId = await schemaRegistryClient.GetSchemaIdAsync(subject, writerSchemaString, normalizeSchemas)
.ConfigureAwait(continueOnCapturedContext: false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public SpecificWriter<T> AvroWriter

private ISchemaRegistryClient schemaRegistryClient;
private bool autoRegisterSchema;
private bool normalizeSchemas;
private bool useLatestVersion;
private int initialBufferSize;
private SubjectNameStrategyDelegate subjectNameStrategy;
Expand All @@ -98,12 +99,14 @@ public SpecificWriter<T> AvroWriter
public SpecificSerializerImpl(
ISchemaRegistryClient schemaRegistryClient,
bool autoRegisterSchema,
bool normalizeSchemas,
bool useLatestVersion,
int initialBufferSize,
SubjectNameStrategyDelegate subjectNameStrategy)
{
this.schemaRegistryClient = schemaRegistryClient;
this.autoRegisterSchema = autoRegisterSchema;
this.normalizeSchemas = normalizeSchemas;
this.useLatestVersion = useLatestVersion;
this.initialBufferSize = initialBufferSize;
this.subjectNameStrategy = subjectNameStrategy;
Expand Down Expand Up @@ -222,10 +225,10 @@ public async Task<byte[]> Serialize(string topic, T data, bool isKey)
// first usage: register/get schema to check compatibility
currentSchemaData.WriterSchemaId = autoRegisterSchema
? await schemaRegistryClient
.RegisterSchemaAsync(subject, currentSchemaData.WriterSchemaString)
.RegisterSchemaAsync(subject, currentSchemaData.WriterSchemaString, normalizeSchemas)
.ConfigureAwait(continueOnCapturedContext: false)
: await schemaRegistryClient
.GetSchemaIdAsync(subject, currentSchemaData.WriterSchemaString)
.GetSchemaIdAsync(subject, currentSchemaData.WriterSchemaString, normalizeSchemas)
.ConfigureAwait(continueOnCapturedContext: false);
}

Expand Down
6 changes: 4 additions & 2 deletions src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class JsonSerializer<T> : IAsyncSerializer<T> where T : class
private const int DefaultInitialBufferSize = 1024;

private bool autoRegisterSchema = true;
private bool normalizeSchemas = false;
private bool useLatestVersion = false;
private int initialBufferSize = DefaultInitialBufferSize;
private SubjectNameStrategyDelegate subjectNameStrategy = null;
Expand Down Expand Up @@ -113,6 +114,7 @@ public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, JsonSerializer

if (config.BufferBytes != null) { this.initialBufferSize = config.BufferBytes.Value; }
if (config.AutoRegisterSchemas != null) { this.autoRegisterSchema = config.AutoRegisterSchemas.Value; }
if (config.NormalizeSchemas != null) { this.normalizeSchemas = config.NormalizeSchemas.Value; }
if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); }

Expand Down Expand Up @@ -180,10 +182,10 @@ public async Task<byte[]> SerializeAsync(T value, SerializationContext context)
// first usage: register/get schema to check compatibility
schemaId = autoRegisterSchema
? await schemaRegistryClient.RegisterSchemaAsync(subject,
new Schema(this.schemaText, EmptyReferencesList, SchemaType.Json))
new Schema(this.schemaText, EmptyReferencesList, SchemaType.Json), normalizeSchemas)
.ConfigureAwait(continueOnCapturedContext: false)
: await schemaRegistryClient.GetSchemaIdAsync(subject,
new Schema(this.schemaText, EmptyReferencesList, SchemaType.Json))
new Schema(this.schemaText, EmptyReferencesList, SchemaType.Json), normalizeSchemas)
.ConfigureAwait(continueOnCapturedContext: false);

// TODO: It may be better to fail fast if conflicting values for schemaId are seen here.
Expand Down
21 changes: 21 additions & 0 deletions src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializerConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ public static class PropertyNames
/// </summary>
public const string AutoRegisterSchemas = "json.serializer.auto.register.schemas";

/// <summary>
/// Specifies whether to normalize schemas, which will transform schemas
/// to have a consistent format, including ordering properties and references.
///
/// default: false
/// </summary>
public const string NormalizeSchemas = "json.serializer.normalize.schemas";

/// <summary>
/// Specifies whether or not the JSON serializer should use the latest subject
/// version for serialization.
Expand Down Expand Up @@ -113,6 +121,19 @@ public bool? AutoRegisterSchemas
}


/// <summary>
/// Specifies whether to normalize schemas, which will transform schemas
/// to have a consistent format, including ordering properties and references.
///
/// default: false
/// </summary>
public bool? NormalizeSchemas
{
get { return GetBool(PropertyNames.NormalizeSchemas); }
set { SetObject(PropertyNames.NormalizeSchemas, value); }
}


/// <summary>
/// Specifies whether or not the JSON serializer should use the latest subject
/// version for serialization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ namespace Confluent.SchemaRegistry.Serdes
private const int DefaultInitialBufferSize = 1024;

private bool autoRegisterSchema = true;
private bool normalizeSchemas = false;
private bool useLatestVersion = false;
private bool skipKnownTypes = false;
private bool useDeprecatedFormat = false;
Expand Down Expand Up @@ -96,6 +97,7 @@ public ProtobufSerializer(ISchemaRegistryClient schemaRegistryClient, ProtobufSe

if (config.BufferBytes != null) { this.initialBufferSize = config.BufferBytes.Value; }
if (config.AutoRegisterSchemas != null) { this.autoRegisterSchema = config.AutoRegisterSchemas.Value; }
if (config.NormalizeSchemas != null) { this.normalizeSchemas = config.NormalizeSchemas.Value; }
if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
if (config.SkipKnownTypes != null) { this.skipKnownTypes = config.SkipKnownTypes.Value; }
if (config.UseDeprecatedFormat != null) { this.useDeprecatedFormat = config.UseDeprecatedFormat.Value; }
Expand Down Expand Up @@ -207,9 +209,9 @@ private async Task<List<SchemaReference>> RegisterOrGetReferences(FileDescriptor
var subject = referenceSubjectNameStrategy(context, dependency.Name);
var schema = new Schema(dependency.SerializedData.ToBase64(), dependencyReferences, SchemaType.Protobuf);
var schemaId = autoRegisterSchema
? await schemaRegistryClient.RegisterSchemaAsync(subject, schema).ConfigureAwait(continueOnCapturedContext: false)
: await schemaRegistryClient.GetSchemaIdAsync(subject, schema).ConfigureAwait(continueOnCapturedContext: false);
var registeredDependentSchema = await schemaRegistryClient.LookupSchemaAsync(subject, schema, true).ConfigureAwait(continueOnCapturedContext: false);
? await schemaRegistryClient.RegisterSchemaAsync(subject, schema, normalizeSchemas).ConfigureAwait(continueOnCapturedContext: false)
: await schemaRegistryClient.GetSchemaIdAsync(subject, schema, normalizeSchemas).ConfigureAwait(continueOnCapturedContext: false);
var registeredDependentSchema = await schemaRegistryClient.LookupSchemaAsync(subject, schema, true, normalizeSchemas).ConfigureAwait(continueOnCapturedContext: false);
return new SchemaReference(dependency.Name, subject, registeredDependentSchema.Version);
};
tasks.Add(t(fileDescriptor));
Expand Down Expand Up @@ -288,11 +290,11 @@ await RegisterOrGetReferences(value.Descriptor.File, context, autoRegisterSchema
schemaId = autoRegisterSchema
? await schemaRegistryClient.RegisterSchemaAsync(subject,
new Schema(value.Descriptor.File.SerializedData.ToBase64(), references,
SchemaType.Protobuf))
SchemaType.Protobuf), normalizeSchemas)
.ConfigureAwait(continueOnCapturedContext: false)
: await schemaRegistryClient.GetSchemaIdAsync(subject,
new Schema(value.Descriptor.File.SerializedData.ToBase64(), references,
SchemaType.Protobuf))
SchemaType.Protobuf), normalizeSchemas)
.ConfigureAwait(continueOnCapturedContext: false);

// note: different values for schemaId should never be seen here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ public static class PropertyNames
/// </summary>
public const string AutoRegisterSchemas = "protobuf.serializer.auto.register.schemas";

/// <summary>
/// Specifies whether to normalize schemas, which will transform schemas
/// to have a consistent format, including ordering properties and references.
///
/// default: false
/// </summary>
public const string NormalizeSchemas = "protobuf.serializer.normalize.schemas";

/// <summary>
/// Specifies whether or not the Protobuf serializer should use the latest subject
/// version for serialization.
Expand Down Expand Up @@ -133,6 +141,19 @@ public bool? AutoRegisterSchemas
set { SetObject(PropertyNames.AutoRegisterSchemas, value); }
}


/// <summary>
/// Specifies whether to normalize schemas, which will transform schemas
/// to have a consistent format, including ordering properties and references.
///
/// default: false
/// </summary>
public bool? NormalizeSchemas
{
get { return GetBool(PropertyNames.NormalizeSchemas); }
set { SetObject(PropertyNames.NormalizeSchemas, value); }
}


/// <summary>
/// Specifies whether or not the Protobuf serializer should use the latest subject
Expand Down
Loading

0 comments on commit 7567e89

Please sign in to comment.