Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/feature-2.0.0-beta-dataframe_upd…
Browse files Browse the repository at this point in the history
…ate' into dev-2.0.0-secureboost
  • Loading branch information
talkingwallace committed Jul 24, 2023
2 parents 04facf1 + d845c7a commit 8411275
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 69 deletions.
66 changes: 38 additions & 28 deletions python/fate/arch/dataframe/_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,18 @@ def weight(self):

@property
def shape(self) -> "tuple":
if not self.__count:
if self.__count is None:
if self._sample_id_indexer:
items = self._sample_id_indexer.count()
elif self._match_id_indexer:
items = self._match_id_indexer.count()
else:
items = self._block_table.mapValues(lambda block: 0 if block is None else len(block[0])).reduce(
lambda size1, size2: size1 + size2
)
if self._block_table.count() == 0:
items = 0
else:
items = self._block_table.mapValues(lambda block: 0 if block is None else len(block[0])).reduce(
lambda size1, size2: size1 + size2
)
self.__count = items

return self.__count, len(self._data_manager.schema.columns)
Expand Down Expand Up @@ -174,6 +177,14 @@ def create_frame(self, with_label=False, with_weight=False, columns: list = None
with_sample_id=True, with_match_id=True, with_label=with_label, with_weight=with_weight, columns=columns
)

def empty_frame(self) -> "DataFrame":
return DataFrame(
self._ctx,
self._ctx.computing.parallelize([], include_key=False, partition=self._block_table.partitions),
partition_order_mappings=dict(),
data_manager=self._data_manager.duplicate()
)

def drop(self, index) -> "DataFrame":
from .ops._dimension_scaling import drop

Expand Down Expand Up @@ -252,21 +263,16 @@ def sigmoid(self) -> "DataFrame":

return sigmoid(self)

def rename(
self,
sample_id_name: str = None,
match_id_name: str = None,
label_name: str = None,
weight_name: str = None,
columns: dict = None,
):
self._data_manager.rename(
sample_id_name=sample_id_name,
match_id_name=match_id_name,
label_name=label_name,
weight_name=weight_name,
columns=columns,
)
def rename(self, sample_id_name: str = None,
match_id_name: str = None,
label_name: str = None,
weight_name: str = None,
columns: dict = None):
self._data_manager.rename(sample_id_name=sample_id_name,
match_id_name=match_id_name,
label_name=label_name,
weight_name=weight_name,
columns=columns)

def count(self) -> "int":
return self.shape[0]
Expand All @@ -276,19 +282,20 @@ def describe(self, ddof=1, unbiased=False):

return describe(self, ddof=ddof, unbiased=unbiased)

def quantile(self, q, relative_error: float = 1e-4):
def quantile(
self,
q,
relative_error: float = 1e-4
):
from .ops._quantile import quantile

return quantile(self, q, relative_error)

def qcut(self, q: int):
from .ops._quantile import qcut

return qcut(self, q)

def bucketize(self, boundaries: Union[dict, pd.DataFrame]) -> "DataFrame":
from .ops._encoder import bucketize

return bucketize(self, boundaries)

def hist(self, targets):
Expand All @@ -298,7 +305,6 @@ def hist(self, targets):

def replace(self, to_replace=None) -> "DataFrame":
from .ops._replace import replace

return replace(self, to_replace)

def __add__(self, other: Union[int, float, list, "np.ndarray", "DataFrame", "pd.Series"]) -> "DataFrame":
Expand Down Expand Up @@ -468,6 +474,9 @@ def get_indexer(self, target):
if target not in ["sample_id", "match_id"]:
raise ValueError(f"Target should be sample_id or match_id, but {target} found")

if self.shape[0] == 0:
return self._ctx.computing.parallelize([], include_key=False, partitions=self._block_table.partitions)

target_name = getattr(self.schema, f"{target}_name")
indexer = self.__convert_to_table(target_name)
if target == "sample_id":
Expand All @@ -484,6 +493,9 @@ def loc(self, indexer, target="sample_id", preserve_order=False):
else:
indexer = self_indexer.join(indexer, lambda lhs, rhs: (lhs, lhs))

if indexer.count() == 0:
return self.empty_frame()

agg_indexer = aggregate_indexer(indexer)

if not preserve_order:
Expand Down Expand Up @@ -563,7 +575,7 @@ def copy(self) -> "DataFrame":
self._ctx,
self._block_table.mapValues(lambda v: v),
copy.deepcopy(self.partition_order_mappings),
self._data_manager.duplicate(),
self._data_manager.duplicate()
)

@classmethod
Expand All @@ -578,9 +590,8 @@ def vstack(cls, stacks: List["DataFrame"]) -> "DataFrame":

return vstack(stacks)

def sample(self, n: int = None, frac: float = None, random_state=None) -> "DataFrame":
def sample(self, n: int=None, frac: float=None, random_state=None) -> "DataFrame":
from .ops._dimension_scaling import sample

return sample(self, n, frac, random_state)

def __extract_fields(
Expand Down Expand Up @@ -612,5 +623,4 @@ def __convert_to_table(self, target_name):

def data_overview(self, num=100):
from .ops._data_overview import collect_data

return collect_data(self, num=100)
19 changes: 18 additions & 1 deletion python/fate/arch/dataframe/ops/_dimension_scaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import functools
from typing import List
import pandas as pd
Expand Down Expand Up @@ -56,10 +57,15 @@ def hstack(data_frames: List["DataFrame"]) -> "DataFrame":


def vstack(data_frames: List["DataFrame"]) -> "DataFrame":
frame_0 = data_frames[0]
data_frames = list(filter(lambda df: df.shape[0], data_frames))
if not data_frames:
return frame_0

if len(data_frames[0]) == 1:
return data_frames[0]

def _align_blocks(blocks, src_fields_loc=None, src_dm: DataManager=None, dst_dm: DataManager=None):
def _align_blocks(blocks, src_fields_loc=None, src_dm: DataManager = None, dst_dm: DataManager = None):
ret_blocks = []
lines = None
for dst_bid, block in enumerate(dst_dm.blocks):
Expand Down Expand Up @@ -127,6 +133,14 @@ def _align_blocks(blocks, src_fields_loc=None, src_dm: DataManager=None, dst_dm:


def drop(df: "DataFrame", index: "DataFrame" = None) -> "DataFrame":
if index.shape[0] == 0:
return DataFrame(
df._ctx,
block_table=df.block_table,
partition_order_mappings=copy.deepcopy(df.partition_order_mappings),
data_manager=df.data_manager.duplicate()
)

data_manager = df.data_manager.duplicate()
l_flatten_func = functools.partial(
_flatten_partition,
Expand Down Expand Up @@ -205,6 +219,9 @@ def _retrieval(blocks, t: torch.Tensor):
_flatten_func = functools.partial(_flatten_partition, block_num=df.data_manager.block_num)
retrieval_raw_table = retrieval_block_table.mapPartitions(_flatten_func, use_previous_behavior=False)

if retrieval_raw_table.count() == 0:
return df.empty_frame()

partition_order_mappings = get_partition_order_by_raw_table(retrieval_raw_table)
to_blocks_func = functools.partial(to_blocks, dm=df.data_manager, partition_mappings=partition_order_mappings)

Expand Down
1 change: 1 addition & 0 deletions python/fate/arch/dataframe/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
from ._dataloader import BatchEncoding
from ._k_fold import KFold
from ._sample import federated_sample
from ._sample import local_sample
34 changes: 15 additions & 19 deletions python/fate/arch/dataframe/utils/_dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def _prepare(self):
return

if self._batch_size == len(self._dataset):
self._batch_splits.append(self._dataset)
self._batch_splits.append(BatchEncoding(self._dataset, batch_id=0))
else:
if self._mode in ["homo", "local"] or self._role == "guest":
indexer = list(self._dataset.get_indexer(target="sample_id").collect())
Expand All @@ -137,31 +137,21 @@ def _prepare(self):
if self._role == "guest":
iter_ctx.hosts.put("batch_indexes", batch_indexer)

self._batch_splits.append(sub_frame)
self._batch_splits.append(BatchEncoding(sub_frame, batch_id=i))
elif self._mode == "hetero" and self._role == "host":
for i, iter_ctx in self._ctx.sub_ctx("dataloader_batch").ctxs_range(self._batch_num):
batch_indexes = iter_ctx.guest.get("batch_indexes")
sub_frame = self._dataset.loc(batch_indexes, preserve_order=True)
self._batch_splits.append(sub_frame)
self._batch_splits.append(BatchEncoding(sub_frame, batch_id=i))

def __next__(self):
if self._role == "arbiter":
for batch_id in range(self._batch_num):
yield BatchEncoding(batch_id=batch_id)
return

for bid, batch in enumerate(self._batch_splits):
if batch.label and batch.weight:
yield BatchEncoding(x=batch.values.as_tensor(),
label=batch.label.as_tensor(),
weight=batch.weight.as_tensor(),
batch_id=bid)
elif batch.label:
yield BatchEncoding(x=batch.values.as_tensor(),
label=batch.label.as_tensor(),
batch_id=bid)
else:
yield BatchEncoding(x=batch.values.as_tensor())
for batch in self._batch_splits:
yield batch

def __iter__(self):
return self.__next__()
Expand All @@ -172,10 +162,16 @@ def batch_num(self):


class BatchEncoding(object):
def __init__(self, x=None, label=None, weight=None, batch_id=None):
self._x = x
self._label = label
self._weight = weight
def __init__(self, batch_df=None, batch_id=None):
if batch_df:
self._x = batch_df.values.as_tensor()
self._label = batch_df.label.as_tensor() if batch_df.label else None
self._weight = batch_df.weight.as_tensor() if batch_df.weight else None
else:
self._x = None
self._label = None
self._weight = None

self._batch_id = batch_id

@property
Expand Down
60 changes: 39 additions & 21 deletions python/fate/arch/dataframe/utils/_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@
REGENERATED_IDS = "regenerated_ids"


def local_sample(
ctx,
df: DataFrame,
n: int=None,
frac: Union[float, Dict[Any, float]] = None,
replace: bool = True,
random_state=None
):
return _sample_guest(ctx, df, n, frac, replace, random_state, sync=False)


def federated_sample(
ctx,
df: DataFrame,
Expand All @@ -35,36 +46,37 @@ def federated_sample(
random_state=None,
role: str = "guest"):
if role == "guest":
return _federated_sample_guest(ctx, df, n, frac, replace, random_state)
return _sample_guest(ctx, df, n, frac, replace, random_state, sync=True)
else:
return _federated_sample_host(ctx, df)


def _federated_sample_guest(
def _sample_guest(
ctx,
df: DataFrame,
n: int = None,
frac: Union[float, Dict[Any, float]] = None,
replace: bool = True,
random_state=None
random_state=None,
sync=True,
):
if n is not None and frac is not None:
raise ValueError("federated_sample's parameters n and frac should not be set in the same time.")
raise ValueError("sample's parameters n and frac should not be set in the same time.")

if frac is not None:
if isinstance(frac, float):
if frac > 1:
raise ValueError(f"federated_sample's parameter frac={frac} should <= 1.0")
raise ValueError(f"sample's parameter frac={frac} should <= 1.0")
n = max(1, int(frac * df.shape[0]))
else:
for k, f in frac.items():
if f > 1 and replace is False:
raise ValueError(f"federated_sample's parameter frac's label={k}, fraction={f} "
raise ValueError(f"sample's parameter frac's label={k}, fraction={f} "
f"should <= 1.0 if replace=False")

if n is not None:
if n > df.shape[0] and replace is False:
raise ValueError(f"federated_sample's parameter n={n} should <= data_size={df.shape[0]} if replace=False")
raise ValueError(f"sample's parameter n={n} should <= data_size={df.shape[0]} if replace=False")

if replace:
choices = resample(list(range(df.shape[0])), replace=True, n_samples=n, random_state=random_state)
Expand All @@ -77,19 +89,22 @@ def _federated_sample_guest(
regenerated_ids,
df.block_table.partitions)

ctx.hosts.put(REGENERATED_TAG, True)
ctx.hosts.put(REGENERATED_IDS, choice_with_regenerated_ids)
if sync:
ctx.hosts.put(REGENERATED_TAG, True)
ctx.hosts.put(REGENERATED_IDS, choice_with_regenerated_ids)

regenerated_raw_table = _regenerated_sample_ids(df, choice_with_regenerated_ids)
sample_df = _convert_raw_table_to_df(df._ctx, regenerated_raw_table, df.data_manager)
sample_indexer = sample_df.get_indexer(target="sample_id")
ctx.hosts.put(SAMPLE_INDEX_TAG, sample_indexer)
if sync:
sample_indexer = sample_df.get_indexer(target="sample_id")
ctx.hosts.put(SAMPLE_INDEX_TAG, sample_indexer)

else:
sample_df = df.sample(n=n, random_state=random_state)
sample_indexer = sample_df.get_indexer(target="sample_id")
ctx.hosts.put(REGENERATED_TAG, False)
ctx.hosts.put(SAMPLE_INDEX_TAG, sample_indexer)
if sync:
sample_indexer = sample_df.get_indexer(target="sample_id")
ctx.hosts.put(REGENERATED_TAG, False)
ctx.hosts.put(SAMPLE_INDEX_TAG, sample_indexer)
else:
up_sample = False
for label, f in frac.items():
Expand All @@ -113,12 +128,14 @@ def _federated_sample_guest(
else:
choice_with_regenerated_ids = choice_with_regenerated_ids.union(label_choice_with_regenerated_ids)

ctx.hosts.put(REGENERATED_TAG, True)
ctx.hosts.put(REGENERATED_IDS, choice_with_regenerated_ids)
if sync:
ctx.hosts.put(REGENERATED_TAG, True)
ctx.hosts.put(REGENERATED_IDS, choice_with_regenerated_ids)
regenerated_raw_table = _regenerated_sample_ids(df, choice_with_regenerated_ids)
sample_df = _convert_raw_table_to_df(df._ctx, regenerated_raw_table, df.data_manager)
sample_indexer = sample_df.get_indexer(target="sample_id")
ctx.hosts.put(SAMPLE_INDEX_TAG, sample_indexer)
if sync:
sample_indexer = sample_df.get_indexer(target="sample_id")
ctx.hosts.put(SAMPLE_INDEX_TAG, sample_indexer)
else:
sample_df = None
for label, f in frac.items():
Expand All @@ -131,9 +148,10 @@ def _federated_sample_guest(
else:
DataFrame.hstack([sample_df, sample_label_df])

sample_indexer = sample_df.get_indexer(target="sample_id")
ctx.hosts.put(REGENERATED_IDS, False)
ctx.hosts.put(SAMPLE_INDEX_TAG, sample_indexer)
if sync:
sample_indexer = sample_df.get_indexer(target="sample_id")
ctx.hosts.put(REGENERATED_IDS, False)
ctx.hosts.put(SAMPLE_INDEX_TAG, sample_indexer)

return sample_df

Expand Down

0 comments on commit 8411275

Please sign in to comment.