-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/84 metastore get latest available date for delta #116
Feature/84 metastore get latest available date for delta #116
Conversation
…r delta format table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Just a couple of nitpicks
df_select = self._read_table( | ||
metastore_table.format, metastore_table.path | ||
).select(f"{metastore_table.info_date_settings.column}") | ||
dates_list = [data[0] for data in df_select.distinct().collect()] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take a look here, not sure what are you doing, but I am confused by the .collect and not readable data[0] for data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
collect()
returns records as an array, and [0]
is the first column. At least this is how it works in Scala.
How would you do it differently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
something like this would be more sparktonic based on my sense of the spark nature:):
latest_date =df.select("info_date").distinct().rdd.map(lambda x: x[0]).max()
- replace lambda x: x[0] with some typed function, i.e.
def get_info_date_value(row: Row) -> str:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and it is more readable (no obscured data[0] for data). in this case you need to know that Row supports indexing....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From tastes perspective I like the original code. Seen many Spark functions that do this trick:
Quoting my own code here, but have seen similar things in other code bases, and it does not trigger a wtf for me 😄 : https://github.com/AbsaOSS/pramen/blob/main/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbc.scala#L79-L79
Extracting the processing logic as a lambda won't do much since you can't avoid data[0]
or something like this.
Do I understand it correctly that you propose:
def get_info_date_value(row: Row) -> str:
return str[0]
Also, I'm not sure why you want to convert to an rdd - seems like redundant to me, might be even less performant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be happy to chat about this tomorrow, sure do not be blocked on this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the low-hanging fruit here would be to rename data
to row
which (maybe) makes it clearer that we are extracting the first column but I guess it's still pretty much a matter of preference. I have used this "pattern" many times so to me it was familiar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of renaming data to row
def test_metastore_get_latest_available_date_for_delta( | ||
spark, get_data_stub, tmp_path | ||
): | ||
def save_delta_table(df: DataFrame, path: str) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like something reusable. Consider moving it as a fixture
|
||
df_union = get_data_stub.union( | ||
spark.createDataFrame( | ||
spark.sparkContext.parallelize([(17, 18, d(2022, 11, 2))]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for nitpicking but I think you can create a DataFrame directly from a list of tuples (without using RDD).
spark.sparkContext.parallelize([(17, 18, d(2022, 11, 2))]), | |
[(17, 18, d(2022, 11, 2))], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You right, Thanks!
aa529ab
to
55c360d
Compare
55c360d
to
8ab3541
Compare
self, path: str, target_partition_name: str | ||
) -> Optional[str]: | ||
def is_file_hidden(column_name_: str, date_: str) -> bool: | ||
if len(column_name_) > 0 and column_name_[0] == "_": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this function could be simplified to be more readable using column_name_.startswith("_")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great. Just a couple of tiny things left.
032da8c
to
7dc3797
Compare
7dc3797
to
b9b8091
Compare
007835b
to
bacfc76
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
Just a couple of optional considerations.
@staticmethod | ||
def _apply_uppercase_to_columns_names( | ||
self, df: DataFrame, uppercase_columns: bool | ||
df: DataFrame, uppercase_columns: bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
logger.error(f"Unable to access directory: {path}") | ||
raise Exception(f"Unable to access directory : {path}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logging and throwing is an anti-pattern. Pick one. Probably raising an exception is the best here.
Why do you want to raise a custom exception here? Is AnalysisException error message meaningful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.error( | ||
f"The directory does not contain partitions by " | ||
f"'{metastore_table.info_date_settings.column}': {metastore_table.path}" | ||
) | ||
raise ValueError("No partitions are available") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here - you can raise an error only, but it can contain the more detailed message.
logger.error( | ||
f"No partitions are available for the given '{metastore_table.name}'.\n" | ||
f"The table is available for the following dates:\n" | ||
f"{str_date_list}\n" | ||
f"Only partitions earlier than {str(until)} might be included." | ||
) from err | ||
else: | ||
logger.info(f"Latest date for {table_name} is {latest_date}") | ||
return latest_date | ||
) | ||
raise ValueError("No partitions are available") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here - you can raise an error only, but it can contain the more detailed message.
|
||
from pramen_py import MetastoreReader, MetastoreWriter | ||
from pramen_py.models import InfoDateSettings, MetastoreTable, TableFormat | ||
|
||
|
||
@pytest.mark.parametrize( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice test suite!
bacfc76
to
70403e4
Compare
try: | ||
return self.spark.read.format(table_format.value).load(path) | ||
except AnalysisException: | ||
raise Exception(f"Unable to access directory: {path}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mentioned you wanted to improve the error message by adding the cause of the error (from the original analysts exception)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Original exception is outing by this way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good
No description provided.