Skip to content

Commit

Permalink
Poll Kafka Continuously (#132)
Browse files Browse the repository at this point in the history
* Poll kafka continuously
  • Loading branch information
zprobst authored Nov 7, 2023
1 parent 8f4ebe3 commit 51a48f9
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 12 deletions.
13 changes: 7 additions & 6 deletions nodestream/pipeline/extractors/streams/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,13 @@ def poll(self):
async def extract_records(self):
await self.connector.connect()
try:
results = await self.poll()
if len(results) == 0:
yield Flush
else:
for record in results:
yield self.record_format.parse(record)
while True:
results = await self.poll()
if len(results) == 0:
yield Flush
else:
for record in results:
yield self.record_format.parse(record)
except Exception:
self.logger.exception("failed extracting records")
finally:
Expand Down
19 changes: 13 additions & 6 deletions tests/unit/pipeline/extractors/streams/test_extractor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import json

import pytest
from hamcrest import assert_that, equal_to

Expand All @@ -16,10 +18,15 @@ def extractor(mocker):


@pytest.mark.asyncio
async def test_extract(extractor):
extractor.connector.poll.side_effect = [['{"key": "test-value"}'], ValueError]
async def test_extractor_polls_until_error(extractor):
input_batches = [['{"key": "test-value"}'] for _ in range(10)]
expected_results = [json.loads(r) for batch in input_batches for r in batch]
extractor.connector.poll.side_effect = [
*input_batches,
ValueError,
]
result = [record async for record in extractor.extract_records()]
assert_that(result, equal_to([{"key": "test-value"}]))
extractor.connector.poll.assert_called_once_with()
extractor.connector.connect.assert_called_once()
extractor.connector.disconnect.assert_called_once()
assert_that(result, equal_to(expected_results))
assert_that(extractor.connector.poll.call_count, equal_to(11))
extractor.connector.connect.assert_awaited_once()
extractor.connector.disconnect.assert_awaited_once()

0 comments on commit 51a48f9

Please sign in to comment.