Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aiokafka some time fails, possibly because no support of ZSTD compression #795

Closed
katyukha opened this issue Nov 17, 2021 · 0 comments · Fixed by #798
Closed

aiokafka some time fails, possibly because no support of ZSTD compression #795

katyukha opened this issue Nov 17, 2021 · 0 comments · Fixed by #798

Comments

@katyukha
Copy link

Periodically kafka consumer fails with traceback. 80% of time everything works fine, but sometimes kafka consumer fails with UnboundLocalError. Possibly because ZSTD compression is not available.

May be related to #501 and #708

See traceback below:

Traceback (most recent call last):
  File "/home/user/projects/my-kafka-consumer/venv/bin/my-kafka-consumer", line 11, in <module>
    load_entry_point('my-kafka-consumer==0.0.1', 'console_scripts', 'my-kafka-consumer')()
  File "/home/user/projects/my-kafka-consumer/venv/lib/python3.9/site-packages/typer/main.py", line 214, in __call__
    return get_command(self)(*args, **kwargs)
  File "/home/user/projects/my-kafka-consumer/venv/lib/python3.9/site-packages/click/core.py", line 1128, in __call__
    return self.main(*args, **kwargs)
  File "/home/user/projects/my-kafka-consumer/venv/lib/python3.9/site-packages/click/core.py", line 1053, in main
    rv = self.invoke(ctx)
  File "/home/user/projects/my-kafka-consumer/venv/lib/python3.9/site-packages/click/core.py", line 1395, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/user/projects/my-kafka-consumer/venv/lib/python3.9/site-packages/click/core.py", line 754, in invoke
    return __callback(*args, **kwargs)
  File "/home/user/projects/my-kafka-consumer/venv/lib/python3.9/site-packages/typer/main.py", line 500, in wrapper
    return callback(**use_params)  # type: ignore
  File "/home/user/projects/my-kafka-consumer/venv/lib/python3.9/site-packages/my_kafka_consumer/app.py", line 87, in main
    consumer.start()
  File "/home/user/projects/my-kafka-consumer/venv/lib/python3.9/site-packages/my_kafka_consumer/my_consumer.py", line 151, in start
    asyncio.run(self.consume())
  File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/home/user/projects/my-kafka-consumer/venv/lib/python3.9/site-packages/my_kafka_consumer/my_consumer.py", line 135, in consume
    async for msg in self.consumer:
  File "/home/user/projects/my-kafka-consumer/venv/lib/python3.9/site-packages/aiokafka/consumer/consumer.py", line 1248, in __anext__
    return (await self.getone())
  File "/home/user/projects/my-kafka-consumer/venv/lib/python3.9/site-packages/aiokafka/consumer/consumer.py", line 1136, in getone
    msg = await self._fetcher.next_record(partitions)
  File "/home/user/projects/my-kafka-consumer/venv/lib/python3.9/site-packages/aiokafka/consumer/fetcher.py", line 1030, in next_record
    message = res_or_error.getone()
  File "/home/user/projects/my-kafka-consumer/venv/lib/python3.9/site-packages/aiokafka/consumer/fetcher.py", line 117, in getone
    msg = next(self._partition_records)
  File "/home/user/projects/my-kafka-consumer/venv/lib/python3.9/site-packages/aiokafka/consumer/fetcher.py", line 198, in __next__
    return next(self._records_iterator)
  File "/home/user/projects/my-kafka-consumer/venv/lib/python3.9/site-packages/aiokafka/consumer/fetcher.py", line 241, in _unpack_records
    for record in next_batch:
  File "aiokafka/record/_crecords/default_records.pyx", line 354, in aiokafka.record._crecords.default_records.DefaultRecordBatch.__iter__
  File "aiokafka/record/_crecords/default_records.pyx", line 227, in aiokafka.record._crecords.default_records.DefaultRecordBatch._maybe_uncompress
UnboundLocalError: local variable 'uncompressed' referenced before assignment

I think, that code have to be modfied to show why this happens, instead of failing on UnboundLocalError

ods added a commit to ods/aiokafka that referenced this issue Nov 20, 2021
ods added a commit to ods/aiokafka that referenced this issue Nov 20, 2021
@ods ods closed this as completed in #798 Nov 22, 2021
ods added a commit that referenced this issue Nov 22, 2021
* Fix requirements conflict

* Fix handling unsupported compression codec (#795)

* Add changelog entry
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant