Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dataframe: optimize op, modify histogram stat logic #5054

Merged
merged 7 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 30 additions & 30 deletions python/fate/arch/dataframe/ops/_apply_row.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,40 +71,40 @@ def _apply(blocks, func=None, src_field_names=None,
dm = dst_dm.duplicate()
apply_blocks = []
lines = len(blocks[0])
ret_column_len = len(ret_columns) if ret_columns is not None else None
block_types = []

flat_blocks = [Block.transform_block_to_list(block) for block in blocks]
for lid in range(lines):
apply_row_data = [flat_blocks[bid][lid][offset] for bid, offset in src_fields_loc]
apply_ret = func(pd.Series(apply_row_data, index=src_field_names))

if isinstance(apply_ret, Iterable):
apply_ret = list(apply_ret)
if ret_column_len is None:
ret_column_len = len(apply_ret)
elif ret_column_len != len(apply_ret):
raise ValueError("Result of apply row should have equal length")
else:
if ret_column_len and ret_column_len != 1:
raise ValueError("Result of apply row should have equal length")
apply_ret = [apply_ret]

if ret_column_len is None:
ret_column_len = len(apply_ret)

if not block_types:
block_types = [BlockType.get_block_type(value) for value in apply_ret]
apply_blocks = [[] for _ in range(ret_column_len)]

for idx, value in enumerate(apply_ret):
apply_blocks[idx].append([value])
apply_data = [[] for _ in range(lines)]
for bid, offset in src_fields_loc:
for lid in range(lines):
apply_data[lid].append(flat_blocks[bid][lid][offset])

apply_data = pd.DataFrame(apply_data, columns=src_field_names)
apply_ret = apply_data.apply(lambda row: func(row), axis=1).values.tolist()

if isinstance(apply_ret[0], Iterable):
first_row = list(apply_ret[0])
ret_column_len = len(first_row)
block_types = [BlockType.get_block_type(value) for value in first_row]
apply_blocks = [[] for _ in range(ret_column_len)]
for ret in apply_ret:
for idx, value in enumerate(ret):
apply_blocks[idx].append([value])

if enable_type_align_checking:
block_type = BlockType.get_block_type(value)
if block_types[idx] < block_type:
block_types[idx] = block_type
else:
block_types = [BlockType.get_block_type(apply_ret[0])]
apply_blocks.append([[ret] for ret in apply_ret])
ret_column_len = 1

if enable_type_align_checking:
for idx, value in enumerate(apply_ret):
block_type = BlockType.get_block_type(value)
if block_types[idx] < block_type:
block_types[idx] = block_type
for ret in apply_ret:
block_type = BlockType.get_block_type(ret)
if block_types[0] < block_type:
block_types[0] = block_type


if not ret_columns:
ret_columns = generated_default_column_names(ret_column_len)
Expand Down
55 changes: 42 additions & 13 deletions python/fate/arch/dataframe/ops/_histogram.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
# limitations under the License.
#
import functools
import numpy as np
import torch
from typing import Union

from fate.arch.tensor.inside import Hist

Expand Down Expand Up @@ -43,28 +46,54 @@ def _reducer(l_histogram, r_histogram):
return block_table.join(targets.shardings._data, _mapper_func).reduce(_reducer)


def distributed_hist_stat(df: DataFrame, distributed_hist, position: DataFrame, targets: dict):
def distributed_hist_stat(df: DataFrame, distributed_hist, position: DataFrame, targets: Union[dict, DataFrame]):
block_table, data_manager = _try_to_compress_table(df.block_table, df.data_manager)
data_block_id = data_manager.infer_operable_blocks()[0]
position_block_id = position.data_manager.infer_operable_blocks()[0]

def _pack_data_with_position(l_blocks, r_blocks, l_block_id=None, r_block_id=None):
return l_blocks[l_block_id], r_blocks[r_block_id], dict()
if isinstance(targets, dict):
def _pack_data_with_position(l_blocks, r_blocks, l_block_id=None, r_block_id=None):
return l_blocks[l_block_id], r_blocks[r_block_id], dict()

def _pack_with_target(l_values, r_value, target_name):
l_values[2][target_name] = r_value
def _pack_with_target(l_values, r_value, target_name):
l_values[2][target_name] = r_value

return l_values
return l_values

_pack_func = functools.partial(_pack_data_with_position,
l_block_id=data_block_id,
r_block_id=position_block_id)
_pack_func = functools.partial(_pack_data_with_position,
l_block_id=data_block_id,
r_block_id=position_block_id)

data_with_position = block_table.join(position.block_table, _pack_func)
data_with_position = block_table.join(position.block_table, _pack_func)

for name, target in targets.items():
_pack_with_target_func = functools.partial(_pack_with_target, target_name=name)
data_with_position = data_with_position.join(target.shardings._data, _pack_with_target_func)
else:
data_with_position = block_table.join(
position.block_table,
lambda l_blocks, r_blocks: (l_blocks[data_block_id], r_blocks[position_block_id])
)

target_data_manager = targets.data_manager
target_field_names = target_data_manager.infer_operable_field_names()
fields_loc = target_data_manager.loc_block(target_field_names, with_offset=True)

def _pack_with_targets(l_blocks, r_blocks):
target_blocks = dict()
for field_name, (block_id, offset) in zip(target_field_names, fields_loc):
if (block := target_data_manager.get_block(block_id)).is_phe_tensor():
target_blocks[field_name] = block.convert_to_phe_tensor(
r_blocks[block_id],
shape=(len(r_blocks[0]), 1)
)
else:
target_blocks[field_name] = r_blocks[block_id][:, [offset]]

return l_blocks[0], l_blocks[1], target_blocks

data_with_position = data_with_position.join(targets.block_table, _pack_with_targets)

for name, target in targets.items():
_pack_with_target_func = functools.partial(_pack_with_target, target_name=name)
data_with_position = data_with_position.join(target.shardings._data, _pack_with_target_func)

return distributed_hist.i_update(data_with_position)

Expand Down
22 changes: 19 additions & 3 deletions python/fate/arch/dataframe/ops/_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ def _aggregate(kvs):
return agg_indexer


def _convert_to_frame_block(blocks, data_manager):
convert_blocks = []
for idx, block_schema in enumerate(data_manager.blocks):
block_content = [row_data[1][idx] for row_data in blocks]
convert_blocks.append(block_schema.convert_block(block_content))

return convert_blocks


def transform_to_table(block_table, block_index, partition_order_mappings):
def _convert_to_order_index(kvs):
for block_id, blocks in kvs:
Expand Down Expand Up @@ -228,8 +237,11 @@ def _convert_to_row(kvs):

block_table = df.block_table.join(agg_indexer, lambda lhs, rhs: (lhs, rhs))
block_table = block_table.mapReducePartitions(_convert_to_row, _merge_list)
block_table = block_table.mapValues(lambda values: [v[1] for v in values])
block_table = transform_list_block_to_frame_block(block_table, df.data_manager)

_convert_to_frame_block_func = functools.partial(_convert_to_frame_block, data_manager=df.data_manager)
block_table = block_table.mapValues(_convert_to_frame_block_func)
# block_table = block_table.mapValues(lambda values: [v[1] for v in values])
# block_table = transform_list_block_to_frame_block(block_table, df.data_manager)

partition_order_mappings = get_partition_order_mappings_by_block_table(block_table, df.data_manager.block_row_size)
return DataFrame(
Expand Down Expand Up @@ -369,18 +381,22 @@ def _convert_to_row(kvs):
for dst_block_id, value_list in ret_dict.items():
yield dst_block_id, sorted(value_list)

"""
def _convert_to_frame_block(blocks):
convert_blocks = []
for idx, block_schema in enumerate(data_manager.blocks):
block_content = [row_data[1][idx] for row_data in blocks]
convert_blocks.append(block_schema.convert_block(block_content))

return convert_blocks
"""

agg_indexer = indexer.mapReducePartitions(_aggregate, lambda l1, l2: l1 + l2)
block_table = df.block_table.join(agg_indexer, lambda v1, v2: (v1, v2))
block_table = block_table.mapReducePartitions(_convert_to_row, _merge_list)
block_table = block_table.mapValues(_convert_to_frame_block)

_convert_to_frame_block_func = functools.partial(_convert_to_frame_block, data_manager=data_manager)
block_table = block_table.mapValues(_convert_to_frame_block_func)

return DataFrame(
ctx=df._ctx,
Expand Down
126 changes: 124 additions & 2 deletions python/fate/ml/ensemble/learner/decision_tree/tree_core/hist.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,33 @@
#
# Copyright 2019 The FATE Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
from typing import Dict
from sklearn.ensemble._hist_gradient_boosting.grower import HistogramBuilder
from fate.arch.histogram.histogram import DistributedHistogram, Histogram
from fate.ml.ensemble.learner.decision_tree.tree_core.decision_tree import Node
from typing import List
import numpy as np
import pandas as pd
from fate.arch.dataframe import DataFrame
from fate.arch import Context
import logging



HIST_TYPE = ['distributed', 'sklearn']

class SklearnHistBuilder(object):

def __init__(self, bin_data, bin_num, g, h) -> None:
Expand All @@ -19,11 +42,13 @@ def __init__(self, bin_data, bin_num, g, h) -> None:
self.hist_builder = hist_builder


def compute_hist(self, nodes: List[Node], bin_train_data=None, gh=None, sample_pos: DataFrame = None, node_map={}):
def compute_hist(self, nodes: List[Node], bin_train_data=None, gh=None, sample_pos: DataFrame = None, node_map={}, debug=False):

grouped = sample_pos.as_pd_df().groupby('node_idx')['sample_id'].apply(np.array).apply(np.uint32)
data_indices = [None for i in range(len(nodes))]
inverse_node_map = {v: k for k, v in node_map.items()}
print('grouped is {}'.format(grouped.keys()))
print('node map is {}'.format(node_map))
for idx, node in enumerate(nodes):
data_indices[idx] = grouped[inverse_node_map[idx]]

Expand All @@ -38,4 +63,101 @@ def compute_hist(self, nodes: List[Node], bin_train_data=None, gh=None, sample_p
hists.append([g, h, count])
idx += 1

return hists
if debug:
return hists, data_indices
else:
return hists


# def get_hist_builder(bin_train_data, grad_and_hess, root_node, max_bin, bin_info, hist_type='distributed'):

# assert hist_type in HIST_TYPE, 'hist_type should be in {}'.format(HIST_TYPE)

# if hist_type == 'distributed':
# pass

# if hist_type == 'sklearn':

# if isinstance(bin_train_data, DataFrame):
# data = bin_train_data.as_pd_df()
# elif isinstance(bin_train_data, pd.DataFrame):
# data = bin_train_data

# if isinstance(grad_and_hess, DataFrame):
# gh = grad_and_hess.as_pd_df()
# elif isinstance(grad_and_hess, pd.DataFrame):
# gh = grad_and_hess

# data['sample_id'] = data['sample_id'].astype(np.uint32)
# gh['sample_id'] = gh['sample_id'].astype(np.uint32)
# collect_data = data.sort_values(by='sample_id')
# collect_gh = gh.sort_values(by='sample_id')
# if bin_train_data.schema.label_name is None:
# feat_arr = collect_data.drop(columns=[bin_train_data.schema.sample_id_name, bin_train_data.schema.match_id_name]).values
# else:
# feat_arr = collect_data.drop(columns=[bin_train_data.schema.sample_id_name, bin_train_data.schema.label_name, bin_train_data.schema.match_id_name]).values
# g = collect_gh['g'].values
# h = collect_gh['h'].values
# feat_arr = np.asfortranarray(feat_arr.astype(np.uint8))
# return SklearnHistBuilder(feat_arr, max_bin, g, h)

class SBTHistogramBuilder(object):

def __init__(self, bin_train_data: DataFrame, bin_info: dict, random_seed=None) -> None:

columns = bin_train_data.schema.columns
self.random_seed = random_seed
self.feat_bin_num = [len(bin_info[feat]) for feat in columns]

def _get_plain_text_schema(self):
return {
"g": {"type": "tensor", "stride": 1, "dtype": torch.float32},
"h": {"type": "tensor", "stride": 1, "dtype": torch.float32},
"cnt": {"type": "tensor", "stride": 1, "dtype": torch.float32},
}

def _get_enc_hist_schema(self, pk, evaluator):
return {
"g":{"type": "paillier", "stride": 1, "pk": pk, "evaluator": evaluator},
"h":{"type": "paillier", "stride": 1, "pk": pk, "evaluator": evaluator},
"cnt": {"type": "tensor", "stride": 1, "dtype": torch.float32},
}

def compute_hist(self, ctx: Context, nodes: List[Node], bin_train_data: DataFrame, gh: DataFrame, sample_pos: DataFrame = None, node_map={}, pk=None, evaluator=None):

node_num = len(nodes)
if ctx.is_on_guest:
schema = self._get_plain_text_schema()
elif ctx.is_on_host:
if pk is None or evaluator is None:
schema = self._get_plain_text_schema()
else:
schema = self._get_enc_hist_schema(pk, evaluator)

hist = DistributedHistogram(
node_size=node_num,
feature_bin_sizes=self.feat_bin_num,
value_schemas=schema,
seed=self.random_seed,
)
indexer = bin_train_data.get_indexer('sample_id')
gh = gh.loc(indexer, preserve_order=True)
gh["cnt"] = 1
sample_pos = sample_pos.loc(indexer, preserve_order=True)
map_sample_pos = sample_pos.create_frame()
map_sample_pos['node_idx'] = sample_pos.apply_row(lambda x: node_map[x['node_idx']])

stat_obj = bin_train_data.distributed_hist_stat(hist, map_sample_pos, gh)

return hist, stat_obj

def recover_feature_bins(self, hist: DistributedHistogram, nid_split_id: Dict[int, int], node_map: dict) -> Dict[int, int]:
if self.random_seed is None:
return nid_split_id # randome seed has no shuffle, no need to recover
else:
reverse_node_map = {v: k for k, v in node_map.items()}
nid_split_id_ = {node_map[k]: v for k, v in nid_split_id.items()}
recover = hist.recover_feature_bins(self.random_seed, nid_split_id_)
print('recover rs is', recover)
recover_rs = {reverse_node_map[k]: v for k, v in recover.items()}
return recover_rs