Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Parquet Format version to address NoClassDefFoundError #273

Merged
merged 1 commit into from
Sep 27, 2022

Conversation

sidd1809
Copy link
Contributor

@sidd1809 sidd1809 commented Sep 27, 2022

Problem

We are observing confluentinc/kafka-connect-storage-cloud#553 after upgrading parquet version to 1.12.3 in PR #259

Solution

  • Change parquet version to 1.11.2 which is the next patch version with CVEs in 1.11.1 addressed
  • Have tested locally for S3 sink with snapshot, and was able to write to S3 with parquet format

Reproduce NoClassDef:

➜  connect-aws-s3-sink git:(master) kafka-avro-console-producer --broker-list localhost:9092 --property schema.registry.url=http://localhost:8081 --topic s3_topic --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"u_name","type":"string"}, {"name":"u_quantity", "type": "int"}]}' << EOF
{"u_name": "scissors3", "u_quantity": 11}
{"u_name": "fudge", "u_quantity": 12}
EOF
➜  connect-aws-s3-sink git:(master) statusConnector s3-sink
{
  "name": "s3-sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "192.168.1.6:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "192.168.1.6:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:750)\nCaused by: java.lang.NoClassDefFoundError: org/apache/hadoop/mapreduce/lib/output/FileOutputFormat\n\tat java.lang.ClassLoader.defineClass1(Native Method)\n\tat java.lang.ClassLoader.defineClass(ClassLoader.java:756)\n\tat java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)\n\tat java.net.URLClassLoader.defineClass(URLClassLoader.java:473)\n\tat java.net.URLClassLoader.access$100(URLClassLoader.java:74)\n\tat java.net.URLClassLoader$1.run(URLClassLoader.java:369)\n\tat java.net.URLClassLoader$1.run(URLClassLoader.java:363)\n\tat java.security.AccessController.doPrivileged(Native Method)\n\tat java.net.URLClassLoader.findClass(URLClassLoader.java:362)\n\tat org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:95)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:351)\n\tat org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:285)\n\tat org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:675)\n\tat io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:102)\n\tat io.confluent.connect.s3.format.S3RetriableRecordWriter.write(S3RetriableRecordWriter.java:46)\n\tat io.confluent.connect.s3.format.KeyValueHeaderRecordWriterProvider$1.write(KeyValueHeaderRecordWriterProvider.java:107)\n\tat io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:562)\n\tat io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:311)\n\tat io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:254)\n\tat io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:205)\n\tat io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:234)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:584)\n\t... 10 more\nCaused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapreduce.lib.output.FileOutputFormat\n\tat java.net.URLClassLoader.findClass(URLClassLoader.java:387)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:418)\n\tat org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:103)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:351)\n\t... 32 more\n"
    }
  ],
  "type": "sink"
}

Fixed NoClassDef, and files pushed to S3:

➜  kafka-connect-s3 git:(10.0.x) ✗ kafka-avro-console-producer --broker-list localhost:9092 --property schema.registry.url=http://localhost:8081 --topic s3_topic --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"u_name","type":"string"}, {"name":"u_quantity", "type": "int"}]}' << EOF
{"u_name": "scissors3", "u_quantity": 11}
{"u_name": "fudge", "u_quantity": 12}
EOF
➜  kafka-connect-s3 git:(10.0.x) ✗ statusConnector s3-sink
{
  "name": "s3-sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "192.168.1.6:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "192.168.1.6:8083"
    }
  ],
  "type": "sink"
}

➜  kafka-connect-s3 git:(10.0.x) ✗ mvn dependency:tree | grep parquet
[INFO] +- org.apache.parquet:parquet-tools:jar:1.11.1:test
[INFO] |  +- org.apache.parquet:parquet-column:jar:1.11.2:compile
[INFO] |  |  +- org.apache.parquet:parquet-common:jar:1.11.2:compile
[INFO] |  |  \- org.apache.parquet:parquet-encoding:jar:1.11.2:compile
[INFO] |  \- org.apache.parquet:parquet-avro:jar:1.11.2:compile
[INFO] |     +- org.apache.parquet:parquet-hadoop:jar:1.11.2:compile
[INFO] |     \- org.apache.parquet:parquet-format-structures:jar:1.11.2:compile
Does this solution apply anywhere else?
  • yes -> bump parent in S3 sink
  • no
If yes, where?

Test Strategy

Testing done:
  • Unit tests
  • Integration tests
  • System tests
  • Manual tests

Release Plan

Release as a patch version

@sidd1809 sidd1809 requested a review from a team as a code owner September 27, 2022 14:07
@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

Copy link
Member

@sudeshwasnik sudeshwasnik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! lgtm!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants