Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dataframe: optimize op, modify histogram stat logic #5054

Merged
merged 7 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 30 additions & 30 deletions python/fate/arch/dataframe/ops/_apply_row.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,40 +71,40 @@ def _apply(blocks, func=None, src_field_names=None,
dm = dst_dm.duplicate()
apply_blocks = []
lines = len(blocks[0])
ret_column_len = len(ret_columns) if ret_columns is not None else None
block_types = []

flat_blocks = [Block.transform_block_to_list(block) for block in blocks]
for lid in range(lines):
apply_row_data = [flat_blocks[bid][lid][offset] for bid, offset in src_fields_loc]
apply_ret = func(pd.Series(apply_row_data, index=src_field_names))

if isinstance(apply_ret, Iterable):
apply_ret = list(apply_ret)
if ret_column_len is None:
ret_column_len = len(apply_ret)
elif ret_column_len != len(apply_ret):
raise ValueError("Result of apply row should have equal length")
else:
if ret_column_len and ret_column_len != 1:
raise ValueError("Result of apply row should have equal length")
apply_ret = [apply_ret]

if ret_column_len is None:
ret_column_len = len(apply_ret)

if not block_types:
block_types = [BlockType.get_block_type(value) for value in apply_ret]
apply_blocks = [[] for _ in range(ret_column_len)]

for idx, value in enumerate(apply_ret):
apply_blocks[idx].append([value])
apply_data = [[] for _ in range(lines)]
for bid, offset in src_fields_loc:
for lid in range(lines):
apply_data[lid].append(flat_blocks[bid][lid][offset])

apply_data = pd.DataFrame(apply_data, columns=src_field_names)
apply_ret = apply_data.apply(lambda row: func(row), axis=1).values.tolist()

if isinstance(apply_ret[0], Iterable):
first_row = list(apply_ret[0])
ret_column_len = len(first_row)
block_types = [BlockType.get_block_type(value) for value in first_row]
apply_blocks = [[] for _ in range(ret_column_len)]
for ret in apply_ret:
for idx, value in enumerate(ret):
apply_blocks[idx].append([value])

if enable_type_align_checking:
block_type = BlockType.get_block_type(value)
if block_types[idx] < block_type:
block_types[idx] = block_type
else:
block_types = [BlockType.get_block_type(apply_ret[0])]
apply_blocks.append([[ret] for ret in apply_ret])
ret_column_len = 1

if enable_type_align_checking:
for idx, value in enumerate(apply_ret):
block_type = BlockType.get_block_type(value)
if block_types[idx] < block_type:
block_types[idx] = block_type
for ret in apply_ret:
block_type = BlockType.get_block_type(ret)
if block_types[0] < block_type:
block_types[0] = block_type


if not ret_columns:
ret_columns = generated_default_column_names(ret_column_len)
Expand Down
21 changes: 12 additions & 9 deletions python/fate/arch/dataframe/ops/_compress_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import numpy as np
import torch
from ..manager import BlockType
from ..manager import DataManager


Expand All @@ -22,23 +25,23 @@ def compress_blocks(block_table, data_manager: DataManager):
return block_table, data_manager

def _compress(blocks):
ret_blocks = [[] for idx in range(data_manager.block_num)]
ret_blocks = [[] for _ in range(data_manager.block_num)]
for src_bid, dst_bid in non_compress_block_changes.items():
ret_blocks[dst_bid] = blocks[src_bid]

lines = len(blocks[0])
for dst_bid, block_loc in to_compress_block_loc:
block_buf = []
field_len = len(data_manager.get_block(dst_bid).field_indexes)
for lid in range(lines):
row_value = [None for idx in range(field_len)]
for src_bid, field_indexes in block_loc:
for field_idx, val in zip(field_indexes, blocks[src_bid][lid]):
row_value[field_idx] = val
block = data_manager.get_block(dst_bid)
if BlockType.is_tensor(block.block_type):
block_buf = torch.empty((lines, field_len), dtype=getattr(torch, block.block_type.value))
else:
block_buf = np.empty((lines, field_len), dtype=getattr(np, block.block_type.value))

block_buf.append(row_value)
for src_bid, field_indexes in block_loc:
block_buf[:, field_indexes] = blocks[src_bid]

ret_blocks[dst_bid] = data_manager.get_block(dst_bid).convert_block(block_buf)
ret_blocks[dst_bid] = block.convert_block(block_buf)

return ret_blocks

Expand Down
21 changes: 12 additions & 9 deletions python/fate/arch/dataframe/ops/_dimension_scaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,21 @@ def sample(df: "DataFrame", n=None, frac: float =None, random_state=None) -> "Da
def retrieval_row(df: "DataFrame", indexer: "DTensor"):
if indexer.shape[1] != 1:
raise ValueError("Row indexing by DTensor should have only one column filling with True/False")
block_num = df.data_manager.block_num

def _retrieval(blocks, t: torch.Tensor):
index = t.reshape(-1).tolist()
ret_blocks = [block[index] for block in blocks]
block_num = df.data_manager.block_num
def _flatten_data(kvs):
for _, (blocks, t) in kvs:
flat_blocks = [Block.transform_block_to_list(block) for block in blocks]
for i, v in enumerate(t):
v = v.item()
if not v:
continue
yield flat_blocks[0][i], [flat_blocks[j][i] for j in range(1, block_num)]

return ret_blocks

_retrieval_func = functools.partial(_retrieval)
retrieval_block_table = df.block_table.join(indexer.shardings._data, _retrieval_func)

_flatten_func = functools.partial(_flatten_partition, block_num=df.data_manager.block_num)
retrieval_raw_table = retrieval_block_table.mapPartitions(_flatten_func, use_previous_behavior=False)
block_table_with_index = df.block_table.join(indexer.shardings._data, lambda v1, v2: (v1, v2))
retrieval_raw_table = block_table_with_index.mapPartitions(_flatten_data, use_previous_behavior=False)

if retrieval_raw_table.count() == 0:
return df.empty_frame()
Expand Down
55 changes: 42 additions & 13 deletions python/fate/arch/dataframe/ops/_histogram.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
# limitations under the License.
#
import functools
import numpy as np
import torch
from typing import Union

from fate.arch.tensor.inside import Hist

Expand Down Expand Up @@ -43,28 +46,54 @@ def _reducer(l_histogram, r_histogram):
return block_table.join(targets.shardings._data, _mapper_func).reduce(_reducer)


def distributed_hist_stat(df: DataFrame, distributed_hist, position: DataFrame, targets: dict):
def distributed_hist_stat(df: DataFrame, distributed_hist, position: DataFrame, targets: Union[dict, DataFrame]):
block_table, data_manager = _try_to_compress_table(df.block_table, df.data_manager)
data_block_id = data_manager.infer_operable_blocks()[0]
position_block_id = position.data_manager.infer_operable_blocks()[0]

def _pack_data_with_position(l_blocks, r_blocks, l_block_id=None, r_block_id=None):
return l_blocks[l_block_id], r_blocks[r_block_id], dict()
if isinstance(targets, dict):
def _pack_data_with_position(l_blocks, r_blocks, l_block_id=None, r_block_id=None):
return l_blocks[l_block_id], r_blocks[r_block_id], dict()

def _pack_with_target(l_values, r_value, target_name):
l_values[2][target_name] = r_value
def _pack_with_target(l_values, r_value, target_name):
l_values[2][target_name] = r_value

return l_values
return l_values

_pack_func = functools.partial(_pack_data_with_position,
l_block_id=data_block_id,
r_block_id=position_block_id)
_pack_func = functools.partial(_pack_data_with_position,
l_block_id=data_block_id,
r_block_id=position_block_id)

data_with_position = block_table.join(position.block_table, _pack_func)
data_with_position = block_table.join(position.block_table, _pack_func)

for name, target in targets.items():
_pack_with_target_func = functools.partial(_pack_with_target, target_name=name)
data_with_position = data_with_position.join(target.shardings._data, _pack_with_target_func)
else:
data_with_position = block_table.join(
position.block_table,
lambda l_blocks, r_blocks: (l_blocks[data_block_id], r_blocks[position_block_id])
)

target_data_manager = targets.data_manager
target_field_names = target_data_manager.infer_operable_field_names()
fields_loc = target_data_manager.loc_block(target_field_names, with_offset=True)

def _pack_with_targets(l_blocks, r_blocks):
target_blocks = dict()
for field_name, (block_id, offset) in zip(target_field_names, fields_loc):
if (block := target_data_manager.get_block(block_id)).is_phe_tensor():
target_blocks[field_name] = block.convert_to_phe_tensor(
r_blocks[block_id],
shape=(len(r_blocks[0]), 1)
)
else:
target_blocks[field_name] = r_blocks[block_id][:, [offset]]

return l_blocks[0], l_blocks[1], target_blocks

data_with_position = data_with_position.join(targets.block_table, _pack_with_targets)

for name, target in targets.items():
_pack_with_target_func = functools.partial(_pack_with_target, target_name=name)
data_with_position = data_with_position.join(target.shardings._data, _pack_with_target_func)

return distributed_hist.i_update(data_with_position)

Expand Down
22 changes: 19 additions & 3 deletions python/fate/arch/dataframe/ops/_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ def _aggregate(kvs):
return agg_indexer


def _convert_to_frame_block(blocks, data_manager):
convert_blocks = []
for idx, block_schema in enumerate(data_manager.blocks):
block_content = [row_data[1][idx] for row_data in blocks]
convert_blocks.append(block_schema.convert_block(block_content))

return convert_blocks


def transform_to_table(block_table, block_index, partition_order_mappings):
def _convert_to_order_index(kvs):
for block_id, blocks in kvs:
Expand Down Expand Up @@ -228,8 +237,11 @@ def _convert_to_row(kvs):

block_table = df.block_table.join(agg_indexer, lambda lhs, rhs: (lhs, rhs))
block_table = block_table.mapReducePartitions(_convert_to_row, _merge_list)
block_table = block_table.mapValues(lambda values: [v[1] for v in values])
block_table = transform_list_block_to_frame_block(block_table, df.data_manager)

_convert_to_frame_block_func = functools.partial(_convert_to_frame_block, data_manager=df.data_manager)
block_table = block_table.mapValues(_convert_to_frame_block_func)
# block_table = block_table.mapValues(lambda values: [v[1] for v in values])
# block_table = transform_list_block_to_frame_block(block_table, df.data_manager)

partition_order_mappings = get_partition_order_mappings_by_block_table(block_table, df.data_manager.block_row_size)
return DataFrame(
Expand Down Expand Up @@ -369,18 +381,22 @@ def _convert_to_row(kvs):
for dst_block_id, value_list in ret_dict.items():
yield dst_block_id, sorted(value_list)

"""
def _convert_to_frame_block(blocks):
convert_blocks = []
for idx, block_schema in enumerate(data_manager.blocks):
block_content = [row_data[1][idx] for row_data in blocks]
convert_blocks.append(block_schema.convert_block(block_content))

return convert_blocks
"""

agg_indexer = indexer.mapReducePartitions(_aggregate, lambda l1, l2: l1 + l2)
block_table = df.block_table.join(agg_indexer, lambda v1, v2: (v1, v2))
block_table = block_table.mapReducePartitions(_convert_to_row, _merge_list)
block_table = block_table.mapValues(_convert_to_frame_block)

_convert_to_frame_block_func = functools.partial(_convert_to_frame_block, data_manager=data_manager)
block_table = block_table.mapValues(_convert_to_frame_block_func)

return DataFrame(
ctx=df._ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,12 @@ def compute_hist(self, ctx: Context, nodes: List[Node], bin_train_data: DataFram
)
indexer = bin_train_data.get_indexer('sample_id')
gh = gh.loc(indexer, preserve_order=True)
gh["cnt"] = 1
sample_pos = sample_pos.loc(indexer, preserve_order=True)
targets = {'g': gh['g'].as_tensor(), 'h': gh['h'].as_tensor(), 'cnt': bin_train_data.apply_row(lambda x: 1).as_tensor()}
map_sample_pos = sample_pos.create_frame()
map_sample_pos['node_idx'] = sample_pos.apply_row(lambda x: node_map[x['node_idx']])

stat_obj = bin_train_data.distributed_hist_stat(hist, map_sample_pos, targets)
stat_obj = bin_train_data.distributed_hist_stat(hist, map_sample_pos, gh)

return hist, stat_obj

Expand Down