Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/65 delta format in pramen py #81

Merged
merged 15 commits into from
Dec 2, 2022

Conversation

ValeriiKhalimendik
Copy link
Collaborator

No description provided.

@yruslan
Copy link
Collaborator

yruslan commented Nov 24, 2022

Hmm, the CI swill failing.

Try changing teh CI to use actions/setup-python@v4

@yruslan
Copy link
Collaborator

yruslan commented Nov 24, 2022

I see it didn't work. Try removing Python 2.6 from CI

Copy link
Collaborator

@yruslan yruslan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. Haven't finished the review yet. Will continue later.

But there are already a couple of issues to address.

loguru = "^0.6.0"
pytest = "6.2.5"
pytest-asyncio = "0.16"
pytest-cov = "2.12.1"
types-PyYAML = "^6.0.4"
pyspark-stubs = "^3.0.0"
pyspark-stubs = "2.3.0.post2"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How PySpark version and PySpark-stubs versions are related?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pramen-py/src/pramen_py/metastore/reader.py Outdated Show resolved Hide resolved
pramen-py/src/pramen_py/metastore/writer.py Outdated Show resolved Hide resolved
@ValeriiKhalimendik ValeriiKhalimendik force-pushed the feature/65-Delta-format-in-Pramen-Py branch from 776e57e to 8417693 Compare November 25, 2022 09:33
@ValeriiKhalimendik ValeriiKhalimendik force-pushed the feature/65-Delta-format-in-Pramen-Py branch from 8417693 to 3a1f6c3 Compare November 25, 2022 09:39
@yruslan
Copy link
Collaborator

yruslan commented Nov 25, 2022

Nice! so running tests sequentially actually solved the CI issue?

Copy link
Collaborator

@zhukovgreen zhukovgreen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, this is excellent work! Couple of notes, though:)

@@ -35,7 +35,7 @@ build: install
poetry build

test: install .env
poetry run pytest --cov
poetry run pytest -n 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you remove --cov?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we want to have the tests running sequentially, then just uninstall pytest-xdist

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we want tests run in parallel, but something prevents it. They fail on an attempt to reuse a stopped Spark session IIRC.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use both --cov and -n 1 for now?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is not related options. --cov is for coverage -n 1 is for parallelization

.github/workflows/python.yml Show resolved Hide resolved
loguru = "^0.6.0"
pytest = "6.2.5"
pytest-asyncio = "0.16"
pytest-cov = "2.12.1"
types-PyYAML = "^6.0.4"
pyspark-stubs = "^3.0.0"
pyspark-stubs = "2.3.0.post2"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -42,6 +43,9 @@ class MetastoreReader(MetastoreReaderBase):
a KeyError will be raised.
"""

def _read_table(self, format_value: str, path: str) -> DataFrame:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: all arguments are values, so I think it is not needed to add _value suffix to the argument names

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd pass table_format: TableFormat and to the resolution logic inside the method.

format_table_path = (table_path / format_.value).as_posix()
logger.info("Creating sample DataFrame partitioned by info_date")
get_data_stub.write.partitionBy("info_date").format(
format_.value
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a bit risky as you don't know for sure that TableFormat contains compatible keys with the spark write format

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We enforce this, I'd say we can trust it.

If this is a real concerns, I'd suggest adding a comment to TableFormat specifying that the string name of the format should match Spark format.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. In this case let's add this information to the class TableFormat docstrings.

pramen-py/tests/conftest.py Outdated Show resolved Hide resolved
@@ -121,12 +135,15 @@ def load_and_patch_config(
object.__setattr__(
config.metastore_tables[0],
"path",
create_parquet_data_stubs[0].resolve().as_posix(),
pathlib.Path(create_data_stubs_and_paths["parquet"])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it makes more sense to use delta as a default format for tests?

pramen-py/tests/conftest.py Outdated Show resolved Hide resolved
@@ -121,12 +135,15 @@ def load_and_patch_config(
object.__setattr__(
config.metastore_tables[0],
"path",
create_parquet_data_stubs[0].resolve().as_posix(),
pathlib.Path(create_data_stubs_and_paths["parquet"])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to parametrize this fixture, so it is possible to say which format to use.
For example, something like:

@pytest.mark.metastore(format="delta")
def test_foo(load_and_patch_config):...

That's easy to achieve, see for example this https://jaketrent.com/post/pass-params-pytest-fixture/

)
expected = spark.read.parquet(
load_and_patch_config.metastore_tables[0].path
for format_ in TableFormat:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to use @pytest.mark.parametrize and set two formats. So you'll have better handling of your tests by pytest

@zhukovgreen
Copy link
Collaborator

Also you miss a license header...

@yruslan yruslan requested a review from DzMakatun November 29, 2022 08:07
Copy link
Collaborator

@yruslan yruslan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job!


logger.info(f"Looking for {table_name} in the metastore.")
logger.debug(f"info_date range: {info_date_from} - {info_date_to}")
logger.debug(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it makes sense to put this info to the above logger.info() call?

@@ -42,6 +43,9 @@ class MetastoreReader(MetastoreReaderBase):
a KeyError will be raised.
"""

def _read_table(self, format_value: str, path: str) -> DataFrame:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd pass table_format: TableFormat and to the resolution logic inside the method.

Comment on lines 97 to 102
if uppercase_columns:
return df.select([F.col(c).alias(c.upper()) for c in df.columns])
return df_filtered.select(
[F.col(c).alias(c.upper()) for c in df.columns]
)
else:
return df
return df_filtered
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extracting this as a separate method since you use this at least in 2 places get_table() and get_latest()

until or info_date,
)
expected = expected.filter(F.col("info_date") == latest_date)
assert_df_equality(actual, expected, ignore_row_order=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this assert report which format has failed the test?
Maybe you can add the format as a part of the error message (can be a parameter of the assert).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with pytest.mark.parametrize, yes, it will be explicitly noticeable. The test name will be autogenerated from the parametrization values

.github/workflows/python.yml Show resolved Hide resolved
os-name: [ ubuntu-latest ]
runs-on: ${{ matrix.os-name }}
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
- uses: actions/setup-python@v4
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change makes it impossible to check the py3.6 version

pramen-py/tests/conftest.py Outdated Show resolved Hide resolved
format_table_path = (table_path / format_.value).as_posix()
logger.info("Creating sample DataFrame partitioned by info_date")
get_data_stub.write.partitionBy("info_date").format(
format_.value
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. In this case let's add this information to the class TableFormat docstrings.

until or info_date,
)
expected = expected.filter(F.col("info_date") == latest_date)
assert_df_equality(actual, expected, ignore_row_order=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with pytest.mark.parametrize, yes, it will be explicitly noticeable. The test name will be autogenerated from the parametrization values

@ValeriiKhalimendik ValeriiKhalimendik force-pushed the feature/65-Delta-format-in-Pramen-Py branch 2 times, most recently from cc44552 to 2b87532 Compare November 30, 2022 15:47
@ValeriiKhalimendik ValeriiKhalimendik force-pushed the feature/65-Delta-format-in-Pramen-Py branch from 2b87532 to 8c424c4 Compare December 1, 2022 09:56
@ValeriiKhalimendik ValeriiKhalimendik force-pushed the feature/65-Delta-format-in-Pramen-Py branch 2 times, most recently from 09cbf63 to fbe6887 Compare December 1, 2022 10:44
yruslan
yruslan previously approved these changes Dec 1, 2022
@yruslan
Copy link
Collaborator

yruslan commented Dec 1, 2022

@zhukovgreen, there are still a couple of pending commends related to improving unit tests. But we decided to merge the code now now, and create separate issues for making parametrized tests and other minor issues identified in this PR.

Issues: #88, #89, #84, #85

@ValeriiKhalimendik ValeriiKhalimendik merged commit 8387dd7 into main Dec 2, 2022
@ValeriiKhalimendik ValeriiKhalimendik deleted the feature/65-Delta-format-in-Pramen-Py branch December 2, 2022 12:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants