Skip to content

Commit

Permalink
add data overview
Browse files Browse the repository at this point in the history
Signed-off-by: weiwee <wbwmat@gmail.com>
  • Loading branch information
sagewe committed Jun 20, 2023
1 parent aef0306 commit 883d07b
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 61 deletions.
135 changes: 80 additions & 55 deletions python/fate/arch/dataframe/_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import numpy as np
import operator
import pandas as pd

from typing import List, Union

from .ops import (
aggregate_indexer,
get_partition_order_mappings
)
import numpy as np
import pandas as pd

from .manager import DataManager, Schema
from .ops import aggregate_indexer, get_partition_order_mappings


class DataFrame(object):
Expand All @@ -50,19 +47,17 @@ def __init__(self, ctx, block_table, partition_order_mappings, data_manager: Dat
@property
def sample_id(self):
if self._sample_id is None:
self._sample_id = self.__extract_fields(with_sample_id=True,
with_match_id=False,
with_label=False,
with_weight=False)
self._sample_id = self.__extract_fields(
with_sample_id=True, with_match_id=False, with_label=False, with_weight=False
)
return self._sample_id

@property
def match_id(self):
if self._match_id is None:
self._match_id = self.__extract_fields(with_sample_id=True,
with_match_id=True,
with_label=False,
with_weight=False)
self._match_id = self.__extract_fields(
with_sample_id=True, with_match_id=True, with_label=False, with_weight=False
)

return self._match_id

Expand All @@ -75,11 +70,7 @@ def values(self):
return None

return self.__extract_fields(
with_sample_id=True,
with_match_id=True,
with_label=False,
with_weight=False,
columns=self.columns.tolist()
with_sample_id=True, with_match_id=True, with_label=False, with_weight=False, columns=self.columns.tolist()
)

@property
Expand All @@ -89,10 +80,7 @@ def label(self):

if self._label is None:
self._label = self.__extract_fields(
with_sample_id=True,
with_match_id=True,
with_label=True,
with_weight=False
with_sample_id=True, with_match_id=True, with_label=True, with_weight=False
)

return self._label
Expand All @@ -104,10 +92,7 @@ def weight(self):

if self._weight is None:
self._weight = self.__extract_fields(
with_sample_id=True,
with_match_id=True,
with_label=False,
with_weight=False
with_sample_id=True, with_match_id=True, with_label=False, with_weight=False
)

return self._weight
Expand All @@ -121,7 +106,8 @@ def shape(self) -> "tuple":
items = self._match_id_indexer.count()
else:
items = self._block_table.mapValues(lambda block: 0 if block is None else len(block[0])).reduce(
lambda size1, size2: size1 + size2)
lambda size1, size2: size1 + size2
)
self.__count = items

return self.__count, len(self._data_manager.schema.columns)
Expand Down Expand Up @@ -161,105 +147,123 @@ def as_tensor(self, dtype=None):
df.values.as_tensor()
"""
from .ops._transformer import transform_to_tensor

return transform_to_tensor(self._block_table, self._data_manager, dtype)

def as_pd_df(self) -> "pd.DataFrame":
from .ops._transformer import transform_to_pandas_dataframe
return transform_to_pandas_dataframe(
self._block_table,
self._data_manager
)

def apply_row(self, func, columns=None, with_label=False,
with_weight=False, enable_type_align_checking=True):
return transform_to_pandas_dataframe(self._block_table, self._data_manager)

def apply_row(self, func, columns=None, with_label=False, with_weight=False, enable_type_align_checking=True):
from .ops._apply_row import apply_row

return apply_row(
self,
func,
columns=columns,
with_label=with_label,
with_weight=with_weight,
enable_type_align_checking=enable_type_align_checking
enable_type_align_checking=enable_type_align_checking,
)

def create_frame(self, with_label=False, with_weight=False, columns: list = None) -> "DataFrame":
return self.__extract_fields(with_sample_id=True,
with_match_id=True,
with_label=with_label,
with_weight=with_weight,
columns=columns)
return self.__extract_fields(
with_sample_id=True, with_match_id=True, with_label=with_label, with_weight=with_weight, columns=columns
)

def drop(self, index) -> "DataFrame":
from .ops._dimension_scaling import drop

return drop(self, index)

def fillna(self, value):
from .ops._fillna import fillna

return fillna(self, value)

def get_dummies(self, dtype="int32"):
from .ops._encoder import get_dummies

return get_dummies(self, dtype=dtype)

def isna(self):
from .ops._missing import isna

return isna(self)

def isin(self, values):
from .ops._isin import isin

return isin(self, values)

def na_count(self):
return self.isna().sum()

def max(self) -> "pd.Series":
from .ops._stat import max

return max(self)

def min(self, *args, **kwargs) -> "pd.Series":
from .ops._stat import min

return min(self)

def mean(self, *args, **kwargs) -> "pd.Series":
from .ops._stat import mean

return mean(self)

def sum(self, *args, **kwargs) -> "pd.Series":
from .ops._stat import sum

return sum(self)

def std(self, ddof=1, **kwargs) -> "pd.Series":
from .ops._stat import std

return std(self, ddof=ddof)

def var(self, ddof=1, **kwargs):
from .ops._stat import var

return var(self, ddof=ddof)

def variation(self, ddof=1):
from .ops._stat import variation

return variation(self, ddof=ddof)

def skew(self, unbiased=False):
from .ops._stat import skew

return skew(self, unbiased=unbiased)

def kurt(self, unbiased=False):
from .ops._stat import kurt

return kurt(self, unbiased=unbiased)

def sigmoid(self) -> "DataFrame":
from .ops._activation import sigmoid

return sigmoid(self)

def count(self) -> "int":
return self.shape[0]

def describe(self, ddof=1, unbiased=False):
from .ops._stat import describe

return describe(self, ddof=ddof, unbiased=unbiased)

def quantile(self, q, axis=0, method="quantile", ):
def quantile(
self,
q,
axis=0,
method="quantile",
):
...

def __add__(self, other: Union[int, float, list, "np.ndarray", "DataFrame", "pd.Series"]) -> "DataFrame":
Expand Down Expand Up @@ -300,6 +304,7 @@ def __ge__(self, other) -> "DataFrame":

def __invert__(self):
from .ops._unary_operator import invert

return invert(self)

def __arithmetic_operate(self, op, other) -> "DataFrame":
Expand All @@ -312,10 +317,12 @@ def __arithmetic_operate(self, op, other) -> "DataFrame":
需要注意的是:int/float可能会统一上升成float,所以涉及到block类型的变化和压缩
"""
from .ops._arithmetic import arith_operate

return arith_operate(self, other, op)

def __cmp_operate(self, op, other) -> "DataFrame":
from .ops._cmp import cmp_operate

return cmp_operate(self, other, op)

def __getattr__(self, attr):
Expand All @@ -325,8 +332,7 @@ def __getattr__(self, attr):
return self.__getitem__(attr)

def __setattr__(self, key, value):
property_attr_mapping = dict(block_table="_block_table",
data_manager="_data_manager")
property_attr_mapping = dict(block_table="_block_table", data_manager="_data_manager")
if key not in ["label", "weight"] and key not in property_attr_mapping:
self.__dict__[key] = value
return
Expand All @@ -339,16 +345,19 @@ def __setattr__(self, key, value):
if self._label is not None:
self.__dict__["_label"] = None
from .ops._set_item import set_label_or_weight

set_label_or_weight(self, value, key_type=key)
else:
if self._weight is not None:
self.__dict__["_weight"] = None
from .ops._set_item import set_label_or_weight

set_label_or_weight(self, value, key_type=key)

def __getitem__(self, items) -> "DataFrame":
if isinstance(items, DataFrame):
from .ops._where import where

return where(self, items)

if isinstance(items, pd.Index):
Expand Down Expand Up @@ -433,12 +442,14 @@ def loc(self, indexer, target="sample_id", preserve_order=False):
agg_indexer = aggregate_indexer(indexer)

if not preserve_order:

def _convert_block(blocks, retrieval_indexes):
row_indexes = [retrieval_index[0] for retrieval_index in retrieval_indexes]
return [block[row_indexes] for block in blocks]

block_table = self._block_table.join(agg_indexer, _convert_block)
else:

def _convert_to_block(kvs):
ret_dict = {}
for block_id, (blocks, block_indexer) in kvs:
Expand All @@ -449,8 +460,12 @@ def _convert_to_block(kvs):
if dst_block_id not in ret_dict:
ret_dict[dst_block_id] = []

ret_dict[dst_block_id].append([block[src_row_id] if isinstance(block, pd.Index)
else block[src_row_id].tolist() for block in blocks])
ret_dict[dst_block_id].append(
[
block[src_row_id] if isinstance(block, pd.Index) else block[src_row_id].tolist()
for block in blocks
]
)

return list(ret_dict.items())

Expand Down Expand Up @@ -487,45 +502,55 @@ def _merge_list(lhs, rhs):
return ret

from .ops._transformer import transform_list_block_to_frame_block

block_table = self._block_table.join(agg_indexer, lambda lhs, rhs: (lhs, rhs))
block_table = block_table.mapReducePartitions(_convert_to_block, _merge_list)
block_table = transform_list_block_to_frame_block(block_table,
self._data_manager)
block_table = transform_list_block_to_frame_block(block_table, self._data_manager)

partition_order_mappings = get_partition_order_mappings(block_table)
return DataFrame(self._ctx,
block_table,
partition_order_mappings,
self._data_manager)
return DataFrame(self._ctx, block_table, partition_order_mappings, self._data_manager)

def iloc(self, indexes):
...

@classmethod
def hstack(cls, stacks: List["DataFrame"]) -> "DataFrame":
from .ops._dimension_scaling import hstack

return hstack(stacks)

@classmethod
def vstack(cls, stacks: List["DataFrame"]) -> "DataFrame":
from .ops._dimension_scaling import vstack

return vstack(stacks)

def __extract_fields(self, with_sample_id=True, with_match_id=True,
with_label=True, with_weight=True, columns: Union[str, list]=None) -> "DataFrame":
def __extract_fields(
self,
with_sample_id=True,
with_match_id=True,
with_label=True,
with_weight=True,
columns: Union[str, list] = None,
) -> "DataFrame":
from .ops._field_extract import field_extract

return field_extract(
self,
with_sample_id=with_sample_id,
with_match_id=with_match_id,
with_label=with_label,
with_weight=with_weight,
columns=columns
columns=columns,
)

def __convert_to_table(self, target_name):
block_loc = self._data_manager.loc_block(target_name)
assert block_loc[1] == 0, "support only one indexer in current version"

from .ops._indexer import transform_to_table

return transform_to_table(self._block_table, block_loc[0], self._partition_order_mappings)

def data_overview(self, num=100):
return [[1, 2, 3, 4], [2, 2, 3, 4]]
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@
if typing.TYPE_CHECKING:
from fate.arch import URI

W = TypeVar("W")


class _ArtifactTypeWriter(Generic[W]):
def __init__(self, artifact: W) -> None:
class _ArtifactTypeWriter:
def __init__(self, artifact: "_ArtifactType") -> None:
self.artifact = artifact

def __str__(self):
Expand Down
Loading

0 comments on commit 883d07b

Please sign in to comment.