Skip to content

Commit

Permalink
Feature: added parquet sampling (#1070)
Browse files Browse the repository at this point in the history
* parquet sampling function developed in data_utils.py; Added sample_nrows argument in ParquetData class; Added test_len_sampled_data in test_parquet_data.py

* resolved conflict with dev, added more tests

* fixed sample empty column bug

* fixed comments in data_utils.py, including:
1. added type of return in sample_parquet function;
2. changed variable names in sample_parquet function to more descriptive names (select -> sample_index, out -> sample_df);
3. created convert_unicode_col_to_utf8 function to reduce repeating code in sample_parquet and read_parquet_df functions

* 1. renamed variable names in covert_unicode_col_to_utf8 function (data_utils.py) to be more descriptive (types -> input_column_types, col -> iter_column), other part unchanged

2. test_parquet_data.py, move import statement to the top of file

3. test_parquet_data.py, merged all tests about parquet sample feature to their original tests

* checked the datatype and input file path before and after reload with sampling option enabled

* test

* delete test edit in avro_data.py, updated fastavro version in  requirment.txt

* remove fastavro.reader type

* change fastavro version back to original

* 1. sample_parquet function description
2. test_len_data method keep one sample length test
3. remove sampling test in test_specifying_data_type
4. remove sampling test in test_reload_data
  • Loading branch information
menglinw authored Dec 12, 2023
1 parent ad2eb80 commit 0d56dac
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 31 deletions.
2 changes: 1 addition & 1 deletion dataprofiler/data_readers/avro_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def _load_data_from_file(self, input_file_path: str) -> List:
# even when the option encoding='utf-8' is added. It may come from
# some special compression codec, e.g., snappy. Then, binary mode
# reading is currently used to get the dict-formatted lines.
df_reader: fastavro.reader = fastavro.reader(input_file)
df_reader = fastavro.reader(input_file)
lines: List = list()
for line in df_reader:
lines.append(line)
Expand Down
134 changes: 107 additions & 27 deletions dataprofiler/data_readers/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import logging
import os
import random
import re
import urllib
from collections import OrderedDict
Expand All @@ -24,6 +25,7 @@
import boto3
import botocore
import dateutil
import numpy as np
import pandas as pd
import pyarrow.parquet as pq
import requests
Expand Down Expand Up @@ -439,52 +441,130 @@ def read_csv_df(
return data


def read_parquet_df(
def convert_unicode_col_to_utf8(input_df: pd.DataFrame) -> pd.DataFrame:
"""
Convert all unicode columns in input dataframe to utf-8.
:param input_df: input dataframe
:type input_df: pd.DataFrame
:return: corrected dataframe
:rtype: pd.DataFrame
"""
# Convert all the unicode columns to utf-8
input_column_types = input_df.apply(
lambda x: pd.api.types.infer_dtype(x.values, skipna=True)
)

mixed_and_unicode_cols = input_column_types[
input_column_types == "unicode"
].index.union(input_column_types[input_column_types == "mixed"].index)

for iter_column in mixed_and_unicode_cols:
# Encode sting to bytes
input_df[iter_column] = input_df[iter_column].apply(
lambda x: x.encode("utf-8").strip() if isinstance(x, str) else x
)

# Decode bytes back to string
input_df[iter_column] = input_df[iter_column].apply(
lambda x: x.decode("utf-8").strip() if isinstance(x, bytes) else x
)

return input_df


def sample_parquet(
file_path: str,
sample_nrows: int,
selected_columns: Optional[List[str]] = None,
read_in_string: bool = False,
) -> Tuple[pd.DataFrame, pd.Series]:
"""
Return an iterator that returns one row group each time.
Read parquet file, sample specified number of rows from it and return a data frame.
:param file_path: path to the Parquet file.
:type file_path: str
:param sample_nrows: number of rows being sampled
:type sample_nrows: int
:param selected_columns: columns need to be read
:type selected_columns: list
:param read_in_string: return as string type
:type read_in_string: bool
:return:
:rtype: Iterator(pd.DataFrame)
"""
parquet_file = pq.ParquetFile(file_path)
data = pd.DataFrame()
for i in range(parquet_file.num_row_groups):
# read parquet file into table
if selected_columns:
parquet_table = pq.read_table(file_path, columns=selected_columns)
else:
parquet_table = pq.read_table(file_path)

data_row_df = parquet_file.read_row_group(i).to_pandas()
# sample
n_rows = parquet_table.num_rows
if n_rows > sample_nrows:
sample_index = np.array([False] * n_rows)
sample_index[random.sample(range(n_rows), sample_nrows)] = True
else:
sample_index = np.array([True] * n_rows)
sample_df = parquet_table.filter(sample_index).to_pandas()

# Convert all the unicode columns to utf-8
types = data_row_df.apply(
lambda x: pd.api.types.infer_dtype(x.values, skipna=True)
)
# Convert all the unicode columns to utf-8
sample_df = convert_unicode_col_to_utf8(sample_df)

mixed_and_unicode_cols = types[types == "unicode"].index.union(
types[types == "mixed"].index
)
original_df_dtypes = sample_df.dtypes
if read_in_string:
sample_df = sample_df.astype(str)

for col in mixed_and_unicode_cols:
data_row_df[col] = data_row_df[col].apply(
lambda x: x.encode("utf-8").strip() if isinstance(x, str) else x
)
data_row_df[col] = data_row_df[col].apply(
lambda x: x.decode("utf-8").strip() if isinstance(x, bytes) else x
)
return sample_df, original_df_dtypes

if selected_columns:
data_row_df = data_row_df[selected_columns]

data = pd.concat([data, data_row_df])
def read_parquet_df(
file_path: str,
sample_nrows: Optional[int] = None,
selected_columns: Optional[List[str]] = None,
read_in_string: bool = False,
) -> Tuple[pd.DataFrame, pd.Series]:
"""
Return an iterator that returns one row group each time.
original_df_dtypes = data.dtypes
if read_in_string:
data = data.astype(str)
:param file_path: path to the Parquet file.
:type file_path: str
:param sample_nrows: number of rows being sampled
:type sample_nrows: int
:param selected_columns: columns need to be read
:type selected_columns: list
:param read_in_string: return as string type
:type read_in_string: bool
:return:
:rtype: Iterator(pd.DataFrame)
"""
if sample_nrows is None:
parquet_file = pq.ParquetFile(file_path)
data = pd.DataFrame()
for i in range(parquet_file.num_row_groups):

data_row_df = parquet_file.read_row_group(i).to_pandas()

return data, original_df_dtypes
# Convert all the unicode columns to utf-8
data_row_df = convert_unicode_col_to_utf8(data_row_df)

if selected_columns:
data_row_df = data_row_df[selected_columns]

data = pd.concat([data, data_row_df])

original_df_dtypes = data.dtypes
if read_in_string:
data = data.astype(str)
return data, original_df_dtypes
else:
data, original_df_dtypes = sample_parquet(
file_path,
sample_nrows,
selected_columns=selected_columns,
read_in_string=read_in_string,
)
return data, original_df_dtypes


def read_text_as_list_of_strs(
Expand Down
11 changes: 10 additions & 1 deletion dataprofiler/data_readers/parquet_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(
self._data_formats["json"] = self._get_data_as_json
self._selected_data_format: str = options.get("data_format", "dataframe")
self._selected_columns: List[str] = options.get("selected_columns", list())
self._sample_nrows: Optional[int] = options.get("sample_nrows", None)

if data is not None:
self._load_data(data)
Expand All @@ -83,6 +84,11 @@ def selected_columns(self) -> List[str]:
"""Return selected columns."""
return self._selected_columns

@property
def sample_nrows(self) -> Optional[int]:
"""Return sample_nrows."""
return self._sample_nrows

@property
def is_structured(self) -> bool:
"""Determine compatibility with StructuredProfiler."""
Expand All @@ -100,7 +106,10 @@ def _load_data_from_str(self, data_as_str: str) -> pd.DataFrame:
def _load_data_from_file(self, input_file_path: str) -> pd.DataFrame:
"""Return data from file."""
data, original_df_dtypes = data_utils.read_parquet_df(
input_file_path, self.selected_columns, read_in_string=True
input_file_path,
selected_columns=self.selected_columns,
read_in_string=True,
sample_nrows=self.sample_nrows,
)
self._original_df_dtypes = original_df_dtypes
return data
Expand Down
22 changes: 20 additions & 2 deletions dataprofiler/tests/data_readers/test_parquet_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import unittest
from io import BytesIO

import pandas as pd

from dataprofiler.data_readers.data import Data
from dataprofiler.data_readers.parquet_data import ParquetData

Expand Down Expand Up @@ -123,13 +125,24 @@ def test_data_formats(self):
self.assertEqual(input_data_obj.data_format, data_format)
data = input_data_obj.data
if data_format == "dataframe":
import pandas as pd

self.assertIsInstance(data, pd.DataFrame)
elif data_format in ["records", "json"]:
self.assertIsInstance(data, list)
self.assertIsInstance(data[0], str)

input_data_obj_sampled = Data(
input_file["path"], options={"sample_nrows": 100}
)
for data_format in list(input_data_obj_sampled._data_formats.keys()):
input_data_obj_sampled.data_format = data_format
self.assertEqual(input_data_obj_sampled.data_format, data_format)
data_sampled = input_data_obj_sampled.data
if data_format == "dataframe":
self.assertIsInstance(data_sampled, pd.DataFrame)
elif data_format in ["records", "json"]:
self.assertIsInstance(data_sampled, list)
self.assertIsInstance(data_sampled[0], str)

def test_mixed_string_col(self):
"""
Determine if parquet can handle mixed string column types.
Expand Down Expand Up @@ -181,6 +194,11 @@ def test_len_data(self):
self.assertEqual(input_file["count"], len(data), msg=input_file["path"])
self.assertEqual(input_file["count"], data.length, msg=input_file["path"])

data_sampled = Data(input_file["path"], options={"sample_nrows": 100})
self.assertEqual(
min(100, input_file["count"]), len(data_sampled), msg=input_file["path"]
)

def test_file_encoding(self):
"""Tests to ensure file_encoding set to None"""
for input_file in self.file_or_buf_list:
Expand Down

0 comments on commit 0d56dac

Please sign in to comment.