Skip to content

Commit

Permalink
dataframe: support qcut and bucketize
Browse files Browse the repository at this point in the history
Signed-off-by: mgqa34 <mgq3374541@163.com>
Signed-off-by: weiwee <wbwmat@gmail.com>
  • Loading branch information
mgqa34 authored and sagewe committed Jul 21, 2023
1 parent f6e9386 commit ff78d28
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 1 deletion.
8 changes: 8 additions & 0 deletions python/fate/arch/dataframe/_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,14 @@ def quantile(
from .ops._quantile import quantile
return quantile(self, q, relative_error)

def qcut(self, q: int):
from .ops._quantile import qcut
return qcut(self, q)

def bucketize(self, boundaries: Union[dict, pd.DataFrame]) -> "DataFrame":
from .ops._encoder import bucketize
return bucketize(self, boundaries)

def __add__(self, other: Union[int, float, list, "np.ndarray", "DataFrame", "pd.Series"]) -> "DataFrame":
return self.__arithmetic_operate(operator.add, other)

Expand Down
76 changes: 75 additions & 1 deletion python/fate/arch/dataframe/ops/_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import functools

import pandas as pd
import numpy as np
import torch
from sklearn.preprocessing import OneHotEncoder
from typing import Union
from .._dataframe import DataFrame
from ..manager import BlockType
from ..manager import BlockType, DataManager


BUCKETIZE_RESULT_TYPE = "int32"


def get_dummies(df: "DataFrame", dtype="int32"):
Expand Down Expand Up @@ -98,3 +106,69 @@ def _encode(blocks):
return ret_blocks

return block_table.mapValues(_encode)


def bucketize(df: DataFrame, boundaries: Union[pd.DataFrame, dict]):
if isinstance(boundaries, pd.DataFrame):
boundaries = dict([(_name, boundaries[_name].tolist()) for _name in boundaries])
elif not isinstance(boundaries, dict):
raise ValueError("boundaries should be pd.DataFrame or dict")

data_manager = df.data_manager.duplicate()
field_names = list(filter(lambda field_name: field_name in boundaries, data_manager.infer_operable_field_names()))
blocks_loc = data_manager.loc_block(field_names)

_boundaries_list = []
for name, (_bid, _) in zip(field_names, blocks_loc):
if BlockType.is_tensor(data_manager.blocks[_bid].block_type):
_boundary = torch.tensor(boundaries[name])
_boundary[-1] = torch.inf
else:
_boundary = np.array(boundaries[name])
_boundary[-1] = np.inf

_boundaries_list.append((_bid, _, _boundary))

narrow_blocks, dst_blocks = data_manager.split_columns(field_names, BlockType.get_block_type(BUCKETIZE_RESULT_TYPE))

def _mapper(blocks, boundaries_list: list = None, narrow_loc: list = None,
dst_bids: list = None, dm: DataManager = None):
ret_blocks = []
for block in blocks:
if isinstance(block, torch.Tensor):
ret_blocks.append(block.clone())
elif isinstance(block, np.ndarray):
ret_blocks.append(block.copy())
else:
ret_blocks.append(block)

for i in range(len(ret_blocks), dm.block_num):
ret_blocks.append([])

for bid, offsets in narrow_loc:
ret_blocks[bid] = ret_blocks[bid][:, offsets]

for dst_bid, (src_bid, src_offset, boundary) in zip(dst_bids, boundaries_list):
if isinstance(blocks[src_bid], torch.Tensor):
ret = torch.bucketize(blocks[src_bid][:, [src_offset]], boundary, out_int32=False)
else:
ret = torch.bucketize(blocks[src_bid][:, [src_offset]], boundary)

ret_blocks[dst_bid] = dm.blocks[dst_bid].convert_block(ret)

return ret_blocks

bucketize_mapper = functools.partial(_mapper,
boundaries_list=_boundaries_list,
narrow_loc=narrow_blocks,
dst_bids=dst_blocks,
dm=data_manager)

block_table = df.block_table.mapValues(bucketize_mapper)

return DataFrame(
df._ctx,
block_table,
partition_order_mappings=df.partition_order_mappings,
data_manager=data_manager
)
16 changes: 16 additions & 0 deletions python/fate/arch/dataframe/ops/_quantile.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,19 @@ def _reducer(l_gk_summary_obj_list, r_gk_summary_obj_list):
quantile_df = pd.DataFrame(quantile_rets, index=q)

return quantile_df


def qcut(df: DataFrame, q: int):
assert isinstance(q, int), f"to use qcut, {q} should be positive integer"
max_ret = df.max()
min_ret = df.min()

dist = (max_ret - min_ret) / q

cut_ret = []
for i in range(1, q):
cut_ret.append(min_ret + i * dist)

cut_ret.append(max_ret)

return pd.DataFrame(cut_ret, index=range(1, q + 1, 1))

0 comments on commit ff78d28

Please sign in to comment.