Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to use zstd compression with parquet #570

Closed
BDeus opened this issue Oct 18, 2022 · 3 comments
Closed

Unable to use zstd compression with parquet #570

BDeus opened this issue Oct 18, 2022 · 3 comments

Comments

@BDeus
Copy link

BDeus commented Oct 18, 2022

Related to #316 and #394

I tried to use parquet with zstd codec with the latest version of kafka confluent oss version (7.2.1) + kafka-connect-s3 (10.2.2)
The connect still need zstd native library that is not provide with confluent oss version

Does anybody as success to use it and adding native hadoop libraries to their Kafka Connect Cluster ?
If so, it could be nice to add documentation on this repository to explain how to proceed.

2022-10-18T18:12:44.684  ERROR [org.apache.kafka.connect.runtime.WorkerSinkTask] - WorkerSinkTask{id=#####} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: native zStandard library not available: this version of libhadoop was built without zstd support.
java.lang.RuntimeException: native zStandard library not available: this version of libhadoop was built without zstd support.
        at org.apache.hadoop.io.compress.ZStandardCodec.checkNativeCodeLoaded(ZStandardCodec.java:65)
        at org.apache.hadoop.io.compress.ZStandardCodec.getCompressorType(ZStandardCodec.java:153)
        at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:150)
        at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:168)
        at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.<init>(CodecFactory.java:144)
        at org.apache.parquet.hadoop.CodecFactory.createCompressor(CodecFactory.java:206)
        at org.apache.parquet.hadoop.CodecFactory.getCompressor(CodecFactory.java:189)
        at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:287)
        at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:564)
        at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:102)
        at io.confluent.connect.s3.format.S3RetriableRecordWriter.write(S3RetriableRecordWriter.java:46)
        at io.confluent.connect.s3.format.KeyValueHeaderRecordWriterProvider$1.write(KeyValueHeaderRecordWriterProvider.java:107)
        at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:562)
        at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:311)
        at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:254)
        at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:205)
        at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:234)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
@BDeus BDeus changed the title Unable to use zstd compression Unable to use zstd compression with parquet Oct 18, 2022
@gsanon
Copy link

gsanon commented Jan 16, 2023

Hi,

If you are using the confluentinc/cp-kafka-connect image to run your cluster you can use this workaround :

  • Compile your hadoop lib with zstd support
  • Create a docker image based on confluentinc/cp-kafka-connect
  • Copy your self compiled hadoop lib in the image
COPY libhadoop.so.1.0.0 /usr/lib64/libhadoop.so.1.0.0
COPY libhadoop.so.1.0.0 /usr/lib64/libhadoop.so
  • Update the kafka-run-class file (located at /usr/bin/kafka-run-class in the based image) by adding -Djava.library.path=/usr/lib64 the launch commands (after the # Launch mode comment). Without that the hadoop lib won't be considered

Then you should be able to use zstd compression

@github-louis-fruleux
Copy link

Thanks for the pointers @gsanon.
I was able to make it work using official hadoop release v3.3.5, using docker base image confluentinc/cp-kafka-connect:7.3.1.

Only COPY libhadoop.so.1.0.0 /usr/lib64/libhadoop.so is needed because the way jni works it only needs to look after "libhadoop.so", cf this line of code.

Also, a way to pass the library path can be done via
ENV KAFKA_OPTS="${KAFKA_OPTS} -Djava.library.path=/usr/lib64/"
That's working because the line actually executing the kafka connect is exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@" letting the developer pass any arguments wanted to KAFKA_OPTS env variable

It would be simpler to update the parquet version. But I hope it will help future developers looking for the same answer.

@BDeus
Copy link
Author

BDeus commented Jan 29, 2024

i close the issue
parquet update will make it easier but COPY libhadoop.so.1.0.0 does the trick

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants