Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Corner case: Aborted transaction yields KeyError in fetcher.py #781

Closed
pikulmar opened this issue Sep 9, 2021 · 1 comment · Fixed by #782
Closed

Corner case: Aborted transaction yields KeyError in fetcher.py #781

pikulmar opened this issue Sep 9, 2021 · 1 comment · Fixed by #782

Comments

@pikulmar
Copy link
Contributor

pikulmar commented Sep 9, 2021

Describe the bug
At least with recent versions if the bitnami/kafka:2 Kafka broker container image, aborted transactions can give rise to a situation in which the log for a given Kafka topic looks as follows:

$ /opt/bitnami/kafka/bin/kafka-dump-log.sh --files /bitnami/kafka/data/test-topic-0/00000000000000000000.log
Dumping /bitnami/kafka/data/test-topic-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: 3 producerEpoch: 1 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 0 CreateTime: 1631000770035 size: 78 magic: 2 compresscodec: NONE crc: 2811855458 isvalid: true

Here, the log consists of the transaction abort marker only. Therefore,

self._aborted_producers.add(producer_id)
is never reached and
self._aborted_producers.remove(next_batch.producer_id)
results in a KeyError when attempting to consume the aforementioned topic.

Expected behaviour
Consumer should proceed without error.

Environment (please complete the following information):

  • aiokafka version (python -c "import aiokafka; print(aiokafka.__version__)"): 0.7.2
  • kafka-python version (python -c "import kafka; print(kafka.__version__)"): 2.0.2
  • Kafka Broker version (kafka-topics.sh --version): 2.8.0 (Commit:ebb1d6e21cc92130)
  • Other information (Confluent Cloud version, etc.):

Reproducible example

import logging
import asyncio

from aiokafka import AIOKafkaConsumer 


async def _recv_all():
    consumer = AIOKafkaConsumer(
        bootstrap_servers=['172.30.0.3'],
        client_id=f'test123-test-consumer',
        auto_offset_reset="earliest",
        isolation_level="read_committed",
        enable_auto_commit=False
    )
    async with consumer:
        consumer.subscribe(['test-topic'])
        while True:
            for messages in (await consumer.getmany(
                    timeout_ms=20)).values():
                if messages:
                  print('Got message')


logging.basicConfig(level=logging.DEBUG)
loop = asyncio.new_event_loop()
result = loop.run_until_complete(_recv_all())
loop.run_until_complete(loop.shutdown_asyncgens())
return result
@pikulmar
Copy link
Contributor Author

pikulmar commented Sep 9, 2021

Presuming the bug report/analysis is accurate, similar issues could perhaps be caught automatically once #203 is in place.

@ods ods closed this as completed in #782 Sep 14, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant