-
Notifications
You must be signed in to change notification settings - Fork 72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Error serializing value data when using type array using Avro schema #266
Comments
Hey @ricardpons, Can you give me a snippet of your script/schema? Please remove sensitive information before you post. |
Hello @mostafa here is a fragment of my schemma. {
"type": "record",
"name": "EmailMessageEventRestrictedInValue",
"namespace": "internalservices.itoperation.emailmessage",
"fields": [
{
"name": "payload",
"type": {
"type": "record",
"name": "EmailMessage",
"fields": [
{
"name": "id",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
},
{
"name": "smtpConfigName",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
},
{
"name": "emailAddressFrom",
"type": {
"type": "record",
"name": "EmailAddressBasicInfo",
"fields": [
{
"name": "emailAddress",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
},
{
"name": "alias",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
}
]
}
},
{
"name": "emailAddressTo",
"type": {
"type": "array",
"items": "EmailAddressBasicInfo"
}
},
{
"name": "emailAddressCc",
"type": [
"null",
{
"type": "array",
"items": "EmailAddressBasicInfo"
}
],
"default": null
}, The rest of fields is record type. |
Your issue might be related to this #220 (comment). As far as I can tell, you're trying to use Union types of Avro. And they should be used in a specific way that I explained in the aforementioned comment. |
Hello @mostafa, I'm a colleague of @ricardpons. From the original schema that he posted, most of the problem was solved, but... One snippet of the schema is the following: {
"name": "emailAddressReplyTo",
"type": [
"null",
"EmailAddressBasicInfo"
],
"default": null
}, When I send in the message {"emailAddressReplyTo": null} it works fine. But when I try to send in the message this:
I got the following error message: ERRO[0003] GoError: Failed to encode data, OriginalError: cannot decode textual record "internalservices.itoperation.emailmessage.EmailMessageEventRestrictedInValue": cannot decode textual record "internalservices.itoperation.emailmessage.EmailMessage": cannot decode textual union: cannot decode textual map: cannot determine codec: "EmailAddressBasicInfo" for key: "emailAddressReplyTo" for key: "payload" So I decided to look for the reference in the Schema Registry of "EmailAddressBasicInfo", found it and substituted it with: {
"name": "emailAddressReplyTo",
"type": [
"null",
{
"type": "record",
"name": "EmailAddressBasicInfo",
"fields": [
{
"name": "emailAddress",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
},
{
"name": "alias",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
}
]
}
],
"default": null
}, After this, I tried to type my message but nothing worked:
I got this error message: ERRO[0003] GoError: Failed to encode data, OriginalError: cannot decode textual record "internalservices.itoperation.emailmessage.EmailMessageEventRestrictedInValue": cannot decode textual record "internalservices.itoperation.emailmessage.EmailMessage": cannot decode textual union: cannot decode textual map: cannot determine codec: "alias" for key: "emailAddressReplyTo" for key: "payload"
I got this error message: ERRO[0003] GoError: Failed to encode data, OriginalError: cannot decode textual record "internalservices.itoperation.emailmessage.EmailMessageEventRestrictedInValue": cannot decode textual record "internalservices.itoperation.emailmessage.EmailMessage": cannot decode textual union: cannot decode textual map: cannot determine codec: "record" for key: "emailAddressReplyTo" for key: "payload" If you have any suggestion about how can I type the message to be correctly sent, it'd be great. |
Hey @irenedmg, xk6-kafka uses srclient for interactions with the Schema Registry and it internally uses goavro for dealing with Avro schema. I recommend having a look at this nested example in the goavro repo, and possibly other examples in that repo to see if you can find the solution. Otherwise we need to debug the code to see what goes wrong, which is honestly not an easy task, since the extension is built into k6. |
Hi @mostafa Thanks in advance cc: @irenedmg @ricardpons |
Can you give me a complete working schema and a working JSON object that I can test with? The snippets you (and your colleagues) provided are malformed. Please remove sensitive info before posting. |
@mostafa here you have the full schema which works fine when we send the message like this {
"type": "record",
"name": "EmailMessageEventRestrictedInValue",
"namespace": "internalservices.itoperation.emailmessage",
"fields": [
{
"name": "payload",
"type": {
"type": "record",
"name": "EmailMessage",
"fields": [
{
"name": "id",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
},
{
"name": "smtpConfigName",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
},
{
"name": "emailAddressFrom",
"type": {
"type": "record",
"name": "EmailAddressBasicInfo",
"fields": [
{
"name": "emailAddress",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
},
{
"name": "alias",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
}
]
}
},
{
"name": "emailAddressTo",
"type": {
"type": "array",
"items": "EmailAddressBasicInfo"
}
},
{
"name": "emailAddressCc",
"type": [
"null",
{
"type": "array",
"items": "EmailAddressBasicInfo"
}
],
"default": null
},
{
"name": "emailAddressBcc",
"type": [
"null",
{
"type": "array",
"items": "EmailAddressBasicInfo"
}
],
"default": null
},
{
"name": "subject",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
},
{
"name": "body",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
},
{
"name": "isHtmlBody",
"type": [
"null",
"boolean"
],
"default": null
},
{
"name": "emailAddressReplyTo",
"type": [
"null",
"EmailAddressBasicInfo"
],
"default": null
},
{
"name": "attachments",
"type": [
"null",
{
"type": "array",
"items": {
"type": "record",
"name": "EmailMessageAttachment",
"fields": [
{
"name": "name",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
},
{
"name": "attachment",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
}
]
}
}
],
"default": null
},
{
"name": "priority",
"type": [
"null",
"int"
],
"default": null
}
]
}
}
]
} The problem comes when we try to use the emailAddressReplyTo != null, as @irenedmg showed above Thanks in advance! |
@ferrange Can you also provide a complete JSON object that you think should match this schema and it works in your system (producers, consumers) now? I'll test the variations myself. |
Hi @mostafa sure, here is an example of the message we produce working, which has commented the fragment I think it should work but it doesn't. You will see the same property "emailAddressReplyTo" as null, which works fine:
|
This is not valid JSON object. |
Hi @mostafa, this is a code snippet directly copies from out k6 script that works with the schema @ferrange provided: Scriptimport * as kafka from "k6/x/kafka"; // import the module object
// Authentication and connection to Kafka and Schema Registry
const brokers = ['KAFKA_URL:443'];
const topic= "topic";
// Autenticacion
const jks = kafka.LoadJKS({
path: "./c_segac_cacerts/JKS/cacerts",
password: "changeit"
})
const tlsConfig = {
enableTls: true,
insecureSkipTlsVerify: true,
minVersion: kafka.TLS_1_2,
};
const saslConfig = {
username: "user",
password: "password",
algorithm: kafka.SASL_PLAIN
};
const sslConfig = {
enabled: true,
certificate: './c_segac_cacerts/CRT/cacerts'
};
// create Readers and Writers for all listed topics
let writer = new kafka.Writer({
brokers: brokers,
topic: topic,
sasl: saslConfig,
tls: tlsConfig,
});
let reader = new kafka.Reader({
brokers: brokers,
topic: topic,
sasl: saslConfig,
tls: tlsConfig,
});
// Establish Kafka connection
const connection = new kafka.Connection({
address: brokers[0],
sasl: saslConfig,
ssl: sslConfig,
tls: tlsConfig
});
const schemaRegistry = new kafka.SchemaRegistry({
url: "https://schemaregistry.com", // pre
tls: tlsConfig,
basicAuth: {
username: "kafka_notis",
password: "...",
}
});
// Get key schema
const keySubjectName = schemaRegistry.getSubjectName({
topic: topic,
element: kafka.KEY,
subjectNameStrategy: kafka.TOPIC_NAME_STRATEGY,
});
const keySchemaObject = schemaRegistry.getSchema({
subject: keySubjectName,
schemaType: kafka.SCHEMA_TYPE_AVRO,
});
// Get value schema
const valueSubjectName = schemaRegistry.getSubjectName({
topic: topic,
element: kafka.VALUE,
subjectNameStrategy: kafka.VALUE_NAME_STRATEGY,
});
const valueSchemaObject = schemaRegistry.getSchema({
subject: valueSubjectName,
schemaType: kafka.SCHEMA_TYPE_AVRO,
});
let keyData = { "id": "123456"};
let valueData = {
"payload": {
"id": { string: "1234567890" },
"smtpConfigName": { string: "Config 1" },
"emailAddressFrom": {
"emailAddress": { string: "nameapp@mail.test.com" },
"alias": { string: "nameapp" },
},
"emailAddressTo": [
{
"emailAddress": { string: "dest1@test.com" },
"alias": { string: "dest 1" },
}
],
"emailAddressCc": {
array: [
{
"emailAddress": { string: "dest2@test.com" },
"alias": { string: "dest 2" },
}
]
},
"emailAddressBcc": {
array: [
{
"emailAddress": { string: "dest3@test.com" },
"alias": { string: "dest 3" },
}
]
},
"subject": { string: "test info" },
"body": { string: "test info body" },
"isHtmlBody": { boolean: true },
"emailAddressReplyTo": null,
"attachments": {
array: [
{
"name": { string: "attachmentfile.png" },
"attachment": { string: "http://www.test.com" }
}
]
},
"priority": { int: 0 }
}
}
let message = [
{
// The data type of the key is AVRO
key: schemaRegistry.serialize({
data: keyData,
schema: schemas.keySchemaObject,
schemaType: kafka.SCHEMA_TYPE_AVRO,
}),
// The data type of the value is AVRO
value: schemaRegistry.serialize({
data: valueData,
schema: schemas.valueSchemaObject,
schemaType: kafka.SCHEMA_TYPE_AVRO,
}),
},
];
// Produce messages to Kafka
writer.produce({ messages: message });
// Consume messages from Kafka
let messages = reader.consume({
limit: 100,
}); |
@ricardpons @ferrange @irenedmg Found the issue! You have to use the type with its namespace to provide meaningful data to the Avro codec. So, instead of using the bare Schema{
"type": "record",
"name": "EmailMessageEventRestrictedInValue",
"namespace": "internalservices.itoperation.emailmessage",
"fields": [
{
"name": "payload",
"type": {
"type": "record",
"name": "EmailMessage",
"fields": [
{
"name": "id",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
},
{
"name": "smtpConfigName",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
},
{
"name": "emailAddressFrom",
"type": {
"type": "record",
"name": "EmailAddressBasicInfo",
"fields": [
{
"name": "emailAddress",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
},
{
"name": "alias",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
}
]
}
},
{
"name": "emailAddressTo",
"type": {
"type": "array",
"items": "EmailAddressBasicInfo"
}
},
{
"name": "emailAddressCc",
"type": [
"null",
{
"type": "array",
"items": "EmailAddressBasicInfo"
}
],
"default": null
},
{
"name": "emailAddressBcc",
"type": [
"null",
{
"type": "array",
"items": "EmailAddressBasicInfo"
}
],
"default": null
},
{
"name": "subject",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
},
{
"name": "body",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
},
{
"name": "isHtmlBody",
"type": [
"null",
"boolean"
],
"default": null
},
{
"name": "emailAddressReplyTo",
"type": [
"null",
"EmailAddressBasicInfo"
],
"default": null
},
{
"name": "attachments",
"type": [
"null",
{
"type": "array",
"items": {
"type": "record",
"name": "EmailMessageAttachment",
"fields": [
{
"name": "name",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
},
{
"name": "attachment",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
}
]
}
}
],
"default": null
},
{
"name": "priority",
"type": [
"null",
"int"
],
"default": null
}
]
}
}
]
} Data{
"payload": {
"id": {
"string": "1234567890"
},
"smtpConfigName": {
"string": "Config 1"
},
"emailAddressFrom": {
"emailAddress": {
"string": "nameapp@mail.test.com"
},
"alias": {
"string": "nameapp"
}
},
"emailAddressTo": [
{
"emailAddress": {
"string": "dest1@test.com"
},
"alias": {
"string": "dest 1"
}
}
],
"emailAddressCc": {
"array": [
{
"emailAddress": {
"string": "dest2@test.com"
},
"alias": {
"string": "dest 2"
}
}
]
},
"emailAddressBcc": {
"array": [
{
"emailAddress": {
"string": "dest3@test.com"
},
"alias": {
"string": "dest 3"
}
}
]
},
"subject": {
"string": "test info"
},
"body": {
"string": "test info body"
},
"isHtmlBody": {
"boolean": true
},
"emailAddressReplyTo": {
"internalservices.itoperation.emailmessage.EmailAddressBasicInfo": {
"emailAddress": {
"string": "app-nameapp@test.com"
},
"alias": {
"string": "App name App"
}
}
},
"attachments": {
"array": [
{
"name": {
"string": "attachmentfile.png"
},
"attachment": {
"string": "http://www.test.com"
}
}
]
},
"priority": {
"int": 0
}
}
} I created this repo to test your schema and data. |
Hello @mostafa it worked! Thanks for your help :) |
Good to hear! 🚀 I'll close this issue. |
Hello,
I'm running some Kafka load test using xk6-kafka and i'm getting some errors when serializing objects. In my Avro schema there is some fields that are array type and seems that when calling
schemaRegistry.serialize()
i'm getting the following error:k6-1 | time="2024-01-09T06:55:21Z" level=error msg="panic: runtime error: invalid memory address or nil pointer dereference\ngoroutine 59 [running]:\nruntime/debug.Stack()\n\truntime/debug/stack.go:24 +0x5e\ngo.k6.io/k6/js/common.RunWithPanicCatching.func1()\n\tgo.k6.io/k6@v0.48.0/js/common/util.go:82 +0x1df\npanic({0x17c3520?, 0x2d13300?})\n\truntime/panic.go:914 +0x21f\ngithub.com/dop251/goja.(*Runtime).runWrapped.func1()\n\tgithub.com/dop251/goja@v0.0.0-20231027120936-b396bb4c349d/runtime.go:2442 +0x171\npanic({0x17c3520?, 0x2d13300?})\n\truntime/panic.go:914 +0x21f\ngithub.com/dop251/goja.(*vm).handleThrow(0xc002b547e0, {0x17c3520, 0x2d13300})\n\tgithub.com/dop251/goja@v0.0.0-20231027120936-b396bb4c349d/vm.go:788 +0x425\ngithub.com/dop251/goja.(*vm).try.func1()\n\tgithub.com/dop251/goja@v0.0.0-20231027120936-b396bb4c349d/vm.go:807 +0x3f\npanic({0x17c3520?, 0x2d13300?})\n\truntime/panic.go:914 +0x21f\ngithub.com/dop251/goja.(*vm).handleThrow(0xc002b547e0, {0x17c3520, 0x2d13300})\n\tgithub.com/dop251/goja@v0.0.0-20231027120936-b396bb4c349d/vm.go:788 +0x425\ngithub.com/dop251/goja.(*vm).runTryInner.func1()\n\tgithub.com/dop251/goja@v0.0.0-20231027120936
I tried with another Avro schema without that type array and it worked.
Is there a way to pass that data to the serialice method?
Thanks in advance.
The text was updated successfully, but these errors were encountered: