diff --git a/bom/application/pom.xml b/bom/application/pom.xml
index 29dd585205c7c..67b9b4692260f 100644
--- a/bom/application/pom.xml
+++ b/bom/application/pom.xml
@@ -1402,6 +1402,16 @@
quarkus-confluent-registry-avro
${project.version}
+
+ io.quarkus
+ quarkus-confluent-registry-json-schema
+ ${project.version}
+
+
+ io.quarkus
+ quarkus-confluent-registry-json-schema-deployment
+ ${project.version}
+
io.quarkus
quarkus-confluent-registry-avro-deployment
@@ -3397,7 +3407,6 @@
apicurio-common-rest-client-vertx
${apicurio-common-rest-client.version}
-
io.quarkus
quarkus-mutiny
diff --git a/core/deployment/src/main/java/io/quarkus/deployment/Feature.java b/core/deployment/src/main/java/io/quarkus/deployment/Feature.java
index 3c7fa9726a664..69c785ab424aa 100644
--- a/core/deployment/src/main/java/io/quarkus/deployment/Feature.java
+++ b/core/deployment/src/main/java/io/quarkus/deployment/Feature.java
@@ -19,6 +19,7 @@ public enum Feature {
CDI,
CONFIG_YAML,
CONFLUENT_REGISTRY_AVRO,
+ CONFLUENT_REGISTRY_JSON,
ELASTICSEARCH_REST_CLIENT_COMMON,
ELASTICSEARCH_REST_CLIENT,
ELASTICSEARCH_REST_HIGH_LEVEL_CLIENT,
diff --git a/devtools/bom-descriptor-json/pom.xml b/devtools/bom-descriptor-json/pom.xml
index 66b67abc53c13..fdbe228c16269 100644
--- a/devtools/bom-descriptor-json/pom.xml
+++ b/devtools/bom-descriptor-json/pom.xml
@@ -330,6 +330,19 @@
+
+ io.quarkus
+ quarkus-confluent-registry-json-schema
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
io.quarkus
quarkus-container-image
diff --git a/docs/pom.xml b/docs/pom.xml
index b97445b213b81..fb3527a65497f 100644
--- a/docs/pom.xml
+++ b/docs/pom.xml
@@ -346,6 +346,19 @@
+
+ io.quarkus
+ quarkus-confluent-registry-json-schema-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
io.quarkus
quarkus-container-image-deployment
diff --git a/extensions/schema-registry/confluent/json-schema/deployment/pom.xml b/extensions/schema-registry/confluent/json-schema/deployment/pom.xml
new file mode 100644
index 0000000000000..94501c8f5d7c0
--- /dev/null
+++ b/extensions/schema-registry/confluent/json-schema/deployment/pom.xml
@@ -0,0 +1,49 @@
+
+
+ 4.0.0
+
+
+ io.quarkus
+ quarkus-confluent-registry-json-schema-parent
+ 999-SNAPSHOT
+
+
+ quarkus-confluent-registry-json-schema-deployment
+ Quarkus - Confluent Schema Registry - Json Schema - Deployment
+
+
+
+ io.quarkus
+ quarkus-confluent-registry-json-schema
+
+
+
+ io.quarkus
+ quarkus-confluent-registry-common-deployment
+
+
+ io.quarkus
+ quarkus-schema-registry-devservice-deployment
+
+
+
+
+
+
+ maven-compiler-plugin
+
+
+
+ io.quarkus
+ quarkus-extension-processor
+ ${project.version}
+
+
+
+
+
+
+
+
diff --git a/extensions/schema-registry/confluent/json-schema/deployment/src/main/java/io/quarkus/confluent/registry/json/ConfluentRegistryJsonProcessor.java b/extensions/schema-registry/confluent/json-schema/deployment/src/main/java/io/quarkus/confluent/registry/json/ConfluentRegistryJsonProcessor.java
new file mode 100644
index 0000000000000..0edb834b39656
--- /dev/null
+++ b/extensions/schema-registry/confluent/json-schema/deployment/src/main/java/io/quarkus/confluent/registry/json/ConfluentRegistryJsonProcessor.java
@@ -0,0 +1,76 @@
+package io.quarkus.confluent.registry.json;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+import org.jboss.logging.Logger;
+
+import io.quarkus.deployment.Feature;
+import io.quarkus.deployment.annotations.BuildProducer;
+import io.quarkus.deployment.annotations.BuildStep;
+import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem;
+import io.quarkus.deployment.builditem.FeatureBuildItem;
+import io.quarkus.deployment.builditem.nativeimage.NativeImageConfigBuildItem;
+import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
+import io.quarkus.deployment.pkg.builditem.CurateOutcomeBuildItem;
+import io.quarkus.maven.dependency.ResolvedDependency;
+
+public class ConfluentRegistryJsonProcessor {
+
+ public static final String CONFLUENT_GROUP_ID = "io.confluent";
+ public static final String CONFLUENT_ARTIFACT_ID = "kafka-json-schema-serializer";
+
+ private static final Logger LOGGER = Logger.getLogger(ConfluentRegistryJsonProcessor.class.getName());
+ public static final String CONFLUENT_REPO = "https://packages.confluent.io/maven/";
+ public static final String GUIDE_URL = "https://quarkus.io/guides/kafka-schema-registry-json-schema";
+
+ @BuildStep
+ FeatureBuildItem featureAndCheckDependency(CurateOutcomeBuildItem cp) {
+ if (findConfluentSerde(cp.getApplicationModel().getDependencies()).isEmpty()) {
+ LOGGER.warnf("The application uses the `quarkus-confluent-registry-json-schema` extension, but does not " +
+ "depend on `%s:%s`. Note that this dependency is only available from the `%s` Maven " +
+ "repository. Check %s for more details.",
+ CONFLUENT_GROUP_ID, CONFLUENT_ARTIFACT_ID, CONFLUENT_REPO, GUIDE_URL);
+ }
+
+ return new FeatureBuildItem(Feature.CONFLUENT_REGISTRY_JSON);
+ }
+
+ @BuildStep
+ public void confluentRegistryJson(BuildProducer reflectiveClass,
+ BuildProducer sslNativeSupport) {
+ reflectiveClass
+ .produce(ReflectiveClassBuildItem.builder("io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer",
+ "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer").methods().build());
+ }
+
+ @BuildStep
+ public void configureNative(BuildProducer config, CurateOutcomeBuildItem cp) {
+ Optional serde = findConfluentSerde(cp.getApplicationModel().getDependencies());
+ if (serde.isPresent()) {
+ String version = serde.get().getVersion();
+ if (version.startsWith("7.1") || version.startsWith("7.2")) {
+ // Only required for Confluent Serde 7.1.x and 7.2.x
+ config.produce(NativeImageConfigBuildItem.builder()
+ .addRuntimeInitializedClass("io.confluent.kafka.schemaregistry.client.rest.utils.UrlList")
+ .build());
+ }
+ }
+ }
+
+ @BuildStep
+ ExtensionSslNativeSupportBuildItem enableSslInNative() {
+ return new ExtensionSslNativeSupportBuildItem(Feature.CONFLUENT_REGISTRY_JSON);
+ }
+
+ private Optional findConfluentSerde(Collection dependencies) {
+ return dependencies.stream().filter(new Predicate() {
+ @Override
+ public boolean test(ResolvedDependency rd) {
+ return rd.getGroupId().equalsIgnoreCase(CONFLUENT_GROUP_ID)
+ && rd.getArtifactId().equalsIgnoreCase(CONFLUENT_ARTIFACT_ID);
+ }
+ }).findAny();
+ }
+}
diff --git a/extensions/schema-registry/confluent/json-schema/pom.xml b/extensions/schema-registry/confluent/json-schema/pom.xml
new file mode 100644
index 0000000000000..cdfaed3577c55
--- /dev/null
+++ b/extensions/schema-registry/confluent/json-schema/pom.xml
@@ -0,0 +1,21 @@
+
+
+
+ quarkus-confluent-registry-parent
+ io.quarkus
+ 999-SNAPSHOT
+ ../pom.xml
+
+
+ 4.0.0
+ quarkus-confluent-registry-json-schema-parent
+ Quarkus - Confluent Schema Registry - Json Schema
+ pom
+
+
+ deployment
+ runtime
+
+
diff --git a/extensions/schema-registry/confluent/json-schema/runtime/pom.xml b/extensions/schema-registry/confluent/json-schema/runtime/pom.xml
new file mode 100644
index 0000000000000..c84a5d24fb394
--- /dev/null
+++ b/extensions/schema-registry/confluent/json-schema/runtime/pom.xml
@@ -0,0 +1,90 @@
+
+
+ 4.0.0
+
+
+ io.quarkus
+ quarkus-confluent-registry-json-schema-parent
+ 999-SNAPSHOT
+
+
+ quarkus-confluent-registry-json-schema
+ Quarkus - Confluent Schema Registry - Json Schema - Runtime
+ Use Confluent as Json Schema schema registry
+
+
+
+
+ io.quarkus
+ quarkus-confluent-registry-common
+
+
+ io.quarkus
+ quarkus-schema-registry-devservice
+
+
+ io.confluent
+ kafka-json-schema-serializer
+ 7.5.1
+
+
+ org.checkerframework
+ checker-qual
+
+
+ commons-logging
+ commons-logging
+
+
+ validation-api
+ javax.validation
+
+
+
+
+ org.graalvm.sdk
+ graal-sdk
+ provided
+
+
+
+
+
+ confluent
+ https://packages.confluent.io/maven/
+
+ false
+
+
+
+
+
+
+
+
+ io.quarkus
+ quarkus-extension-maven-plugin
+
+
+ io.quarkus.confluent.registry.json
+
+
+
+
+ maven-compiler-plugin
+
+
+
+ io.quarkus
+ quarkus-extension-processor
+ ${project.version}
+
+
+
+
+
+
+
+
diff --git a/extensions/schema-registry/confluent/json-schema/runtime/src/main/java/io/quarkus/confluent/registry/json/ConfluentJsonSubstitutions.java b/extensions/schema-registry/confluent/json-schema/runtime/src/main/java/io/quarkus/confluent/registry/json/ConfluentJsonSubstitutions.java
new file mode 100644
index 0000000000000..96bb8fee05ae9
--- /dev/null
+++ b/extensions/schema-registry/confluent/json-schema/runtime/src/main/java/io/quarkus/confluent/registry/json/ConfluentJsonSubstitutions.java
@@ -0,0 +1,72 @@
+package io.quarkus.confluent.registry.json;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.oracle.svm.core.annotate.Substitute;
+import com.oracle.svm.core.annotate.TargetClass;
+
+import io.confluent.kafka.schemaregistry.annotations.Schema;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
+import io.confluent.kafka.schemaregistry.json.JsonSchema;
+import io.confluent.kafka.schemaregistry.json.SpecificationVersion;
+
+@TargetClass(className = "io.confluent.kafka.schemaregistry.json.JsonSchemaUtils")
+final class Target_io_confluent_kafka_schemaregistry_json_JsonSchemaUtils {
+
+ @Substitute
+ public static JsonSchema getSchema(
+ Object object,
+ SpecificationVersion specVersion,
+ boolean useOneofForNullables,
+ boolean failUnknownProperties,
+ ObjectMapper objectMapper,
+ SchemaRegistryClient client) throws IOException {
+
+ if (object == null) {
+ return null;
+ }
+
+ Class> cls = object.getClass();
+ //We only support the scenario of having the schema defined in the annotation in the java bean, since it does not rely on outdated libraries.
+ if (cls.isAnnotationPresent(Schema.class)) {
+ Schema schema = cls.getAnnotation(Schema.class);
+ List references = Arrays.stream(schema.refs())
+ .map(new Function() {
+ @Override
+ public SchemaReference apply(
+ io.confluent.kafka.schemaregistry.annotations.SchemaReference schemaReference) {
+ return new SchemaReference(schemaReference.name(), schemaReference.subject(),
+ schemaReference.version());
+ }
+ })
+ .collect(Collectors.toList());
+ if (client == null) {
+ if (!references.isEmpty()) {
+ throw new IllegalArgumentException("Cannot resolve schema " + schema.value()
+ + " with refs " + references);
+ }
+ return new JsonSchema(schema.value());
+ } else {
+ return (JsonSchema) client.parseSchema(JsonSchema.TYPE, schema.value(), references)
+ .orElseThrow(new Supplier() {
+ @Override
+ public IOException get() {
+ return new IOException("Invalid schema " + schema.value()
+ + " with refs " + references);
+ }
+ });
+ }
+ }
+ return null;
+ }
+}
+
+class ConfluentJsonSubstitutions {
+}
diff --git a/extensions/schema-registry/confluent/json-schema/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/extensions/schema-registry/confluent/json-schema/runtime/src/main/resources/META-INF/quarkus-extension.yaml
new file mode 100644
index 0000000000000..46e9af1a164e4
--- /dev/null
+++ b/extensions/schema-registry/confluent/json-schema/runtime/src/main/resources/META-INF/quarkus-extension.yaml
@@ -0,0 +1,10 @@
+---
+artifact: ${project.groupId}:${project.artifactId}:${project.version}
+name: "Confluent Schema Registry - Json Schema"
+metadata:
+ keywords:
+ - "confluent"
+ - "json-schema"
+ categories:
+ - "serialization"
+ status: "preview"
diff --git a/extensions/schema-registry/confluent/pom.xml b/extensions/schema-registry/confluent/pom.xml
index 08e3f6c6262ee..f1f3fd770436f 100644
--- a/extensions/schema-registry/confluent/pom.xml
+++ b/extensions/schema-registry/confluent/pom.xml
@@ -15,8 +15,29 @@
Quarkus - Confluent Schema Registry
pom
+
+
+
+ joda-time
+ joda-time
+ 2.10.14
+
+
+ org.jetbrains.kotlin
+ kotlin-scripting-compiler-embeddable
+ 1.6.0
+
+
+ org.json
+ json
+ 20230227
+
+
+
+
common
avro
+ json-schema
diff --git a/integration-tests/kafka-json-schema-apicurio2/pom.xml b/integration-tests/kafka-json-schema-apicurio2/pom.xml
index 88a9216d54975..7fa3b388a9ff8 100644
--- a/integration-tests/kafka-json-schema-apicurio2/pom.xml
+++ b/integration-tests/kafka-json-schema-apicurio2/pom.xml
@@ -13,6 +13,26 @@
Quarkus - Integration Tests - Kafka Json Schema with Apicurio 2.x
The Apache Kafka Json Schema with Apicurio Registry 2.x integration tests module
+
+
+
+ joda-time
+ joda-time
+ 2.10.14
+
+
+ org.jetbrains.kotlin
+ kotlin-scripting-compiler-embeddable
+ 1.6.0
+
+
+ org.json
+ json
+ 20230227
+
+
+
+
io.quarkus
@@ -43,11 +63,16 @@
com.fasterxml.jackson.dataformat
jackson-dataformat-csv
-
+
+
io.quarkus
quarkus-apicurio-registry-json-schema
+
+ io.quarkus
+ quarkus-confluent-registry-json-schema
+
@@ -149,6 +174,19 @@
+
+ io.quarkus
+ quarkus-confluent-registry-json-schema-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
diff --git a/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaEndpoint.java b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaEndpoint.java
index f65ff696a15a2..31c114e1b583e 100644
--- a/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaEndpoint.java
+++ b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaEndpoint.java
@@ -24,6 +24,20 @@ public class JsonSchemaEndpoint {
@Inject
JsonSchemaKafkaCreator creator;
+ @GET
+ @Path("/confluent")
+ public JsonObject getConfluent() {
+ return get(
+ creator.createConfluentConsumer("test-json-schema-confluent-consumer", "test-json-schema-confluent-consumer"));
+ }
+
+ @POST
+ @Path("/confluent")
+ public void sendConfluent(Pet pet) {
+ KafkaProducer p = creator.createConfluentProducer("test-json-schema-confluent");
+ send(p, pet, "test-json-schema-confluent-producer");
+ }
+
@GET
@Path("/apicurio")
public JsonObject getApicurio() {
diff --git a/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaKafkaCreator.java b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaKafkaCreator.java
index 989d2f0e10667..119beaf837785 100644
--- a/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaKafkaCreator.java
+++ b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaKafkaCreator.java
@@ -17,6 +17,10 @@
import io.apicurio.registry.serde.SerdeConfig;
import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer;
import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
+import io.confluent.kafka.serializers.KafkaJsonDeserializerConfig;
+import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
+import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;
/**
* Create Json Schema Kafka Consumers and Producers
@@ -30,18 +34,34 @@ public class JsonSchemaKafkaCreator {
@ConfigProperty(name = "mp.messaging.connector.smallrye-kafka.apicurio.registry.url")
String apicurioRegistryUrl;
+ @ConfigProperty(name = "mp.messaging.connector.smallrye-kafka.schema.registry.url")
+ String confluentRegistryUrl;
+
public JsonSchemaKafkaCreator() {
}
- public JsonSchemaKafkaCreator(String bootstrap, String apicurioRegistryUrl) {
+ public JsonSchemaKafkaCreator(String bootstrap, String apicurioRegistryUrl, String confluentRegistryUrl) {
this.bootstrap = bootstrap;
this.apicurioRegistryUrl = apicurioRegistryUrl;
+ this.confluentRegistryUrl = confluentRegistryUrl;
}
public String getApicurioRegistryUrl() {
return apicurioRegistryUrl;
}
+ public String getConfluentRegistryUrl() {
+ return confluentRegistryUrl;
+ }
+
+ public KafkaConsumer createConfluentConsumer(String groupdIdConfig, String subscribtionName) {
+ return createConfluentConsumer(bootstrap, getConfluentRegistryUrl(), groupdIdConfig, subscribtionName);
+ }
+
+ public KafkaProducer createConfluentProducer(String clientId) {
+ return createConfluentProducer(bootstrap, getConfluentRegistryUrl(), clientId);
+ }
+
public KafkaConsumer createApicurioConsumer(String groupdIdConfig, String subscribtionName) {
return createApicurioConsumer(bootstrap, getApicurioRegistryUrl(), groupdIdConfig, subscribtionName);
}
@@ -50,6 +70,12 @@ public KafkaProducer createApicurioProducer(String clientId) {
return createApicurioProducer(bootstrap, getApicurioRegistryUrl(), clientId);
}
+ public static KafkaConsumer createConfluentConsumer(String bootstrap, String confluent,
+ String groupdIdConfig, String subscribtionName) {
+ Properties p = getConfluentConsumerProperties(bootstrap, confluent, groupdIdConfig);
+ return createConsumer(p, subscribtionName);
+ }
+
public static KafkaConsumer createApicurioConsumer(String bootstrap, String apicurio,
String groupdIdConfig, String subscribtionName) {
Properties p = getApicurioConsumerProperties(bootstrap, apicurio, groupdIdConfig);
@@ -62,6 +88,12 @@ public static KafkaProducer createApicurioProducer(String bootstra
return createProducer(p);
}
+ public static KafkaProducer createConfluentProducer(String bootstrap, String confluent,
+ String clientId) {
+ Properties p = getConfluentProducerProperties(bootstrap, confluent, clientId);
+ return createProducer(p);
+ }
+
private static KafkaConsumer createConsumer(Properties props, String subscribtionName) {
if (!props.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)) {
props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
@@ -78,6 +110,15 @@ private static KafkaProducer createProducer(Properties props) {
return new KafkaProducer<>(props);
}
+ private static Properties getConfluentConsumerProperties(String bootstrap, String confluent,
+ String groupdIdConfig) {
+ Properties props = getGenericConsumerProperties(bootstrap, groupdIdConfig);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonSchemaDeserializer.class.getName());
+ props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, confluent);
+ props.put(KafkaJsonDeserializerConfig.JSON_VALUE_TYPE, Pet.class.getName());
+ return props;
+ }
+
public static Properties getApicurioConsumerProperties(String bootstrap, String apicurio, String groupdIdConfig) {
Properties props = getGenericConsumerProperties(bootstrap, groupdIdConfig);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSchemaKafkaDeserializer.class.getName());
@@ -107,6 +148,13 @@ private static Properties getApicurioProducerProperties(String bootstrap, String
return props;
}
+ private static Properties getConfluentProducerProperties(String bootstrap, String confluent, String clientId) {
+ Properties props = getGenericProducerProperties(bootstrap, clientId);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSchemaSerializer.class.getName());
+ props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, confluent);
+ return props;
+ }
+
private static Properties getGenericProducerProperties(String bootstrap, String clientId) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
diff --git a/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/Pet.java b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/Pet.java
index ee47fb2fe9482..fd53166cae9b7 100644
--- a/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/Pet.java
+++ b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/Pet.java
@@ -1,5 +1,25 @@
package io.quarkus.it.kafka.jsonschema;
+import io.confluent.kafka.schemaregistry.annotations.Schema;
+
+//This class is used by both serializers, but for it to be usable by the Confluent serializer the schema must be attached here in the annotation
+@Schema(value = """
+ {
+ "$id": "https://example.com/person.schema.json",
+ "$schema": "http://json-schema.org/draft-07/schema#",
+ "title": "Pet",
+ "type": "object",
+ "properties": {
+ "name": {
+ "type": "string",
+ "description": "The pet's name."
+ },
+ "color": {
+ "type": "string",
+ "description": "The pet's color."
+ }
+ }
+ }""", refs = {})
public class Pet {
private String name;
diff --git a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTestBase.java b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTestBase.java
index 796540becc0a7..729b8956fd47e 100644
--- a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTestBase.java
+++ b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTestBase.java
@@ -17,6 +17,8 @@ public abstract class KafkaJsonSchemaTestBase {
static final String APICURIO_PATH = "/json-schema/apicurio";
+ static final String CONFLUENT_PATH = "/json-schema/confluent";
+
abstract JsonSchemaKafkaCreator creator();
@Test
@@ -41,6 +43,20 @@ public void testApicurioJsonSchemaConsumer() {
testJsonSchemaConsumer(producer, APICURIO_PATH, topic);
}
+ @Test
+ public void testConfluentJsonSchemaProducer() {
+ KafkaConsumer consumer = creator().createConfluentConsumer(
+ "test-json-schema-confluent",
+ "test-json-schema-confluent-producer");
+ testJsonSchemaProducer(consumer, CONFLUENT_PATH);
+ }
+
+ @Test
+ public void testConfluentJsonSchemaConsumer() {
+ KafkaProducer producer = creator().createConfluentProducer("test-json-schema-confluent-test");
+ testJsonSchemaConsumer(producer, CONFLUENT_PATH, "test-json-schema-confluent-consumer");
+ }
+
protected void testJsonSchemaProducer(KafkaConsumer consumer, String path) {
RestAssured.given()
.header("content-type", "application/json")
diff --git a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java
index dabe27a7715ed..652fdc47b9641 100644
--- a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java
+++ b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java
@@ -17,7 +17,8 @@ public void setIntegrationTestContext(DevServicesContext context) {
String bootstrapServers = devServicesProperties.get("kafka.bootstrap.servers");
if (bootstrapServers != null) {
String apicurioUrl = devServicesProperties.get("mp.messaging.connector.smallrye-kafka.apicurio.registry.url");
- creator = new JsonSchemaKafkaCreator(bootstrapServers, apicurioUrl);
+ String confluentUrl = devServicesProperties.get("mp.messaging.connector.smallrye-kafka.schema.registry.url");
+ creator = new JsonSchemaKafkaCreator(bootstrapServers, apicurioUrl, confluentUrl);
}
}