diff --git a/python/fate/arch/dataframe/manager/block_manager.py b/python/fate/arch/dataframe/manager/block_manager.py index c8d623e87e..e21a1ed2e7 100644 --- a/python/fate/arch/dataframe/manager/block_manager.py +++ b/python/fate/arch/dataframe/manager/block_manager.py @@ -268,6 +268,21 @@ def transform_row_to_raw(cls, block, index): else: return block[index].tolist() + @classmethod + def vstack(cls, blocks): + ret = blocks[0] + if isinstance(ret, pd.Index): + for block in blocks[1:]: + ret = ret.union(block, sort=False) + elif isinstance(ret, torch.Tensor): + ret = torch.vstack(blocks) + elif isinstance(ret, np.ndarray): + ret = np.vstack(blocks) + else: + raise ValueError(f"Not implemented block vstack for type {type(ret)}") + + return ret + class Int32Block(Block): def __init__(self, *args, **kwargs): @@ -367,6 +382,8 @@ def set_extra_kwargs(self, pk, evaluator, coder, dtype, device): @staticmethod def convert_block(block): + if isinstance(block, list): + block = block[0].cat(block[1:]) return block def convert_to_phe_tensor(self, block, shape): diff --git a/python/fate/arch/dataframe/ops/_apply_row.py b/python/fate/arch/dataframe/ops/_apply_row.py index 7931cec527..c6b1505cf1 100644 --- a/python/fate/arch/dataframe/ops/_apply_row.py +++ b/python/fate/arch/dataframe/ops/_apply_row.py @@ -118,6 +118,17 @@ def _apply(blocks, func=None, src_field_names=None, ret_blocks[idx] = blocks[bid] for idx, bid in enumerate(block_indexes): - ret_blocks[bid] = dm.blocks[bid].convert_block(apply_blocks[idx]) + if dm.blocks[bid].is_phe_tensor(): + single_value = apply_blocks[idx][0][0] + dm.blocks[bid].set_extra_kwargs(pk=single_value.pk, + evaluator=single_value.evaluator, + coder=single_value.coder, + dtype=single_value.dtype, + device=single_value.device) + ret = [v[0]._data for v in apply_blocks[idx]] + ret_blocks[bid] = dm.blocks[bid].convert_block(ret) + # ret_blocks[bid] = dm.blocks[bid].convert_to_phe_tensor(ret, shape=(len(ret), 1)) + else: + ret_blocks[bid] = dm.blocks[bid].convert_block(apply_blocks[idx]) return ret_blocks, dm diff --git a/python/fate/arch/dataframe/ops/_dimension_scaling.py b/python/fate/arch/dataframe/ops/_dimension_scaling.py index d8078b4df6..2f8c7eed9d 100644 --- a/python/fate/arch/dataframe/ops/_dimension_scaling.py +++ b/python/fate/arch/dataframe/ops/_dimension_scaling.py @@ -21,7 +21,7 @@ from ..manager.data_manager import DataManager from ..manager.block_manager import Block from ._compress_block import compress_blocks -from ._indexer import get_partition_order_by_raw_table +from ._indexer import get_partition_order_by_raw_table, get_partition_order_mappings_by_block_table from ._promote_types import promote_partial_block_types from ._set_item import set_item from fate.arch.tensor import DTensor @@ -118,8 +118,8 @@ def _align_blocks(blocks, align_fields_loc=None, full_block_migrate_set=None, ds l_block_table = promote_partial_block_types(l_block_table, narrow_blocks=narrow_blocks, dst_blocks=dst_blocks, data_manager=data_manager, dst_fields_loc=changed_fields_loc) - l_flatten_func = functools.partial(_flatten_partition, block_num=data_manager.block_num) - l_flatten = l_block_table.mapPartitions(l_flatten_func, use_previous_behavior=False) + # l_flatten_func = functools.partial(_flatten_partition, block_num=data_manager.block_num) + # l_flatten = l_block_table.mapPartitions(l_flatten_func, use_previous_behavior=False) for r_df in data_frames[1:]: r_field_names = r_df.data_manager.get_field_name_list() @@ -144,18 +144,31 @@ def _align_blocks(blocks, align_fields_loc=None, full_block_migrate_set=None, ds full_block_migrate_set=full_migrate_set, dst_dm=data_manager) r_block_table = r_block_table.mapValues(_align_func) - r_flatten_func = functools.partial(_flatten_partition, block_num=data_manager.block_num) - r_flatten = r_block_table.mapPartitions(r_flatten_func, use_previous_behavior=False) - l_flatten = l_flatten.union(r_flatten) + # r_flatten_func = functools.partial(_flatten_partition, block_num=data_manager.block_num) + # r_flatten = r_block_table.mapPartitions(r_flatten_func, use_previous_behavior=False) + # l_flatten = l_flatten.union(r_flatten) + l_block_table = l_block_table.union( + r_block_table, + lambda l_blocks, r_blocks: [ + Block.vstack([l_block, r_block]) for l_block, r_block in zip(l_blocks, r_blocks) + ] + ) - partition_order_mappings = get_partition_order_by_raw_table(l_flatten, data_manager.block_row_size) - _convert_to_block_func = functools.partial(to_blocks, dm=data_manager, partition_mappings=partition_order_mappings) - block_table = l_flatten.mapPartitions(_convert_to_block_func, use_previous_behavior=False) - block_table, data_manager = compress_blocks(block_table, data_manager) + # partition_order_mappings = get_partition_order_by_raw_table(l_flatten, data_manager.block_row_size) + # _convert_to_block_func = functools.partial(to_blocks, dm=data_manager, partition_mappings=partition_order_mappings) + # block_table = l_flatten.mapPartitions(_convert_to_block_func, use_previous_behavior=False) + # block_table, data_manager = compress_blocks(block_table, data_manager) + partition_order_mappings = get_partition_order_mappings_by_block_table(l_block_table, data_manager.block_row_size) + _balance_block_func = functools.partial(_balance_blocks, + partition_order_mappings=partition_order_mappings, + block_row_size=data_manager.block_row_size) + l_block_table = l_block_table.mapPartitions(_balance_block_func, + use_previous_behavior=False) + l_block_table, data_manager = compress_blocks(l_block_table, data_manager) return DataFrame( l_df._ctx, - block_table, + l_block_table, partition_order_mappings, data_manager ) @@ -180,10 +193,7 @@ def drop(df: "DataFrame", index: "DataFrame" = None) -> "DataFrame": ) l_flatten_table = df.block_table.mapPartitions(l_flatten_func, use_previous_behavior=False) - r_flatten_func = functools.partial( - _flatten_partition, - block_num=index.data_manager.block_num - ) + r_flatten_func = functools.partial(_flatten_partition_without_value) r_flatten_table = index.block_table.mapPartitions(r_flatten_func, use_previous_behavior=False) drop_flatten = l_flatten_table.subtractByKey(r_flatten_table) @@ -240,7 +250,6 @@ def sample(df: "DataFrame", n=None, frac: float =None, random_state=None) -> "Da def retrieval_row(df: "DataFrame", indexer: "DTensor"): if indexer.shape[1] != 1: raise ValueError("Row indexing by DTensor should have only one column filling with True/False") - block_num = df.data_manager.block_num block_num = df.data_manager.block_num def _flatten_data(kvs): @@ -272,6 +281,12 @@ def _flatten_data(kvs): ) +def _flatten_partition_without_value(kvs): + for block_id, blocks in kvs: + for sample_id in blocks[0]: + yield sample_id, [] + + def _flatten_partition(kvs, block_num=0): for block_id, blocks in kvs: flat_blocks = [Block.transform_block_to_list(block) for block in blocks] @@ -280,6 +295,30 @@ def _flatten_partition(kvs, block_num=0): yield flat_blocks[0][i], [flat_blocks[j][i] for j in range(1, block_num)] +def _balance_blocks(kvs, partition_order_mappings: dict=None, block_row_size: int=None): + block_id = None + previous_blocks = list() + for _, blocks in kvs: + if block_id is None: + sample_id = blocks[0][0] + block_id = partition_order_mappings[sample_id]["start_block_id"] + + if previous_blocks: + blocks = [Block.vstack([pre_block, block]) for pre_block, block in zip(previous_blocks, blocks)] + previous_blocks = list() + + row_size = len(blocks[0]) + for i in range(0, row_size, block_row_size): + if row_size - i < block_row_size: + previous_blocks = [block[i: row_size] for block in blocks] + else: + yield block_id, [block[i: i + block_row_size] for block in blocks] + block_id += 1 + + if previous_blocks: + yield block_id, previous_blocks + + def to_blocks(kvs, dm: DataManager = None, partition_mappings: dict = None): ret_blocks = [[] for _ in range(dm.block_num)] diff --git a/python/fate/arch/dataframe/ops/_indexer.py b/python/fate/arch/dataframe/ops/_indexer.py index f95262cc5e..7d60104fcf 100644 --- a/python/fate/arch/dataframe/ops/_indexer.py +++ b/python/fate/arch/dataframe/ops/_indexer.py @@ -14,6 +14,7 @@ # limitations under the License. # import functools +import numpy as np from ..manager import Block, DataManager from .._dataframe import DataFrame @@ -77,7 +78,8 @@ def _block_counter(kvs): first_block_id = 0 for k, v in kvs: if partition_key is None: - partition_key = k + partition_key = v[0][0] + # partition_key = k size += len(v[0]) @@ -170,7 +172,7 @@ def _merge_list(lhs, rhs): l_len = len(lhs) r_len = len(rhs) - ret = [[] for i in range(l_len + r_len)] + ret = [None] * (l_len + r_len) i, j, k = 0, 0, 0 while i < l_len and j < r_len: if lhs[i][0] < rhs[j][0]: @@ -233,15 +235,11 @@ def _convert_to_row(kvs): for dst_block_id, value_list in ret_dict.items(): yield dst_block_id, sorted(value_list) - from ._transformer import transform_list_block_to_frame_block - block_table = df.block_table.join(agg_indexer, lambda lhs, rhs: (lhs, rhs)) block_table = block_table.mapReducePartitions(_convert_to_row, _merge_list) _convert_to_frame_block_func = functools.partial(_convert_to_frame_block, data_manager=df.data_manager) block_table = block_table.mapValues(_convert_to_frame_block_func) - # block_table = block_table.mapValues(lambda values: [v[1] for v in values]) - # block_table = transform_list_block_to_frame_block(block_table, df.data_manager) partition_order_mappings = get_partition_order_mappings_by_block_table(block_table, df.data_manager.block_row_size) return DataFrame( diff --git a/python/fate/arch/dataframe/ops/_set_item.py b/python/fate/arch/dataframe/ops/_set_item.py index d2159feb18..cbd817f695 100644 --- a/python/fate/arch/dataframe/ops/_set_item.py +++ b/python/fate/arch/dataframe/ops/_set_item.py @@ -82,10 +82,15 @@ def _append_multi(blocks, item_list, bid_list=None, dm: DataManager=None): return ret_blocks - def _append_df(l_blocks, r_blocks, r_blocks_loc=None): + def _append_df(l_blocks, r_blocks, r_blocks_loc=None, dm=None): ret_blocks = [block for block in l_blocks] + l_bid = len(ret_blocks) for bid, offset in r_blocks_loc: - ret_blocks.append(r_blocks[bid][:, [offset]]) + if dm.blocks[bid].is_phe_tensor(): + ret_blocks.append(r_blocks[bid]) + else: + ret_blocks.append(r_blocks[bid][:, [offset]]) + l_bid += 1 return ret_blocks @@ -128,7 +133,7 @@ def _append_phe_tensor(l_blocks, r_tensor): raise ValueError("Setitem with rhs=DataFrame must have equal len keys") data_manager.append_columns(keys, block_types) - _append_func = functools.partial(_append_df, r_blocks_loc=operable_blocks_loc) + _append_func = functools.partial(_append_df, r_blocks_loc=operable_blocks_loc, dm=data_manager) block_table = df.block_table.join(items.block_table, _append_func) elif isinstance(items, DTensor): meta_data = items.shardings._data.mapValues(