Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dataframe: fix onehot to support higher version of sklearn #5207

Merged
merged 2 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 16 additions & 76 deletions python/fate/arch/dataframe/ops/_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,20 @@ def _reducer(categories1_, categories2_):


def _one_hot_encode(block_table, block_indexes, data_manager, categories, dtype):
categories = [np.array(category) for category in categories]
transform_categories = []
mock_data = []
for category in categories:
transform_categories.append([np.array(cate) for cate in category])
shapes = [cate.shape[0] for cate in transform_categories[-1]]
row_shape = max(shapes)
column_shape = len(categories[-1])
mock_cate_data = torch.zeros((row_shape, column_shape), dtype=getattr(torch, dtype))
for rid in range(row_shape):
for cid in range(column_shape):
mock_cate_data[rid][cid] = rid % shapes[cid]

mock_data.append(mock_cate_data)

block_index_set = set(block_indexes)

def _encode(blocks):
Expand All @@ -96,8 +109,8 @@ def _encode(blocks):
continue

enc = OneHotEncoder(dtype=dtype)
enc.fit([[1]]) # one hot encoder need to fit first.
enc.categories_ = categories[idx]
enc.fit(mock_data[idx]) # one hot encoder need to fit first.
enc.categories_ = transform_categories[idx]
idx += 1
enc_blocks.append(enc.transform(block).toarray())

Expand All @@ -108,79 +121,6 @@ def _encode(blocks):
return block_table.mapValues(_encode)


def bucketize(df: DataFrame, boundaries: Union[pd.DataFrame, dict]):
if isinstance(boundaries, pd.DataFrame):
boundaries = dict([(_name, boundaries[_name].tolist()) for _name in boundaries])
elif not isinstance(boundaries, dict):
raise ValueError("boundaries should be pd.DataFrame or dict")

data_manager = df.data_manager.duplicate()
field_names = list(filter(lambda field_name: field_name in boundaries, data_manager.infer_operable_field_names()))
blocks_loc = data_manager.loc_block(field_names)

_boundaries_list = []
for name, (_bid, _) in zip(field_names, blocks_loc):
if BlockType.is_tensor(data_manager.blocks[_bid].block_type):
_boundary = torch.tensor(boundaries[name])
_boundary[-1] = torch.inf
else:
_boundary = np.array(boundaries[name])
_boundary[-1] = np.inf

_boundaries_list.append((_bid, _, _boundary))

narrow_blocks, dst_blocks = data_manager.split_columns(
field_names, BlockType.get_block_type(BUCKETIZE_RESULT_TYPE)
)

def _mapper(
blocks, boundaries_list: list = None, narrow_loc: list = None, dst_bids: list = None, dm: DataManager = None
):
ret_blocks = []
for block in blocks:
if isinstance(block, torch.Tensor):
ret_blocks.append(block.clone())
elif isinstance(block, np.ndarray):
ret_blocks.append(block.copy())
else:
ret_blocks.append(block)

for i in range(len(ret_blocks), dm.block_num):
ret_blocks.append([])

for bid, offsets in narrow_loc:
ret_blocks[bid] = ret_blocks[bid][:, offsets]

for dst_bid, (src_bid, src_offset, boundary) in zip(dst_bids, boundaries_list):
if isinstance(blocks[src_bid], torch.Tensor):
ret = torch.bucketize(blocks[src_bid][:, [src_offset]], boundary, out_int32=False)
else:
ret = torch.bucketize(blocks[src_bid][:, [src_offset]], boundary)

ret_blocks[dst_bid] = dm.blocks[dst_bid].convert_block(ret)

return ret_blocks

bucketize_mapper = functools.partial(
_mapper, boundaries_list=_boundaries_list, narrow_loc=narrow_blocks, dst_bids=dst_blocks, dm=data_manager
)

block_table = df.block_table.mapValues(bucketize_mapper)

block_indexes = data_manager.infer_operable_blocks()
if len(block_indexes) > 1:
to_promote_types = []
for bid in block_indexes:
to_promote_types.append((bid, BlockType.get_block_type(BUCKETIZE_RESULT_TYPE)))

data_manager.promote_types(to_promote_types)
block_table, data_manager = compress_blocks(block_table, data_manager)

return DataFrame(
df._ctx, block_table, partition_order_mappings=df.partition_order_mappings, data_manager=data_manager
)


def bucketize(df: DataFrame, boundaries: Union[pd.DataFrame, dict]):
if isinstance(boundaries, pd.DataFrame):
boundaries = dict([(_name, boundaries[_name].tolist()) for _name in boundaries])
Expand Down
3 changes: 1 addition & 2 deletions python/requirements-fate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,4 @@ beautifultable
requests<2.26.0
grpcio==1.46.3
protobuf==3.19.6
scikit-learn==1.2.1; sys_platform == 'darwin' and platform_machine == 'arm64'
scikit-learn==1.0.1; sys_platform != 'darwin' or platform_machine != 'arm64'
scikit-learn
3 changes: 1 addition & 2 deletions python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
"cloudpickle==2.1.0",
"click",
"ruamel.yaml==0.16",
"scikit-learn==1.2.1; sys_platform == 'darwin' and platform_machine == 'arm64'",
"scikit-learn==1.0.1; sys_platform != 'darwin' or platform_machine != 'arm64'",
"scikit-learn",
"numpy",
"pandas",
"transformers",
Expand Down