Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature 2.0.0 beta dataframe refact #5039

Merged
merged 3 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 32 additions & 11 deletions python/fate/arch/dataframe/_frame_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import Union


from .conf.default_config import DATAFRAME_BLOCK_ROW_SIZE
from .entity import types
from ._dataframe import DataFrame
from .manager import DataManager
Expand All @@ -41,7 +42,8 @@ def __init__(
na_values: Union[str, list, dict] = None,
input_format: str = "dense",
tag_with_value: bool = False,
tag_value_delimiter: str = ":"
tag_value_delimiter: str = ":",
block_row_size: int = None
):
self._sample_id_name = sample_id_name
self._match_id_name = match_id_name
Expand All @@ -60,21 +62,25 @@ def __init__(
self._input_format = input_format
self._tag_with_value = tag_with_value
self._tag_value_delimiter = tag_value_delimiter
self._block_row_size = block_row_size if block_row_size is not None else DATAFRAME_BLOCK_ROW_SIZE

self.check_params()

def check_params(self):
if not self._sample_id_name:
raise ValueError("Please provide sample_id_name")

if not isinstance(self._block_row_size, int) or self._block_row_size < 0:
raise ValueError("block_row_size should be positive integer")

def to_frame(self, ctx, table):
if self._input_format != "dense":
raise ValueError("Only support dense input format in this version.")

return self._dense_format_to_frame(ctx, table)

def _dense_format_to_frame(self, ctx, table):
data_manager = DataManager()
data_manager = DataManager(block_row_size=self._block_row_size)
columns = self._header.split(self._delimiter, -1)
columns.remove(self._sample_id_name)
retrieval_index_dict = data_manager.init_from_local_file(
Expand All @@ -84,7 +90,7 @@ def _dense_format_to_frame(self, ctx, table):
dtype=self._dtype, default_type=types.DEFAULT_DATA_TYPE)

from .ops._indexer import get_partition_order_by_raw_table
partition_order_mappings = get_partition_order_by_raw_table(table)
partition_order_mappings = get_partition_order_by_raw_table(table, data_manager.block_row_size)
# partition_order_mappings = _get_partition_order(table)
table = table.mapValues(lambda value: value.split(self._delimiter, -1))
to_block_func = functools.partial(_to_blocks,
Expand Down Expand Up @@ -129,7 +135,8 @@ def __init__(
weight_type: str = "float32",
dtype: str = "float32",
na_values: Union[None, str, list, dict] = None,
partition: int = 4
partition: int = 4,
block_row_size: int = None
):
self._sample_id_name = sample_id_name
self._match_id_list = match_id_list
Expand All @@ -142,6 +149,7 @@ def __init__(
self._dtype = dtype
self._na_values = na_values
self._partition = partition
self._block_row_size = block_row_size if block_row_size is not None else DATAFRAME_BLOCK_ROW_SIZE

def to_frame(self, ctx, path):
# TODO: use table put data instead of read all data
Expand All @@ -156,6 +164,7 @@ def to_frame(self, ctx, path):
weight_name=self._weight_name,
dtype=self._dtype,
partition=self._partition,
block_row_size=self._block_row_size
).to_frame(ctx, df)


Expand Down Expand Up @@ -194,6 +203,7 @@ def __init__(
weight_type: str = "float32",
dtype: str = "float32",
partition: int = 4,
block_row_size: int = None,
):
self._sample_id_name = sample_id_name
self._match_id_list = match_id_list
Expand All @@ -204,6 +214,7 @@ def __init__(
self._weight_type = weight_type
self._dtype = dtype
self._partition = partition
self._block_row_size = block_row_size if block_row_size is not None else DATAFRAME_BLOCK_ROW_SIZE

if self._sample_id_name and not self._match_id_name:
raise ValueError(f"As sample_id {self._sample_id_name} is given, match_id should be given too")
Expand All @@ -215,7 +226,7 @@ def to_frame(self, ctx, df: "pd.DataFrame"):
else:
df = df.set_index(self._sample_id_name)

data_manager = DataManager()
data_manager = DataManager(block_row_size=self._block_row_size)
retrieval_index_dict = data_manager.init_from_local_file(
sample_id_name=self._sample_id_name, columns=df.columns.tolist(), match_id_list=self._match_id_list,
match_id_name=self._match_id_name, label_name=self._label_name, weight_name=self._weight_name,
Expand Down Expand Up @@ -260,11 +271,11 @@ def _to_blocks(kvs,
"""
sample_id/match_id,label(maybe missing),weight(maybe missing),X
"""
partition_id = None
block_id = None

schema = data_manager.schema

splits = [[] for idx in range(data_manager.block_num)]
splits = [[] for _ in range(data_manager.block_num)]
sample_id_block = data_manager.loc_block(schema.sample_id_name, with_offset=False) if schema.sample_id_name else None

match_id_block = data_manager.loc_block(schema.match_id_name, with_offset=False)if schema.match_id_name else None
Expand All @@ -287,9 +298,13 @@ def _to_blocks(kvs,

column_blocks_mapping[bid].append(col_id)

block_row_size = data_manager.block_row_size

lid = 0
for key, value in kvs:
if partition_id is None:
partition_id = partition_order_mappings[key]["block_id"]
if block_id is None:
block_id = partition_order_mappings[key]["start_block_id"]
lid += 1

# columns = value.split(",", -1)
splits[sample_id_block].append(key)
Expand All @@ -303,6 +318,12 @@ def _to_blocks(kvs,
for bid, col_id_list in column_blocks_mapping.items():
splits[bid].append([value[col_id] for col_id in col_id_list])

converted_blocks = data_manager.convert_to_blocks(splits)
if lid % block_row_size == 0:
converted_blocks = data_manager.convert_to_blocks(splits)
yield block_id, converted_blocks
block_id += 1
splits = [[] for _ in range(data_manager.block_num)]

return [(partition_id, converted_blocks)]
if lid % block_row_size:
converted_blocks = data_manager.convert_to_blocks(splits)
yield block_id, converted_blocks
Empty file.
17 changes: 17 additions & 0 deletions python/fate/arch/dataframe/conf/default_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Copyright 2019 The FATE Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
DATAFRAME_BLOCK_ROW_SIZE = 2**6

13 changes: 12 additions & 1 deletion python/fate/arch/dataframe/manager/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@
from .block_manager import BlockType
from ..entity import types
from typing import Union, List, Tuple
from ..conf.default_config import DATAFRAME_BLOCK_ROW_SIZE


class DataManager(object):
def __init__(self, schema_manager: SchemaManager = None, block_manager: BlockManager = None):
def __init__(
self,
schema_manager: SchemaManager = None,
block_manager: BlockManager = None,
block_row_size: int = DATAFRAME_BLOCK_ROW_SIZE
):
self._schema_manager = schema_manager
self._block_manager = block_manager
self._block_row_size = block_row_size

@property
def blocks(self):
Expand All @@ -34,6 +41,10 @@ def blocks(self):
def block_num(self):
return len(self._block_manager.blocks)

@property
def block_row_size(self):
return self._block_row_size

@property
def schema(self):
return self._schema_manager.schema
Expand Down
26 changes: 16 additions & 10 deletions python/fate/arch/dataframe/ops/_dimension_scaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def _align_blocks(blocks, align_fields_loc=None, full_block_migrate_set=None, ds
r_flatten = r_block_table.mapPartitions(r_flatten_func, use_previous_behavior=False)
l_flatten = l_flatten.union(r_flatten)

partition_order_mappings = get_partition_order_by_raw_table(l_flatten)
partition_order_mappings = get_partition_order_by_raw_table(l_flatten, data_manager.block_row_size)
_convert_to_block_func = functools.partial(to_blocks, dm=data_manager, partition_mappings=partition_order_mappings)
block_table = l_flatten.mapPartitions(_convert_to_block_func, use_previous_behavior=False)
block_table, data_manager = compress_blocks(block_table, data_manager)
Expand Down Expand Up @@ -187,7 +187,9 @@ def drop(df: "DataFrame", index: "DataFrame" = None) -> "DataFrame":
r_flatten_table = index.block_table.mapPartitions(r_flatten_func, use_previous_behavior=False)

drop_flatten = l_flatten_table.subtractByKey(r_flatten_table)
partition_order_mappings = get_partition_order_by_raw_table(drop_flatten) if drop_flatten.count() else dict()
partition_order_mappings = get_partition_order_by_raw_table(
drop_flatten, data_manager.block_row_size
) if drop_flatten.count() else dict()

_convert_to_block_func = functools.partial(to_blocks,
dm=data_manager,
Expand Down Expand Up @@ -254,7 +256,7 @@ def _retrieval(blocks, t: torch.Tensor):
if retrieval_raw_table.count() == 0:
return df.empty_frame()

partition_order_mappings = get_partition_order_by_raw_table(retrieval_raw_table)
partition_order_mappings = get_partition_order_by_raw_table(retrieval_raw_table, df.data_manager.block_row_size)
to_blocks_func = functools.partial(to_blocks, dm=df.data_manager, partition_mappings=partition_order_mappings)

block_table = retrieval_raw_table.mapPartitions(to_blocks_func, use_previous_behavior=False)
Expand Down Expand Up @@ -286,16 +288,20 @@ def _flatten_partition(kvs, block_num=0):


def to_blocks(kvs, dm: DataManager = None, partition_mappings: dict = None):
ret_blocks = [[] for i in range(dm.block_num)]
ret_blocks = [[] for _ in range(dm.block_num)]

partition_id = None
for sample_id, value in kvs:
if partition_id is None:
partition_id = partition_mappings[sample_id]["block_id"]
block_id = None
for lid, (sample_id, value) in enumerate(kvs):
if block_id is None:
block_id = partition_mappings[sample_id]["start_block_id"]
ret_blocks[0].append(sample_id)
for bid, buf in enumerate(value):
ret_blocks[bid + 1].append(buf)

ret_blocks = dm.convert_to_blocks(ret_blocks)
if (lid + 1) % dm.block_row_size == 0:
yield block_id, dm.convert_to_blocks(ret_blocks)
ret_blocks = [[] for i in range(dm.block_num)]
block_id += 1

return [(partition_id, ret_blocks)]
if ret_blocks[0]:
yield block_id, dm.convert_to_blocks(ret_blocks)
Loading