Skip to content

Commit

Permalink
Merge pull request #5104 from FederatedAI/feature-2.0.0-beta-datafram…
Browse files Browse the repository at this point in the history
…e_refact

dataframe & secureboost optimize
  • Loading branch information
mgqa34 authored Aug 31, 2023
2 parents 57f0128 + 60b0304 commit a798bc6
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 20 deletions.
4 changes: 4 additions & 0 deletions python/fate/arch/dataframe/_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,10 @@ def loc(self, indexer, target="sample_id", preserve_order=False):

return loc(self, indexer, target=target, preserve_order=preserve_order)

def iloc(self, indexer: "DataFrame") -> "DataFrame":
from .ops._dimension_scaling import retrieval_row
return retrieval_row(self, indexer)

def loc_with_sample_id_replacement(self, indexer):
"""
indexer: table,
Expand Down
5 changes: 5 additions & 0 deletions python/fate/arch/dataframe/manager/block_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ def convert_block_type(self, block_type):
def retrieval_row(cls, block, indexes):
if isinstance(block, CiphertextVector):
return block.slice_indexes(indexes)
elif isinstance(block, pd.Index):
if isinstance(indexes, list):
return block[indexes]
else:
return pd.Index(block[indexes])
else:
return block[indexes]

Expand Down
100 changes: 98 additions & 2 deletions python/fate/arch/dataframe/ops/_dimension_scaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
import copy
import functools
from typing import List
from typing import List, Union
import torch
from sklearn.utils import resample
from .._dataframe import DataFrame
Expand Down Expand Up @@ -247,6 +247,7 @@ def sample(df: "DataFrame", n=None, frac: float =None, random_state=None) -> "Da
return sample_frame


"""
def retrieval_row(df: "DataFrame", indexer: "DTensor"):
if indexer.shape[1] != 1:
raise ValueError("Row indexing by DTensor should have only one column filling with True/False")
Expand Down Expand Up @@ -279,6 +280,69 @@ def _flatten_data(kvs):
partition_order_mappings,
df.data_manager
)
"""
def retrieval_row(df: "DataFrame", indexer: Union["DTensor", "DataFrame"]):
if indexer.shape[1] != 1:
raise ValueError("Row indexing by DTensor should have only one column filling with True/False")

data_manager = df.data_manager

def _block_counter(kvs, value_type="tensor"):
size = 0
first_block_id = None
for k, value in kvs:
if first_block_id is None:
first_block_id = k

if value_type == "tensor":
size += value.sum().item()
else:
size += len(value[0])

return first_block_id, size

if isinstance(indexer, DataFrame):
_block_counter_func = functools.partial(_block_counter, value_type="dataframe")
block_info = sorted([summary[1] for summary in indexer.block_table.applyPartitions(_block_counter_func).collect()])
else:
block_info = sorted([summary[1] for summary in indexer.shardings._data.applyPartitions(_block_counter).collect()])
block_order_mappings = dict()
start_index = 0
acc_block_num = 0
for block_id, block_size in block_info:
block_num = (block_size + data_manager.block_row_size - 1) // data_manager.block_row_size
block_order_mappings[block_id] = dict(
start_index=start_index,
end_index=start_index + block_size - 1,
start_block_id=acc_block_num,
end_block_id=acc_block_num + block_num - 1
)
start_index += block_size
acc_block_num += block_num

if start_index == 0:
return df.empty_frame()

if isinstance(indexer, DataFrame):
bid = indexer.data_manager.infer_operable_blocks()[0]
block_table = df.block_table.join(indexer.block_table, lambda v1, v2: (v1, v2[bid]))
else:
block_table = df.block_table.join(indexer.shardings._data, lambda v1, v2: (v1, v2))

_balance_block_func = functools.partial(_balance_blocks_with_index,
partition_order_mappings=block_order_mappings,
data_manager=data_manager)
block_table = block_table.mapPartitions(_balance_block_func,
use_previous_behavior=False)
block_table, data_manager = compress_blocks(block_table, data_manager)
partition_order_mappings = get_partition_order_mappings_by_block_table(block_table, data_manager.block_row_size)

return DataFrame(
df._ctx,
block_table,
partition_order_mappings,
data_manager
)


def _flatten_partition_without_value(kvs):
Expand All @@ -299,7 +363,7 @@ def _balance_blocks(kvs, partition_order_mappings: dict=None, block_row_size: in
block_id = None
previous_blocks = list()
for _, blocks in kvs:
if block_id is None:
if block_id is None and len(blocks[0]):
sample_id = blocks[0][0]
block_id = partition_order_mappings[sample_id]["start_block_id"]

Expand All @@ -318,6 +382,38 @@ def _balance_blocks(kvs, partition_order_mappings: dict=None, block_row_size: in
if previous_blocks:
yield block_id, previous_blocks

if block_id is None:
return []


def _balance_blocks_with_index(kvs, partition_order_mappings: dict=None, data_manager: DataManager=None):
block_id = None
block_num = data_manager.block_num
ret_blocks = [[] for _ in range(block_num)]
block_size = 0
for _, (blocks, t) in kvs:
if block_id is None:
block_id = partition_order_mappings[_]["start_block_id"]

flat_blocks = [Block.transform_block_to_list(block) for block in blocks]
for i, v in enumerate(t):
v = v.item()
if not v:
continue

block_size += 1
for j in range(block_num):
ret_blocks[j].append(flat_blocks[j][i])

if block_size == data_manager.block_row_size:
yield block_id, data_manager.convert_to_blocks(ret_blocks)
block_size = 0
block_id += 1
ret_blocks = [[] for _ in range(block_num)]

if block_size:
yield block_id, data_manager.convert_to_blocks(ret_blocks)


def to_blocks(kvs, dm: DataManager = None, partition_mappings: dict = None):
ret_blocks = [[] for _ in range(dm.block_num)]
Expand Down
7 changes: 5 additions & 2 deletions python/fate/arch/dataframe/ops/_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
#
import functools
import uuid

from ..manager import Block, DataManager
from .._dataframe import DataFrame
Expand Down Expand Up @@ -76,12 +77,14 @@ def _block_counter(kvs):
size = 0
first_block_id = 0
for k, v in kvs:
if partition_key is None:
if size == 0 and len(v[0]):
partition_key = v[0][0]
# partition_key = k

size += len(v[0])

if size == 0:
partition_key = str(first_block_id) + "_" + str(uuid.uuid1())

return first_block_id, (partition_key, size)

block_info = sorted([summary[1] for summary in block_table.applyPartitions(_block_counter).collect()])
Expand Down
7 changes: 4 additions & 3 deletions python/fate/arch/dataframe/utils/_k_fold.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ def _hetero_split(self, df: DataFrame):
else:
for _, iter_ctx in self._ctx.sub_ctx("KFold").ctxs_range(self._n_splits):
train_indexer, test_indexer = iter_ctx.guest.get("fold_indexes")
train_frame = df.loc(train_indexer)
test_frame = df.loc(test_indexer)
train_frame = df.loc(train_indexer, preserve_order=True)
test_frame = df.loc(test_indexer, preserve_order=True)

yield train_frame, test_frame

Expand All @@ -77,7 +77,8 @@ def _homo_split(self, df: DataFrame, return_indexer):
test_frame = df.loc(test_indexer)

if return_indexer:
yield train_frame, test_frame, train_indexer, test_indexer
yield train_frame, test_frame, \
train_frame.get_indexer(target="sample_id"), test_frame.get_indexer(target="sample_id")
else:
yield train_frame, test_frame

Expand Down
7 changes: 5 additions & 2 deletions python/fate/ml/ensemble/learner/decision_tree/hetero/guest.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ def _update_sample_pos(self, ctx, cur_layer_nodes: List[Node], sample_pos: DataF
data_with_pos = DataFrame.hstack([data, sample_pos])
map_func = functools.partial(_get_sample_on_local_nodes, cur_layer_node=cur_layer_nodes, node_map=node_map, sitename=sitename)
local_sample_idx = data_with_pos.apply_row(map_func)
local_samples = data_with_pos[local_sample_idx.values.as_tensor()]
# local_samples = data_with_pos[local_sample_idx.as_tensor()]
local_samples = data_with_pos.iloc(local_sample_idx)
logger.info('{}/{} samples on local nodes'.format(len(local_samples), len(data)))
if len(local_samples) == 0:
updated_sample_pos = None
Expand Down Expand Up @@ -147,7 +148,8 @@ def _update_sample_pos(self, ctx, cur_layer_nodes: List[Node], sample_pos: DataF
new_sample_pos = new_sample_pos # all samples are on host

# share new sample position with all hosts
ctx.hosts.put('new_sample_pos', (new_sample_pos.as_tensor(), new_sample_pos.get_indexer(target='sample_id')))
# ctx.hosts.put('new_sample_pos', (new_sample_pos.as_tensor(), new_sample_pos.get_indexer(target='sample_id')))
ctx.hosts.put('new_sample_pos', new_sample_pos)
self.sample_pos = new_sample_pos

return new_sample_pos
Expand Down Expand Up @@ -268,6 +270,7 @@ def booster_fit(self, ctx: Context, bin_train_data: DataFrame, grad_and_hess: Da
# Prepare for training
node_map = {}
cur_layer_node = [root_node]
grad_and_hess["cnt"] = 1

for cur_depth, sub_ctx in ctx.on_iterations.ctxs_range(self.max_depth):

Expand Down
14 changes: 10 additions & 4 deletions python/fate/ml/ensemble/learner/decision_tree/hetero/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,18 @@ def _update_sample_pos(self, ctx, cur_layer_nodes: List[Node], sample_pos: DataF
sitename = ctx.local.party[0] + '_' + ctx.local.party[1]
data_with_pos = DataFrame.hstack([data, sample_pos])
map_func = functools.partial(_get_sample_on_local_nodes, cur_layer_node=cur_layer_nodes, node_map=node_map, sitename=sitename)
local_sample_idx = data_with_pos.apply_row(map_func).values.as_tensor()
local_samples = data_with_pos[local_sample_idx]
# local_sample_idx = data_with_pos.apply_row(map_func).as_tensor()
# local_samples = data_with_pos[local_sample_idx]
local_sample_idx = data_with_pos.apply_row(map_func)
local_samples = data_with_pos.iloc(local_sample_idx)
logger.info('{} samples on local nodes'.format(len(local_samples)))

if len(local_samples) == 0:
updated_sample_pos = None
else:
updated_sample_pos = sample_pos.loc(local_samples.get_indexer(target="sample_id"), preserve_order=True).create_frame()
update_func = functools.partial(_update_sample_pos, cur_layer_node=cur_layer_nodes, node_map=node_map)
updated_sample_pos['node_idx'] = local_samples.apply_row(update_func)
updated_sample_pos = local_samples.create_frame()
updated_sample_pos["node_idx"] = local_samples.apply_row(update_func)

# synchronize sample pos
if updated_sample_pos is None:
Expand All @@ -101,10 +103,13 @@ def _update_sample_pos(self, ctx, cur_layer_nodes: List[Node], sample_pos: DataF
pos_index = updated_sample_pos.get_indexer(target='sample_id')
update_data = (True, (pos_data, pos_index))
ctx.guest.put('updated_data', update_data)
"""
new_pos_data, new_pos_indexer = ctx.guest.get('new_sample_pos')
new_sample_pos = sample_pos.create_frame()
new_sample_pos = new_sample_pos.loc(new_pos_indexer, preserve_order=True)
new_sample_pos['node_idx'] = new_pos_data
"""
new_sample_pos = ctx.guest.get('new_sample_pos')

return new_sample_pos

Expand Down Expand Up @@ -145,6 +150,7 @@ def booster_fit(self, ctx: Context, bin_train_data: DataFrame, binning_dict: dic

node_map = {}
cur_layer_node = [root_node]
en_grad_and_hess["cnt"] = 1
for cur_depth, sub_ctx in ctx.on_iterations.ctxs_range(self.max_depth):

if len(cur_layer_node) == 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,9 @@ def _initialize_root_node(self, ctx: Context, train_df: DataFrame, gh: DataFrame
if gh is None:
sum_g, sum_h = 0, 0
else:
sum_g = float(gh['g'].sum().iloc[0])
sum_h = float(gh['h'].sum().iloc[0])
sum_gh = gh.sum()
sum_g = float(sum_gh['g'])
sum_h = float(sum_gh['h'])
root_node = Node(nid=0, grad=sum_g, hess=sum_h, sitename=sitename, sample_num=len(train_df))

return root_node
Expand Down Expand Up @@ -365,15 +366,14 @@ def _drop_samples_on_leaves(self, new_sample_pos: DataFrame, data: DataFrame):
assert len(new_sample_pos) == len(data), 'sample pos num not match data num, got {} sample pos vs {} data'.format(len(new_sample_pos), len(data))
x = (new_sample_pos >= 0)
indexer = x.get_indexer('sample_id')
x_t = x.as_tensor()
update_pos = new_sample_pos[x_t]
new_data = data.loc(indexer, preserve_order=True)[x_t]
update_pos = new_sample_pos.iloc(x)
new_data = data.loc(indexer, preserve_order=True).iloc(x)
logger.info('drop leaf samples, new sample count is {}, {} samples dropped'.format(len(new_sample_pos), len(data) - len(new_data)))
return new_data, update_pos

def _get_samples_on_leaves(self, sample_pos: DataFrame):
x = (sample_pos < 0)
samples_on_leaves = sample_pos[x.as_tensor()]
samples_on_leaves = sample_pos.iloc(x)
return samples_on_leaves

def _get_column_max_bin(self, result_dict):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def compute_hist(self, ctx: Context, nodes: List[Node], bin_train_data: DataFram
)
indexer = bin_train_data.get_indexer('sample_id')
gh = gh.loc(indexer, preserve_order=True)
gh["cnt"] = 1
# gh["cnt"] = 1
sample_pos = sample_pos.loc(indexer, preserve_order=True)
map_sample_pos = sample_pos.create_frame()
map_sample_pos['node_idx'] = sample_pos.apply_row(lambda x: node_map[x['node_idx']])
Expand Down

0 comments on commit a798bc6

Please sign in to comment.