diff --git a/bom/application/pom.xml b/bom/application/pom.xml index f388fb3676652..dac7ae2ed350b 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -1389,6 +1389,16 @@ quarkus-apicurio-registry-json-schema-deployment ${project.version} + + io.quarkus + quarkus-apicurio-registry-protobuf + ${project.version} + + + io.quarkus + quarkus-apicurio-registry-protobuf-deployment + ${project.version} + io.quarkus quarkus-confluent-registry-common @@ -3404,6 +3414,11 @@ apicurio-registry-serdes-jsonschema-serde ${apicurio-registry.version} + + io.apicurio + apicurio-registry-serdes-protobuf-serde + ${apicurio-registry.version} + io.apicurio apicurio-common-rest-client-vertx diff --git a/core/deployment/src/main/java/io/quarkus/deployment/Feature.java b/core/deployment/src/main/java/io/quarkus/deployment/Feature.java index 69c785ab424aa..7eb9f903f3525 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/Feature.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/Feature.java @@ -14,6 +14,7 @@ public enum Feature { AZURE_FUNCTIONS, APICURIO_REGISTRY_AVRO, APICURIO_REGISTRY_JSON_SCHEMA, + APICURIO_REGISTRY_PROTOBUF, AWT, CACHE, CDI, diff --git a/devtools/bom-descriptor-json/pom.xml b/devtools/bom-descriptor-json/pom.xml index fdbe228c16269..1d3facea6ce51 100644 --- a/devtools/bom-descriptor-json/pom.xml +++ b/devtools/bom-descriptor-json/pom.xml @@ -213,6 +213,19 @@ + + io.quarkus + quarkus-apicurio-registry-protobuf + ${project.version} + pom + test + + + * + * + + + io.quarkus quarkus-arc diff --git a/docs/pom.xml b/docs/pom.xml index 080a5326bfa27..33917d46eaf08 100644 --- a/docs/pom.xml +++ b/docs/pom.xml @@ -229,6 +229,19 @@ + + io.quarkus + quarkus-apicurio-registry-protobuf-deployment + ${project.version} + pom + test + + + * + * + + + io.quarkus quarkus-arc-deployment diff --git a/extensions/schema-registry/apicurio/pom.xml b/extensions/schema-registry/apicurio/pom.xml index 48249736defb5..1de3e8ff5643f 100644 --- a/extensions/schema-registry/apicurio/pom.xml +++ b/extensions/schema-registry/apicurio/pom.xml @@ -18,5 +18,6 @@ common avro json-schema + protobuf diff --git a/extensions/schema-registry/apicurio/protobuf/pom.xml b/extensions/schema-registry/apicurio/protobuf/pom.xml index d2e6866f46a43..45a1f5cdd660a 100644 --- a/extensions/schema-registry/apicurio/protobuf/pom.xml +++ b/extensions/schema-registry/apicurio/protobuf/pom.xml @@ -19,7 +19,7 @@ kotlinx-serialization-core-jvm org.jetbrains.kotlinx - 1.6.0 + 1.6.1 diff --git a/integration-tests/kafka-protobuf-apicurio2/pom.xml b/integration-tests/kafka-protobuf-apicurio2/pom.xml new file mode 100644 index 0000000000000..6766f47685a65 --- /dev/null +++ b/integration-tests/kafka-protobuf-apicurio2/pom.xml @@ -0,0 +1,269 @@ + + + + quarkus-integration-tests-parent + io.quarkus + 999-SNAPSHOT + + 4.0.0 + + quarkus-integration-test-kafka-protobuf-apicurio2 + Quarkus - Integration Tests - Kafka Protobuf with Apicurio 2.x + The Apache Kafka Protobuf with Apicurio Registry 2.x integration tests module + + + + + kotlinx-serialization-core-jvm + org.jetbrains.kotlinx + 1.6.0 + + + + + + + io.quarkus + quarkus-integration-test-class-transformer + + + io.quarkus + quarkus-integration-test-shared-library + + + + + io.quarkus + quarkus-resteasy + + + io.quarkus + quarkus-resteasy-jackson + + + + + io.quarkus + quarkus-kafka-client + + + + com.fasterxml.jackson.dataformat + jackson-dataformat-csv + + + + io.quarkus + quarkus-apicurio-registry-protobuf + + + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + jakarta.xml.bind + jakarta.xml.bind-api + + + + + io.strimzi + strimzi-test-container + test + + + org.apache.logging.log4j + log4j-core + + + + + org.testcontainers + testcontainers + test + + + + + io.quarkus + quarkus-integration-test-class-transformer-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-kafka-client-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-jackson-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-apicurio-registry-protobuf-deployment + ${project.version} + pom + test + + + * + * + + + + + + + + confluent + https://packages.confluent.io/maven/ + + false + + + + + + + + io.quarkus + quarkus-maven-plugin + + + + generate-code + build + + + + + + + kr.motd.maven + os-maven-plugin + 1.7.1 + + + initialize + + detect + + + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + true + + + gencode + generate-sources + + compile + test-compile + + + ./src/main/proto + + com.google.protobuf:protoc:${protobuf-java.version}:exe:${os.detected.classifier} + + + + + + + + maven-failsafe-plugin + + true + + + + + maven-surefire-plugin + + true + + + + + + + + test-kafka + + + test-containers + + + + + + maven-surefire-plugin + + false + + + + maven-failsafe-plugin + + false + + + + + + + + diff --git a/integration-tests/kafka-protobuf-apicurio2/src/main/java/io/quarkus/it/kafka/protobuf/Pet.java b/integration-tests/kafka-protobuf-apicurio2/src/main/java/io/quarkus/it/kafka/protobuf/Pet.java new file mode 100644 index 0000000000000..97eb9b1696118 --- /dev/null +++ b/integration-tests/kafka-protobuf-apicurio2/src/main/java/io/quarkus/it/kafka/protobuf/Pet.java @@ -0,0 +1,31 @@ +package io.quarkus.it.kafka.protobuf; + +public class Pet { + + private String name; + private String color; + + public Pet() { + } + + public Pet(String name, String color) { + this.name = name; + this.color = color; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getColor() { + return color; + } + + public void setColor(String color) { + this.color = color; + } +} diff --git a/integration-tests/kafka-protobuf-apicurio2/src/main/java/io/quarkus/it/kafka/protobuf/ProtobufEndpoint.java b/integration-tests/kafka-protobuf-apicurio2/src/main/java/io/quarkus/it/kafka/protobuf/ProtobufEndpoint.java new file mode 100644 index 0000000000000..15244de51be67 --- /dev/null +++ b/integration-tests/kafka-protobuf-apicurio2/src/main/java/io/quarkus/it/kafka/protobuf/ProtobufEndpoint.java @@ -0,0 +1,65 @@ +package io.quarkus.it.kafka.protobuf; + +import java.time.Duration; + +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +import io.vertx.core.json.JsonObject; + +/** + * Endpoint to test the Protobuf support + */ +@Path("/protobuf") +public class ProtobufEndpoint { + + @Inject + ProtobufKafkaCreator creator; + + @GET + @Path("/apicurio") + public JsonObject getApicurio() { + return get(creator.createApicurioConsumer("test-protobuf-apicurio-consumer", "test-protobuf-apicurio-consumer")); + } + + @POST + @Path("/apicurio") + public void sendApicurio(io.quarkus.it.kafka.protobuf.Pet pet) { + KafkaProducer p = creator + .createApicurioProducer("test-protobuf-apicurio"); + send(p, pet, "test-protobuf-apicurio-producer"); + } + + private JsonObject get(KafkaConsumer consumer) { + final ConsumerRecords records = consumer + .poll(Duration.ofMillis(60000)); + if (records.isEmpty()) { + return null; + } + ConsumerRecord consumerRecord = records.iterator().next(); + com.example.tutorial.PetOuterClass.Pet p = consumerRecord.value(); + // We cannot serialize the returned Pet directly, it contains non-serializable object such as the schema. + JsonObject result = new JsonObject(); + result.put("name", p.getName()); + result.put("color", p.getColor()); + return result; + } + + private void send(KafkaProducer producer, Pet pet, String topic) { + com.example.tutorial.PetOuterClass.Pet protoPet = com.example.tutorial.PetOuterClass.Pet.newBuilder() + .setColor(pet.getColor()) + .setName(pet.getName()) + .build(); + + producer.send(new ProducerRecord<>(topic, 0, protoPet)); + producer.flush(); + } +} diff --git a/integration-tests/kafka-protobuf-apicurio2/src/main/java/io/quarkus/it/kafka/protobuf/ProtobufKafkaCreator.java b/integration-tests/kafka-protobuf-apicurio2/src/main/java/io/quarkus/it/kafka/protobuf/ProtobufKafkaCreator.java new file mode 100644 index 0000000000000..f576b763183ed --- /dev/null +++ b/integration-tests/kafka-protobuf-apicurio2/src/main/java/io/quarkus/it/kafka/protobuf/ProtobufKafkaCreator.java @@ -0,0 +1,117 @@ +package io.quarkus.it.kafka.protobuf; + +import java.util.Collections; +import java.util.Properties; +import java.util.UUID; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import com.example.tutorial.PetOuterClass.Pet; + +import io.apicurio.registry.serde.SerdeConfig; +import io.apicurio.registry.serde.protobuf.ProtobufKafkaDeserializer; +import io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer; + +/** + * Create Protobuf Kafka Consumers and Producers + */ +@ApplicationScoped +public class ProtobufKafkaCreator { + + @ConfigProperty(name = "kafka.bootstrap.servers") + String bootstrap; + + @ConfigProperty(name = "mp.messaging.connector.smallrye-kafka.apicurio.registry.url") + String apicurioRegistryUrl; + + public ProtobufKafkaCreator() { + } + + public ProtobufKafkaCreator(String bootstrap, String apicurioRegistryUrl) { + this.bootstrap = bootstrap; + this.apicurioRegistryUrl = apicurioRegistryUrl; + } + + public String getApicurioRegistryUrl() { + return apicurioRegistryUrl; + } + + public KafkaConsumer createApicurioConsumer(String groupdIdConfig, String subscribtionName) { + return createApicurioConsumer(bootstrap, getApicurioRegistryUrl(), groupdIdConfig, subscribtionName); + } + + public KafkaProducer createApicurioProducer(String clientId) { + return createApicurioProducer(bootstrap, getApicurioRegistryUrl(), clientId); + } + + public static KafkaConsumer createApicurioConsumer(String bootstrap, String apicurio, + String groupdIdConfig, String subscribtionName) { + Properties p = getApicurioConsumerProperties(bootstrap, apicurio, groupdIdConfig); + return createConsumer(p, subscribtionName); + } + + public static KafkaProducer createApicurioProducer(String bootstrap, String apicurio, + String clientId) { + Properties p = getApicurioProducerProperties(bootstrap, apicurio, clientId); + return createProducer(p); + } + + private static KafkaConsumer createConsumer(Properties props, String subscribtionName) { + if (!props.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)) { + props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); + } + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singletonList(subscribtionName)); + return consumer; + } + + private static KafkaProducer createProducer(Properties props) { + if (!props.containsKey(ProducerConfig.CLIENT_ID_CONFIG)) { + props.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); + } + return new KafkaProducer<>(props); + } + + public static Properties getApicurioConsumerProperties(String bootstrap, String apicurio, String groupdIdConfig) { + Properties props = getGenericConsumerProperties(bootstrap, groupdIdConfig); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ProtobufKafkaDeserializer.class.getName()); + props.put(SerdeConfig.REGISTRY_URL, apicurio); + return props; + } + + private static Properties getGenericConsumerProperties(String bootstrap, String groupdIdConfig) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupdIdConfig); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); + return props; + } + + private static Properties getApicurioProducerProperties(String bootstrap, String apicurio, String clientId) { + Properties props = getGenericProducerProperties(bootstrap, clientId); + props.put(ProducerConfig.ACKS_CONFIG, "all"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProtobufKafkaSerializer.class.getName()); + props.put(SerdeConfig.REGISTRY_URL, apicurio); + props.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, true); + return props; + } + + private static Properties getGenericProducerProperties(String bootstrap, String clientId) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); + props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); + return props; + } +} diff --git a/integration-tests/kafka-protobuf-apicurio2/src/main/proto/pet.proto b/integration-tests/kafka-protobuf-apicurio2/src/main/proto/pet.proto new file mode 100644 index 0000000000000..548f4bb384692 --- /dev/null +++ b/integration-tests/kafka-protobuf-apicurio2/src/main/proto/pet.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; + +package tutorial; + +option java_package = "com.example.tutorial"; + +message Pet { + string name = 1; + string color = 2; +} \ No newline at end of file diff --git a/integration-tests/kafka-protobuf-apicurio2/src/main/resources/application.properties b/integration-tests/kafka-protobuf-apicurio2/src/main/resources/application.properties new file mode 100644 index 0000000000000..da7eb6b7cfb26 --- /dev/null +++ b/integration-tests/kafka-protobuf-apicurio2/src/main/resources/application.properties @@ -0,0 +1,8 @@ +quarkus.log.category.kafka.level=WARN +quarkus.log.category.\"org.apache.kafka\".level=WARN +quarkus.log.category.\"org.apache.zookeeper\".level=WARN + +# enable health check +quarkus.kafka.health.enabled=true + +quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry-mem:2.4.2.Final diff --git a/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufIT.java b/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufIT.java new file mode 100644 index 0000000000000..1d14f92f50493 --- /dev/null +++ b/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufIT.java @@ -0,0 +1,29 @@ +package io.quarkus.it.kafka; + +import org.junit.jupiter.api.BeforeAll; + +import io.apicurio.registry.rest.client.RegistryClientFactory; +import io.apicurio.rest.client.VertxHttpClientProvider; +import io.quarkus.it.kafka.protobuf.ProtobufKafkaCreator; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusIntegrationTest; +import io.vertx.core.Vertx; + +@QuarkusIntegrationTest +@QuarkusTestResource(value = KafkaResource.class, restrictToAnnotatedClass = true) +public class KafkaProtobufIT extends KafkaProtobufTestBase { + + ProtobufKafkaCreator creator; + + @Override + ProtobufKafkaCreator creator() { + return creator; + } + + @BeforeAll + public static void setUp() { + // this is for the test JVM, which also uses Kafka client, which in turn also interacts with the registry + RegistryClientFactory.setProvider(new VertxHttpClientProvider(Vertx.vertx())); + } + +} diff --git a/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufTest.java b/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufTest.java new file mode 100644 index 0000000000000..b632a7df73205 --- /dev/null +++ b/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufTest.java @@ -0,0 +1,18 @@ +package io.quarkus.it.kafka; + +import jakarta.inject.Inject; + +import io.quarkus.it.kafka.protobuf.ProtobufKafkaCreator; +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +public class KafkaProtobufTest extends KafkaProtobufTestBase { + + @Inject + ProtobufKafkaCreator creator; + + @Override + ProtobufKafkaCreator creator() { + return creator; + } +} diff --git a/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufTestBase.java b/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufTestBase.java new file mode 100644 index 0000000000000..ed87f1cfa083b --- /dev/null +++ b/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufTestBase.java @@ -0,0 +1,66 @@ +package io.quarkus.it.kafka; + +import java.time.Duration; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import com.example.tutorial.PetOuterClass.Pet; + +import io.quarkus.it.kafka.protobuf.ProtobufKafkaCreator; +import io.restassured.RestAssured; + +public abstract class KafkaProtobufTestBase { + + static final String APICURIO_PATH = "/protobuf/apicurio"; + + abstract ProtobufKafkaCreator creator(); + + @Test + public void testUrls() { + Assertions.assertTrue(creator().getApicurioRegistryUrl().endsWith("/apis/registry/v2")); + } + + @Test + public void testApicurioProtobufProducer() { + KafkaConsumer consumer = creator().createApicurioConsumer( + "test-protobuf-apicurio", + "test-protobuf-apicurio-producer"); + testProtobufProducer(consumer, APICURIO_PATH); + } + + @Test + public void testApicurioProtobufConsumer() { + KafkaProducer producer = creator().createApicurioProducer("test-protobuf-apicurio-test"); + testProtobufConsumer(producer, APICURIO_PATH, "test-protobuf-apicurio-consumer"); + } + + protected void testProtobufProducer(KafkaConsumer consumer, String path) { + RestAssured.given() + .header("content-type", "application/json") + .body("{\"name\":\"neo\", \"color\":\"tricolor\"}") + .post(path); + + ConsumerRecord records = consumer.poll(Duration.ofMillis(20000)).iterator().next(); + Assertions.assertEquals(records.key(), (Integer) 0); + Pet pet = records.value(); + Assertions.assertEquals("neo", pet.getName()); + Assertions.assertEquals("tricolor", pet.getColor()); + consumer.close(); + } + + protected void testProtobufConsumer(KafkaProducer producer, String path, String topic) { + producer.send(new ProducerRecord<>(topic, 1, createPet())); + producer.close(); + } + + private Pet createPet() { + return Pet.newBuilder() + .setName("neo") + .setColor("white").build(); + } +} diff --git a/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java b/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java new file mode 100644 index 0000000000000..edbd3647cb306 --- /dev/null +++ b/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java @@ -0,0 +1,39 @@ +package io.quarkus.it.kafka; + +import java.util.Collections; +import java.util.Map; + +import io.quarkus.it.kafka.protobuf.ProtobufKafkaCreator; +import io.quarkus.test.common.DevServicesContext; +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; + +public class KafkaResource implements QuarkusTestResourceLifecycleManager, DevServicesContext.ContextAware { + + ProtobufKafkaCreator creator; + + @Override + public void setIntegrationTestContext(DevServicesContext context) { + Map devServicesProperties = context.devServicesProperties(); + String bootstrapServers = devServicesProperties.get("kafka.bootstrap.servers"); + if (bootstrapServers != null) { + String apicurioUrl = devServicesProperties.get("mp.messaging.connector.smallrye-kafka.apicurio.registry.url"); + creator = new ProtobufKafkaCreator(bootstrapServers, apicurioUrl); + } + } + + @Override + public Map start() { + return Collections.emptyMap(); + } + + @Override + public void stop() { + } + + @Override + public void inject(TestInjector testInjector) { + testInjector.injectIntoFields( + creator, + new TestInjector.MatchesType(ProtobufKafkaCreator.class)); + } +} diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 7b9728847c268..86456858db167 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -222,6 +222,7 @@ kafka-snappy kafka-avro-apicurio2 kafka-json-schema-apicurio2 + kafka-protobuf-apicurio2 kafka-streams kafka-devservices jpa