Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka s3 sink connect complains Error retrieving Avro value schema version for id 12345 #284

Open
jma562 opened this issue Nov 14, 2022 · 4 comments

Comments

@jma562
Copy link

jma562 commented Nov 14, 2022

I'm trying to set up a kafka s3 sink connector that will consume messages in avro format and dump to s3 compatible storage (minio) in parquet format.

This pipe line works for certain topics but fails for others. After some investigation, I've found that: if a topic has an corresponding schema entry of the same topic's name in schema registry. the kafka connector can successfully de-serialize the messages and convert to parket.

If a topic has no schema entry of the same name in schema registry (the schema is there, as offsetexplorer and nifi flow) can still de-serialize the message from such a topic. The connector fails with

Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic <topic name> to Avro:
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 13      at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema version for id 12345

I tried to dig more in hope to find a solution but got stuck.

finding schema based on schema ID, which might be related to io.confluent.kafka.serializers.subject.TopicNameStrategy In case of schema entries are published to schema registry over a long period of time and it's not been done consistently. some entries are follow topic names others are not. How do I deal with this situation
and

I did try to take a look at https://schema.registry.url/schemas/12345 where 12345 is the schema ID shown in exception message. I did see a default schema version declared here:

{\"type\":\"record\",\"name\":\"schema_name\",\"namespace\":\"name_space\",\"fields\":
... ... [{"name":"schemaVersion","type":"string","default":"1.47"},

finding schema version based on schema ID which might be related to use.latest.version=true and auto.register.schemas=false but after I set it it does not help. connector complains that these options are not recognized. Tried value.converter.use.latest.version=true as well, no difference.

How do we force it to use latest version of schema regardless of default version?

Your help is greatly appreciated.

@jma562
Copy link
Author

jma562 commented Nov 14, 2022

Is it a hard requirement that only these three patterns are supported by connector?

TopicNameStrategy | RecordNameStrategy | TopicRecordNameStrategy

The schema subject name must follow one of the patterns?
Why do offset explorer/nifi can consume and deserialize messages from these topics but not kafka connector?

@jma562
Copy link
Author

jma562 commented Nov 15, 2022

Further looking into the schema registry, I've found that the schema ID is unique per schema name per schema version, so why kafka connector still throws that exception. I verified that schema can be found in schema registry. The only thing that could mess this up is the lookup strategy (topicNameStrategy), even the only valid schema exists, since the name does not match topic name, converter skips it. This does not make sense to me if this is the logic implemented in converter.

@jma562
Copy link
Author

jma562 commented Nov 16, 2022

Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema version for id 11025
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found. io.confluent.rest.exceptions.RestNotFoundException: Subject not found.
io.confluent.rest.exceptions.RestNotFoundException: Subject not found.
        at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.subjectNotFoundException(Errors.java:51)
        at io.confluent.kafka.schemaregistry.rest.resources.SubjectsResource.lookUpSchemaUnderSubject(SubjectsResource.java:93)

Add exception stack trace here.
SchemaId is unique, why the subject matters here. I could imagine that it's for performance enhancement but at least
it should fall back to look up by global schemaID only and the local cache built based on schemaId can help improve performance (which is already done by CachedSRClient).

@jma562
Copy link
Author

jma562 commented Nov 21, 2022

Hi,

I configured value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy in worker.properties and started confluent connector by connector_distributed.sh /plugin/worker.properties

However, I kept seeing
[2022-11-21 16:40:23,690] WARN The configuration 'value.subject.name.strategy' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:355)

I also tried different variations as follows:

grep -i "name.strategy" /plugins/worker.properties 
value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
value.converter.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
consumer.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
consumer.value.converter.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy

but nothing seems to be working.

I'd greatly appreciate if anyone can point to me how to change the subject naming strategy (in confluent s3 sink connector)。

Regards

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant