Skip to content

Commit

Permalink
Merge pull request #5072 from FederatedAI/feature-2.0.0-beta-datafram…
Browse files Browse the repository at this point in the history
…e_refact

Feature 2.0.0 beta dataframe refact
  • Loading branch information
talkingwallace authored Aug 23, 2023
2 parents 94accbe + bddb36a commit e0d1161
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 26 deletions.
17 changes: 17 additions & 0 deletions python/fate/arch/dataframe/manager/block_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,21 @@ def transform_row_to_raw(cls, block, index):
else:
return block[index].tolist()

@classmethod
def vstack(cls, blocks):
ret = blocks[0]
if isinstance(ret, pd.Index):
for block in blocks[1:]:
ret = ret.union(block, sort=False)
elif isinstance(ret, torch.Tensor):
ret = torch.vstack(blocks)
elif isinstance(ret, np.ndarray):
ret = np.vstack(blocks)
else:
raise ValueError(f"Not implemented block vstack for type {type(ret)}")

return ret


class Int32Block(Block):
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -367,6 +382,8 @@ def set_extra_kwargs(self, pk, evaluator, coder, dtype, device):

@staticmethod
def convert_block(block):
if isinstance(block, list):
block = block[0].cat(block[1:])
return block

def convert_to_phe_tensor(self, block, shape):
Expand Down
13 changes: 12 additions & 1 deletion python/fate/arch/dataframe/ops/_apply_row.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,17 @@ def _apply(blocks, func=None, src_field_names=None,
ret_blocks[idx] = blocks[bid]

for idx, bid in enumerate(block_indexes):
ret_blocks[bid] = dm.blocks[bid].convert_block(apply_blocks[idx])
if dm.blocks[bid].is_phe_tensor():
single_value = apply_blocks[idx][0][0]
dm.blocks[bid].set_extra_kwargs(pk=single_value.pk,
evaluator=single_value.evaluator,
coder=single_value.coder,
dtype=single_value.dtype,
device=single_value.device)
ret = [v[0]._data for v in apply_blocks[idx]]
ret_blocks[bid] = dm.blocks[bid].convert_block(ret)
# ret_blocks[bid] = dm.blocks[bid].convert_to_phe_tensor(ret, shape=(len(ret), 1))
else:
ret_blocks[bid] = dm.blocks[bid].convert_block(apply_blocks[idx])

return ret_blocks, dm
71 changes: 55 additions & 16 deletions python/fate/arch/dataframe/ops/_dimension_scaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from ..manager.data_manager import DataManager
from ..manager.block_manager import Block
from ._compress_block import compress_blocks
from ._indexer import get_partition_order_by_raw_table
from ._indexer import get_partition_order_by_raw_table, get_partition_order_mappings_by_block_table
from ._promote_types import promote_partial_block_types
from ._set_item import set_item
from fate.arch.tensor import DTensor
Expand Down Expand Up @@ -118,8 +118,8 @@ def _align_blocks(blocks, align_fields_loc=None, full_block_migrate_set=None, ds
l_block_table = promote_partial_block_types(l_block_table, narrow_blocks=narrow_blocks, dst_blocks=dst_blocks,
data_manager=data_manager, dst_fields_loc=changed_fields_loc)

l_flatten_func = functools.partial(_flatten_partition, block_num=data_manager.block_num)
l_flatten = l_block_table.mapPartitions(l_flatten_func, use_previous_behavior=False)
# l_flatten_func = functools.partial(_flatten_partition, block_num=data_manager.block_num)
# l_flatten = l_block_table.mapPartitions(l_flatten_func, use_previous_behavior=False)

for r_df in data_frames[1:]:
r_field_names = r_df.data_manager.get_field_name_list()
Expand All @@ -144,18 +144,31 @@ def _align_blocks(blocks, align_fields_loc=None, full_block_migrate_set=None, ds
full_block_migrate_set=full_migrate_set, dst_dm=data_manager)
r_block_table = r_block_table.mapValues(_align_func)

r_flatten_func = functools.partial(_flatten_partition, block_num=data_manager.block_num)
r_flatten = r_block_table.mapPartitions(r_flatten_func, use_previous_behavior=False)
l_flatten = l_flatten.union(r_flatten)
# r_flatten_func = functools.partial(_flatten_partition, block_num=data_manager.block_num)
# r_flatten = r_block_table.mapPartitions(r_flatten_func, use_previous_behavior=False)
# l_flatten = l_flatten.union(r_flatten)
l_block_table = l_block_table.union(
r_block_table,
lambda l_blocks, r_blocks: [
Block.vstack([l_block, r_block]) for l_block, r_block in zip(l_blocks, r_blocks)
]
)

partition_order_mappings = get_partition_order_by_raw_table(l_flatten, data_manager.block_row_size)
_convert_to_block_func = functools.partial(to_blocks, dm=data_manager, partition_mappings=partition_order_mappings)
block_table = l_flatten.mapPartitions(_convert_to_block_func, use_previous_behavior=False)
block_table, data_manager = compress_blocks(block_table, data_manager)
# partition_order_mappings = get_partition_order_by_raw_table(l_flatten, data_manager.block_row_size)
# _convert_to_block_func = functools.partial(to_blocks, dm=data_manager, partition_mappings=partition_order_mappings)
# block_table = l_flatten.mapPartitions(_convert_to_block_func, use_previous_behavior=False)
# block_table, data_manager = compress_blocks(block_table, data_manager)
partition_order_mappings = get_partition_order_mappings_by_block_table(l_block_table, data_manager.block_row_size)
_balance_block_func = functools.partial(_balance_blocks,
partition_order_mappings=partition_order_mappings,
block_row_size=data_manager.block_row_size)
l_block_table = l_block_table.mapPartitions(_balance_block_func,
use_previous_behavior=False)
l_block_table, data_manager = compress_blocks(l_block_table, data_manager)

return DataFrame(
l_df._ctx,
block_table,
l_block_table,
partition_order_mappings,
data_manager
)
Expand All @@ -180,10 +193,7 @@ def drop(df: "DataFrame", index: "DataFrame" = None) -> "DataFrame":
)
l_flatten_table = df.block_table.mapPartitions(l_flatten_func, use_previous_behavior=False)

r_flatten_func = functools.partial(
_flatten_partition,
block_num=index.data_manager.block_num
)
r_flatten_func = functools.partial(_flatten_partition_without_value)
r_flatten_table = index.block_table.mapPartitions(r_flatten_func, use_previous_behavior=False)

drop_flatten = l_flatten_table.subtractByKey(r_flatten_table)
Expand Down Expand Up @@ -240,7 +250,6 @@ def sample(df: "DataFrame", n=None, frac: float =None, random_state=None) -> "Da
def retrieval_row(df: "DataFrame", indexer: "DTensor"):
if indexer.shape[1] != 1:
raise ValueError("Row indexing by DTensor should have only one column filling with True/False")
block_num = df.data_manager.block_num

block_num = df.data_manager.block_num
def _flatten_data(kvs):
Expand Down Expand Up @@ -272,6 +281,12 @@ def _flatten_data(kvs):
)


def _flatten_partition_without_value(kvs):
for block_id, blocks in kvs:
for sample_id in blocks[0]:
yield sample_id, []


def _flatten_partition(kvs, block_num=0):
for block_id, blocks in kvs:
flat_blocks = [Block.transform_block_to_list(block) for block in blocks]
Expand All @@ -280,6 +295,30 @@ def _flatten_partition(kvs, block_num=0):
yield flat_blocks[0][i], [flat_blocks[j][i] for j in range(1, block_num)]


def _balance_blocks(kvs, partition_order_mappings: dict=None, block_row_size: int=None):
block_id = None
previous_blocks = list()
for _, blocks in kvs:
if block_id is None:
sample_id = blocks[0][0]
block_id = partition_order_mappings[sample_id]["start_block_id"]

if previous_blocks:
blocks = [Block.vstack([pre_block, block]) for pre_block, block in zip(previous_blocks, blocks)]
previous_blocks = list()

row_size = len(blocks[0])
for i in range(0, row_size, block_row_size):
if row_size - i < block_row_size:
previous_blocks = [block[i: row_size] for block in blocks]
else:
yield block_id, [block[i: i + block_row_size] for block in blocks]
block_id += 1

if previous_blocks:
yield block_id, previous_blocks


def to_blocks(kvs, dm: DataManager = None, partition_mappings: dict = None):
ret_blocks = [[] for _ in range(dm.block_num)]

Expand Down
10 changes: 4 additions & 6 deletions python/fate/arch/dataframe/ops/_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
#
import functools
import numpy as np

from ..manager import Block, DataManager
from .._dataframe import DataFrame
Expand Down Expand Up @@ -77,7 +78,8 @@ def _block_counter(kvs):
first_block_id = 0
for k, v in kvs:
if partition_key is None:
partition_key = k
partition_key = v[0][0]
# partition_key = k

size += len(v[0])

Expand Down Expand Up @@ -170,7 +172,7 @@ def _merge_list(lhs, rhs):

l_len = len(lhs)
r_len = len(rhs)
ret = [[] for i in range(l_len + r_len)]
ret = [None] * (l_len + r_len)
i, j, k = 0, 0, 0
while i < l_len and j < r_len:
if lhs[i][0] < rhs[j][0]:
Expand Down Expand Up @@ -233,15 +235,11 @@ def _convert_to_row(kvs):
for dst_block_id, value_list in ret_dict.items():
yield dst_block_id, sorted(value_list)

from ._transformer import transform_list_block_to_frame_block

block_table = df.block_table.join(agg_indexer, lambda lhs, rhs: (lhs, rhs))
block_table = block_table.mapReducePartitions(_convert_to_row, _merge_list)

_convert_to_frame_block_func = functools.partial(_convert_to_frame_block, data_manager=df.data_manager)
block_table = block_table.mapValues(_convert_to_frame_block_func)
# block_table = block_table.mapValues(lambda values: [v[1] for v in values])
# block_table = transform_list_block_to_frame_block(block_table, df.data_manager)

partition_order_mappings = get_partition_order_mappings_by_block_table(block_table, df.data_manager.block_row_size)
return DataFrame(
Expand Down
11 changes: 8 additions & 3 deletions python/fate/arch/dataframe/ops/_set_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,15 @@ def _append_multi(blocks, item_list, bid_list=None, dm: DataManager=None):

return ret_blocks

def _append_df(l_blocks, r_blocks, r_blocks_loc=None):
def _append_df(l_blocks, r_blocks, r_blocks_loc=None, dm=None):
ret_blocks = [block for block in l_blocks]
l_bid = len(ret_blocks)
for bid, offset in r_blocks_loc:
ret_blocks.append(r_blocks[bid][:, [offset]])
if dm.blocks[bid].is_phe_tensor():
ret_blocks.append(r_blocks[bid])
else:
ret_blocks.append(r_blocks[bid][:, [offset]])
l_bid += 1

return ret_blocks

Expand Down Expand Up @@ -128,7 +133,7 @@ def _append_phe_tensor(l_blocks, r_tensor):
raise ValueError("Setitem with rhs=DataFrame must have equal len keys")
data_manager.append_columns(keys, block_types)

_append_func = functools.partial(_append_df, r_blocks_loc=operable_blocks_loc)
_append_func = functools.partial(_append_df, r_blocks_loc=operable_blocks_loc, dm=data_manager)
block_table = df.block_table.join(items.block_table, _append_func)
elif isinstance(items, DTensor):
meta_data = items.shardings._data.mapValues(
Expand Down

0 comments on commit e0d1161

Please sign in to comment.