Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: added parquet sampling #1070

Merged
merged 11 commits into from
Dec 12, 2023
114 changes: 89 additions & 25 deletions dataprofiler/data_readers/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import logging
import os
import random
import re
import urllib
from collections import OrderedDict
Expand All @@ -24,6 +25,7 @@
import boto3
import botocore
import dateutil
import numpy as np
import pandas as pd
import pyarrow.parquet as pq
import requests
Expand Down Expand Up @@ -439,52 +441,114 @@ def read_csv_df(
return data


def read_parquet_df(
def sample_parquet(
file_path: str,
sample_nrows: int,
selected_columns: Optional[List[str]] = None,
read_in_string: bool = False,
) -> Tuple[pd.DataFrame, pd.Series]:
"""
Return an iterator that returns one row group each time.
Read parquet file, sample n row from it and return a dataframe.
menglinw marked this conversation as resolved.
Show resolved Hide resolved

:param file_path: path to the Parquet file.
:type file_path: str
:param sample_nrows: number of rows being sampled
:type sample_nrows: int
:param selected_columns: columns need to be read
:type selected_columns: list
:param read_in_string: return as string type
:type read_in_string: bool
:return:
:rtype: Iterator(pd.DataFrame)
:rtype:
menglinw marked this conversation as resolved.
Show resolved Hide resolved
"""
parquet_file = pq.ParquetFile(file_path)
data = pd.DataFrame()
for i in range(parquet_file.num_row_groups):
# read parquet file into table
table = pq.read_table(file_path, columns=selected_columns)

# sample
n_rows = table.num_rows
if n_rows > sample_nrows:
select = np.array([False] * n_rows)
menglinw marked this conversation as resolved.
Show resolved Hide resolved
select[random.sample(range(n_rows), sample_nrows)] = True
else:
select = np.array([True] * n_rows)
out = table.filter(select).to_pandas()
menglinw marked this conversation as resolved.
Show resolved Hide resolved

data_row_df = parquet_file.read_row_group(i).to_pandas()
# Convert all the unicode columns to utf-8
types = out.apply(lambda x: pd.api.types.infer_dtype(x.values, skipna=True))
menglinw marked this conversation as resolved.
Show resolved Hide resolved

# Convert all the unicode columns to utf-8
types = data_row_df.apply(
lambda x: pd.api.types.infer_dtype(x.values, skipna=True)
)
mixed_and_unicode_cols = types[types == "unicode"].index.union(
types[types == "mixed"].index
)

mixed_and_unicode_cols = types[types == "unicode"].index.union(
types[types == "mixed"].index
for col in mixed_and_unicode_cols:
out[col] = out[col].apply(
lambda x: x.encode("utf-8").strip() if isinstance(x, str) else x
)
out[col] = out[col].apply(
lambda x: x.decode("utf-8").strip() if isinstance(x, bytes) else x
menglinw marked this conversation as resolved.
Show resolved Hide resolved
menglinw marked this conversation as resolved.
Show resolved Hide resolved
)
original_df_dtypes = out.dtypes
if read_in_string:
out = out.astype(str)

return out, original_df_dtypes


def read_parquet_df(
file_path: str,
sample_nrows: Optional[int] = None,
selected_columns: Optional[List[str]] = None,
read_in_string: bool = False,
) -> Tuple[pd.DataFrame, pd.Series]:
"""
Return an iterator that returns one row group each time.

for col in mixed_and_unicode_cols:
data_row_df[col] = data_row_df[col].apply(
lambda x: x.encode("utf-8").strip() if isinstance(x, str) else x
:param file_path: path to the Parquet file.
:type file_path: str
:return:
:rtype: Iterator(pd.DataFrame)
"""
if sample_nrows is None:
parquet_file = pq.ParquetFile(file_path)
data = pd.DataFrame()
for i in range(parquet_file.num_row_groups):

data_row_df = parquet_file.read_row_group(i).to_pandas()

# Convert all the unicode columns to utf-8
types = data_row_df.apply(
lambda x: pd.api.types.infer_dtype(x.values, skipna=True)
)
data_row_df[col] = data_row_df[col].apply(
lambda x: x.decode("utf-8").strip() if isinstance(x, bytes) else x

mixed_and_unicode_cols = types[types == "unicode"].index.union(
types[types == "mixed"].index
)

if selected_columns:
data_row_df = data_row_df[selected_columns]
for col in mixed_and_unicode_cols:
data_row_df[col] = data_row_df[col].apply(
lambda x: x.encode("utf-8").strip() if isinstance(x, str) else x
)
data_row_df[col] = data_row_df[col].apply(
lambda x: x.decode("utf-8").strip() if isinstance(x, bytes) else x
)
menglinw marked this conversation as resolved.
Show resolved Hide resolved

data = pd.concat([data, data_row_df])
if selected_columns:
data_row_df = data_row_df[selected_columns]

original_df_dtypes = data.dtypes
if read_in_string:
data = data.astype(str)
data = pd.concat([data, data_row_df])

return data, original_df_dtypes
original_df_dtypes = data.dtypes
if read_in_string:
data = data.astype(str)
return data, original_df_dtypes
else:
data, original_df_dtypes = sample_parquet(
file_path,
sample_nrows,
selected_columns=selected_columns,
read_in_string=read_in_string,
)
return data, original_df_dtypes


def read_text_as_list_of_strs(
Expand Down
11 changes: 10 additions & 1 deletion dataprofiler/data_readers/parquet_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(
self._data_formats["json"] = self._get_data_as_json
self._selected_data_format: str = options.get("data_format", "dataframe")
self._selected_columns: List[str] = options.get("selected_columns", list())
self._sample_nrows: Optional[int] = options.get("sample_nrows", None)

if data is not None:
self._load_data(data)
Expand All @@ -83,6 +84,11 @@ def selected_columns(self) -> List[str]:
"""Return selected columns."""
return self._selected_columns

@property
def sample_nrows(self) -> Optional[int]:
"""Return sample_nrows."""
return self._sample_nrows

taylorfturner marked this conversation as resolved.
Show resolved Hide resolved
@property
def is_structured(self) -> bool:
"""Determine compatibility with StructuredProfiler."""
Expand All @@ -100,7 +106,10 @@ def _load_data_from_str(self, data_as_str: str) -> pd.DataFrame:
def _load_data_from_file(self, input_file_path: str) -> pd.DataFrame:
"""Return data from file."""
data, original_df_dtypes = data_utils.read_parquet_df(
input_file_path, self.selected_columns, read_in_string=True
input_file_path,
selected_columns=self.selected_columns,
read_in_string=True,
sample_nrows=self.sample_nrows,
)
self._original_df_dtypes = original_df_dtypes
return data
Expand Down
53 changes: 53 additions & 0 deletions dataprofiler/tests/data_readers/test_parquet_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ def test_specifying_data_type(self):
input_data_obj = Data(input_file["path"], data_type="parquet")
self.assertEqual(input_data_obj.data_type, "parquet")

def test_specifying_data_type_when_sampled(self):
"""
Determine if the parquet file can be loaded with manual data_type setting when sampled
"""
for input_file in self.file_or_buf_list:
input_data_obj = Data(
input_file["path"], data_type="parquet", options={"sample_nrows": 100}
)
self.assertEqual(input_data_obj.data_type, "parquet")

def test_reload_data(self):
"""
Determine if the parquet file can be reloaded
Expand All @@ -112,6 +122,16 @@ def test_reload_data(self):
self.assertEqual(input_data_obj.data_type, "parquet")
self.assertEqual(input_file["path"], input_data_obj.input_file_path)

def test_reload_data_when_sampled(self):
"""
Determine if the parquet file can be reloaded when sampled
"""
for input_file in self.file_or_buf_list:
input_data_obj = Data(input_file["path"], options={"sample_nrows": 100})
input_data_obj.reload(input_file["path"], options={"sample_nrows": 100})
self.assertEqual(input_data_obj.data_type, "parquet")
self.assertEqual(input_file["path"], input_data_obj.input_file_path)

def test_data_formats(self):
"""
Determine if the parquet file data_formats can be used
Expand All @@ -130,6 +150,24 @@ def test_data_formats(self):
self.assertIsInstance(data, list)
self.assertIsInstance(data[0], str)

def test_data_formats_when_sampled(self):
"""
Determine if the parquet file data_formats can be used when sampled
"""
for input_file in self.file_or_buf_list:
input_data_obj = Data(input_file["path"], options={"sample_nrows": 100})
for data_format in list(input_data_obj._data_formats.keys()):
input_data_obj.data_format = data_format
self.assertEqual(input_data_obj.data_format, data_format)
data = input_data_obj.data
if data_format == "dataframe":
import pandas as pd
menglinw marked this conversation as resolved.
Show resolved Hide resolved

self.assertIsInstance(data, pd.DataFrame)
elif data_format in ["records", "json"]:
self.assertIsInstance(data, list)
self.assertIsInstance(data[0], str)

def test_mixed_string_col(self):
"""
Determine if parquet can handle mixed string column types.
Expand Down Expand Up @@ -181,6 +219,21 @@ def test_len_data(self):
self.assertEqual(input_file["count"], len(data), msg=input_file["path"])
self.assertEqual(input_file["count"], data.length, msg=input_file["path"])

def test_len_sampled_data(self):
"""
Validate that length called on ParquetData with sample_nrows option is
appropriately determining the length value.
"""

for input_file in self.file_or_buf_list:
data = Data(input_file["path"], options={"sample_nrows": 100})
self.assertEqual(
min(100, input_file["count"]), len(data), msg=input_file["path"]
)
self.assertEqual(
min(100, input_file["count"]), data.length, msg=input_file["path"]
)

def test_file_encoding(self):
"""Tests to ensure file_encoding set to None"""
for input_file in self.file_or_buf_list:
Expand Down