Skip to content

Commit

Permalink
Merge pull request #4993 from FederatedAI/dev-2.0.0-beta-debugging
Browse files Browse the repository at this point in the history
Dev 2.0.0 beta debugging
  • Loading branch information
sagewe authored Jul 21, 2023
2 parents 5249377 + 03e0dfb commit fbb47af
Show file tree
Hide file tree
Showing 28 changed files with 1,524 additions and 334 deletions.
3 changes: 2 additions & 1 deletion python/fate/arch/context/_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
from typing import Any, List, Tuple, TypeVar, Union

from fate.arch.abc import FederationEngine, PartyMeta
from ._namespace import NS

from ..computing import is_table
from ..federation._gc import IterationGC
from ._namespace import NS

T = TypeVar("T")

Expand Down
5 changes: 5 additions & 0 deletions python/fate/arch/dataframe/_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from .manager import DataManager, Schema
from .ops import aggregate_indexer, get_partition_order_mappings
from fate.arch.tensor import DTensor


class DataFrame(object):
Expand Down Expand Up @@ -397,6 +398,10 @@ def __getitem__(self, items) -> "DataFrame":

return where(self, items)

if isinstance(items, DTensor):
from .ops._dimension_scaling import retrieval_row
return retrieval_row(self, items)

if isinstance(items, pd.Index):
items = items.tolist()
elif not isinstance(items, list):
Expand Down
30 changes: 30 additions & 0 deletions python/fate/arch/dataframe/ops/_dimension_scaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from ._compress_block import compress_blocks
from ._indexer import get_partition_order_by_raw_table
from ._set_item import set_item
from fate.arch.tensor import DTensor


def hstack(data_frames: List["DataFrame"]) -> "DataFrame":
Expand Down Expand Up @@ -188,6 +189,35 @@ def sample(df: "DataFrame", n=None, frac: float =None, random_state=None) -> "Da
return sample_frame


def retrieval_row(df: "DataFrame", indexer: "DTensor"):
if indexer.shape[1] != 1:
raise ValueError("Row indexing by DTensor should have only one column filling with True/False")

def _retrieval(blocks, t: torch.Tensor):
index = t.reshape(-1).tolist()
ret_blocks = [block[index] for block in blocks]

return ret_blocks

_retrieval_func = functools.partial(_retrieval)
retrieval_block_table = df.block_table.join(indexer.shardings._data, _retrieval_func)

_flatten_func = functools.partial(_flatten_partition, block_num=df.data_manager.block_num)
retrieval_raw_table = retrieval_block_table.mapPartitions(_flatten_func, use_previous_behavior=False)

partition_order_mappings = get_partition_order_by_raw_table(retrieval_raw_table)
to_blocks_func = functools.partial(to_blocks, dm=df.data_manager, partition_mappings=partition_order_mappings)

block_table = retrieval_raw_table.mapPartitions(to_blocks_func, use_previous_behavior=False)

return DataFrame(
df._ctx,
block_table,
partition_order_mappings,
df.data_manager
)


def _flatten_partition(kvs, block_num=0):
_flattens = []
for partition_id, blocks in kvs:
Expand Down
13 changes: 8 additions & 5 deletions python/fate/arch/dataframe/ops/_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@
# limitations under the License.
#
import functools
from typing import Union

import numpy as np
import pandas as pd
import numpy as np
import torch
from sklearn.preprocessing import OneHotEncoder

from typing import Union
from ._compress_block import compress_blocks
from .._dataframe import DataFrame
from ..manager import BlockType, DataManager
from ._compress_block import compress_blocks


BUCKETIZE_RESULT_TYPE = "int32"

Expand All @@ -45,7 +45,10 @@ def get_dummies(df: "DataFrame", dtype="int32"):
block_table = _one_hot_encode(df.block_table, block_indexes, dst_data_manager, [[categories]], dtype=dtype)

return DataFrame(
df._ctx, block_table, partition_order_mappings=df.partition_order_mappings, data_manager=dst_data_manager
df._ctx,
block_table,
partition_order_mappings=df.partition_order_mappings,
data_manager=dst_data_manager
)


Expand Down
17 changes: 8 additions & 9 deletions python/fate/arch/dataframe/utils/_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,14 @@ def _federated_sample_guest(
regenerated_sample_id_prefix = generate_sample_id_prefix()
choice_with_regenerated_ids = None
for label, f in frac.items():
label_df = df[df.label == label]
choices = resample(list(range(label_df.shape[0])), replace=True, n_samples=n, random_state=random_state)
label_df = df[(df.label == label).as_tensor()]
label_n = max(1, int(label_df.shape[0] * f))
choices = resample(list(range(label_df.shape[0])), replace=True,
n_samples=label_n, random_state=random_state)
label_indexer = list(label_df.get_indexer(target="sample_id").collect())
regenerated_ids = generate_sample_id(n, regenerated_sample_id_prefix)
label_choice_with_regenerated_ids = _agg_choices(ctx,
label_indexer,
choices,
regenerated_ids,
df.block_table.partitions)
regenerated_ids = generate_sample_id(label_n, regenerated_sample_id_prefix)
label_choice_with_regenerated_ids = _agg_choices(ctx, label_indexer, choices,
regenerated_ids, df.block_table.partitions)
if choice_with_regenerated_ids is None:
choice_with_regenerated_ids = label_choice_with_regenerated_ids
else:
Expand All @@ -123,7 +122,7 @@ def _federated_sample_guest(
else:
sample_df = None
for label, f in frac.items():
label_df = df[df.label == label]
label_df = df[(df.label == label).as_tensor()]
label_n = max(1, int(label_df.shape[0] * f))
sample_label_df = label_df.sample(n=label_n, random_state=random_state)

Expand Down
14 changes: 13 additions & 1 deletion python/fate/components/components/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def homo_nn(self):
@_lazy_cpn
def homo_lr(self):
from .homo_lr import homo_lr

return homo_lr

@_lazy_cpn
Expand Down Expand Up @@ -126,6 +126,18 @@ def feature_union(self):

return feature_union

@_lazy_cpn
def sample(self):
from .sample import sample

return sample

@_lazy_cpn
def data_split(self):
from .data_split import data_split

return data_split

@_lazy_cpn
def toy_example(self):
from .toy_example import toy_example
Expand Down
66 changes: 66 additions & 0 deletions python/fate/components/components/data_split.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#
# Copyright 2023 The FATE Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Union

from fate.arch import Context
from fate.components.core import GUEST, HOST, Role, cpn, params
from fate.ml.model_selection.data_split import DataSplitModuleGuest, DataSplitModuleHost


@cpn.component(roles=[GUEST, HOST], provider="fate")
def data_split(
ctx: Context,
role: Role,
input_data: cpn.dataframe_input(roles=[GUEST, HOST]),
train_size: cpn.parameter(type=Union[params.conint(ge=0), params.confloat(ge=0.0)], default=None,
desc="size of output training data, should be either int for exact sample size or float for fraction"),
validate_size: cpn.parameter(type=Union[params.conint(ge=0), params.confloat(ge=0.0)], default=None,
desc="size of output validation data, should be either int for exact sample size or float for fraction"),
test_size: cpn.parameter(type=Union[params.conint(ge=0), params.confloat(ge=0.0)], default=None,
desc="size of output test data, should be either int for exact sample size or float for fraction"),
stratified: cpn.parameter(type=bool, default=False,
desc="whether sample with stratification, "
"should not use this for data with continuous label values"),
random_state: cpn.parameter(type=params.conint(ge=0), default=None, desc="random state"),
ctx_mode: cpn.parameter(type=params.string_choice(["hetero", "homo", "local"]), default="hetero",
desc="sampling mode, 'homo' & 'local' will both sample locally"),
train_output_data: cpn.dataframe_output(roles=[GUEST, HOST], optional=True),
validate_output_data: cpn.dataframe_output(roles=[GUEST, HOST], optional=True),
test_output_data: cpn.dataframe_output(roles=[GUEST, HOST], optional=True),
):
if isinstance(train_size, float) or isinstance(validate_size, float) or isinstance(test_size, float):
if train_size + validate_size + test_size > 1:
raise ValueError("(train_size + validate_size + test_size) should be less than or equal to 1.0")
if train_size is None and validate_size is None and test_size is None:
train_size = 0.8
validate_size = 0.2
test_size = 0.0

sub_ctx = ctx.sub_ctx("train")
if role.is_guest:
module = DataSplitModuleGuest(train_size, validate_size, test_size, stratified, random_state, ctx_mode)
elif role.is_host:
module = DataSplitModuleHost(train_size, validate_size, test_size, stratified, random_state, ctx_mode)
input_data = input_data.read()

train_data_set, validate_data_set, test_data_set = module.fit(sub_ctx, input_data)
# train_data_set, validate_data_set, test_data_set = module.split_data(train_data)
if train_data_set:
train_output_data.write(train_data_set)
if validate_data_set:
validate_output_data.write(validate_data_set)
if test_data_set:
test_output_data.write(test_data_set)
75 changes: 75 additions & 0 deletions python/fate/components/components/sample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#
# Copyright 2023 The FATE Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Union, Mapping

from fate.arch import Context
from fate.components.core import GUEST, HOST, Role, cpn, params
from fate.ml.model_selection.sample import SampleModuleGuest, SampleModuleHost


@cpn.component(roles=[GUEST, HOST], provider="fate")
def sample(
ctx: Context,
role: Role,
input_data: cpn.dataframe_input(roles=[GUEST, HOST]),
mode: cpn.parameter(type=params.string_choice(['random', 'stratified', 'weight']),
default='random',
desc="sample mode, if select 'weight', "
"will use dataframe's weight as sampling weight, default 'random'"),
replace: cpn.parameter(type=bool, default=False,
desc="whether allow sampling with replacement, default False"),
frac: cpn.parameter(type=Union[params.confloat(gt=0.0),
Mapping[Union[params.conint(), params.confloat()], params.confloat(gt=0.0)]],
default=None, optional=True,
desc="if mode equals to random, it should be a float number greater than 0,"
"otherwise a dict of pairs like [label_i, sample_rate_i],"
"e.g. {0: 0.5, 1: 0.8, 2: 0.3}, any label unspecified in dict will not be sampled,"
"default: 1.0, cannot be used with n"),
n: cpn.parameter(type=Union[params.conint(gt=0),
Mapping[Union[params.conint(), params.confloat()], params.conint(gt=0)]], default=None, optional=True,
desc="exact sample size, it should be an int greater than 0, "
"otherwise a dict of pairs like [label_i, sample_count_i],"
"e.g. {0: 50, 1: 20, 2: 30}, any label unspecified in dict will not be sampled,"
"default: None, cannot be used with frac"),
random_state: cpn.parameter(type=params.conint(ge=0), default=None,
desc="random state"),
ctx_mode: cpn.parameter(type=params.string_choice(["hetero", "homo", "local"]), default="hetero",
desc="sampling mode, 'homo' & 'local' will both sample locally"),
output_data: cpn.dataframe_output(roles=[GUEST, HOST])
):
if frac is not None and n is not None:
raise ValueError(f"n and frac cannot be used at the same time")
if mode in ["random"] and (isinstance(frac, dict) or isinstance(n, dict)):
raise ValueError(f"frac or n must be single value when mode set to {mode}")
if frac is not None and frac > 1 and not replace:
raise ValueError(f"replace has to be set to True when sampling frac greater than 1.")
if n is None and frac is None:
frac = 1.0

sub_ctx = ctx.sub_ctx("train")
if role.is_guest:
module = SampleModuleGuest(mode=mode, replace=replace, frac=frac, n=n,
random_state=random_state, ctx_mode=ctx_mode)
elif role.is_host:
module = SampleModuleHost(mode=mode, replace=replace, frac=frac, n=n,
random_state=random_state, ctx_mode=ctx_mode)
else:
raise ValueError(f"unknown role")
input_data = input_data.read()

sampled_data = module.fit(sub_ctx, input_data)

output_data.write(sampled_data)
74 changes: 71 additions & 3 deletions python/fate/ml/ensemble/learner/decision_tree/hetero/guest.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,82 @@
from fate.ml.ensemble.learner.decision_tree.tree_core.decision_tree import DecisionTree
from fate.ml.ensemble.learner.decision_tree.tree_core.decision_tree import DecisionTree, Node
from fate.ml.ensemble.learner.decision_tree.tree_core.hist import SklearnHistBuilder
from fate.ml.ensemble.learner.decision_tree.tree_core.splitter import SklearnSplitter
from fate.arch import Context
from fate.arch.dataframe import DataFrame
import numpy as np


class HeteroDecisionTreeGuest(DecisionTree):

def __init__(self, max_depth=3, feature_importance_type='split', valid_features=None):
def __init__(self, max_depth=3, feature_importance_type='split', valid_features=None, max_split_nodes=1024):
super().__init__(max_depth, use_missing=False, zero_as_missing=False, feature_importance_type=feature_importance_type, valid_features=valid_features)
self.max_split_nodes = max_split_nodes
self.hist_builder = None
self.splitter = None

def fit(self, ctx: Context, train_data: DataFrame, grad_and_hess: DataFrame, encryptor):
def _compute_best_splits(self):
pass

def get_column_max_bin(self, result_dict):
bin_len = {}

for column, values in result_dict.items():
bin_num = len(values)
bin_len[column] = bin_num

max_max_value = max(bin_len.values())

return bin_len, max_max_value

def get_sklearn_hist_builder(self, bin_train_data, grad_and_hess, root_node, max_bin):
collect_data = bin_train_data.as_pd_df().sort_values(by='sample_id')
collect_gh = grad_and_hess.as_pd_df().sort_values(by='sample_id')
root_node.set_inst_indices(collect_gh['sample_id'].values)
feat_arr = collect_data.drop(columns=[bin_train_data.schema.sample_id_name, bin_train_data.schema.label_name, bin_train_data.schema.match_id_name]).values
g = collect_gh['g'].values
h = collect_gh['h'].values
feat_arr = np.asfortranarray(feat_arr.astype(np.uint8))
return SklearnHistBuilder(feat_arr, max_bin, g, h)

def get_sklearn_splitter(self, bin_train_data, grad_and_hess, root_node, max_bin):
pass

def get_distribute_hist_builder(self, bin_train_data, grad_and_hess, root_node, max_bin):
pass

def booster_fit(self, ctx: Context, bin_train_data: DataFrame, grad_and_hess: DataFrame, bining_dict: dict):


feat_max_bin, max_bin = self.get_column_max_bin(bining_dict)
sample_pos = self._init_sample_pos(bin_train_data)
root_node = self._initialize_root_node(grad_and_hess, ctx.guest.party[0] + '_' + ctx.guest.party[1])

self._nodes.append(root_node)

# init histogram builder
self.hist_builder = self.get_sklearn_hist_builder(bin_train_data, grad_and_hess, root_node, max_bin)
# init splitter
self.splitter = SklearnSplitter(bining_dict)

self.cur_layer_node = [root_node]
for cur_depth in range(self.max_depth):

# select samples on cur nodes

# compute histogram
hist = self.hist_builder.compute_hist(self.cur_layer_node, grad_and_hess)
# compute best splits
# update tree with best splits

# update sample position

# update cur_layer_node
break


return root_node, feat_max_bin, max_bin, hist, self.splitter

def fit(self, ctx: Context, train_data: DataFrame):
pass

def predict(self, ctx: Context, data_inst: DataFrame):
Expand Down
Loading

0 comments on commit fbb47af

Please sign in to comment.