Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Should Schema Registry client be exported to JS? #55

Closed
mostafa opened this issue May 9, 2022 · 3 comments · Fixed by #149
Closed

Should Schema Registry client be exported to JS? #55

mostafa opened this issue May 9, 2022 · 3 comments · Fixed by #149
Labels
✨ Feature Request New feature or request ❓ Question Further information is requested

Comments

@mostafa
Copy link
Owner

mostafa commented May 9, 2022

It could be in line with #50. In my opinion, explicit is better than implicit, and it makes things easier for the user, too. Yet, the script becomes bigger and needs more initialization, and I suspect it might be less performant, but it must be tested. These are related:

Update:
I am working on this issue in this branch along with #50, #53, and #54 and the results are not as promising as I initially thought. The performance degraded because most of the conversions (serdes) are being handled in JS, which slows things down. As shown below the number of messages produced dropped by %14. However, the introduction of the SchemaRegistry client seems to help create and retrieve schemas much easier, which comes at a cost. Maybe I should find a better way or develop the extension as is while exposing more configuration of the schema registry client and the serdes. 🤷

This is the result, as of now:

import { SchemaRegistry } from "k6/x/kafka";

const schemaRegistry = SchemaRegistry({
    url: "...",
    basicAuth: {...},
    tls: {...},
});

schemaRegistry.getSchema({ version: 0, subject: "..." });
schemaRegistry.createSchema({ version: 0, subject: "...", schema: "...", schemaType: "AVRO" });
schemaRegistry.getSubjectName({ element: "...", topic: "...", schema: "...", subjectNameStrategy: "..." });
// Automatically figures out data type, whether the schema is passed or not.
// You can pass a string, byte array, and JSON objects as data.
// If the schema is passed, either Avro, JSONSchema, or others, the data will be (de)serialized using that schema.
schemaRegistry.serialize({ schema: "...", data: "...", schemaType: "AVRO" });
schemaRegistry.deserialize({ schema: "...", data: "...", schemaType: "AVRO" });

The following is the result of running the test_json.js for a minute with 50 VUs.

$ ./k6 run -d 60s --vus 50 scripts/test_json.js

          /\      |‾‾| /‾‾/   /‾‾/
     /\  /  \     |  |/  /   /  /
    /  \/    \    |     (   /   ‾‾\
   /          \   |  |\  \ |  (‾)  |
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: scripts/test_json.js
     output: -

  scenarios: (100.00%) 1 scenario, 50 max VUs, 1m30s max duration (incl. graceful stop):
           * default: 50 looping VUs for 1m0s (gracefulStop: 30s)


running (1m02.3s), 00/50 VUs, 11835 complete and 0 interrupted iterations
default ✓ [======================================] 50 VUs  1m0s

     ✓ 10 messages are received
     ✓ Topic equals to xk6_kafka_json_topic
     ✓ Key is correct
     ✓ Value is correct
     ✓ Header equals {'mykey': 'myvalue'}
     ✓ Time is past
     ✓ Partition is zero
     ✓ Offset is gte zero
     ✓ High watermark is gte zero

     █ teardown

     checks.........................: 100.00% ✓ 106515       ✗ 0
     data_received..................: 0 B     0 B/s
     data_sent......................: 0 B     0 B/s
     iteration_duration.............: avg=257.04ms min=8.38ms med=35.86ms max=6.88s    p(90)=355.36ms p(95)=811.4ms
     iterations.....................: 11835   190.026501/s
     kafka.reader.dial.count........: 51      0.818872/s
     kafka.reader.dial.seconds......: avg=331.69µs min=0s     med=0s      max=214.01ms p(90)=0s       p(95)=0s

   ✗ kafka.reader.error.count.......: 1       0.016056/s
     kafka.reader.fetch_bytes.max...: 1000000 min=1000000    max=1000000
     kafka.reader.fetch_bytes.min...: 1       min=1          max=1
     kafka.reader.fetch_wait.max....: 200ms   min=200ms      max=200ms
     kafka.reader.fetch.bytes.......: 7.7 MB  124 kB/s
     kafka.reader.fetch.size........: 19737   316.903511/s
     kafka.reader.fetches.count.....: 101     1.621688/s
     kafka.reader.lag...............: 107492  min=-1         max=1004616
     kafka.reader.message.bytes.....: 23 MB   373 kB/s
     kafka.reader.message.count.....: 118398  1901.035712/s
     kafka.reader.offset............: 2360    min=9          max=3370
     kafka.reader.queue.capacity....: 1       min=1          max=1
     kafka.reader.queue.length......: 1       min=0          max=1
     kafka.reader.read.seconds......: avg=43.31ms  min=0s     med=0s      max=34.49s   p(90)=0s       p(95)=0s

     kafka.reader.rebalance.count...: 0       0/s
     kafka.reader.timeouts.count....: 50      0.802816/s
     kafka.reader.wait.seconds......: avg=668.76µs min=0s     med=0s      max=229.48ms p(90)=0s       p(95)=0s

     kafka.writer.acks.required.....: 0       min=0          max=0
     kafka.writer.async.............: 0.00%   ✓ 0            ✗ 1183500
     kafka.writer.attempts.max......: 0       min=0          max=0
     kafka.writer.batch.bytes.......: 259 MB  4.2 MB/s
     kafka.writer.batch.max.........: 1       min=1          max=1
     kafka.writer.batch.size........: 1183500 19002.65009/s
     kafka.writer.batch.timeout.....: 0s      min=0s         max=0s
   ✓ kafka.writer.error.count.......: 0       0/s
     kafka.writer.message.bytes.....: 518 MB  8.3 MB/s
     kafka.writer.message.count.....: 2367000 38005.300179/s
     kafka.writer.read.timeout......: 0s      min=0s         max=0s
     kafka.writer.retries.count.....: 0       0/s
     kafka.writer.wait.seconds......: avg=0s       min=0s     med=0s      max=0s       p(90)=0s       p(95)=0s

     kafka.writer.write.count.......: 2367000 38005.300179/s
     kafka.writer.write.seconds.....: avg=880.66µs min=4.16µs med=12.01µs max=3.43s    p(90)=31.9µs   p(95)=130.16µs
     kafka.writer.write.timeout.....: 0s      min=0s         max=0s
     vus............................: 6       min=6          max=50
     vus_max........................: 50      min=50         max=50

ERRO[0063] some thresholds have failed
@mostafa mostafa added ✨ Feature Request New feature or request ❓ Question Further information is requested labels May 9, 2022
@fdahunsibread
Copy link

I think since the extension is used by dev teams with a lot of different flows and specific needs, it's helpful to have this functionality. It could be an optional setting where users can decide if it is to be implicit or explicit

@mostafa mostafa moved this to Todo in xk6-kafka May 10, 2022
@mostafa
Copy link
Owner Author

mostafa commented May 21, 2022

I'll probably refactor it to include in the APIs.

@enamrik
Copy link
Contributor

enamrik commented Jun 15, 2022

Yes this would be very helpful. I've been in scenarios where I was forced to produce messages without the magic schemaID prefix but which schema lived in the schema registry. Weird, I know. If the schema registry client was exposed, I could use it directly to grab the schema I need, then given the possible refactoring mentioned in issue 50, I could use a plain AvroSerde vs a SchemaRegistrySerde to publish the message.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
✨ Feature Request New feature or request ❓ Question Further information is requested
Projects
Status: Release
Development

Successfully merging a pull request may close this issue.

3 participants