-
Notifications
You must be signed in to change notification settings - Fork 155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kafka s3 sink connect complains Error retrieving Avro value schema version for id 12345 #284
Comments
Is it a hard requirement that only these three patterns are supported by connector? TopicNameStrategy | RecordNameStrategy | TopicRecordNameStrategy The schema subject name must follow one of the patterns? |
Further looking into the schema registry, I've found that the schema ID is unique per schema name per schema version, so why kafka connector still throws that exception. I verified that schema can be found in schema registry. The only thing that could mess this up is the lookup strategy (topicNameStrategy), even the only valid schema exists, since the name does not match topic name, converter skips it. This does not make sense to me if this is the logic implemented in converter. |
Add exception stack trace here. |
Hi, I configured value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy in worker.properties and started confluent connector by connector_distributed.sh /plugin/worker.properties However, I kept seeing I also tried different variations as follows:
but nothing seems to be working. I'd greatly appreciate if anyone can point to me how to change the subject naming strategy (in confluent s3 sink connector)。 Regards |
I'm trying to set up a kafka s3 sink connector that will consume messages in avro format and dump to s3 compatible storage (minio) in parquet format.
This pipe line works for certain topics but fails for others. After some investigation, I've found that: if a topic has an corresponding schema entry of the same topic's name in schema registry. the kafka connector can successfully de-serialize the messages and convert to parket.
If a topic has no schema entry of the same name in schema registry (the schema is there, as offsetexplorer and nifi flow) can still de-serialize the message from such a topic. The connector fails with
I tried to dig more in hope to find a solution but got stuck.
finding schema based on schema ID, which might be related to io.confluent.kafka.serializers.subject.TopicNameStrategy In case of schema entries are published to schema registry over a long period of time and it's not been done consistently. some entries are follow topic names others are not. How do I deal with this situation
and
I did try to take a look at https://schema.registry.url/schemas/12345 where 12345 is the schema ID shown in exception message. I did see a default schema version declared here:
finding schema version based on schema ID which might be related to use.latest.version=true and auto.register.schemas=false but after I set it it does not help. connector complains that these options are not recognized. Tried value.converter.use.latest.version=true as well, no difference.
How do we force it to use latest version of schema regardless of default version?
Your help is greatly appreciated.
The text was updated successfully, but these errors were encountered: