Skip to content

Commit

Permalink
parquet sampling function developed in data_utils.py; Added sample_nr…
Browse files Browse the repository at this point in the history
…ows argument in ParquetData class; Added test_len_sampled_data in test_parquet_data.py
  • Loading branch information
menglinw committed Nov 16, 2023
1 parent ad2eb80 commit 09a868d
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 26 deletions.
114 changes: 89 additions & 25 deletions dataprofiler/data_readers/data_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Contains functions for data readers."""
import json
import random
import logging
import os
import re
Expand All @@ -24,6 +25,7 @@
import boto3
import botocore
import dateutil
import numpy as np
import pandas as pd
import pyarrow.parquet as pq
import requests
Expand Down Expand Up @@ -439,52 +441,114 @@ def read_csv_df(
return data


def read_parquet_df(
def sample_parquet(
file_path: str,
sample_nrows: int,
selected_columns: Optional[List[str]] = None,
read_in_string: bool = False,
) -> Tuple[pd.DataFrame, pd.Series]:
"""
Return an iterator that returns one row group each time.
Read parquet file, sample n row from it and return a dataframe.
:param file_path: path to the Parquet file.
:type file_path: str
:param sample_nrows: number of rows being sampled
:type sample_nrows: int
:param selected_columns: columns need to be read
:type selected_columns: list
:param read_in_string: return as string type
:type read_in_string: bool
:return:
:rtype: Iterator(pd.DataFrame)
:rtype:
"""
parquet_file = pq.ParquetFile(file_path)
data = pd.DataFrame()
for i in range(parquet_file.num_row_groups):
# read parquet file into table
table = pq.read_table(file_path, columns=selected_columns)

# sample
n_rows = table.num_rows
if n_rows > sample_nrows:
select = np.array([False] * n_rows)
select[random.sample(range(n_rows), sample_nrows)] = True
else:
select = np.array([True] * n_rows)
out = table.filter(select).to_pandas()

data_row_df = parquet_file.read_row_group(i).to_pandas()
# Convert all the unicode columns to utf-8
types = out.apply(lambda x: pd.api.types.infer_dtype(x.values, skipna=True))

# Convert all the unicode columns to utf-8
types = data_row_df.apply(
lambda x: pd.api.types.infer_dtype(x.values, skipna=True)
)
mixed_and_unicode_cols = types[types == "unicode"].index.union(
types[types == "mixed"].index
)

mixed_and_unicode_cols = types[types == "unicode"].index.union(
types[types == "mixed"].index
for col in mixed_and_unicode_cols:
out[col] = out[col].apply(
lambda x: x.encode("utf-8").strip() if isinstance(x, str) else x
)
out[col] = out[col].apply(
lambda x: x.decode("utf-8").strip() if isinstance(x, bytes) else x
)
original_df_dtypes = out.dtypes
if read_in_string:
out = out.astype(str)

return out, original_df_dtypes


def read_parquet_df(
file_path: str,
sample_nrows: Optional[int] = None,
selected_columns: Optional[List[str]] = None,
read_in_string: bool = False,
) -> Tuple[pd.DataFrame, pd.Series]:
"""
Return an iterator that returns one row group each time.
for col in mixed_and_unicode_cols:
data_row_df[col] = data_row_df[col].apply(
lambda x: x.encode("utf-8").strip() if isinstance(x, str) else x
:param file_path: path to the Parquet file.
:type file_path: str
:return:
:rtype: Iterator(pd.DataFrame)
"""
if sample_nrows is None:
parquet_file = pq.ParquetFile(file_path)
data = pd.DataFrame()
for i in range(parquet_file.num_row_groups):

data_row_df = parquet_file.read_row_group(i).to_pandas()

# Convert all the unicode columns to utf-8
types = data_row_df.apply(
lambda x: pd.api.types.infer_dtype(x.values, skipna=True)
)
data_row_df[col] = data_row_df[col].apply(
lambda x: x.decode("utf-8").strip() if isinstance(x, bytes) else x

mixed_and_unicode_cols = types[types == "unicode"].index.union(
types[types == "mixed"].index
)

if selected_columns:
data_row_df = data_row_df[selected_columns]
for col in mixed_and_unicode_cols:
data_row_df[col] = data_row_df[col].apply(
lambda x: x.encode("utf-8").strip() if isinstance(x, str) else x
)
data_row_df[col] = data_row_df[col].apply(
lambda x: x.decode("utf-8").strip() if isinstance(x, bytes) else x
)

data = pd.concat([data, data_row_df])
if selected_columns:
data_row_df = data_row_df[selected_columns]

original_df_dtypes = data.dtypes
if read_in_string:
data = data.astype(str)
data = pd.concat([data, data_row_df])

return data, original_df_dtypes
original_df_dtypes = data.dtypes
if read_in_string:
data = data.astype(str)
return data, original_df_dtypes
else:
data, original_df_dtypes = sample_parquet(
file_path,
sample_nrows,
selected_columns=selected_columns,
read_in_string=read_in_string,
)
return data, original_df_dtypes


def read_text_as_list_of_strs(
Expand Down
11 changes: 10 additions & 1 deletion dataprofiler/data_readers/parquet_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(
self._data_formats["json"] = self._get_data_as_json
self._selected_data_format: str = options.get("data_format", "dataframe")
self._selected_columns: List[str] = options.get("selected_columns", list())
self._sample_nrows: Optional[int] = options.get("sample_nrows", None)

if data is not None:
self._load_data(data)
Expand All @@ -83,6 +84,11 @@ def selected_columns(self) -> List[str]:
"""Return selected columns."""
return self._selected_columns

@property
def sample_nrows(self) -> Optional[int]:
"""Return sample_nrows."""
return self._sample_nrows

@property
def is_structured(self) -> bool:
"""Determine compatibility with StructuredProfiler."""
Expand All @@ -100,7 +106,10 @@ def _load_data_from_str(self, data_as_str: str) -> pd.DataFrame:
def _load_data_from_file(self, input_file_path: str) -> pd.DataFrame:
"""Return data from file."""
data, original_df_dtypes = data_utils.read_parquet_df(
input_file_path, self.selected_columns, read_in_string=True
input_file_path,
selected_columns=self.selected_columns,
read_in_string=True,
sample_nrows=self.sample_nrows,
)
self._original_df_dtypes = original_df_dtypes
return data
Expand Down
15 changes: 15 additions & 0 deletions dataprofiler/tests/data_readers/test_parquet_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,21 @@ def test_len_data(self):
self.assertEqual(input_file["count"], len(data), msg=input_file["path"])
self.assertEqual(input_file["count"], data.length, msg=input_file["path"])

def test_len_sampled_data(self):
"""
Validate that length called on ParquetData with sample_nrows option is
appropriately determining the length value.
"""

for input_file in self.file_or_buf_list:
data = Data(input_file["path"], options={"sample_nrows": 100})
self.assertEqual(
min(100, input_file["count"]), len(data), msg=input_file["path"]
)
self.assertEqual(
min(100, input_file["count"]), data.length, msg=input_file["path"]
)

def test_file_encoding(self):
"""Tests to ensure file_encoding set to None"""
for input_file in self.file_or_buf_list:
Expand Down

0 comments on commit 09a868d

Please sign in to comment.