Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dataframe: fix batch loader #4925

Merged
merged 1 commit into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/fate/arch/dataframe/_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

from .ops import (
aggregate_indexer,
transform_to_table,
get_partition_order_mappings
)
from .manager import DataManager, Schema
Expand Down Expand Up @@ -528,4 +527,5 @@ def __convert_to_table(self, target_name):
block_loc = self._data_manager.loc_block(target_name)
assert block_loc[1] == 0, "support only one indexer in current version"

from .ops._indexer import transform_to_table
return transform_to_table(self._block_table, block_loc[0], self._partition_order_mappings)
2 changes: 0 additions & 2 deletions python/fate/arch/dataframe/ops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@
# limitations under the License.
from ._indexer import (
aggregate_indexer,
transform_to_table,
get_partition_order_mappings
)


__all__ = [
"transform_to_table",
"aggregate_indexer",
"get_partition_order_mappings"
]
57 changes: 21 additions & 36 deletions python/fate/arch/dataframe/utils/_dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@
# limitations under the License.
import random

import numpy as np
import pandas as pd
import torch
from fate.arch.context.io.data import df
from fate.arch.dataframe import PandasReader, TorchDataSetReader


class DataLoader(object):
Expand All @@ -39,7 +35,10 @@ def __init__(
self._dataset = dataset
self._batch_size = batch_size
if dataset:
self._batch_size = min(batch_size, len(dataset))
if batch_size == -1:
self._batch_size = len(dataset)
else:
self._batch_size = min(batch_size, len(dataset))
self._shuffle = shuffle
self._batch_strategy = batch_strategy
self._random_seed = random_seed
Expand All @@ -53,16 +52,6 @@ def __init__(
def _init_settings(self):
if isinstance(self._dataset, df.Dataframe):
self._dataset = self._dataset.data
"""
if isinstance(self._dataset, pd.DataFrame):
self._dataset = PandasReader().to_frame(self._ctx, self._dataset)
elif isinstance(self._dataset, (np.ndarray, list)):
self._dataset = pd.DataFrame(np.ndarray)
self._dataset = PandasReader().to_frame(self._ctx, self._dataset)
elif isinstance(self._dataset, torch.utils.data.Dataset):
# Note: torch dataset items' order should be X, y
self._dataset = TorchDataSetReader().to_frame(self._ctx, self._dataset)
"""

if self._batch_strategy == "full":
self._batch_generator = FullBatchDataLoader(
Expand Down Expand Up @@ -92,17 +81,11 @@ def batch_num(self):

def __next__(self):
for batch in self._batch_generator:
if len(batch[1:]) == 1:
yield batch[1]
else:
yield batch[1:]
yield batch

def __iter__(self):
for batch in self._batch_generator:
if len(batch[1:]) == 1:
yield batch[1]
else:
yield batch[1:]
yield batch


class FullBatchDataLoader(object):
Expand Down Expand Up @@ -150,25 +133,27 @@ def _prepare(self):
self._batch_splits.append(self._dataset)
else:
if self._mode in ["homo", "local"] or self._role == "guest":
indexes = self._dataset.index.tolist()

indexer = list(self._dataset.get_indexer(target="sample_id").collect())
if self._shuffle:
random.seed = self._random_seed
random.shuffle(indexes)
random.shuffle(indexer)

for i, iter_ctx in self._ctx.range(self._batch_num):
batch_indexes = indexes[self._batch_size * i : self._batch_size * (i + 1)]
batch_indexer = indexer[self._batch_size * i: self._batch_size * (i + 1)]
batch_indexer = self._ctx.computing.parallelize(batch_indexer,
include_key=True,
partition=self._dataset.block_table.partitions)

sub_frame = self._dataset.loc(batch_indexes)
sub_frame = self._dataset.loc(batch_indexer, preserve_order=True)

if self._role == "guest":
iter_ctx.hosts.put("batch_indexes", batch_indexes)
iter_ctx.hosts.put("batch_indexes", batch_indexer)

self._batch_splits.append(sub_frame)
elif self._mode == "hetero" and self._role == "host":
for i, iter_ctx in self._ctx.range(self._batch_num):
batch_indexes = iter_ctx.guest.get("batch_indexes")
sub_frame = self._dataset.loc(batch_indexes)
sub_frame = self._dataset.loc(batch_indexes, preserve_order=True)
self._batch_splits.append(sub_frame)

def __next__(self):
Expand All @@ -179,11 +164,11 @@ def __next__(self):

for batch in self._batch_splits:
if batch.label and batch.weight:
yield batch.index, batch.values, batch.label, batch.weight
yield batch.values, batch.label, batch.weight
elif batch.label:
yield batch.index, batch.values, batch.label
yield batch.values, batch.label
else:
yield batch.index, batch.values
yield batch.values

def __iter__(self):
if self._role == "arbiter":
Expand All @@ -193,11 +178,11 @@ def __iter__(self):

for batch in self._batch_splits:
if batch.label and batch.weight:
yield batch.index, batch.values, batch.label, batch.weight
yield batch.values, batch.label, batch.weight
elif batch.label:
yield batch.index, batch.values, batch.label
yield batch.values, batch.label
else:
yield batch.index, batch.values
yield batch.values

@property
def batch_num(self):
Expand Down