diff --git a/python/fate/arch/dataframe/_dataframe.py b/python/fate/arch/dataframe/_dataframe.py index b3f33d5b94..8073a03938 100644 --- a/python/fate/arch/dataframe/_dataframe.py +++ b/python/fate/arch/dataframe/_dataframe.py @@ -503,6 +503,10 @@ def loc(self, indexer, target="sample_id", preserve_order=False): return loc(self, indexer, target=target, preserve_order=preserve_order) + def iloc(self, indexer: "DataFrame") -> "DataFrame": + from .ops._dimension_scaling import retrieval_row + return retrieval_row(self, indexer) + def loc_with_sample_id_replacement(self, indexer): """ indexer: table, diff --git a/python/fate/arch/dataframe/manager/block_manager.py b/python/fate/arch/dataframe/manager/block_manager.py index 5073685d8a..08ea708b52 100644 --- a/python/fate/arch/dataframe/manager/block_manager.py +++ b/python/fate/arch/dataframe/manager/block_manager.py @@ -249,6 +249,11 @@ def convert_block_type(self, block_type): def retrieval_row(cls, block, indexes): if isinstance(block, CiphertextVector): return block.slice_indexes(indexes) + elif isinstance(block, pd.Index): + if isinstance(indexes, list): + return block[indexes] + else: + return pd.Index(block[indexes]) else: return block[indexes] diff --git a/python/fate/arch/dataframe/ops/_dimension_scaling.py b/python/fate/arch/dataframe/ops/_dimension_scaling.py index 2f8c7eed9d..309ad087fc 100644 --- a/python/fate/arch/dataframe/ops/_dimension_scaling.py +++ b/python/fate/arch/dataframe/ops/_dimension_scaling.py @@ -14,7 +14,7 @@ # limitations under the License. import copy import functools -from typing import List +from typing import List, Union import torch from sklearn.utils import resample from .._dataframe import DataFrame @@ -247,6 +247,7 @@ def sample(df: "DataFrame", n=None, frac: float =None, random_state=None) -> "Da return sample_frame +""" def retrieval_row(df: "DataFrame", indexer: "DTensor"): if indexer.shape[1] != 1: raise ValueError("Row indexing by DTensor should have only one column filling with True/False") @@ -279,6 +280,69 @@ def _flatten_data(kvs): partition_order_mappings, df.data_manager ) +""" +def retrieval_row(df: "DataFrame", indexer: Union["DTensor", "DataFrame"]): + if indexer.shape[1] != 1: + raise ValueError("Row indexing by DTensor should have only one column filling with True/False") + + data_manager = df.data_manager + + def _block_counter(kvs, value_type="tensor"): + size = 0 + first_block_id = None + for k, value in kvs: + if first_block_id is None: + first_block_id = k + + if value_type == "tensor": + size += value.sum().item() + else: + size += len(value[0]) + + return first_block_id, size + + if isinstance(indexer, DataFrame): + _block_counter_func = functools.partial(_block_counter, value_type="dataframe") + block_info = sorted([summary[1] for summary in indexer.block_table.applyPartitions(_block_counter_func).collect()]) + else: + block_info = sorted([summary[1] for summary in indexer.shardings._data.applyPartitions(_block_counter).collect()]) + block_order_mappings = dict() + start_index = 0 + acc_block_num = 0 + for block_id, block_size in block_info: + block_num = (block_size + data_manager.block_row_size - 1) // data_manager.block_row_size + block_order_mappings[block_id] = dict( + start_index=start_index, + end_index=start_index + block_size - 1, + start_block_id=acc_block_num, + end_block_id=acc_block_num + block_num - 1 + ) + start_index += block_size + acc_block_num += block_num + + if start_index == 0: + return df.empty_frame() + + if isinstance(indexer, DataFrame): + bid = indexer.data_manager.infer_operable_blocks()[0] + block_table = df.block_table.join(indexer.block_table, lambda v1, v2: (v1, v2[bid])) + else: + block_table = df.block_table.join(indexer.shardings._data, lambda v1, v2: (v1, v2)) + + _balance_block_func = functools.partial(_balance_blocks_with_index, + partition_order_mappings=block_order_mappings, + data_manager=data_manager) + block_table = block_table.mapPartitions(_balance_block_func, + use_previous_behavior=False) + block_table, data_manager = compress_blocks(block_table, data_manager) + partition_order_mappings = get_partition_order_mappings_by_block_table(block_table, data_manager.block_row_size) + + return DataFrame( + df._ctx, + block_table, + partition_order_mappings, + data_manager + ) def _flatten_partition_without_value(kvs): @@ -299,7 +363,7 @@ def _balance_blocks(kvs, partition_order_mappings: dict=None, block_row_size: in block_id = None previous_blocks = list() for _, blocks in kvs: - if block_id is None: + if block_id is None and len(blocks[0]): sample_id = blocks[0][0] block_id = partition_order_mappings[sample_id]["start_block_id"] @@ -318,6 +382,38 @@ def _balance_blocks(kvs, partition_order_mappings: dict=None, block_row_size: in if previous_blocks: yield block_id, previous_blocks + if block_id is None: + return [] + + +def _balance_blocks_with_index(kvs, partition_order_mappings: dict=None, data_manager: DataManager=None): + block_id = None + block_num = data_manager.block_num + ret_blocks = [[] for _ in range(block_num)] + block_size = 0 + for _, (blocks, t) in kvs: + if block_id is None: + block_id = partition_order_mappings[_]["start_block_id"] + + flat_blocks = [Block.transform_block_to_list(block) for block in blocks] + for i, v in enumerate(t): + v = v.item() + if not v: + continue + + block_size += 1 + for j in range(block_num): + ret_blocks[j].append(flat_blocks[j][i]) + + if block_size == data_manager.block_row_size: + yield block_id, data_manager.convert_to_blocks(ret_blocks) + block_size = 0 + block_id += 1 + ret_blocks = [[] for _ in range(block_num)] + + if block_size: + yield block_id, data_manager.convert_to_blocks(ret_blocks) + def to_blocks(kvs, dm: DataManager = None, partition_mappings: dict = None): ret_blocks = [[] for _ in range(dm.block_num)] diff --git a/python/fate/arch/dataframe/ops/_indexer.py b/python/fate/arch/dataframe/ops/_indexer.py index e32e13d66a..b141f43a2d 100644 --- a/python/fate/arch/dataframe/ops/_indexer.py +++ b/python/fate/arch/dataframe/ops/_indexer.py @@ -14,6 +14,7 @@ # limitations under the License. # import functools +import uuid from ..manager import Block, DataManager from .._dataframe import DataFrame @@ -76,12 +77,14 @@ def _block_counter(kvs): size = 0 first_block_id = 0 for k, v in kvs: - if partition_key is None: + if size == 0 and len(v[0]): partition_key = v[0][0] - # partition_key = k size += len(v[0]) + if size == 0: + partition_key = str(first_block_id) + "_" + str(uuid.uuid1()) + return first_block_id, (partition_key, size) block_info = sorted([summary[1] for summary in block_table.applyPartitions(_block_counter).collect()]) diff --git a/python/fate/arch/dataframe/utils/_k_fold.py b/python/fate/arch/dataframe/utils/_k_fold.py index c175576983..9d0b58f741 100644 --- a/python/fate/arch/dataframe/utils/_k_fold.py +++ b/python/fate/arch/dataframe/utils/_k_fold.py @@ -52,8 +52,8 @@ def _hetero_split(self, df: DataFrame): else: for _, iter_ctx in self._ctx.sub_ctx("KFold").ctxs_range(self._n_splits): train_indexer, test_indexer = iter_ctx.guest.get("fold_indexes") - train_frame = df.loc(train_indexer) - test_frame = df.loc(test_indexer) + train_frame = df.loc(train_indexer, preserve_order=True) + test_frame = df.loc(test_indexer, preserve_order=True) yield train_frame, test_frame @@ -77,7 +77,8 @@ def _homo_split(self, df: DataFrame, return_indexer): test_frame = df.loc(test_indexer) if return_indexer: - yield train_frame, test_frame, train_indexer, test_indexer + yield train_frame, test_frame, \ + train_frame.get_indexer(target="sample_id"), test_frame.get_indexer(target="sample_id") else: yield train_frame, test_frame diff --git a/python/fate/ml/ensemble/learner/decision_tree/hetero/guest.py b/python/fate/ml/ensemble/learner/decision_tree/hetero/guest.py index 83731c5f36..ca37ec13e0 100644 --- a/python/fate/ml/ensemble/learner/decision_tree/hetero/guest.py +++ b/python/fate/ml/ensemble/learner/decision_tree/hetero/guest.py @@ -110,7 +110,8 @@ def _update_sample_pos(self, ctx, cur_layer_nodes: List[Node], sample_pos: DataF data_with_pos = DataFrame.hstack([data, sample_pos]) map_func = functools.partial(_get_sample_on_local_nodes, cur_layer_node=cur_layer_nodes, node_map=node_map, sitename=sitename) local_sample_idx = data_with_pos.apply_row(map_func) - local_samples = data_with_pos[local_sample_idx.values.as_tensor()] + # local_samples = data_with_pos[local_sample_idx.as_tensor()] + local_samples = data_with_pos.iloc(local_sample_idx) logger.info('{}/{} samples on local nodes'.format(len(local_samples), len(data))) if len(local_samples) == 0: updated_sample_pos = None @@ -147,7 +148,8 @@ def _update_sample_pos(self, ctx, cur_layer_nodes: List[Node], sample_pos: DataF new_sample_pos = new_sample_pos # all samples are on host # share new sample position with all hosts - ctx.hosts.put('new_sample_pos', (new_sample_pos.as_tensor(), new_sample_pos.get_indexer(target='sample_id'))) + # ctx.hosts.put('new_sample_pos', (new_sample_pos.as_tensor(), new_sample_pos.get_indexer(target='sample_id'))) + ctx.hosts.put('new_sample_pos', new_sample_pos) self.sample_pos = new_sample_pos return new_sample_pos @@ -268,6 +270,7 @@ def booster_fit(self, ctx: Context, bin_train_data: DataFrame, grad_and_hess: Da # Prepare for training node_map = {} cur_layer_node = [root_node] + grad_and_hess["cnt"] = 1 for cur_depth, sub_ctx in ctx.on_iterations.ctxs_range(self.max_depth): diff --git a/python/fate/ml/ensemble/learner/decision_tree/hetero/host.py b/python/fate/ml/ensemble/learner/decision_tree/hetero/host.py index 5a056280e2..1d5cb3e10f 100644 --- a/python/fate/ml/ensemble/learner/decision_tree/hetero/host.py +++ b/python/fate/ml/ensemble/learner/decision_tree/hetero/host.py @@ -82,16 +82,18 @@ def _update_sample_pos(self, ctx, cur_layer_nodes: List[Node], sample_pos: DataF sitename = ctx.local.party[0] + '_' + ctx.local.party[1] data_with_pos = DataFrame.hstack([data, sample_pos]) map_func = functools.partial(_get_sample_on_local_nodes, cur_layer_node=cur_layer_nodes, node_map=node_map, sitename=sitename) - local_sample_idx = data_with_pos.apply_row(map_func).values.as_tensor() - local_samples = data_with_pos[local_sample_idx] + # local_sample_idx = data_with_pos.apply_row(map_func).as_tensor() + # local_samples = data_with_pos[local_sample_idx] + local_sample_idx = data_with_pos.apply_row(map_func) + local_samples = data_with_pos.iloc(local_sample_idx) logger.info('{} samples on local nodes'.format(len(local_samples))) if len(local_samples) == 0: updated_sample_pos = None else: - updated_sample_pos = sample_pos.loc(local_samples.get_indexer(target="sample_id"), preserve_order=True).create_frame() update_func = functools.partial(_update_sample_pos, cur_layer_node=cur_layer_nodes, node_map=node_map) - updated_sample_pos['node_idx'] = local_samples.apply_row(update_func) + updated_sample_pos = local_samples.create_frame() + updated_sample_pos["node_idx"] = local_samples.apply_row(update_func) # synchronize sample pos if updated_sample_pos is None: @@ -101,10 +103,13 @@ def _update_sample_pos(self, ctx, cur_layer_nodes: List[Node], sample_pos: DataF pos_index = updated_sample_pos.get_indexer(target='sample_id') update_data = (True, (pos_data, pos_index)) ctx.guest.put('updated_data', update_data) + """ new_pos_data, new_pos_indexer = ctx.guest.get('new_sample_pos') new_sample_pos = sample_pos.create_frame() new_sample_pos = new_sample_pos.loc(new_pos_indexer, preserve_order=True) new_sample_pos['node_idx'] = new_pos_data + """ + new_sample_pos = ctx.guest.get('new_sample_pos') return new_sample_pos @@ -145,6 +150,7 @@ def booster_fit(self, ctx: Context, bin_train_data: DataFrame, binning_dict: dic node_map = {} cur_layer_node = [root_node] + en_grad_and_hess["cnt"] = 1 for cur_depth, sub_ctx in ctx.on_iterations.ctxs_range(self.max_depth): if len(cur_layer_node) == 0: diff --git a/python/fate/ml/ensemble/learner/decision_tree/tree_core/decision_tree.py b/python/fate/ml/ensemble/learner/decision_tree/tree_core/decision_tree.py index 4095915a8f..0b7ead89e9 100644 --- a/python/fate/ml/ensemble/learner/decision_tree/tree_core/decision_tree.py +++ b/python/fate/ml/ensemble/learner/decision_tree/tree_core/decision_tree.py @@ -269,8 +269,9 @@ def _initialize_root_node(self, ctx: Context, train_df: DataFrame, gh: DataFrame if gh is None: sum_g, sum_h = 0, 0 else: - sum_g = float(gh['g'].sum().iloc[0]) - sum_h = float(gh['h'].sum().iloc[0]) + sum_gh = gh.sum() + sum_g = float(sum_gh['g']) + sum_h = float(sum_gh['h']) root_node = Node(nid=0, grad=sum_g, hess=sum_h, sitename=sitename, sample_num=len(train_df)) return root_node @@ -365,15 +366,14 @@ def _drop_samples_on_leaves(self, new_sample_pos: DataFrame, data: DataFrame): assert len(new_sample_pos) == len(data), 'sample pos num not match data num, got {} sample pos vs {} data'.format(len(new_sample_pos), len(data)) x = (new_sample_pos >= 0) indexer = x.get_indexer('sample_id') - x_t = x.as_tensor() - update_pos = new_sample_pos[x_t] - new_data = data.loc(indexer, preserve_order=True)[x_t] + update_pos = new_sample_pos.iloc(x) + new_data = data.loc(indexer, preserve_order=True).iloc(x) logger.info('drop leaf samples, new sample count is {}, {} samples dropped'.format(len(new_sample_pos), len(data) - len(new_data))) return new_data, update_pos def _get_samples_on_leaves(self, sample_pos: DataFrame): x = (sample_pos < 0) - samples_on_leaves = sample_pos[x.as_tensor()] + samples_on_leaves = sample_pos.iloc(x) return samples_on_leaves def _get_column_max_bin(self, result_dict): diff --git a/python/fate/ml/ensemble/learner/decision_tree/tree_core/hist.py b/python/fate/ml/ensemble/learner/decision_tree/tree_core/hist.py index 484fdda088..a179e53ed9 100644 --- a/python/fate/ml/ensemble/learner/decision_tree/tree_core/hist.py +++ b/python/fate/ml/ensemble/learner/decision_tree/tree_core/hist.py @@ -152,7 +152,7 @@ def compute_hist(self, ctx: Context, nodes: List[Node], bin_train_data: DataFram ) indexer = bin_train_data.get_indexer('sample_id') gh = gh.loc(indexer, preserve_order=True) - gh["cnt"] = 1 + # gh["cnt"] = 1 sample_pos = sample_pos.loc(indexer, preserve_order=True) map_sample_pos = sample_pos.create_frame() map_sample_pos['node_idx'] = sample_pos.apply_row(lambda x: node_map[x['node_idx']])