diff --git a/common/common/src/main/java/io/helidon/common/FeatureCatalog.java b/common/common/src/main/java/io/helidon/common/FeatureCatalog.java index 494a6cbef4a..7163f2f718b 100644 --- a/common/common/src/main/java/io/helidon/common/FeatureCatalog.java +++ b/common/common/src/main/java/io/helidon/common/FeatureCatalog.java @@ -361,7 +361,7 @@ final class FeatureCatalog { .description("Reactive messaging connector for Kafka") .path("Messaging", "Kafka") .experimental(true) - .nativeSupported(false)); + .nativeSupported(true)); add("io.helidon.messaging.connectors.jms", FeatureDescriptor.builder() .name("JMS Connector") diff --git a/examples/messaging/docker/kafka/Dockerfile.kafka b/examples/messaging/docker/kafka/Dockerfile.kafka index 10654a0086d..82ffa4cc426 100644 --- a/examples/messaging/docker/kafka/Dockerfile.kafka +++ b/examples/messaging/docker/kafka/Dockerfile.kafka @@ -18,7 +18,7 @@ FROM openjdk:8-jre-alpine ENV VERSION=2.5.0 -RUN apk add --no-cache bash curl jq +RUN apk add --no-cache bash curl jq gcompat # Find closest mirror, download and extract Kafka RUN MIRROR=$(curl -s 'https://www.apache.org/dyn/closer.cgi?as_json=1' | jq -r '.http[0]') \ diff --git a/messaging/kafka/pom.xml b/messaging/kafka/pom.xml index 0f9d3d96494..f1fa29d03a8 100644 --- a/messaging/kafka/pom.xml +++ b/messaging/kafka/pom.xml @@ -64,6 +64,11 @@ provided true + + org.graalvm.nativeimage + svm + provided + org.apache.kafka kafka-clients diff --git a/messaging/kafka/src/main/java/io/helidon/messaging/connectors/kafka/AppInfoParserSubstitution.java b/messaging/kafka/src/main/java/io/helidon/messaging/connectors/kafka/AppInfoParserSubstitution.java new file mode 100644 index 00000000000..dfbe19ec610 --- /dev/null +++ b/messaging/kafka/src/main/java/io/helidon/messaging/connectors/kafka/AppInfoParserSubstitution.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.helidon.messaging.connectors.kafka; + +import com.oracle.svm.core.annotate.Substitute; +import com.oracle.svm.core.annotate.TargetClass; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.AppInfoParser; + +/** + * JMX not supported in native-image. + */ +@TargetClass(AppInfoParser.class) +@SuppressWarnings("checkstyle:HideUtilityClassConstructor") +final class AppInfoParserSubstitution { + + @Substitute + public static void registerAppInfo(String p, String i, Metrics m, long n) { + } + + @Substitute + public static void unregisterAppInfo(String p, String i, Metrics m) { + } +} diff --git a/messaging/kafka/src/main/java/io/helidon/messaging/connectors/kafka/CompressionTypeSubstitution.java b/messaging/kafka/src/main/java/io/helidon/messaging/connectors/kafka/CompressionTypeSubstitution.java new file mode 100644 index 00000000000..5a6c84ad596 --- /dev/null +++ b/messaging/kafka/src/main/java/io/helidon/messaging/connectors/kafka/CompressionTypeSubstitution.java @@ -0,0 +1,156 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.helidon.messaging.connectors.kafka; + +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; + +import io.helidon.common.LazyValue; + +import com.oracle.svm.core.annotate.Substitute; +import com.oracle.svm.core.annotate.TargetClass; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.record.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferInputStream; +import org.apache.kafka.common.utils.ByteBufferOutputStream; + +/** + * Helper for creating ZSTD or SNAPPY compression stream wrappers without method handles. + */ +@SuppressWarnings("checkstyle:OuterTypeFilename") +final class CompressionTypeHelper { + + private CompressionTypeHelper() { + } + + private static boolean zstdNativeLibLoaded = false; + + static final LazyValue> LAZY_INPUT_ZSTD = + LazyValue.create(() -> findConstructor("com.github.luben.zstd.ZstdInputStream", InputStream.class)); + static final LazyValue> LAZY_OUTPUT_ZSTD = + LazyValue.create(() -> findConstructor("com.github.luben.zstd.ZstdOutputStream", OutputStream.class)); + static final LazyValue> LAZY_INPUT_SNAPPY = + LazyValue.create(() -> findConstructor("org.xerial.snappy.SnappyInputStream", InputStream.class)); + static final LazyValue> LAZY_OUTPUT_SNAPPY = + LazyValue.create(() -> findConstructor("org.xerial.snappy.SnappyOutputStream", OutputStream.class)); + + static OutputStream snappyOutputStream(OutputStream orig) { + try { + return (OutputStream) LAZY_OUTPUT_SNAPPY.get().newInstance(orig); + } catch (KafkaException e) { + throw e; + } catch (Exception e) { + throw new KafkaException(e); + } + } + + static InputStream snappyInputStream(ByteBuffer orig) { + try { + return (InputStream) LAZY_INPUT_SNAPPY.get().newInstance(new ByteBufferInputStream(orig)); + } catch (KafkaException e) { + throw e; + } catch (Exception e) { + throw new KafkaException(e); + } + } + + static void zstdLoadNativeLibs() throws ReflectiveOperationException { + // loading jni libs in static blocks is not supported + // see https://github.com/oracle/graal/issues/439#issuecomment-394341725 + if (!zstdNativeLibLoaded) { + Class clazz = Class.forName("com.github.luben.zstd.util.Native"); + Field loadedField = clazz.getDeclaredField("loaded"); + loadedField.setAccessible(true); + loadedField.setBoolean(null, false); + Method loadMethod = clazz.getDeclaredMethod("load"); + loadMethod.invoke(null); + zstdNativeLibLoaded = true; + } + } + + static OutputStream zstdOutputStream(OutputStream orig) { + try { + zstdLoadNativeLibs(); + return (OutputStream) LAZY_OUTPUT_ZSTD.get().newInstance(orig); + } catch (KafkaException e) { + throw e; + } catch (Exception e) { + throw new KafkaException(e); + } + } + + static InputStream zstdInputStream(ByteBuffer orig) { + try { + zstdLoadNativeLibs(); + return (InputStream) LAZY_INPUT_ZSTD.get().newInstance(new ByteBufferInputStream(orig)); + } catch (KafkaException e) { + throw e; + } catch (Exception e) { + throw new KafkaException(e); + } + } + + static Constructor findConstructor(String className, Class... paramTypes) { + try { + return Class.forName(className) + .getDeclaredConstructor(paramTypes); + } catch (NoSuchMethodException | ClassNotFoundException e) { + throw new KafkaException(e); + } + } +} + +/** + * Substitution for {@link org.apache.kafka.common.record.CompressionType#SNAPPY CompressionType.SNAPPY}. + */ +@TargetClass(className = "org.apache.kafka.common.record.CompressionType$3") +@SuppressWarnings("checkstyle:OneTopLevelClass") +final class SnappySubstitution { + + @Substitute + public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { + return CompressionTypeHelper.snappyOutputStream(buffer); + } + + @Substitute + public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { + return CompressionTypeHelper.snappyInputStream(buffer); + } +} + +/** + * Substitution for {@link org.apache.kafka.common.record.CompressionType#ZSTD CompressionType.ZSTD}. + */ +@TargetClass(className = "org.apache.kafka.common.record.CompressionType$5") +@SuppressWarnings("checkstyle:OneTopLevelClass") +final class ZstdSubstitution { + + @Substitute + public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { + return CompressionTypeHelper.zstdOutputStream(buffer); + } + + @Substitute + public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { + return CompressionTypeHelper.zstdInputStream(buffer); + } +} diff --git a/messaging/kafka/src/main/java/io/helidon/messaging/connectors/kafka/Crc32CSubstitution.java b/messaging/kafka/src/main/java/io/helidon/messaging/connectors/kafka/Crc32CSubstitution.java new file mode 100644 index 00000000000..834d451a4c7 --- /dev/null +++ b/messaging/kafka/src/main/java/io/helidon/messaging/connectors/kafka/Crc32CSubstitution.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.helidon.messaging.connectors.kafka; + +import java.nio.ByteBuffer; +import java.util.zip.Checksum; + +import com.oracle.svm.core.annotate.Substitute; +import com.oracle.svm.core.annotate.TargetClass; +import org.apache.kafka.common.utils.Checksums; + + +/** + * Method handles are not supported by native-image, + * invoke {@link java.util.zip.CRC32C CRC32C} directly. + * + * Helidon runs only on Java 11 and newer, {@link java.util.zip.CRC32C CRC32C} + * doesn't have to be instantiated by method handles. + */ +@TargetClass(org.apache.kafka.common.utils.Crc32C.class) +@Substitute +@SuppressWarnings("checkstyle:HideUtilityClassConstructor") +final class Crc32CSubstitution { + + @Substitute + public static long compute(byte[] bytes, int offset, int size) { + Checksum crc = create(); + crc.update(bytes, offset, size); + return crc.getValue(); + } + + @Substitute + public static long compute(ByteBuffer buffer, int offset, int size) { + Checksum crc = create(); + Checksums.update(crc, buffer, offset, size); + return crc.getValue(); + } + + @Substitute + public static Checksum create() { + return new java.util.zip.CRC32C(); + } +} diff --git a/messaging/kafka/src/main/java/io/helidon/messaging/connectors/kafka/JmxReporterSubstitution.java b/messaging/kafka/src/main/java/io/helidon/messaging/connectors/kafka/JmxReporterSubstitution.java new file mode 100644 index 00000000000..ec0c2c63212 --- /dev/null +++ b/messaging/kafka/src/main/java/io/helidon/messaging/connectors/kafka/JmxReporterSubstitution.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.helidon.messaging.connectors.kafka; + +import com.oracle.svm.core.annotate.Substitute; +import com.oracle.svm.core.annotate.TargetClass; +import org.apache.kafka.common.metrics.KafkaMetric; + +/** + * JMX not supported in native-image. + */ +@TargetClass(org.apache.kafka.common.metrics.JmxReporter.class) +final class JmxReporterSubstitution { + + @Substitute + private Object addAttribute(KafkaMetric metric) { + return null; + } + + @Substitute + public void metricChange(KafkaMetric metric) { + } + +} + diff --git a/messaging/kafka/src/main/java/module-info.java b/messaging/kafka/src/main/java/module-info.java index 98342b59e10..21c3c8b6e38 100644 --- a/messaging/kafka/src/main/java/module-info.java +++ b/messaging/kafka/src/main/java/module-info.java @@ -30,6 +30,7 @@ requires io.helidon.common.configurable; requires io.helidon.messaging; requires microprofile.config.api; + requires static svm; exports io.helidon.messaging.connectors.kafka; } \ No newline at end of file diff --git a/messaging/kafka/src/main/resources/META-INF/helidon/native-image/reflection-config.json b/messaging/kafka/src/main/resources/META-INF/helidon/native-image/reflection-config.json new file mode 100644 index 00000000000..c1ba1188cd7 --- /dev/null +++ b/messaging/kafka/src/main/resources/META-INF/helidon/native-image/reflection-config.json @@ -0,0 +1,19 @@ +{ + "annotated":[ + ], + "class-hierarchy": [ + "org.apache.kafka.common.serialization.Serializer", + "org.apache.kafka.common.serialization.Deserializer", + "org.apache.kafka.clients.consumer.ConsumerPartitionAssignor", + "org.apache.kafka.clients.producer.Partitioner" + ], + "classes": [ + "org.xerial.snappy.SnappyInputStream", + "org.xerial.snappy.SnappyOutputStream", + "com.github.luben.zstd.ZstdInputStream", + "com.github.luben.zstd.ZstdOutputStream", + "com.github.luben.zstd.util.Native" + ], + "exclude": [ + ] +} diff --git a/messaging/kafka/src/main/resources/META-INF/native-image/io.helidon.messaging.connectors.kafka/jni-config.json b/messaging/kafka/src/main/resources/META-INF/native-image/io.helidon.messaging.connectors.kafka/jni-config.json new file mode 100644 index 00000000000..c4b02e82d15 --- /dev/null +++ b/messaging/kafka/src/main/resources/META-INF/native-image/io.helidon.messaging.connectors.kafka/jni-config.json @@ -0,0 +1,45 @@ +[ + { + "name": "com.github.luben.zstd.ZstdInputStream", + "fields": [ + { + "name": "dstPos" + }, + { + "name": "srcPos" + } + ] + }, + { + "name": "com.github.luben.zstd.ZstdOutputStream", + "fields": [ + { + "name": "dstPos" + }, + { + "name": "srcPos" + } + ] + }, + { + "name": "java.lang.ClassLoader", + "methods": [ + { + "name": "getPlatformClassLoader", + "parameterTypes": [] + }, + { + "name": "loadClass", + "parameterTypes": [ + "java.lang.String" + ] + } + ] + }, + { + "name": "java.lang.ClassNotFoundException" + }, + { + "name": "java.lang.NoSuchMethodError" + } +] diff --git a/messaging/kafka/src/main/resources/META-INF/native-image/io.helidon.messaging.connectors.kafka/native-image.properties b/messaging/kafka/src/main/resources/META-INF/native-image/io.helidon.messaging.connectors.kafka/native-image.properties new file mode 100644 index 00000000000..6206d06860b --- /dev/null +++ b/messaging/kafka/src/main/resources/META-INF/native-image/io.helidon.messaging.connectors.kafka/native-image.properties @@ -0,0 +1,25 @@ +# +# Copyright (c) 2020 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +Args=--initialize-at-build-time=org.slf4j \ + --initialize-at-build-time=org.xerial.snappy.SnappyInputStream \ + --initialize-at-build-time=org.xerial.snappy.SnappyOutputStream \ + --initialize-at-build-time=com.github.luben.zstd.ZstdInputStream \ + --initialize-at-build-time=com.github.luben.zstd.ZstdOutputStream \ + --initialize-at-build-time=com.github.luben.zstd.util.Native \ + --initialize-at-build-time=org.apache.kafka \ + -H:+JNI \ + -H:IncludeResources=.*\\.so$|.*\\.h$|.*\\.dll$|.*\\.dylib$