-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/65 delta format in pramen py #81
Feature/65 delta format in pramen py #81
Conversation
…oreReader and corrected test for it
…rected test for it
… to "MetastoreTable" class
Hmm, the CI swill failing. Try changing teh CI to use |
…h x64 not found
I see it didn't work. Try removing Python 2.6 from CI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall. Haven't finished the review yet. Will continue later.
But there are already a couple of issues to address.
pramen-py/pyproject.toml
Outdated
loguru = "^0.6.0" | ||
pytest = "6.2.5" | ||
pytest-asyncio = "0.16" | ||
pytest-cov = "2.12.1" | ||
types-PyYAML = "^6.0.4" | ||
pyspark-stubs = "^3.0.0" | ||
pyspark-stubs = "2.3.0.post2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How PySpark version and PySpark-stubs versions are related?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…d all table, after filter by info date
776e57e
to
8417693
Compare
8417693
to
3a1f6c3
Compare
Nice! so running tests sequentially actually solved the CI issue? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey, this is excellent work! Couple of notes, though:)
pramen-py/Makefile
Outdated
@@ -35,7 +35,7 @@ build: install | |||
poetry build | |||
|
|||
test: install .env | |||
poetry run pytest --cov | |||
poetry run pytest -n 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did you remove --cov?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we want to have the tests running sequentially, then just uninstall pytest-xdist
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we want tests run in parallel, but something prevents it. They fail on an attempt to reuse a stopped Spark session IIRC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use both --cov
and -n 1
for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this is not related options. --cov is for coverage -n 1 is for parallelization
pramen-py/pyproject.toml
Outdated
loguru = "^0.6.0" | ||
pytest = "6.2.5" | ||
pytest-asyncio = "0.16" | ||
pytest-cov = "2.12.1" | ||
types-PyYAML = "^6.0.4" | ||
pyspark-stubs = "^3.0.0" | ||
pyspark-stubs = "2.3.0.post2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -42,6 +43,9 @@ class MetastoreReader(MetastoreReaderBase): | |||
a KeyError will be raised. | |||
""" | |||
|
|||
def _read_table(self, format_value: str, path: str) -> DataFrame: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: all arguments are values, so I think it is not needed to add _value suffix to the argument names
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd pass table_format: TableFormat
and to the resolution logic inside the method.
format_table_path = (table_path / format_.value).as_posix() | ||
logger.info("Creating sample DataFrame partitioned by info_date") | ||
get_data_stub.write.partitionBy("info_date").format( | ||
format_.value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's a bit risky as you don't know for sure that TableFormat contains compatible keys with the spark write format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We enforce this, I'd say we can trust it.
If this is a real concerns, I'd suggest adding a comment to TableFormat
specifying that the string name of the format should match Spark format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. In this case let's add this information to the class TableFormat docstrings.
pramen-py/tests/conftest.py
Outdated
@@ -121,12 +135,15 @@ def load_and_patch_config( | |||
object.__setattr__( | |||
config.metastore_tables[0], | |||
"path", | |||
create_parquet_data_stubs[0].resolve().as_posix(), | |||
pathlib.Path(create_data_stubs_and_paths["parquet"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it makes more sense to use delta as a default format for tests?
pramen-py/tests/conftest.py
Outdated
@@ -121,12 +135,15 @@ def load_and_patch_config( | |||
object.__setattr__( | |||
config.metastore_tables[0], | |||
"path", | |||
create_parquet_data_stubs[0].resolve().as_posix(), | |||
pathlib.Path(create_data_stubs_and_paths["parquet"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is better to parametrize this fixture, so it is possible to say which format to use.
For example, something like:
@pytest.mark.metastore(format="delta")
def test_foo(load_and_patch_config):...
That's easy to achieve, see for example this https://jaketrent.com/post/pass-params-pytest-fixture/
) | ||
expected = spark.read.parquet( | ||
load_and_patch_config.metastore_tables[0].path | ||
for format_ in TableFormat: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to use @pytest.mark.parametrize and set two formats. So you'll have better handling of your tests by pytest
Also you miss a license header... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job!
|
||
logger.info(f"Looking for {table_name} in the metastore.") | ||
logger.debug(f"info_date range: {info_date_from} - {info_date_to}") | ||
logger.debug( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it makes sense to put this info to the above logger.info() call?
@@ -42,6 +43,9 @@ class MetastoreReader(MetastoreReaderBase): | |||
a KeyError will be raised. | |||
""" | |||
|
|||
def _read_table(self, format_value: str, path: str) -> DataFrame: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd pass table_format: TableFormat
and to the resolution logic inside the method.
if uppercase_columns: | ||
return df.select([F.col(c).alias(c.upper()) for c in df.columns]) | ||
return df_filtered.select( | ||
[F.col(c).alias(c.upper()) for c in df.columns] | ||
) | ||
else: | ||
return df | ||
return df_filtered |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider extracting this as a separate method since you use this at least in 2 places get_table()
and get_latest()
until or info_date, | ||
) | ||
expected = expected.filter(F.col("info_date") == latest_date) | ||
assert_df_equality(actual, expected, ignore_row_order=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this assert report which format has failed the test?
Maybe you can add the format as a part of the error message (can be a parameter of the assert).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with pytest.mark.parametrize, yes, it will be explicitly noticeable. The test name will be autogenerated from the parametrization values
os-name: [ ubuntu-latest ] | ||
runs-on: ${{ matrix.os-name }} | ||
steps: | ||
- uses: actions/checkout@v2 | ||
- uses: actions/setup-python@v2 | ||
- uses: actions/setup-python@v4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change makes it impossible to check the py3.6 version
format_table_path = (table_path / format_.value).as_posix() | ||
logger.info("Creating sample DataFrame partitioned by info_date") | ||
get_data_stub.write.partitionBy("info_date").format( | ||
format_.value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. In this case let's add this information to the class TableFormat docstrings.
until or info_date, | ||
) | ||
expected = expected.filter(F.col("info_date") == latest_date) | ||
assert_df_equality(actual, expected, ignore_row_order=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with pytest.mark.parametrize, yes, it will be explicitly noticeable. The test name will be autogenerated from the parametrization values
cc44552
to
2b87532
Compare
2b87532
to
8c424c4
Compare
09cbf63
to
fbe6887
Compare
@zhukovgreen, there are still a couple of pending commends related to improving unit tests. But we decided to merge the code now now, and create separate issues for making parametrized tests and other minor issues identified in this PR. |
fbe6887
to
cbef85f
Compare
No description provided.