Skip to content

Commit

Permalink
Merge pull request #4701 from FederatedAI/feature-2.0.0-beta_datafram…
Browse files Browse the repository at this point in the history
…e_update

Feature 2.0.0 beta dataframe update
  • Loading branch information
mgqa34 authored Mar 20, 2023
2 parents c45d66f + 01ec04a commit d5d58b3
Show file tree
Hide file tree
Showing 10 changed files with 226 additions and 97 deletions.
61 changes: 14 additions & 47 deletions python/fate/arch/dataframe/_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

from .ops import (
aggregate_indexer,
transform_to_tensor,
transform_to_table,
get_partition_order_mappings
)
Expand Down Expand Up @@ -77,8 +76,8 @@ def values(self):
return None

return self.__extract_fields(
with_sample_id=False,
with_match_id=False,
with_sample_id=True,
with_match_id=True,
with_label=False,
with_weight=False,
columns=self.columns.tolist()
Expand Down Expand Up @@ -162,29 +161,8 @@ def as_tensor(self, dtype=None):
df.label.as_tensor()
df.values.as_tensor()
"""
attr_status = 0
if self.schema.label_name:
attr_status |= 1

if self.schema.weight_name:
attr_status |= 2

if len(self.schema.columns):
attr_status |= 4

if attr_status == 0:
raise ValueError(f"label/weight/values attributes are None")

if attr_status & -attr_status != attr_status:
raise ValueError(f"Use df.label.as_tensor() or df.weight.as_tensor() or df.values.as_tensor(), "
f"don't mixed please")

if attr_status == 1:
return self.__convert_to_tensor(self.schema.label_name, dtype=dtype)
elif attr_status == 1:
return self.__convert_to_tensor(self.schema.weight_name, dtype=dtype)
else:
return self.__convert_to_tensor(self.schema.columns.tolist(), dtype=dtype)
from .ops._transformer import transform_to_tensor
return transform_to_tensor(self._block_table, self._data_manager, dtype)

def as_pd_df(self) -> "pd.DataFrame":
from .ops._transformer import transform_to_pandas_dataframe
Expand Down Expand Up @@ -216,17 +194,21 @@ def drop(self, index) -> "DataFrame":
from .ops._dimension_scaling import drop
return drop(self, index)

def max(self, *args, **kwargs) -> "pd.Series":
...
def max(self) -> "pd.Series":
from .ops._stat import max
return max(self)

def min(self, *args, **kwargs) -> "pd.Series":
...
from .ops._stat import min
return min(self)

def mean(self, *args, **kwargs) -> "pd.Series":
...
from .ops._stat import mean
return mean(self)

def sum(self, *args, **kwargs) -> "pd.Series":
...
from .ops._stat import sum
return sum(self)

def std(self, *args, **kwargs) -> "pd.Series":
...
Expand Down Expand Up @@ -454,7 +436,7 @@ def vstack(cls, stacks: List["DataFrame"]) -> "DataFrame":
return vstack(stacks)

def __extract_fields(self, with_sample_id=True, with_match_id=True,
with_label=True, with_weight=True, columns: Union[str, list] = None) -> "DataFrame":
with_label=True, with_weight=True, columns: Union[str, list]=None) -> "DataFrame":
from .ops._field_extract import field_extract
return field_extract(
self,
Expand All @@ -465,21 +447,6 @@ def __extract_fields(self, with_sample_id=True, with_match_id=True,
columns=columns
)

def __convert_to_tensor(self, columns: Union[str, list], dtype: str = None):
if isinstance(columns, str):
columns = [columns]

column_index_offsets = [self._schema_manager.get_column_offset(column) for column in columns]
block_indexes = [self._block_manager.get_block_id(column) for column in column_index_offsets]
_, block_retrieval_indexes = self._block_manager.derive_new_block_manager(column_index_offsets)

return transform_to_tensor(
self._ctx,
self._block_table,
block_indexes,
block_retrieval_indexes,
dtype=dtype)

def __convert_to_table(self, target_name):
block_loc = self._data_manager.loc_block(target_name)
assert block_loc[1] == 0, "support only one indexer in current version"
Expand Down
6 changes: 6 additions & 0 deletions python/fate/arch/dataframe/io/_json_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@ def _serialize(ctx, data):
# new_df = DataFrame.hstack([data, df])
# print(data.drop(data).shape)
# print (new_df.as_pd_df())
# import pandas as pd
# print((data[["x0", "x1"]] * pd.Series([1,2])).as_pd_df())
# print((data[["x0", "x1"]] + pd.Series([1,2])).as_pd_df())
# print(DataFrame.hstack([data, empty_df]).as_pd_df())
# print(DataFrame.vstack([data, data * 3]).as_pd_df())
# print(data.values.as_tensor())
# data["x20"] = 1.0
# data["x21"] = [1, 2]
# data[["x22", "x23"]] = [3, 4]
Expand All @@ -53,6 +57,8 @@ def _serialize(ctx, data):
# apply_df = data.apply_row(lambda value: [1, {1:2, 2:3}])
# print(apply_df.as_pd_df())
# print(data.sigmoid().as_pd_df())
# print(data.min(), data.max(), data.sum(), data.mean())

"""
index, match_id, label, weight, values
"""
Expand Down
2 changes: 2 additions & 0 deletions python/fate/arch/dataframe/manager/block_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ def __lt__(self, other):

@staticmethod
def get_block_type(data_type):
if isinstance(data_type, np.dtype):
data_type = data_type.name
if isinstance(data_type, str):
if data_type == "str" or data_type == "object":
data_type = "np_object"
Expand Down
10 changes: 7 additions & 3 deletions python/fate/arch/dataframe/manager/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import numpy as np
from .schema_manager import SchemaManager
from .block_manager import BlockManager
from .block_manager import BlockType
Expand Down Expand Up @@ -169,6 +170,9 @@ def get_field_name_list(self, with_sample_id=True, with_match_id=True, with_labe
def get_field_type_by_name(self, name):
return self._schema_manager.get_field_types(name)

def get_field_offset(self, name):
return self._schema_manager.get_field_offset(name)

def get_block(self, block_id):
return self._block_manager.blocks[block_id]

Expand All @@ -187,9 +191,9 @@ def infer_non_operable_blocks(self):

def try_to_promote_types(self,
block_indexes: List[int],
block_type: Union[list, int, float, BlockType]) -> List[Tuple[int, BlockType]]:
block_type: Union[list, int, float, np.dtype, BlockType]) -> List[Tuple[int, BlockType]]:
promote_types = []
if isinstance(block_type, (int, float)):
if isinstance(block_type, (int, float, np.dtype)):
block_type = BlockType.get_block_type(block_type)

if isinstance(block_type, BlockType):
Expand All @@ -199,7 +203,7 @@ def try_to_promote_types(self,
(bid, block_type)
)
else:
for idx, (bid, r_type) in enumerate(zip(block_indexes, list)):
for idx, (bid, r_type) in enumerate(zip(block_indexes, block_type)):
block_type = BlockType.get_block_type(r_type)
if self.get_block(bid).block_type < block_type:
promote_types.append(
Expand Down
5 changes: 1 addition & 4 deletions python/fate/arch/dataframe/ops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@
transform_to_table,
get_partition_order_mappings
)
from ._transformer import (
transform_to_tensor,
)


__all__ = ["transform_to_tensor",
__all__ = [
"transform_to_table",
"aggregate_indexer",
"get_partition_order_mappings"
Expand Down
19 changes: 15 additions & 4 deletions python/fate/arch/dataframe/ops/_arithmetic.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import pandas as pd
from fate.arch.computing import is_table
from .._dataframe import DataFrame
from ._promote_types import promote_types
from .utils.series_align import series_to_ndarray


def arith_operate(lhs: DataFrame, rhs, op) -> "DataFrame":
Expand All @@ -32,19 +34,28 @@ def arith_operate(lhs: DataFrame, rhs, op) -> "DataFrame":
block_table = _operate(lhs.block_table, rhs.block_table, op, block_indexes, rhs_block_id)
to_promote_blocks = data_manager.try_to_promote_types(block_indexes,
rhs.data_manager.get_block(rhs_block_id).block_type)
elif isinstance(rhs, (np.ndarray, list)):
elif isinstance(rhs, (np.ndarray, list, pd.Series)):
if isinstance(rhs, pd.Series):
rhs = series_to_ndarray(rhs, column_names)
if isinstance(rhs, list):
rhs = np.array(rhs)
if len(rhs.shape) > 2:
raise ValueError("NdArray's Dimension should <= 2")
if len(column_names) != rhs.size:
raise ValueError(f"Size of List/NDArray should = {len(lhs.schema.columns)}")
rhs = rhs.reshape(-1)
rhs_blocks = [rhs[data_manager.get_block(bid).column_indexes] for bid in block_indexes]
block_table = _operate(lhs.block_table, rhs_blocks, op, block_indexes)
field_indexes = [data_manager.get_field_offset(name) for name in column_names]
field_indexes_mappings = dict(zip(field_indexes, range(len(field_indexes))))
rhs_blocks = [np.array([]) for i in range(data_manager.block_num)]
rhs_types = []
for bid in block_indexes:
indexer = [field_indexes_mappings[field] for field in data_manager.get_block(bid).field_indexes]
rhs_blocks[bid] = rhs[indexer]
rhs_types.append(rhs_blocks[bid].dtype)

rhs_types = [block.dtype for block in rhs_blocks]
block_table = _operate(lhs.block_table, rhs_blocks, op, block_indexes)
to_promote_blocks = data_manager.try_to_promote_types(block_indexes, rhs_types)

elif isinstance(rhs, (bool, int, float, np.int32, np.float32, np.int64, np.float64, np.bool)):
block_table = _operate(lhs.block_table, rhs, op, block_indexes)
to_promote_blocks = data_manager.try_to_promote_types(block_indexes, rhs)
Expand Down
130 changes: 116 additions & 14 deletions python/fate/arch/dataframe/ops/_stat.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,126 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools

import numpy as np
import pandas as pd
from fate.arch.storage import storage_ops
import torch
from .._dataframe import DataFrame
from ..manager import DataManager


def min(df: "DataFrame"):
data_manager = df.data_manager
operable_blocks = data_manager.infer_operable_blocks()

def _mapper(blocks, op_bids):
ret = []
for bid in op_bids:
if isinstance(blocks[bid], torch.Tensor):
ret.append(blocks[bid].min(axis=0).values)
else:
ret.append(blocks[bid].min(axis=0))

return ret

def _reducer(blocks1, blocks2):
ret = []
for block1, block2 in zip(blocks1, blocks2):
if isinstance(block1, torch.Tensor):
ret.append(torch.minimum(block1, block2))
else:
ret.append(np.minimum(block1, block2))

return ret

mapper_func = functools.partial(
_mapper,
op_bids=operable_blocks
)

reduce_ret = df.block_table.mapValues(mapper_func).reduce(_reducer)
return _post_process(reduce_ret, operable_blocks, data_manager)


def max(df: "DataFrame"):
data_manager = df.data_manager
operable_blocks = data_manager.infer_operable_blocks()

def _mapper(blocks, op_bids):
ret = []
for bid in op_bids:
if isinstance(blocks[bid], torch.Tensor):
ret.append(blocks[bid].max(axis=0).values)
else:
ret.append(blocks[bid].max(axis=0))

return ret

def _reducer(blocks1, blocks2):
ret = []
for block1, block2 in zip(blocks1, blocks2):
if isinstance(block1, torch.Tensor):
ret.append(torch.maximum(block1, block2))
else:
ret.append(np.maximum(block1, block2))

return ret

mapper_func = functools.partial(
_mapper,
op_bids=operable_blocks
)

reduce_ret = df.block_table.mapValues(mapper_func).reduce(_reducer)
return _post_process(reduce_ret, operable_blocks, data_manager)


def sum(df: DataFrame) -> "pd.Series":
data_manager = df.data_manager
operable_blocks = data_manager.infer_operable_blocks()

def _mapper(blocks, op_bids):
ret = []
for bid in op_bids:
ret.append(blocks[bid].sum(axis=0))

return ret

def _reducer(blocks1, blocks2):
return [block1 + block2 for block1, block2 in zip(blocks1, blocks2)]

mapper_func = functools.partial(
_mapper,
op_bids=operable_blocks
)

reduce_ret = df.block_table.mapValues(mapper_func).reduce(_reducer)
return _post_process(reduce_ret, operable_blocks, data_manager)


def mean(df: "DataFrame") -> "pd.Series":
return sum(df) / df.shape[0]


def min(block_table, block_indexes):
map_ret = self._block_table.mapValues(lambda blocks: storage_ops.min(blocks[-1].storage, dim=0))
reduce_ret = map_ret.reduce(lambda x, y: storage_ops.minimum(x, y))
block_index_set = set(block_indexes)
def _mapper(blocks):
return [
storage_ops.min(blocks[idx]) if idx in block_index_set else blocks[idx]
]
def _post_process(reduce_ret, operable_blocks, data_manager: "DataManager") -> "pd.Series":
field_names = data_manager.infer_operable_field_names()
field_indexes = [data_manager.get_field_offset(name) for name in field_names]
field_indexes_loc = dict(zip(field_indexes, range(len(field_indexes))))
ret = [[] for i in range(len(field_indexes))]

def _reducer(lhs, rhs):
...
block_type = None

block_table.mapValues(_mapper).reduce(_reducer)
reduce_ret = [r.tolist() for r in reduce_ret]
for idx, bid in enumerate(operable_blocks):
field_indexes = data_manager.blocks[bid].field_indexes
for offset, field_index in enumerate(field_indexes):
loc = field_indexes_loc[field_index]
ret[loc] = reduce_ret[idx][offset]

if block_type is None:
block_type = data_manager.blocks[bid].block_type
elif block_type < data_manager.blocks[bid].block_type:
block_type = data_manager.blocks[bid].block_type

def max(block_table, op_block_indexes):
...
return pd.Series(ret, index=field_names, dtype=block_type.value)
Loading

0 comments on commit d5d58b3

Please sign in to comment.