Skip to content

Commit

Permalink
fix format
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Sep 20, 2022
1 parent 6fff010 commit 268ab4d
Showing 1 changed file with 34 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,35 +59,39 @@ public PayloadGenerator(Config config, Properties producerConfig) {
this.avroData = new AvroData(1);
this.avroSchema = config.schema();
this.connectSchema = avroData.toConnectSchema(config.schema());
this.converter = switch (this.format) {
case JSON -> {
var jsonConverter = new JsonConverter();
var schemasEnabled = producerConfig.getProperty("schemas.enabled", "false");
jsonConverter.configure(Map.of("schemas.enable", schemasEnabled, "converter.type", "value"));
yield jsonConverter;
}
case AVRO -> {
var avroConverter = new AvroConverter();
avroConverter.configure(
producerConfig.keySet().stream().collect(Collectors.toMap(String::valueOf, producerConfig::get)),
false);
yield avroConverter;
}
case PROTOBUF -> {
var avroConverter = new ProtobufConverter();
avroConverter.configure(
producerConfig.keySet().stream().collect(Collectors.toMap(String::valueOf, producerConfig::get)),
false);
yield avroConverter;
}
case JSON_SCHEMA -> {
var avroConverter = new JsonSchemaConverter();
avroConverter.configure(
producerConfig.keySet().stream().collect(Collectors.toMap(String::valueOf, producerConfig::get)),
false);
yield avroConverter;
}
};
this.converter =
switch (this.format) {
case JSON -> {
var jsonConverter = new JsonConverter();
var schemasEnabled = producerConfig.getProperty("schemas.enabled", "false");
jsonConverter.configure(Map.of("schemas.enable", schemasEnabled, "converter.type", "value"));
yield jsonConverter;
}
case AVRO -> {
var avroConverter = new AvroConverter();
avroConverter.configure(
producerConfig.keySet().stream().collect(Collectors.toMap(String::valueOf, producerConfig::get)),
false
);
yield avroConverter;
}
case PROTOBUF -> {
var avroConverter = new ProtobufConverter();
avroConverter.configure(
producerConfig.keySet().stream().collect(Collectors.toMap(String::valueOf, producerConfig::get)),
false
);
yield avroConverter;
}
case JSON_SCHEMA -> {
var avroConverter = new JsonSchemaConverter();
avroConverter.configure(
producerConfig.keySet().stream().collect(Collectors.toMap(String::valueOf, producerConfig::get)),
false
);
yield avroConverter;
}
};
}

public GenericRecord get() {
Expand Down Expand Up @@ -219,7 +223,7 @@ public enum Format {
JSON,
AVRO,
JSON_SCHEMA,
PROTOBUF
PROTOBUF,
}

@SuppressWarnings("unchecked")
Expand Down

0 comments on commit 268ab4d

Please sign in to comment.