Skip to content

Commit

Permalink
dataframe: op optimize
Browse files Browse the repository at this point in the history
Signed-off-by: mgqa34 <mgq3374541@163.com>
  • Loading branch information
mgqa34 committed Aug 18, 2023
1 parent 504d537 commit 306f086
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 133 deletions.
2 changes: 1 addition & 1 deletion python/fate/arch/dataframe/_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def as_pd_df(self) -> "pd.DataFrame":

return transform_to_pandas_dataframe(self._block_table, self._data_manager)

def apply_row(self, func, columns=None, with_label=False, with_weight=False, enable_type_align_checking=True):
def apply_row(self, func, columns=None, with_label=False, with_weight=False, enable_type_align_checking=False):
from .ops._apply_row import apply_row

return apply_row(
Expand Down
7 changes: 7 additions & 0 deletions python/fate/arch/dataframe/manager/block_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,13 @@ def retrieval_row(cls, block, indexes):
else:
return block[indexes]

@classmethod
def transform_block_to_list(cls, block):
if isinstance(block, FixedpointPaillierVector):
return [block.slice_indexes([i]) for i in range(len(block))]
else:
return block.tolist()

@classmethod
def transform_row_to_raw(cls, block, index):
if isinstance(block, pd.Index):
Expand Down
24 changes: 9 additions & 15 deletions python/fate/arch/dataframe/ops/_apply_row.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@
# limitations under the License.
import functools
import pandas as pd
import torch

from collections import Iterable

from .._dataframe import DataFrame
from ..manager.block_manager import BlockType
from ..manager.block_manager import Block, BlockType
from ..manager.data_manager import DataManager
from ..utils._auto_column_name_generated import generated_default_column_names

Expand All @@ -32,10 +31,10 @@ def apply_row(df: "DataFrame", func,
"""
data_manager = df.data_manager
dst_data_manager, _ = data_manager.derive_new_data_manager(with_sample_id=True,
with_match_id=True,
with_label=not with_weight,
with_weight=not with_weight,
columns=None)
with_match_id=True,
with_label=not with_weight,
with_weight=not with_weight,
columns=None)

non_operable_field_names = dst_data_manager.get_field_name_list()
non_operable_blocks = [data_manager.loc_block(field_name,
Expand Down Expand Up @@ -75,14 +74,9 @@ def _apply(blocks, func=None, src_field_names=None,
ret_column_len = len(ret_columns) if ret_columns is not None else None
block_types = []

flat_blocks = [Block.transform_block_to_list(block) for block in blocks]
for lid in range(lines):
apply_row_data = []
for bid, offset in src_fields_loc:
if isinstance(blocks[bid], torch.Tensor):
apply_row_data.append(blocks[bid][lid][offset].item())
else:
apply_row_data.append(blocks[bid][lid][offset])

apply_row_data = [flat_blocks[bid][lid][offset] for bid, offset in src_fields_loc]
apply_ret = func(pd.Series(apply_row_data, index=src_field_names))

if isinstance(apply_ret, Iterable):
Expand All @@ -101,7 +95,7 @@ def _apply(blocks, func=None, src_field_names=None,

if not block_types:
block_types = [BlockType.get_block_type(value) for value in apply_ret]
apply_blocks = [[] for idx in range(ret_column_len)]
apply_blocks = [[] for _ in range(ret_column_len)]

for idx, value in enumerate(apply_ret):
apply_blocks[idx].append([value])
Expand All @@ -119,7 +113,7 @@ def _apply(blocks, func=None, src_field_names=None,
ret_columns, block_types
)

ret_blocks = [[] for idx in range(len(src_non_operable_blocks) + ret_column_len)]
ret_blocks = [[] for _ in range(len(src_non_operable_blocks) + ret_column_len)]
for idx, bid in enumerate(src_non_operable_blocks):
ret_blocks[idx] = blocks[bid]

Expand Down
24 changes: 7 additions & 17 deletions python/fate/arch/dataframe/ops/_dimension_scaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
import copy
import functools
from typing import List
import pandas as pd
import torch
from sklearn.utils import resample
from .._dataframe import DataFrame
from ..manager.data_manager import DataManager
from ..manager.block_manager import Block
from ._compress_block import compress_blocks
from ._indexer import get_partition_order_by_raw_table
from ._promote_types import promote_partial_block_types
Expand Down Expand Up @@ -270,21 +270,11 @@ def _retrieval(blocks, t: torch.Tensor):


def _flatten_partition(kvs, block_num=0):
_flattens = []
for partition_id, blocks in kvs:
lines = len(blocks[0])
for idx in range(lines):
sample_id = blocks[0][idx]
row = []
for bid in range(1, block_num):
if isinstance(blocks[bid], pd.Index):
row.append(blocks[bid][idx])
else:
row.append(blocks[bid][idx].tolist())

_flattens.append((sample_id, row))

return _flattens
for block_id, blocks in kvs:
flat_blocks = [Block.transform_block_to_list(block) for block in blocks]
lines = len(flat_blocks[0])
for i in range(lines):
yield flat_blocks[0][i], [flat_blocks[j][i] for j in range(1, block_num)]


def to_blocks(kvs, dm: DataManager = None, partition_mappings: dict = None):
Expand All @@ -300,7 +290,7 @@ def to_blocks(kvs, dm: DataManager = None, partition_mappings: dict = None):

if (lid + 1) % dm.block_row_size == 0:
yield block_id, dm.convert_to_blocks(ret_blocks)
ret_blocks = [[] for i in range(dm.block_num)]
ret_blocks = [[] for _ in range(dm.block_num)]
block_id += 1

if ret_blocks[0]:
Expand Down
9 changes: 1 addition & 8 deletions python/fate/arch/dataframe/ops/_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,7 @@ def bucketize(df: DataFrame, boundaries: Union[pd.DataFrame, dict]):
def _mapper(
blocks, boundaries_list: list = None, narrow_loc: list = None, dst_bids: list = None, dm: DataManager = None
):
ret_blocks = []
for block in blocks:
if isinstance(block, torch.Tensor):
ret_blocks.append(block.clone())
elif isinstance(block, np.ndarray):
ret_blocks.append(block.copy())
else:
ret_blocks.append(block)
ret_blocks = [block for block in blocks]

for i in range(len(ret_blocks), dm.block_num):
ret_blocks.append([])
Expand Down
9 changes: 1 addition & 8 deletions python/fate/arch/dataframe/ops/_field_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pandas as pd
from .._dataframe import DataFrame


Expand All @@ -27,13 +26,7 @@ def _extract_columns(src_blocks):
for src_block_id, dst_block_id, is_changed, block_column_indexes in blocks_loc:
block = src_blocks[src_block_id]
if is_changed:
"""
multiple columns, maybe pandas or fate.arch.tensor object
"""
if isinstance(block, pd.DataFrame):
extract_blocks[dst_block_id] = block.iloc[:, block_column_indexes]
else:
extract_blocks[dst_block_id] = block[:, block_column_indexes]
extract_blocks[dst_block_id] = block[:, block_column_indexes]
else:
extract_blocks[dst_block_id] = block

Expand Down
103 changes: 31 additions & 72 deletions python/fate/arch/dataframe/ops/_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
# limitations under the License.
#
import functools
import pandas as pd

from ..manager import Block, BlockType, DataManager
from ..manager import Block, DataManager
from .._dataframe import DataFrame


Expand All @@ -28,17 +27,24 @@ def aggregate_indexer(indexer):
agg_indexer: key=old_block_id, value=(old_row_id, (new_block_id, new_row_id))
"""

def _aggregate(kvs):
aggregate_ret = dict()
flat_values = []
for k, values in kvs:
old_msg, new_msg = values
if old_msg[0] not in aggregate_ret:
aggregate_ret[old_msg[0]] = []

aggregate_ret[old_msg[0]].append([old_msg[1], new_msg])
flat_values.append((old_msg[0], [old_msg[1], new_msg]))

flat_values.sort()
i = 0
l = len(flat_values)
while i < l:
j = i
while j < l and flat_values[i][0] == flat_values[j][0]:
j += 1

return list(aggregate_ret.items())
agg_ret = [flat_values[k][1] for k in range(i, j)]
yield flat_values[i][0], agg_ret
i = j

agg_indexer = indexer.mapReducePartitions(_aggregate, lambda l1, l2: l1 + l2)

Expand All @@ -47,13 +53,9 @@ def _aggregate(kvs):

def transform_to_table(block_table, block_index, partition_order_mappings):
def _convert_to_order_index(kvs):
order_indexes = []

for block_id, blocks in kvs:
for _idx, _id in enumerate(blocks[block_index]):
order_indexes.append((_id, (block_id, _idx)))

return order_indexes
yield _id, (block_id, _idx)

return block_table.mapPartitions(_convert_to_order_index,
use_previous_behavior=False)
Expand Down Expand Up @@ -197,25 +199,26 @@ def loc(df: DataFrame, indexer, target, preserve_order=False):
agg_indexer = aggregate_indexer(indexer)

if not preserve_order:

def _convert_block(blocks, retrieval_indexes):
row_indexes = [retrieval_index[0] for retrieval_index in retrieval_indexes]
return [Block.retrieval_row(block, row_indexes) for block in blocks]

block_table = df.block_table.join(agg_indexer, _convert_block)
else:
def _convert_to_block(kvs):
def _convert_to_row(kvs):
ret_dict = {}
for block_id, (blocks, block_indexer) in kvs:
"""
block_indexer: row_id, (new_block_id, new_row_id)
"""
flat_blocks = [Block.transform_block_to_list(block) for block in blocks]
block_num = len(flat_blocks)
for src_row_id, (dst_block_id, dst_row_id) in block_indexer:
if dst_block_id not in ret_dict:
ret_dict[dst_block_id] = []

ret_dict[dst_block_id].append(
(dst_row_id, [Block.transform_row_to_raw(block, src_row_id) for block in blocks])
(dst_row_id, [flat_blocks[i][src_row_id] for i in range(block_num)])
)

for dst_block_id, value_list in ret_dict.items():
Expand All @@ -224,7 +227,7 @@ def _convert_to_block(kvs):
from ._transformer import transform_list_block_to_frame_block

block_table = df.block_table.join(agg_indexer, lambda lhs, rhs: (lhs, rhs))
block_table = block_table.mapReducePartitions(_convert_to_block, _merge_list)
block_table = block_table.mapReducePartitions(_convert_to_row, _merge_list)
block_table = block_table.mapValues(lambda values: [v[1] for v in values])
block_table = transform_list_block_to_frame_block(block_table, df.data_manager)

Expand All @@ -247,30 +250,19 @@ def flatten_data(df: DataFrame, key_type="block_id", with_sample_id=True):

def _flatten_with_block_id_key(kvs):
for block_id, blocks in kvs:
flat_blocks = [Block.transform_block_to_list(block) for block in blocks]
block_num = len(flat_blocks)
for row_id in range(len(blocks[0])):
if with_sample_id:
yield (block_id, row_id), (
blocks[sample_id_index][row_id],
[Block.transform_row_to_raw(block, row_id) for block in blocks]
flat_blocks[sample_id_index][row_id],
[flat_blocks[i][row_id] for i in range(block_num)]
)
else:
yield (block_id, row_id), [Block.transform_row_to_raw(block, row_id) for block in blocks]

"""
def _flatten_with_block_id_key(block_id, blocks):
for row_id in range(len(blocks[0])):
if with_sample_id:
yield (block_id, row_id), (
blocks[sample_id_index][row_id],
[Block.transform_row_to_raw(block, row_id) for block in blocks]
)
else:
yield (block_id, row_id), [Block.transform_row_to_raw(block, row_id) for block in blocks]
"""
yield (block_id, row_id), [flat_blocks[i][row_id] for i in range(block_num)]

if key_type == "block_id":
return df.block_table.mapPartitions(_flatten_with_block_id_key, use_previous_behavior=False)
# return df.block_table.flatMap(_flatten_with_block_id_key)
else:
raise ValueError(f"Not Implement key_type={key_type} of flatten_data.")

Expand Down Expand Up @@ -343,59 +335,32 @@ def _aggregate(kvs):
offset = 0
bid += 1

flat_ret.sort(key=lambda value: value[0])
flat_ret.sort()
i = 0
l = len(flat_ret)
while i < l:
j = i
while j < l and flat_ret[i][0] == flat_ret[j][0]:
j += 1

agg_ret = [(flat_ret[k][1], flat_ret[k][2], flat_ret[k][3], flat_ret[k][4])
for k in range(i, j)]
agg_ret = [flat_ret[k][1:] for k in range(i, j)]
yield flat_ret[i][0], agg_ret

i = j
"""
aggregate_ret = dict()
offset = 0
bid = None
for k, values in kvs:
sample_id, (src_block_id, src_offset) = values
if bid is None:
bid = partition_order_mappings[sample_id]["start_block_id"]
if src_block_id not in aggregate_ret:
aggregate_ret[src_block_id] = []
aggregate_ret[src_block_id].append((src_offset, sample_id, bid, offset))
offset += 1
if offset == data_manager.block_row_size:
bid += 1
offset = 0

return list(aggregate_ret.items())
"""

sample_id_index = data_manager.loc_block(data_manager.schema.sample_id_name, with_offset=False)
block_num = data_manager.block_num

def _convert_to_row(kvs):
ret_dict = {}
for block_id, (blocks, block_indexer) in kvs:
flat_blocks = [Block.transform_block_to_list(block) for block in blocks]
for src_row_id, sample_id, dst_block_id, dst_row_id in block_indexer:
if dst_block_id not in ret_dict:
ret_dict[dst_block_id] = []

row_data = []
for i in range(block_num):
if i == sample_id_index:
row_data.append(sample_id)
elif data_manager.blocks[i].block_type == BlockType.index:
row_data.append(blocks[i][src_row_id])
else:
row_data.append(blocks[i][src_row_id].tolist())
row_data = [flat_blocks[i][src_row_id] for i in range(block_num)]
row_data[sample_id_index] = sample_id

ret_dict[dst_block_id].append(
(dst_row_id, row_data)
Expand All @@ -416,13 +381,7 @@ def _convert_to_frame_block(blocks):
block_table = df.block_table.join(agg_indexer, lambda v1, v2: (v1, v2))
block_table = block_table.mapReducePartitions(_convert_to_row, _merge_list)
block_table = block_table.mapValues(_convert_to_frame_block)
"""
block_table = block_table.mapValues(lambda values: [v[1] for v in values])
from ._transformer import transform_list_block_to_frame_block
block_table = transform_list_block_to_frame_block(block_table, df.data_manager)
"""


return DataFrame(
ctx=df._ctx,
block_table=block_table,
Expand Down
Loading

0 comments on commit 306f086

Please sign in to comment.