Skip to content

Commit

Permalink
[DOP-11904] - add has_data, raise_if_no_data methods in DBReader
Browse files Browse the repository at this point in the history
  • Loading branch information
maxim-lixakov committed Jan 29, 2024
1 parent 02a255c commit fef6ba5
Show file tree
Hide file tree
Showing 13 changed files with 206 additions and 34 deletions.
1 change: 1 addition & 0 deletions docs/changelog/next_release/203.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ``has_data``, ``raise_error_if_no_data`` methods to ``DBReader`` class.
2 changes: 1 addition & 1 deletion docs/db/db_reader.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ DB Reader
DBReader.run

.. autoclass:: DBReader
:members: run
:members: run, has_data, raise_error_if_no_data
39 changes: 37 additions & 2 deletions onetl/db/db_reader/db_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,37 @@ def validate_options(cls, options, values):

return None

def has_data(self) -> bool:
"""Returns ``True`` if there is some data in the source, ``False`` otherwise."""
self._check_strategy()

if not self._connection_checked:
self._log_parameters()
self.connection.check()

window, limit = self._calculate_window_and_limit()
if limit == 0:
return False

df = self.connection.read_source_as_df(
source=str(self.source),
columns=self.columns,
hint=self.hint,
where=self.where,
df_schema=self.df_schema,
window=window,
limit=1,
**self._get_read_kwargs(),
)

return bool(df.take(1))

def raise_error_if_no_data(self) -> None:
"""Raises exception if source does not contain any data."""

if not self.has_data():
raise Exception(f"No data in the source: {self.source}") # noqa: WPS454

@slot
def run(self) -> DataFrame:
"""
Expand Down Expand Up @@ -541,6 +572,12 @@ def run(self) -> DataFrame:
self._connection_checked = True

window, limit = self._calculate_window_and_limit()

# update the HWM with the stop value
if self.hwm and window:
strategy: HWMStrategy = StrategyManager.get_current() # type: ignore[assignment]
strategy.update_hwm(window.stop_at.value)

df = self.connection.read_source_as_df(
source=str(self.source),
columns=self.columns,
Expand Down Expand Up @@ -673,7 +710,6 @@ def _calculate_window_and_limit(self) -> tuple[Window | None, int | None]: # no
if start_value is not None and stop_value is not None:
# we already have start and stop values, nothing to do
window = Window(self.hwm.expression, start_from=strategy.current, stop_at=strategy.next)
strategy.update_hwm(window.stop_at.value)
return window, None

if not isinstance(self.connection, ContainsGetMinMaxValues):
Expand Down Expand Up @@ -737,7 +773,6 @@ def _calculate_window_and_limit(self) -> tuple[Window | None, int | None]: # no
stop_at=Edge(value=max_value),
)

strategy.update_hwm(window.stop_at.value)
return window, None

def _log_parameters(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,11 @@ def test_clickhouse_reader_snapshot_nothing_to_read(spark, processing, prepare_s
first_span = processing.create_pandas_df(min_id=first_span_begin, max_id=first_span_end)
second_span = processing.create_pandas_df(min_id=second_span_begin, max_id=second_span_end)

with pytest.raises(Exception, match="No data in the source:"):
reader.raise_error_if_no_data()

assert not reader.has_data()

# no data yet, nothing to read
df = reader.run()
assert not df.count()
Expand All @@ -354,6 +359,9 @@ def test_clickhouse_reader_snapshot_nothing_to_read(spark, processing, prepare_s

# read data explicitly
df = reader.run()

# check that read df has data
assert reader.has_data()
processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int")

# insert second span
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@ def test_greenplum_reader_snapshot_nothing_to_read(spark, processing, prepare_sc
first_span = processing.create_pandas_df(min_id=first_span_begin, max_id=first_span_end)
second_span = processing.create_pandas_df(min_id=second_span_begin, max_id=second_span_end)

with pytest.raises(Exception, match="No data in the source:"):
reader.raise_error_if_no_data()

assert not reader.has_data()

# no data yet, nothing to read
df = reader.run()
assert not df.count()
Expand All @@ -296,6 +301,9 @@ def test_greenplum_reader_snapshot_nothing_to_read(spark, processing, prepare_sc

# read data explicitly
df = reader.run()

# check that read df has data
assert reader.has_data()
processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int")

# insert second span
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ def test_hive_reader_snapshot_nothing_to_read(spark, processing, prepare_schema_
first_span = processing.create_pandas_df(min_id=first_span_begin, max_id=first_span_end)
second_span = processing.create_pandas_df(min_id=second_span_begin, max_id=second_span_end)

with pytest.raises(Exception, match="No data in the source:"):
reader.raise_error_if_no_data()

assert not reader.has_data()

# no data yet, nothing to read
df = reader.run()
assert not df.count()
Expand All @@ -236,6 +241,9 @@ def test_hive_reader_snapshot_nothing_to_read(spark, processing, prepare_schema_

# read data explicitly
df = reader.run()

# check that read df has data
assert reader.has_data()
processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int")

# insert second span
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import secrets

import pytest
Expand Down Expand Up @@ -100,25 +99,8 @@ def kafka_schema_with_headers():
return schema # noqa: WPS331


@pytest.fixture(name="kafka_processing")
def create_kafka_data(spark):
from tests.fixtures.processing.kafka import KafkaProcessing

topic = secrets.token_hex(5)
proc = KafkaProcessing()
df = proc.create_spark_df(spark)
rows = [row.asDict() for row in df.collect()]

for row_to_send in rows:
proc.send_message(topic, json.dumps(row_to_send).encode("utf-8"))

yield topic, proc, df
# Release
proc.delete_topic([topic])


def test_kafka_reader(spark, kafka_processing, schema):
topic, processing, expected_df = kafka_processing
def test_kafka_reader(spark, processing, schema):
topic = secrets.token_hex(6)

kafka = Kafka(
spark=spark,
Expand All @@ -130,13 +112,18 @@ def test_kafka_reader(spark, kafka_processing, schema):
connection=kafka,
source=topic,
)

first_span = processing.create_pandas_df(min_id=0, max_id=100)
processing.insert_pandas_df_into_topic(first_span, topic)
df = reader.run()

processing.assert_equal_df(processing.json_deserialize(df, df_schema=schema), other_frame=expected_df)
processing.assert_equal_df(processing.json_deserialize(df, df_schema=schema), other_frame=first_span)

processing.delete_topic([topic])

def test_kafka_reader_columns_and_types_without_headers(spark, kafka_processing, kafka_schema):
topic, processing, _ = kafka_processing

def test_kafka_reader_columns_and_types_without_headers(spark, processing, kafka_schema):
topic = secrets.token_hex(6)

kafka = Kafka(
spark=spark,
Expand All @@ -149,16 +136,20 @@ def test_kafka_reader_columns_and_types_without_headers(spark, kafka_processing,
source=topic,
)

first_span = processing.create_pandas_df(min_id=0, max_id=100)
processing.insert_pandas_df_into_topic(first_span, topic)

df = reader.run()

assert df.schema == kafka_schema # headers aren't included in schema if includeHeaders=False
processing.delete_topic([topic])


def test_kafka_reader_columns_and_types_with_headers(spark, kafka_processing, kafka_schema_with_headers):
def test_kafka_reader_columns_and_types_with_headers(spark, processing, kafka_schema_with_headers):
if get_spark_version(spark).major < 3:
pytest.skip("Spark 3.x or later is required to write/read 'headers' from Kafka messages")

topic, processing, _ = kafka_processing
topic = secrets.token_hex(6)

kafka = Kafka(
spark=spark,
Expand All @@ -173,14 +164,17 @@ def test_kafka_reader_columns_and_types_with_headers(spark, kafka_processing, ka
options=Kafka.ReadOptions(includeHeaders=True),
)

first_span = processing.create_pandas_df(min_id=0, max_id=100)
processing.insert_pandas_df_into_topic(first_span, topic)

df = reader.run()

assert df.schema == kafka_schema_with_headers

processing.delete_topic([topic])

def test_kafka_reader_topic_does_not_exist(spark, kafka_processing):
_, processing, _ = kafka_processing

def test_kafka_reader_topic_does_not_exist(spark, processing):
kafka = Kafka(
spark=spark,
addresses=[f"{processing.host}:{processing.port}"],
Expand All @@ -197,11 +191,13 @@ def test_kafka_reader_topic_does_not_exist(spark, kafka_processing):


@pytest.mark.parametrize("group_id_option", ["group.id", "groupIdPrefix"])
def test_kafka_reader_with_group_id(group_id_option, spark, kafka_processing, schema):
def test_kafka_reader_with_group_id(group_id_option, spark, processing, schema):
if get_spark_version(spark).major < 3:
pytest.skip("Spark 3.x or later is required to pas group.id")
topic = secrets.token_hex(6)

topic, processing, expected_df = kafka_processing
first_span = processing.create_pandas_df(min_id=0, max_id=100)
processing.insert_pandas_df_into_topic(first_span, topic)

kafka = Kafka(
spark=spark,
Expand All @@ -215,9 +211,79 @@ def test_kafka_reader_with_group_id(group_id_option, spark, kafka_processing, sc
source=topic,
)
df = reader.run()
processing.assert_equal_df(processing.json_deserialize(df, df_schema=schema), other_frame=expected_df)
processing.assert_equal_df(processing.json_deserialize(df, df_schema=schema), other_frame=first_span)

# Spark does not report to Kafka which messages were read, so Kafka does not remember latest offsets for groupId
# https://stackoverflow.com/a/64003569
df = reader.run()
processing.assert_equal_df(processing.json_deserialize(df, df_schema=schema), other_frame=expected_df)
processing.assert_equal_df(processing.json_deserialize(df, df_schema=schema), other_frame=first_span)

processing.delete_topic([topic])


def test_kafka_reader_snapshot_nothing_to_read(spark, processing, schema):
topic = secrets.token_hex(6)
processing.create_topic(topic, num_partitions=1)

kafka = Kafka(
spark=spark,
addresses=[f"{processing.host}:{processing.port}"],
cluster="cluster",
)

reader = DBReader(
connection=kafka,
source=topic,
)

# 0..100
first_span_begin = 0
first_span_end = 100

# 110..210
second_span_begin = 110
second_span_end = 210

first_span = processing.create_pandas_df(min_id=first_span_begin, max_id=first_span_end)
second_span = processing.create_pandas_df(min_id=second_span_begin, max_id=second_span_end)

with pytest.raises(Exception, match="No data in the source:"):
reader.raise_error_if_no_data()

assert not reader.has_data()

# no data yet, nothing to read
df = reader.run()
assert not df.count()

# insert first span
processing.insert_pandas_df_into_topic(first_span, topic)

# .run() is not called, but dataframes are lazy, so it now contains all data from the source
deserialized_df = processing.json_deserialize(df, df_schema=schema)
processing.assert_equal_df(df=deserialized_df, other_frame=first_span, order_by="id_int")

# read data explicitly
df = reader.run()

# check that read df has data
assert reader.has_data()

deserialized_df = processing.json_deserialize(df, df_schema=schema)
processing.assert_equal_df(df=deserialized_df, other_frame=first_span, order_by="id_int")

# insert second span
processing.insert_pandas_df_into_topic(second_span, topic)
total_span = pandas.concat([first_span, second_span], ignore_index=True)

# .run() is not called, but dataframes are lazy, so it now contains all data from the source
deserialized_df = processing.json_deserialize(df, df_schema=schema)
processing.assert_equal_df(df=deserialized_df, other_frame=total_span, order_by="id_int")

# read data explicitly
df = reader.run()

deserialized_df = processing.json_deserialize(df, df_schema=schema)
processing.assert_equal_df(df=deserialized_df, other_frame=total_span, order_by="id_int")

processing.delete_topic([topic])
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ def test_mongodb_reader_snapshot_nothing_to_read(spark, processing, prepare_sche
first_span = processing.create_pandas_df(min_id=first_span_begin, max_id=first_span_end)
second_span = processing.create_pandas_df(min_id=second_span_begin, max_id=second_span_end)

with pytest.raises(Exception, match="No data in the source:"):
reader.raise_error_if_no_data()

assert not reader.has_data()

# no data yet, nothing to read
df = reader.run()
assert not df.count()
Expand All @@ -176,6 +181,9 @@ def test_mongodb_reader_snapshot_nothing_to_read(spark, processing, prepare_sche

# read data explicitly
df = reader.run()

# check that read df has data
assert reader.has_data()
processing.assert_equal_df(df=df, other_frame=first_span, order_by="_id")

# insert second span
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ def test_mssql_reader_snapshot_nothing_to_read(spark, processing, prepare_schema
first_span = processing.create_pandas_df(min_id=first_span_begin, max_id=first_span_end)
second_span = processing.create_pandas_df(min_id=second_span_begin, max_id=second_span_end)

with pytest.raises(Exception, match="No data in the source:"):
reader.raise_error_if_no_data()

assert not reader.has_data()

# no data yet, nothing to read
df = reader.run()
assert not df.count()
Expand All @@ -331,6 +336,9 @@ def test_mssql_reader_snapshot_nothing_to_read(spark, processing, prepare_schema

# read data explicitly
df = reader.run()

# check that read df has data
assert reader.has_data()
processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int")

# insert second span
Expand Down
Loading

0 comments on commit fef6ba5

Please sign in to comment.