Skip to content

Commit

Permalink
[DOP-11752] - update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
maxim-lixakov committed Jan 24, 2024
1 parent 4d2c7a1 commit 8507fa2
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 14 deletions.
File renamed without changes.
6 changes: 6 additions & 0 deletions tests/fixtures/processing/kafka.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import json
import os
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -158,6 +159,11 @@ def delete_topic(self, topics: list[str], timeout: float = DEFAULT_TIMEOUT):
# https://github.com/confluentinc/confluent-kafka-python/issues/813
admin.delete_topics(topics, request_timeout=timeout)

def insert_pandas_df_into_topic(self, df: pandas.DataFrame, topic: str):
for _, row in df.iterrows():
message = json.dumps(row.to_dict())
self.send_message(topic, message.encode("utf-8"))

def topic_exists(self, topic: str, timeout: float = DEFAULT_TIMEOUT) -> bool:
admin = self.get_admin_client()
topic_metadata = admin.list_topics(timeout=timeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def test_strategy_kafka_with_batch_strategy_error(strategy, spark):
table="topic",
hwm=DBReader.AutoDetectHWM(name=secrets.token_hex(5), expression="offset"),
)
with pytest.raises(TypeError, match=re.escape("unsupported operand type(s) for +: 'frozendict' and 'int'")):
# raises as at current version there is no way to distribute step size between kafka partitions
with pytest.raises(TypeError, match=re.escape("unsupported operand type(s) for +")):
for _ in batches:
reader.run()
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import secrets

import pytest
Expand Down Expand Up @@ -93,9 +92,7 @@ def test_kafka_strategy_incremental(
second_span_max = len(second_span) + first_span_max

# insert first span
for _, row in first_span.iterrows():
message = json.dumps(row.to_dict())
processing.send_message(topic, message.encode("utf-8"))
processing.insert_pandas_df_into_topic(first_span, topic)

# hwm is not in the store
assert store.get_hwm(hwm_name) is None
Expand All @@ -114,9 +111,7 @@ def test_kafka_strategy_incremental(
processing.assert_equal_df(df=deserialized_first_df, other_frame=first_span, order_by="id_int")

# insert second span
for _, row in second_span.iterrows():
message = json.dumps(row.to_dict())
processing.send_message(topic, message.encode("utf-8"))
processing.insert_pandas_df_into_topic(second_span, topic)

with IncrementalStrategy():
second_df = reader.run()
Expand Down Expand Up @@ -184,9 +179,7 @@ def test_kafka_strategy_incremental_nothing_to_read(spark, processing, schema, n
assert sum(value for value in hwm.value.values()) == 0

# insert first span
for _, row in first_span.iterrows():
message = json.dumps(row.to_dict())
processing.send_message(topic, message.encode("utf-8"))
processing.insert_pandas_df_into_topic(first_span, topic)

# .run() is not called - dataframe still empty - HWM not updated
assert not df.count()
Expand All @@ -212,9 +205,7 @@ def test_kafka_strategy_incremental_nothing_to_read(spark, processing, schema, n
assert sum(value for value in hwm.value.values()) == first_span_max

# insert second span
for _, row in second_span.iterrows():
message = json.dumps(row.to_dict())
processing.send_message(topic, message.encode("utf-8"))
processing.insert_pandas_df_into_topic(second_span, topic)

# .run() is not called - dataframe still empty - HWM not updated
assert not df.count()
Expand Down Expand Up @@ -262,3 +253,32 @@ def test_kafka_strategy_incremental_wrong_hwm(
source=topic,
hwm=DBReader.AutoDetectHWM(name=hwm_name, expression=hwm_column),
)


def test_kafka_incremental_read_no_new_data(spark, processing):
topic = secrets.token_hex(6)
hwm_name = secrets.token_hex(5)

kafka = Kafka(
addresses=[f"{processing.host}:{processing.port}"],
cluster="cluster",
spark=spark,
)

reader = DBReader(
connection=kafka,
source=topic,
hwm=DBReader.AutoDetectHWM(name=hwm_name, expression="offset"),
)

first_span = processing.create_pandas_df(min_id=0, max_id=100)
processing.insert_pandas_df_into_topic(first_span, topic)

with IncrementalStrategy():
reader.run()

# no new data was added to kafka
with IncrementalStrategy():
new_data_df = reader.run()

assert new_data_df.rdd.isEmpty()

0 comments on commit 8507fa2

Please sign in to comment.