diff --git a/python/fate/arch/dataframe/_dataframe.py b/python/fate/arch/dataframe/_dataframe.py index af4df61f47..84418c0bf7 100644 --- a/python/fate/arch/dataframe/_dataframe.py +++ b/python/fate/arch/dataframe/_dataframe.py @@ -21,7 +21,6 @@ from .ops import ( aggregate_indexer, - transform_to_tensor, transform_to_table, get_partition_order_mappings ) @@ -77,8 +76,8 @@ def values(self): return None return self.__extract_fields( - with_sample_id=False, - with_match_id=False, + with_sample_id=True, + with_match_id=True, with_label=False, with_weight=False, columns=self.columns.tolist() @@ -162,29 +161,8 @@ def as_tensor(self, dtype=None): df.label.as_tensor() df.values.as_tensor() """ - attr_status = 0 - if self.schema.label_name: - attr_status |= 1 - - if self.schema.weight_name: - attr_status |= 2 - - if len(self.schema.columns): - attr_status |= 4 - - if attr_status == 0: - raise ValueError(f"label/weight/values attributes are None") - - if attr_status & -attr_status != attr_status: - raise ValueError(f"Use df.label.as_tensor() or df.weight.as_tensor() or df.values.as_tensor(), " - f"don't mixed please") - - if attr_status == 1: - return self.__convert_to_tensor(self.schema.label_name, dtype=dtype) - elif attr_status == 1: - return self.__convert_to_tensor(self.schema.weight_name, dtype=dtype) - else: - return self.__convert_to_tensor(self.schema.columns.tolist(), dtype=dtype) + from .ops._transformer import transform_to_tensor + return transform_to_tensor(self._block_table, self._data_manager, dtype) def as_pd_df(self) -> "pd.DataFrame": from .ops._transformer import transform_to_pandas_dataframe @@ -216,17 +194,21 @@ def drop(self, index) -> "DataFrame": from .ops._dimension_scaling import drop return drop(self, index) - def max(self, *args, **kwargs) -> "pd.Series": - ... + def max(self) -> "pd.Series": + from .ops._stat import max + return max(self) def min(self, *args, **kwargs) -> "pd.Series": - ... + from .ops._stat import min + return min(self) def mean(self, *args, **kwargs) -> "pd.Series": - ... + from .ops._stat import mean + return mean(self) def sum(self, *args, **kwargs) -> "pd.Series": - ... + from .ops._stat import sum + return sum(self) def std(self, *args, **kwargs) -> "pd.Series": ... @@ -454,7 +436,7 @@ def vstack(cls, stacks: List["DataFrame"]) -> "DataFrame": return vstack(stacks) def __extract_fields(self, with_sample_id=True, with_match_id=True, - with_label=True, with_weight=True, columns: Union[str, list] = None) -> "DataFrame": + with_label=True, with_weight=True, columns: Union[str, list]=None) -> "DataFrame": from .ops._field_extract import field_extract return field_extract( self, @@ -465,21 +447,6 @@ def __extract_fields(self, with_sample_id=True, with_match_id=True, columns=columns ) - def __convert_to_tensor(self, columns: Union[str, list], dtype: str = None): - if isinstance(columns, str): - columns = [columns] - - column_index_offsets = [self._schema_manager.get_column_offset(column) for column in columns] - block_indexes = [self._block_manager.get_block_id(column) for column in column_index_offsets] - _, block_retrieval_indexes = self._block_manager.derive_new_block_manager(column_index_offsets) - - return transform_to_tensor( - self._ctx, - self._block_table, - block_indexes, - block_retrieval_indexes, - dtype=dtype) - def __convert_to_table(self, target_name): block_loc = self._data_manager.loc_block(target_name) assert block_loc[1] == 0, "support only one indexer in current version" diff --git a/python/fate/arch/dataframe/io/_json_serialization.py b/python/fate/arch/dataframe/io/_json_serialization.py index 16e3f52e83..2ac6b05881 100644 --- a/python/fate/arch/dataframe/io/_json_serialization.py +++ b/python/fate/arch/dataframe/io/_json_serialization.py @@ -40,8 +40,12 @@ def _serialize(ctx, data): # new_df = DataFrame.hstack([data, df]) # print(data.drop(data).shape) # print (new_df.as_pd_df()) + # import pandas as pd + # print((data[["x0", "x1"]] * pd.Series([1,2])).as_pd_df()) + # print((data[["x0", "x1"]] + pd.Series([1,2])).as_pd_df()) # print(DataFrame.hstack([data, empty_df]).as_pd_df()) # print(DataFrame.vstack([data, data * 3]).as_pd_df()) + # print(data.values.as_tensor()) # data["x20"] = 1.0 # data["x21"] = [1, 2] # data[["x22", "x23"]] = [3, 4] @@ -53,6 +57,8 @@ def _serialize(ctx, data): # apply_df = data.apply_row(lambda value: [1, {1:2, 2:3}]) # print(apply_df.as_pd_df()) # print(data.sigmoid().as_pd_df()) + # print(data.min(), data.max(), data.sum(), data.mean()) + """ index, match_id, label, weight, values """ diff --git a/python/fate/arch/dataframe/manager/block_manager.py b/python/fate/arch/dataframe/manager/block_manager.py index fe1f425ba4..0347012be9 100644 --- a/python/fate/arch/dataframe/manager/block_manager.py +++ b/python/fate/arch/dataframe/manager/block_manager.py @@ -70,6 +70,8 @@ def __lt__(self, other): @staticmethod def get_block_type(data_type): + if isinstance(data_type, np.dtype): + data_type = data_type.name if isinstance(data_type, str): if data_type == "str" or data_type == "object": data_type = "np_object" diff --git a/python/fate/arch/dataframe/manager/data_manager.py b/python/fate/arch/dataframe/manager/data_manager.py index 060aef0d10..5135db200c 100644 --- a/python/fate/arch/dataframe/manager/data_manager.py +++ b/python/fate/arch/dataframe/manager/data_manager.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import numpy as np from .schema_manager import SchemaManager from .block_manager import BlockManager from .block_manager import BlockType @@ -169,6 +170,9 @@ def get_field_name_list(self, with_sample_id=True, with_match_id=True, with_labe def get_field_type_by_name(self, name): return self._schema_manager.get_field_types(name) + def get_field_offset(self, name): + return self._schema_manager.get_field_offset(name) + def get_block(self, block_id): return self._block_manager.blocks[block_id] @@ -187,9 +191,9 @@ def infer_non_operable_blocks(self): def try_to_promote_types(self, block_indexes: List[int], - block_type: Union[list, int, float, BlockType]) -> List[Tuple[int, BlockType]]: + block_type: Union[list, int, float, np.dtype, BlockType]) -> List[Tuple[int, BlockType]]: promote_types = [] - if isinstance(block_type, (int, float)): + if isinstance(block_type, (int, float, np.dtype)): block_type = BlockType.get_block_type(block_type) if isinstance(block_type, BlockType): @@ -199,7 +203,7 @@ def try_to_promote_types(self, (bid, block_type) ) else: - for idx, (bid, r_type) in enumerate(zip(block_indexes, list)): + for idx, (bid, r_type) in enumerate(zip(block_indexes, block_type)): block_type = BlockType.get_block_type(r_type) if self.get_block(bid).block_type < block_type: promote_types.append( diff --git a/python/fate/arch/dataframe/ops/__init__.py b/python/fate/arch/dataframe/ops/__init__.py index 816b52af46..6d01183344 100644 --- a/python/fate/arch/dataframe/ops/__init__.py +++ b/python/fate/arch/dataframe/ops/__init__.py @@ -17,12 +17,9 @@ transform_to_table, get_partition_order_mappings ) -from ._transformer import ( - transform_to_tensor, -) -__all__ = ["transform_to_tensor", +__all__ = [ "transform_to_table", "aggregate_indexer", "get_partition_order_mappings" diff --git a/python/fate/arch/dataframe/ops/_arithmetic.py b/python/fate/arch/dataframe/ops/_arithmetic.py index f415739337..16dcd254d9 100644 --- a/python/fate/arch/dataframe/ops/_arithmetic.py +++ b/python/fate/arch/dataframe/ops/_arithmetic.py @@ -13,9 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import numpy as np +import pandas as pd from fate.arch.computing import is_table from .._dataframe import DataFrame from ._promote_types import promote_types +from .utils.series_align import series_to_ndarray def arith_operate(lhs: DataFrame, rhs, op) -> "DataFrame": @@ -32,7 +34,9 @@ def arith_operate(lhs: DataFrame, rhs, op) -> "DataFrame": block_table = _operate(lhs.block_table, rhs.block_table, op, block_indexes, rhs_block_id) to_promote_blocks = data_manager.try_to_promote_types(block_indexes, rhs.data_manager.get_block(rhs_block_id).block_type) - elif isinstance(rhs, (np.ndarray, list)): + elif isinstance(rhs, (np.ndarray, list, pd.Series)): + if isinstance(rhs, pd.Series): + rhs = series_to_ndarray(rhs, column_names) if isinstance(rhs, list): rhs = np.array(rhs) if len(rhs.shape) > 2: @@ -40,11 +44,18 @@ def arith_operate(lhs: DataFrame, rhs, op) -> "DataFrame": if len(column_names) != rhs.size: raise ValueError(f"Size of List/NDArray should = {len(lhs.schema.columns)}") rhs = rhs.reshape(-1) - rhs_blocks = [rhs[data_manager.get_block(bid).column_indexes] for bid in block_indexes] - block_table = _operate(lhs.block_table, rhs_blocks, op, block_indexes) + field_indexes = [data_manager.get_field_offset(name) for name in column_names] + field_indexes_mappings = dict(zip(field_indexes, range(len(field_indexes)))) + rhs_blocks = [np.array([]) for i in range(data_manager.block_num)] + rhs_types = [] + for bid in block_indexes: + indexer = [field_indexes_mappings[field] for field in data_manager.get_block(bid).field_indexes] + rhs_blocks[bid] = rhs[indexer] + rhs_types.append(rhs_blocks[bid].dtype) - rhs_types = [block.dtype for block in rhs_blocks] + block_table = _operate(lhs.block_table, rhs_blocks, op, block_indexes) to_promote_blocks = data_manager.try_to_promote_types(block_indexes, rhs_types) + elif isinstance(rhs, (bool, int, float, np.int32, np.float32, np.int64, np.float64, np.bool)): block_table = _operate(lhs.block_table, rhs, op, block_indexes) to_promote_blocks = data_manager.try_to_promote_types(block_indexes, rhs) diff --git a/python/fate/arch/dataframe/ops/_stat.py b/python/fate/arch/dataframe/ops/_stat.py index f00e78bed4..35700230e9 100644 --- a/python/fate/arch/dataframe/ops/_stat.py +++ b/python/fate/arch/dataframe/ops/_stat.py @@ -12,24 +12,126 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import functools + +import numpy as np import pandas as pd -from fate.arch.storage import storage_ops +import torch +from .._dataframe import DataFrame +from ..manager import DataManager + + +def min(df: "DataFrame"): + data_manager = df.data_manager + operable_blocks = data_manager.infer_operable_blocks() + + def _mapper(blocks, op_bids): + ret = [] + for bid in op_bids: + if isinstance(blocks[bid], torch.Tensor): + ret.append(blocks[bid].min(axis=0).values) + else: + ret.append(blocks[bid].min(axis=0)) + + return ret + + def _reducer(blocks1, blocks2): + ret = [] + for block1, block2 in zip(blocks1, blocks2): + if isinstance(block1, torch.Tensor): + ret.append(torch.minimum(block1, block2)) + else: + ret.append(np.minimum(block1, block2)) + + return ret + + mapper_func = functools.partial( + _mapper, + op_bids=operable_blocks + ) + + reduce_ret = df.block_table.mapValues(mapper_func).reduce(_reducer) + return _post_process(reduce_ret, operable_blocks, data_manager) + + +def max(df: "DataFrame"): + data_manager = df.data_manager + operable_blocks = data_manager.infer_operable_blocks() + + def _mapper(blocks, op_bids): + ret = [] + for bid in op_bids: + if isinstance(blocks[bid], torch.Tensor): + ret.append(blocks[bid].max(axis=0).values) + else: + ret.append(blocks[bid].max(axis=0)) + + return ret + + def _reducer(blocks1, blocks2): + ret = [] + for block1, block2 in zip(blocks1, blocks2): + if isinstance(block1, torch.Tensor): + ret.append(torch.maximum(block1, block2)) + else: + ret.append(np.maximum(block1, block2)) + + return ret + + mapper_func = functools.partial( + _mapper, + op_bids=operable_blocks + ) + + reduce_ret = df.block_table.mapValues(mapper_func).reduce(_reducer) + return _post_process(reduce_ret, operable_blocks, data_manager) + + +def sum(df: DataFrame) -> "pd.Series": + data_manager = df.data_manager + operable_blocks = data_manager.infer_operable_blocks() + + def _mapper(blocks, op_bids): + ret = [] + for bid in op_bids: + ret.append(blocks[bid].sum(axis=0)) + + return ret + + def _reducer(blocks1, blocks2): + return [block1 + block2 for block1, block2 in zip(blocks1, blocks2)] + + mapper_func = functools.partial( + _mapper, + op_bids=operable_blocks + ) + + reduce_ret = df.block_table.mapValues(mapper_func).reduce(_reducer) + return _post_process(reduce_ret, operable_blocks, data_manager) + + +def mean(df: "DataFrame") -> "pd.Series": + return sum(df) / df.shape[0] -def min(block_table, block_indexes): - map_ret = self._block_table.mapValues(lambda blocks: storage_ops.min(blocks[-1].storage, dim=0)) - reduce_ret = map_ret.reduce(lambda x, y: storage_ops.minimum(x, y)) - block_index_set = set(block_indexes) - def _mapper(blocks): - return [ - storage_ops.min(blocks[idx]) if idx in block_index_set else blocks[idx] - ] +def _post_process(reduce_ret, operable_blocks, data_manager: "DataManager") -> "pd.Series": + field_names = data_manager.infer_operable_field_names() + field_indexes = [data_manager.get_field_offset(name) for name in field_names] + field_indexes_loc = dict(zip(field_indexes, range(len(field_indexes)))) + ret = [[] for i in range(len(field_indexes))] - def _reducer(lhs, rhs): - ... + block_type = None - block_table.mapValues(_mapper).reduce(_reducer) + reduce_ret = [r.tolist() for r in reduce_ret] + for idx, bid in enumerate(operable_blocks): + field_indexes = data_manager.blocks[bid].field_indexes + for offset, field_index in enumerate(field_indexes): + loc = field_indexes_loc[field_index] + ret[loc] = reduce_ret[idx][offset] + if block_type is None: + block_type = data_manager.blocks[bid].block_type + elif block_type < data_manager.blocks[bid].block_type: + block_type = data_manager.blocks[bid].block_type -def max(block_table, op_block_indexes): - ... \ No newline at end of file + return pd.Series(ret, index=field_names, dtype=block_type.value) diff --git a/python/fate/arch/dataframe/ops/_transformer.py b/python/fate/arch/dataframe/ops/_transformer.py index 4ae65f9276..6947807d37 100644 --- a/python/fate/arch/dataframe/ops/_transformer.py +++ b/python/fate/arch/dataframe/ops/_transformer.py @@ -12,37 +12,31 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import functools + import pandas as pd from typing import List, Tuple import torch from fate.arch import tensor import numpy as np +from ..manager.data_manager import DataManager -def transform_to_tensor(ctx, block_table, - block_indexes: List[Tuple[int, int]], retrieval_block_indexes, dtype=None): - """ - column_indexes: column to retrieval - block_indexes: list, (block_id, block_indexes) - retrieval_block_indexes: list, each element: (src_block_id, dst_block_id, changed=True/False, block_indexes) - dtype: convert to tensor with dtype, default is None - """ - def _to_local_tensor(src_blocks): - if len(retrieval_block_indexes) == 1: - src_block_id, dst_block_id, is_changed, indexes = retrieval_block_indexes[0] - if not is_changed: - t = src_blocks[src_block_id] - else: - t = src_blocks[src_block_id][:, indexes] +def transform_to_tensor(block_table, + data_manager: "DataManager", dtype=None): + def _merge_blocks(src_blocks, bids=None, fields=None): + if len(bids) == 1: + bid = bids[0] + t = src_blocks[bid] else: i = 0 tensors = [] - while i < len(block_indexes): - bid = block_indexes[i][0] - indexes = [block_indexes[i][1]] + while i < len(fields): + bid = fields[i][0] + indexes = [fields[i][1]] j = i + 1 - while j < len(block_indexes) and block_indexes[j] == block_indexes[j - 1]: - indexes.append(block_indexes[j][1]) + while j < len(fields) and bid == fields[j][0] and indexes[-1] + 1 == fields[j][1]: + indexes.append(fields[1]) j += 1 tensors.append(src_blocks[bid][:, indexes]) @@ -55,12 +49,26 @@ def _to_local_tensor(src_blocks): return t - local_tensor_table = block_table.mapValues(_to_local_tensor) - local_tensor_blocks = [block_with_id[1] for block_with_id in sorted(local_tensor_table.collect())] + block_indexes = data_manager.infer_operable_blocks() + for block_id in block_indexes: + if not data_manager.get_block(block_id).is_numeric: + raise ValueError("Transform to distributed tensor should ensure every field is numeric") + + field_names = data_manager.infer_operable_field_names() + fields_loc = data_manager.loc_block(field_names) + + _merged_func = functools.partial( + _merge_blocks, + bids=block_indexes, + fields=fields_loc + ) + merged_table = block_table.mapValues(_merged_func) + + shape_table = merged_table.mapValues(lambda v: v.shape) + shapes = [shape_obj for k, shape_obj in sorted(shape_table.collect())] - return tensor.distributed_tensor(ctx, - local_tensor_blocks, - partitions=len(local_tensor_blocks)) + return tensor.DTensor.from_sharding_table(merged_table, + shapes=shapes) def transform_block_to_list(block_table, data_manager): diff --git a/python/fate/arch/dataframe/ops/utils/__init__.py b/python/fate/arch/dataframe/ops/utils/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/python/fate/arch/dataframe/ops/utils/series_align.py b/python/fate/arch/dataframe/ops/utils/series_align.py new file mode 100644 index 0000000000..b541943254 --- /dev/null +++ b/python/fate/arch/dataframe/ops/utils/series_align.py @@ -0,0 +1,32 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pandas as pd +from typing import List + + +def series_to_ndarray(series_obj: "pd.Series", fields_to_align: List[str]=None): + if isinstance(series_obj.index, pd.RangeIndex) or not fields_to_align: + return series_obj.values + else: + if len(series_obj) != len(fields_to_align): + raise ValueError(f"Can't not align fields, src={fields_to_align}, dst={series_obj}") + + indexer = series_obj.index.get_indexer(fields_to_align) + + return series_obj[indexer] + + +def series_to_list(series_obj: "pd.Series", fields_to_align: List[str]=None): + return series_to_ndarray(series_obj, fields_to_align).tolist()