diff --git a/dataprofiler/data_readers/data_utils.py b/dataprofiler/data_readers/data_utils.py index d0cc7211..79b5b531 100644 --- a/dataprofiler/data_readers/data_utils.py +++ b/dataprofiler/data_readers/data_utils.py @@ -1,5 +1,6 @@ """Contains functions for data readers.""" import json +import random import logging import os import re @@ -24,6 +25,7 @@ import boto3 import botocore import dateutil +import numpy as np import pandas as pd import pyarrow.parquet as pq import requests @@ -439,52 +441,114 @@ def read_csv_df( return data -def read_parquet_df( +def sample_parquet( file_path: str, + sample_nrows: int, selected_columns: Optional[List[str]] = None, read_in_string: bool = False, ) -> Tuple[pd.DataFrame, pd.Series]: """ - Return an iterator that returns one row group each time. + Read parquet file, sample n row from it and return a dataframe. :param file_path: path to the Parquet file. :type file_path: str + :param sample_nrows: number of rows being sampled + :type sample_nrows: int + :param selected_columns: columns need to be read + :type selected_columns: list + :param read_in_string: return as string type + :type read_in_string: bool :return: - :rtype: Iterator(pd.DataFrame) + :rtype: """ - parquet_file = pq.ParquetFile(file_path) - data = pd.DataFrame() - for i in range(parquet_file.num_row_groups): + # read parquet file into table + table = pq.read_table(file_path, columns=selected_columns) + + # sample + n_rows = table.num_rows + if n_rows > sample_nrows: + select = np.array([False] * n_rows) + select[random.sample(range(n_rows), sample_nrows)] = True + else: + select = np.array([True] * n_rows) + out = table.filter(select).to_pandas() - data_row_df = parquet_file.read_row_group(i).to_pandas() + # Convert all the unicode columns to utf-8 + types = out.apply(lambda x: pd.api.types.infer_dtype(x.values, skipna=True)) - # Convert all the unicode columns to utf-8 - types = data_row_df.apply( - lambda x: pd.api.types.infer_dtype(x.values, skipna=True) - ) + mixed_and_unicode_cols = types[types == "unicode"].index.union( + types[types == "mixed"].index + ) - mixed_and_unicode_cols = types[types == "unicode"].index.union( - types[types == "mixed"].index + for col in mixed_and_unicode_cols: + out[col] = out[col].apply( + lambda x: x.encode("utf-8").strip() if isinstance(x, str) else x + ) + out[col] = out[col].apply( + lambda x: x.decode("utf-8").strip() if isinstance(x, bytes) else x ) + original_df_dtypes = out.dtypes + if read_in_string: + out = out.astype(str) + + return out, original_df_dtypes + + +def read_parquet_df( + file_path: str, + sample_nrows: Optional[int] = None, + selected_columns: Optional[List[str]] = None, + read_in_string: bool = False, +) -> Tuple[pd.DataFrame, pd.Series]: + """ + Return an iterator that returns one row group each time. - for col in mixed_and_unicode_cols: - data_row_df[col] = data_row_df[col].apply( - lambda x: x.encode("utf-8").strip() if isinstance(x, str) else x + :param file_path: path to the Parquet file. + :type file_path: str + :return: + :rtype: Iterator(pd.DataFrame) + """ + if sample_nrows is None: + parquet_file = pq.ParquetFile(file_path) + data = pd.DataFrame() + for i in range(parquet_file.num_row_groups): + + data_row_df = parquet_file.read_row_group(i).to_pandas() + + # Convert all the unicode columns to utf-8 + types = data_row_df.apply( + lambda x: pd.api.types.infer_dtype(x.values, skipna=True) ) - data_row_df[col] = data_row_df[col].apply( - lambda x: x.decode("utf-8").strip() if isinstance(x, bytes) else x + + mixed_and_unicode_cols = types[types == "unicode"].index.union( + types[types == "mixed"].index ) - if selected_columns: - data_row_df = data_row_df[selected_columns] + for col in mixed_and_unicode_cols: + data_row_df[col] = data_row_df[col].apply( + lambda x: x.encode("utf-8").strip() if isinstance(x, str) else x + ) + data_row_df[col] = data_row_df[col].apply( + lambda x: x.decode("utf-8").strip() if isinstance(x, bytes) else x + ) - data = pd.concat([data, data_row_df]) + if selected_columns: + data_row_df = data_row_df[selected_columns] - original_df_dtypes = data.dtypes - if read_in_string: - data = data.astype(str) + data = pd.concat([data, data_row_df]) - return data, original_df_dtypes + original_df_dtypes = data.dtypes + if read_in_string: + data = data.astype(str) + return data, original_df_dtypes + else: + data, original_df_dtypes = sample_parquet( + file_path, + sample_nrows, + selected_columns=selected_columns, + read_in_string=read_in_string, + ) + return data, original_df_dtypes def read_text_as_list_of_strs( diff --git a/dataprofiler/data_readers/parquet_data.py b/dataprofiler/data_readers/parquet_data.py index 4ee6dde0..4fa567b8 100644 --- a/dataprofiler/data_readers/parquet_data.py +++ b/dataprofiler/data_readers/parquet_data.py @@ -61,6 +61,7 @@ def __init__( self._data_formats["json"] = self._get_data_as_json self._selected_data_format: str = options.get("data_format", "dataframe") self._selected_columns: List[str] = options.get("selected_columns", list()) + self._sample_nrows: Optional[int] = options.get("sample_nrows", None) if data is not None: self._load_data(data) @@ -83,6 +84,11 @@ def selected_columns(self) -> List[str]: """Return selected columns.""" return self._selected_columns + @property + def sample_nrows(self) -> Optional[int]: + """Return sample_nrows.""" + return self._sample_nrows + @property def is_structured(self) -> bool: """Determine compatibility with StructuredProfiler.""" @@ -100,7 +106,10 @@ def _load_data_from_str(self, data_as_str: str) -> pd.DataFrame: def _load_data_from_file(self, input_file_path: str) -> pd.DataFrame: """Return data from file.""" data, original_df_dtypes = data_utils.read_parquet_df( - input_file_path, self.selected_columns, read_in_string=True + input_file_path, + selected_columns=self.selected_columns, + read_in_string=True, + sample_nrows=self.sample_nrows, ) self._original_df_dtypes = original_df_dtypes return data diff --git a/dataprofiler/tests/data_readers/test_parquet_data.py b/dataprofiler/tests/data_readers/test_parquet_data.py index 56b07618..fdabd413 100644 --- a/dataprofiler/tests/data_readers/test_parquet_data.py +++ b/dataprofiler/tests/data_readers/test_parquet_data.py @@ -181,6 +181,21 @@ def test_len_data(self): self.assertEqual(input_file["count"], len(data), msg=input_file["path"]) self.assertEqual(input_file["count"], data.length, msg=input_file["path"]) + def test_len_sampled_data(self): + """ + Validate that length called on ParquetData with sample_nrows option is + appropriately determining the length value. + """ + + for input_file in self.file_or_buf_list: + data = Data(input_file["path"], options={"sample_nrows": 100}) + self.assertEqual( + min(100, input_file["count"]), len(data), msg=input_file["path"] + ) + self.assertEqual( + min(100, input_file["count"]), data.length, msg=input_file["path"] + ) + def test_file_encoding(self): """Tests to ensure file_encoding set to None""" for input_file in self.file_or_buf_list: