Skip to content

Commit

Permalink
[DOP-11752] - fix incremental_with_new_partition test
Browse files Browse the repository at this point in the history
  • Loading branch information
maxim-lixakov committed Jan 26, 2024
1 parent a20fb58 commit 1f2ea3e
Showing 1 changed file with 18 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,15 @@ def test_kafka_strategy_incremental_nothing_to_read(spark, processing, schema, n

assert not df.count()
hwm = store.get_hwm(name=hwm_name)
assert sum(value for value in hwm.value.values()) == 0
assert all(value == 0 for value in hwm.value.values())

# insert first span
processing.insert_pandas_df_into_topic(first_span, topic)

# .run() is not called - dataframe still empty - HWM not updated
assert not df.count()
hwm = store.get_hwm(name=hwm_name)
assert sum(value for value in hwm.value.values()) == 0
assert all(value == 0 for value in hwm.value.values())

# set hwm value to 50
with IncrementalStrategy():
Expand Down Expand Up @@ -239,6 +239,8 @@ def test_kafka_strategy_incremental_with_new_partition(
initial_partitions,
additional_partitions,
):
from pyspark.sql.functions import count as spark_count

topic = secrets.token_hex(6)
hwm_name = secrets.token_hex(5)
store = HWMStoreStackManager.get_current()
Expand Down Expand Up @@ -271,7 +273,13 @@ def test_kafka_strategy_incremental_with_new_partition(

processing.insert_pandas_df_into_topic(first_span, topic)
with IncrementalStrategy():
reader.run()
first_df = reader.run()

# it is crucial to save dataframe after reading as if number of partitions is altered before executing any subsequent operations, Spark fails to run them due to
# Caused by: java.lang.AssertionError: assertion failed: If startingOffsets contains specific offsets, you must specify all TopicPartitions.
# Use -1 for latest, -2 for earliest.
# Specified: Set(topic1, topic2) Assigned: Set(topic1, topic2, additional_topic3, additional_topic4)
first_df.cache()

hwm = store.get_hwm(name=hwm_name)
first_run_hwm_keys_num = len(hwm.value.keys())
Expand All @@ -280,13 +288,14 @@ def test_kafka_strategy_incremental_with_new_partition(
processing.insert_pandas_df_into_topic(second_span, topic)

with IncrementalStrategy():
reader.run()
second_df = reader.run()

hwm = store.get_hwm(name=hwm_name)
second_run_hwm_keys_num = len(hwm.value.keys())
second_run_hwm_keys_num = len(hwm.value)
assert first_run_hwm_keys_num + additional_partitions == second_run_hwm_keys_num

# check that number of messages in hwm is equal to size of sparkDF
total_messages = sum(value for value in hwm.value.values())
second_span_max = len(second_span) + len(first_span)
assert total_messages == second_span_max
# check that HWM distribution of messages in partitions matches the distribution in sparkDF
combined_df = second_df.union(first_df)
partition_counts_combined = combined_df.groupBy("partition").agg(spark_count("*").alias("count"))
partition_count_dict_combined = {row["partition"]: row["count"] for row in partition_counts_combined.collect()}
assert hwm.value == partition_count_dict_combined

0 comments on commit 1f2ea3e

Please sign in to comment.