Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/84 metastore get latest available date for delta #116

Conversation

ValeriiKhalimendik
Copy link
Collaborator

No description provided.

Copy link
Collaborator

@yruslan yruslan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Just a couple of nitpicks

pramen-py/src/pramen_py/metastore/reader.py Outdated Show resolved Hide resolved
pramen-py/src/pramen_py/metastore/reader.py Outdated Show resolved Hide resolved
pramen-py/src/pramen_py/metastore/reader.py Outdated Show resolved Hide resolved
pramen-py/src/pramen_py/metastore/reader.py Outdated Show resolved Hide resolved
df_select = self._read_table(
metastore_table.format, metastore_table.path
).select(f"{metastore_table.info_date_settings.column}")
dates_list = [data[0] for data in df_select.distinct().collect()]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look here, not sure what are you doing, but I am confused by the .collect and not readable data[0] for data.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collect() returns records as an array, and [0] is the first column. At least this is how it works in Scala.
How would you do it differently?

Copy link
Collaborator

@zhukovgreen zhukovgreen Jan 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like this would be more sparktonic based on my sense of the spark nature:):

latest_date =df.select("info_date").distinct().rdd.map(lambda x: x[0]).max()
  • replace lambda x: x[0] with some typed function, i.e.
def get_info_date_value(row: Row) -> str:

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and it is more readable (no obscured data[0] for data). in this case you need to know that Row supports indexing....

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From tastes perspective I like the original code. Seen many Spark functions that do this trick:
Quoting my own code here, but have seen similar things in other code bases, and it does not trigger a wtf for me 😄 : https://github.com/AbsaOSS/pramen/blob/main/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbc.scala#L79-L79

Extracting the processing logic as a lambda won't do much since you can't avoid data[0] or something like this.
Do I understand it correctly that you propose:

def get_info_date_value(row: Row) -> str:
    return str[0]

Also, I'm not sure why you want to convert to an rdd - seems like redundant to me, might be even less performant.

Copy link
Collaborator

@zhukovgreen zhukovgreen Jan 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be happy to chat about this tomorrow, sure do not be blocked on this!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, thanks!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the low-hanging fruit here would be to rename data to row which (maybe) makes it clearer that we are extracting the first column but I guess it's still pretty much a matter of preference. I have used this "pattern" many times so to me it was familiar.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of renaming data to row

def test_metastore_get_latest_available_date_for_delta(
spark, get_data_stub, tmp_path
):
def save_delta_table(df: DataFrame, path: str) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like something reusable. Consider moving it as a fixture

@yruslan yruslan requested a review from jirifilip January 9, 2023 13:16

df_union = get_data_stub.union(
spark.createDataFrame(
spark.sparkContext.parallelize([(17, 18, d(2022, 11, 2))]),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for nitpicking but I think you can create a DataFrame directly from a list of tuples (without using RDD).

Suggested change
spark.sparkContext.parallelize([(17, 18, d(2022, 11, 2))]),
[(17, 18, d(2022, 11, 2))],

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You right, Thanks!

@ValeriiKhalimendik ValeriiKhalimendik force-pushed the feature/84_metastore_get_latest_available_date_for_delta branch 2 times, most recently from aa529ab to 55c360d Compare January 9, 2023 17:42
@ValeriiKhalimendik ValeriiKhalimendik force-pushed the feature/84_metastore_get_latest_available_date_for_delta branch from 55c360d to 8ab3541 Compare January 9, 2023 17:55
self, path: str, target_partition_name: str
) -> Optional[str]:
def is_file_hidden(column_name_: str, date_: str) -> bool:
if len(column_name_) > 0 and column_name_[0] == "_":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this function could be simplified to be more readable using column_name_.startswith("_")

Copy link
Collaborator

@yruslan yruslan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great. Just a couple of tiny things left.

pramen-py/src/pramen_py/metastore/reader.py Outdated Show resolved Hide resolved
pramen-py/src/pramen_py/metastore/reader.py Outdated Show resolved Hide resolved
pramen-py/src/pramen_py/metastore/reader.py Outdated Show resolved Hide resolved
pramen-py/tests/test_metastore/test_metastore.py Outdated Show resolved Hide resolved
@ValeriiKhalimendik ValeriiKhalimendik force-pushed the feature/84_metastore_get_latest_available_date_for_delta branch from 032da8c to 7dc3797 Compare January 10, 2023 18:27
@ValeriiKhalimendik ValeriiKhalimendik force-pushed the feature/84_metastore_get_latest_available_date_for_delta branch from 7dc3797 to b9b8091 Compare January 10, 2023 18:46
pramen-py/src/pramen_py/metastore/reader.py Outdated Show resolved Hide resolved
pramen-py/src/pramen_py/metastore/reader.py Outdated Show resolved Hide resolved
pramen-py/tests/conftest.py Outdated Show resolved Hide resolved
@ValeriiKhalimendik ValeriiKhalimendik force-pushed the feature/84_metastore_get_latest_available_date_for_delta branch from 007835b to bacfc76 Compare January 11, 2023 09:48
yruslan
yruslan previously approved these changes Jan 11, 2023
Copy link
Collaborator

@yruslan yruslan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great!

Just a couple of optional considerations.

Comment on lines +53 to +55
@staticmethod
def _apply_uppercase_to_columns_names(
self, df: DataFrame, uppercase_columns: bool
df: DataFrame, uppercase_columns: bool
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

Comment on lines 136 to 137
logger.error(f"Unable to access directory: {path}")
raise Exception(f"Unable to access directory : {path}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging and throwing is an anti-pattern. Pick one. Probably raising an exception is the best here.

Why do you want to raise a custom exception here? Is AnalysisException error message meaningful?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 223 to 227
logger.error(
f"The directory does not contain partitions by "
f"'{metastore_table.info_date_settings.column}': {metastore_table.path}"
)
raise ValueError("No partitions are available")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here - you can raise an error only, but it can contain the more detailed message.

Comment on lines 242 to 248
logger.error(
f"No partitions are available for the given '{metastore_table.name}'.\n"
f"The table is available for the following dates:\n"
f"{str_date_list}\n"
f"Only partitions earlier than {str(until)} might be included."
) from err
else:
logger.info(f"Latest date for {table_name} is {latest_date}")
return latest_date
)
raise ValueError("No partitions are available")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here - you can raise an error only, but it can contain the more detailed message.


from pramen_py import MetastoreReader, MetastoreWriter
from pramen_py.models import InfoDateSettings, MetastoreTable, TableFormat


@pytest.mark.parametrize(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice test suite!

@ValeriiKhalimendik ValeriiKhalimendik force-pushed the feature/84_metastore_get_latest_available_date_for_delta branch from bacfc76 to 70403e4 Compare January 11, 2023 12:47
try:
return self.spark.read.format(table_format.value).load(path)
except AnalysisException:
raise Exception(f"Unable to access directory: {path}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mentioned you wanted to improve the error message by adding the cause of the error (from the original analysts exception)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original exception is outing by this way

Copy link
Collaborator

@yruslan yruslan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good

@ValeriiKhalimendik ValeriiKhalimendik merged commit 518eabd into main Jan 11, 2023
@ValeriiKhalimendik ValeriiKhalimendik deleted the feature/84_metastore_get_latest_available_date_for_delta branch January 11, 2023 15:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants