Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev 2.0.0 beta debugging #4983

Merged
merged 11 commits into from
Jul 14, 2023
38 changes: 38 additions & 0 deletions examples/pipeline/test_linr_sid_cv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#
# Copyright 2019 The FATE Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from fate_client.pipeline import FateFlowPipeline
from fate_client.pipeline.components.fate import CoordinatedLinR, Intersection
from fate_client.pipeline.interface import DataWarehouseChannel

pipeline = FateFlowPipeline().set_roles(guest="9999", host="9998", arbiter="9998")

intersect_0 = Intersection("intersect_0", method="raw")
intersect_0.guest.component_setting(input_data=DataWarehouseChannel(name="motor_hetero_guest",
namespace="experiment_sid"))
intersect_0.hosts[0].component_setting(input_data=DataWarehouseChannel(name="motor_hetero_host",
namespace="experiment_sid"))
linr_0 = CoordinatedLinR("linr_0",
epochs=2,
batch_size=100,
optimizer={"method": "sgd", "optimizer_params": {"lr": 0.2}},
init_param={"fit_intercept": True},
cv_data=intersect_0.outputs["output_data"],
cv_param={"n_splits": 3})

pipeline.add_task(intersect_0)
pipeline.add_task(linr_0)
pipeline.compile()
print(pipeline.get_dag())
pipeline.fit()
38 changes: 38 additions & 0 deletions examples/pipeline/test_lr_sid_cv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#
# Copyright 2019 The FATE Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from fate_client.pipeline import FateFlowPipeline
from fate_client.pipeline.components.fate import CoordinatedLR, Intersection
from fate_client.pipeline.interface import DataWarehouseChannel

pipeline = FateFlowPipeline().set_roles(guest="9999", host="9998", arbiter="9998")

intersect_0 = Intersection("intersect_0", method="raw")
intersect_0.guest.component_setting(input_data=DataWarehouseChannel(name="breast_hetero_guest",
namespace="experiment_sid"))
intersect_0.hosts[0].component_setting(input_data=DataWarehouseChannel(name="breast_hetero_host",
namespace="experiment_sid"))
lr_0 = CoordinatedLR("lr_0",
epochs=2,
batch_size=100,
optimizer={"method": "sgd", "optimizer_params": {"lr": 0.01}},
init_param={"fit_intercept": True},
cv_data=intersect_0.outputs["output_data"],
cv_param={"n_splits": 3})

pipeline.add_task(intersect_0)
pipeline.add_task(lr_0)
pipeline.compile()
print(pipeline.get_dag())
pipeline.fit()
16 changes: 12 additions & 4 deletions python/fate/arch/dataframe/_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,18 @@ def describe(self, ddof=1, unbiased=False):
def quantile(
self,
q,
axis=0,
method="quantile",
relative_error: float = 1e-4
):
...
from .ops._quantile import quantile
return quantile(self, q, relative_error)

def qcut(self, q: int):
from .ops._quantile import qcut
return qcut(self, q)

def bucketize(self, boundaries: Union[dict, pd.DataFrame]) -> "DataFrame":
from .ops._encoder import bucketize
return bucketize(self, boundaries)

def __add__(self, other: Union[int, float, list, "np.ndarray", "DataFrame", "pd.Series"]) -> "DataFrame":
return self.__arithmetic_operate(operator.add, other)
Expand Down Expand Up @@ -519,7 +527,7 @@ def _merge_list(lhs, rhs):
block_table = transform_list_block_to_frame_block(block_table, self._data_manager)

partition_order_mappings = get_partition_order_mappings(block_table)
return DataFrame(self._ctx, block_table, partition_order_mappings, self._data_manager)
return DataFrame(self._ctx, block_table, partition_order_mappings, self._data_manager.duplicate())

def iloc(self, indexes):
...
Expand Down
76 changes: 75 additions & 1 deletion python/fate/arch/dataframe/ops/_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import functools

import pandas as pd
import numpy as np
import torch
from sklearn.preprocessing import OneHotEncoder
from typing import Union
from .._dataframe import DataFrame
from ..manager import BlockType
from ..manager import BlockType, DataManager


BUCKETIZE_RESULT_TYPE = "int32"


def get_dummies(df: "DataFrame", dtype="int32"):
Expand Down Expand Up @@ -98,3 +106,69 @@ def _encode(blocks):
return ret_blocks

return block_table.mapValues(_encode)


def bucketize(df: DataFrame, boundaries: Union[pd.DataFrame, dict]):
if isinstance(boundaries, pd.DataFrame):
boundaries = dict([(_name, boundaries[_name].tolist()) for _name in boundaries])
elif not isinstance(boundaries, dict):
raise ValueError("boundaries should be pd.DataFrame or dict")

data_manager = df.data_manager.duplicate()
field_names = list(filter(lambda field_name: field_name in boundaries, data_manager.infer_operable_field_names()))
blocks_loc = data_manager.loc_block(field_names)

_boundaries_list = []
for name, (_bid, _) in zip(field_names, blocks_loc):
if BlockType.is_tensor(data_manager.blocks[_bid].block_type):
_boundary = torch.tensor(boundaries[name])
_boundary[-1] = torch.inf
else:
_boundary = np.array(boundaries[name])
_boundary[-1] = np.inf

_boundaries_list.append((_bid, _, _boundary))

narrow_blocks, dst_blocks = data_manager.split_columns(field_names, BlockType.get_block_type(BUCKETIZE_RESULT_TYPE))

def _mapper(blocks, boundaries_list: list = None, narrow_loc: list = None,
dst_bids: list = None, dm: DataManager = None):
ret_blocks = []
for block in blocks:
if isinstance(block, torch.Tensor):
ret_blocks.append(block.clone())
elif isinstance(block, np.ndarray):
ret_blocks.append(block.copy())
else:
ret_blocks.append(block)

for i in range(len(ret_blocks), dm.block_num):
ret_blocks.append([])

for bid, offsets in narrow_loc:
ret_blocks[bid] = ret_blocks[bid][:, offsets]

for dst_bid, (src_bid, src_offset, boundary) in zip(dst_bids, boundaries_list):
if isinstance(blocks[src_bid], torch.Tensor):
ret = torch.bucketize(blocks[src_bid][:, [src_offset]], boundary, out_int32=False)
else:
ret = torch.bucketize(blocks[src_bid][:, [src_offset]], boundary)

ret_blocks[dst_bid] = dm.blocks[dst_bid].convert_block(ret)

return ret_blocks

bucketize_mapper = functools.partial(_mapper,
boundaries_list=_boundaries_list,
narrow_loc=narrow_blocks,
dst_bids=dst_blocks,
dm=data_manager)

block_table = df.block_table.mapValues(bucketize_mapper)

return DataFrame(
df._ctx,
block_table,
partition_order_mappings=df.partition_order_mappings,
data_manager=data_manager
)
74 changes: 74 additions & 0 deletions python/fate/arch/dataframe/ops/_quantile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#
# Copyright 2019 The FATE Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import functools
import pandas as pd
from .._dataframe import DataFrame
from fate.arch.tensor.inside import GKSummary


def quantile(df: DataFrame, q, relative_error: float):
if isinstance(q, float):
q = [q]
elif not isinstance(q, list):
q = list(q)

data_manager = df.data_manager
column_names = data_manager.infer_operable_field_names()
blocks_loc = [data_manager.loc_block(name) for name in column_names]

def _mapper(blocks, columns_loc=None, error=None):
column_size = len(columns_loc)
gk_summary_obj_list = [GKSummary(error) for _ in range(column_size)]

for idx, (bid, offset) in enumerate(columns_loc):
gk_summary_obj_list[idx] += blocks[bid][:, offset]

return gk_summary_obj_list

def _reducer(l_gk_summary_obj_list, r_gk_summary_obj_list):
rets = []
for l_gk_summary_obj, r_gk_summary_obj in zip(l_gk_summary_obj_list, r_gk_summary_obj_list):
rets.append(l_gk_summary_obj + r_gk_summary_obj)

return rets

gk_summary_func = functools.partial(_mapper, columns_loc=blocks_loc, error=relative_error)
ret_gk_summary_obj_list = df.block_table.mapValues(gk_summary_func).reduce(_reducer)

quantile_rets = dict()
for column_name, gk_summary_obj in zip(column_names, ret_gk_summary_obj_list):
query_ret = gk_summary_obj.queries(q)
quantile_rets[column_name] = query_ret

quantile_df = pd.DataFrame(quantile_rets, index=q)

return quantile_df


def qcut(df: DataFrame, q: int):
assert isinstance(q, int), f"to use qcut, {q} should be positive integer"
max_ret = df.max()
min_ret = df.min()

dist = (max_ret - min_ret) / q

cut_ret = []
for i in range(1, q):
cut_ret.append(min_ret + i * dist)

cut_ret.append(max_ret)

return pd.DataFrame(cut_ret, index=range(1, q + 1, 1))
4 changes: 2 additions & 2 deletions python/fate/arch/dataframe/utils/_dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(
self._dataset = dataset
self._batch_size = batch_size
if dataset:
if batch_size == -1:
if batch_size is None:
self._batch_size = len(dataset)
else:
self._batch_size = min(batch_size, len(dataset))
Expand Down Expand Up @@ -83,7 +83,7 @@ def __init__(self, dataset, ctx, mode, role, batch_size, shuffle, random_seed, n
self._mode = mode
self._role = role
self._batch_size = batch_size
if self._batch_size < 0 and self._role != "arbiter":
if self._batch_size is None and self._role != "arbiter":
self._batch_size = len(self._dataset)
self._shuffle = shuffle
self._random_seed = random_seed
Expand Down
5 changes: 5 additions & 0 deletions python/fate/components/components/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ def hetero_feature_selection(self):

return hetero_feature_selection

@_lazy_cpn
def toy_example(self):
from .toy_example import toy_example
return toy_example

@_lazy_cpn
def dataframe_io_test(self):
from .dataframe_io_test import dataframe_io_test
Expand Down
Loading