Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KSQL Avro format seems to be incompatible with the avro format expected by Kafka connect. #651

Closed
apurvam opened this issue Jan 23, 2018 · 4 comments

Comments

@apurvam
Copy link
Contributor

apurvam commented Jan 23, 2018

Using the JDBCSinkConnector to write to a mysql table from a kafka topic in avro generated by a KSQL CREATE TABLE AS SELECT statement results in the following error:

[2018-01-23 13:44:32,363] ERROR WorkerSinkTask{id=sink_mysql_groupbydevice-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.DataException: groupbydevice
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:96)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2018-01-23 13:44:32,365] ERROR WorkerSinkTask{id=sink_mysql_groupbydevice-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)

To reproduce do the following:

$ ./bin/confluent start
$ ./bin/ksql-cli remote http://localhost:8080
ksql> CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews_kafka_topic_json', value_format='JSON');
ksql> CREATE table user_counts WITH (kafka_topic='groupbydevice', value_format='AVRO') AS SELECT userid, count(*) from pageviews_original group by userid;

 Message
---------------------------
 Table created and running
---------------------------

$ cat > ~/jdbcsink.json
{
       "name": "sink_mysql_groupbydevice",
       "config": {
               "_comment": "Stream data to file from Kafka topic using JdbcSinkConnector",
               "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",


               "auto.create":"true",


               "_comment": " --- JDBC-specific configuration below here  --- ",
               "_comment": "JDBC connection URL. This will vary by RDBMS. Consult your manufacturer's handbook for more information",
               "connection.url": "jdbc:mysql://localhost:3306/demo2?user=root",

               "_comment": "Which topic(s) to write data from",
               "topics": "groupbydevice"
       }
}
$ ./bin/confluent load sink_mysql_groupbydevice -d ~/jdbcsink.json 

This tries to read from the groupbydevice topic in avro format, written by ksql, and write to a mysql table using kafka connects JDBC connector. It results in the deserialization exception above.

@apurvam
Copy link
Contributor Author

apurvam commented Jan 23, 2018

The issue is that the messages written by ksql have String keys and avro values. Kafka connect expects avro keys and avro values. This can be resolved by adding the following to the connect job configuration:

"key.converter": "org.apache.kafka.connect.storage.StringConverter",

@apurvam apurvam closed this as completed Jan 23, 2018
@apurvam
Copy link
Contributor Author

apurvam commented Jan 23, 2018

note that to write to mysql you need add the mysql jdbc driver to the connect class path explicitly. You can get it from here: https://dev.mysql.com/downloads/connector/j/5.1.html

@zamirarif
Copy link

@apurvam Thanks for the response. It helped me resolved my issue.

@yaobukeji123
Copy link

@apurvam Thanks for the response. It helped me resolved my issue.

hello.it is worked for me too,but no data insert into my target table in mysql, in the ksql cli, according to selecting from table,it has data. do you know how to solve ,thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants