Skip to content

Commit

Permalink
dataframe support setitem of pailler tensor, add psi
Browse files Browse the repository at this point in the history
Signed-off-by: mgqa34 <mgq3374541@163.com>
Signed-off-by: weiwee <wbwmat@gmail.com>
  • Loading branch information
mgqa34 authored and sagewe committed Aug 9, 2023
1 parent 35c7d9d commit 2ff5aaf
Show file tree
Hide file tree
Showing 16 changed files with 616 additions and 246 deletions.
106 changes: 20 additions & 86 deletions python/fate/arch/dataframe/_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import pandas as pd

from .manager import DataManager, Schema
from .ops import aggregate_indexer, get_partition_order_mappings
from fate.arch.tensor import DTensor


Expand Down Expand Up @@ -153,7 +152,8 @@ def as_tensor(self, dtype=None):
"""
from .ops._transformer import transform_to_tensor

return transform_to_tensor(self._block_table, self._data_manager, dtype)
return transform_to_tensor(self._block_table, self._data_manager,
dtype, partition_order_mappings=self.partition_order_mappings)

def as_pd_df(self) -> "pd.DataFrame":
from .ops._transformer import transform_to_pandas_dataframe
Expand Down Expand Up @@ -376,11 +376,13 @@ def __cmp_operate(self, op, other) -> "DataFrame":

return cmp_operate(self, other, op)

"""
def __getattr__(self, attr):
if attr not in self._data_manager.schema.columns:
raise ValueError(f"DataFrame does not has attribute {attr}")
return self.__getitem__(attr)
"""

def __setattr__(self, key, value):
property_attr_mapping = dict(block_table="_block_table", data_manager="_data_manager")
Expand Down Expand Up @@ -497,92 +499,24 @@ def get_indexer(self, target):
return indexer

def loc(self, indexer, target="sample_id", preserve_order=False):
self_indexer = self.get_indexer(target)
if preserve_order:
indexer = self_indexer.join(indexer, lambda lhs, rhs: (lhs, rhs))
else:
indexer = self_indexer.join(indexer, lambda lhs, rhs: (lhs, lhs))

if indexer.count() == 0:
return self.empty_frame()

agg_indexer = aggregate_indexer(indexer)

if not preserve_order:

def _convert_block(blocks, retrieval_indexes):
row_indexes = [retrieval_index[0] for retrieval_index in retrieval_indexes]
return [block[row_indexes] for block in blocks]
from .ops._indexer import loc
return loc(self, indexer, target=target, preserve_order=preserve_order)

block_table = self._block_table.join(agg_indexer, _convert_block)
else:
def iloc(self, indexes, return_new_indexer=False):
"""
indexes: table, row: (key=random_key, value=[(partition_id, offset)])
"""
from .ops._indexer import iloc
return iloc(self, indexes, return_new_indexer)

def _convert_to_block(kvs):
ret_dict = {}
for block_id, (blocks, block_indexer) in kvs:
"""
block_indexer: row_id, (new_block_id, new_row_id)
"""
for src_row_id, (dst_block_id, dst_row_id) in block_indexer:
if dst_block_id not in ret_dict:
ret_dict[dst_block_id] = []

ret_dict[dst_block_id].append(
(dst_row_id,
[
block[src_row_id] if isinstance(block, pd.Index) else block[src_row_id].tolist()
for block in blocks
]
)
)

for dst_block_id, value_list in ret_dict.items():
yield dst_block_id, sorted(value_list)

def _merge_list(lhs, rhs):
if not lhs:
return rhs
if not rhs:
return lhs

l_len = len(lhs)
r_len = len(rhs)
ret = [[] for i in range(l_len + r_len)]
i, j, k = 0, 0, 0
while i < l_len and j < r_len:
if lhs[i][0] < rhs[j][0]:
ret[k] = lhs[i]
i += 1
else:
ret[k] = rhs[j]
j += 1

k += 1

while i < l_len:
ret[k] = lhs[i]
i += 1
k += 1

while j < r_len:
ret[k] = rhs[j]
j += 1
k += 1

return ret

from .ops._transformer import transform_list_block_to_frame_block

block_table = self._block_table.join(agg_indexer, lambda lhs, rhs: (lhs, rhs))
block_table = block_table.mapReducePartitions(_convert_to_block, _merge_list)
block_table = block_table.mapValues(lambda values: [v[1] for v in values])
block_table = transform_list_block_to_frame_block(block_table, self._data_manager)

partition_order_mappings = get_partition_order_mappings(block_table)
return DataFrame(self._ctx, block_table, partition_order_mappings, self._data_manager.duplicate())

def iloc(self, indexes):
...
def loc_with_sample_id_replacement(self, indexer):
"""
indexer: table,
row: (key=random_key,
value=((src_partition_id, src_offset), [(sample_id, dst_partition_id, dst_offset) ...])
"""
from .ops._indexer import loc_with_sample_id_replacement
return loc_with_sample_id_replacement(self, indexer)

def copy(self) -> "DataFrame":
return DataFrame(
Expand Down
2 changes: 1 addition & 1 deletion python/fate/arch/dataframe/manager/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .schema_manager import Schema
from .data_manager import DataManager
from .block_manager import BlockType
from .block_manager import BlockType, Block
82 changes: 79 additions & 3 deletions python/fate/arch/dataframe/manager/block_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from enum import Enum
from typing import Union, Tuple, List, Dict
from .schema_manager import SchemaManager
from fate.arch.tensor.phe._tensor import PHETensor
from fate_utils.paillier import FixedpointPaillierVector


class BlockType(str, Enum):
Expand All @@ -32,6 +34,7 @@ class BlockType(str, Enum):
float64 = "float64"
bool = "bool"
index = "index"
phe_tensor = "phe_tensor"
np_object = "np_object"

@staticmethod
Expand All @@ -58,13 +61,16 @@ def __lt__(self, other):
return True

if self == BlockType.int32:
return other not in [BlockType.bool, BlockType.int32]
return other not in [BlockType.bool, BlockType.int32, BlockType]

if self == BlockType.int64:
return other not in [BlockType.bool, BlockType.int32, BlockType.int64]

if self == BlockType.float32:
return other == BlockType.float64
return other in [BlockType.float64, BlockType.phe_tensor, BlockType.np_object]

if self == BlockType.float64:
return other in [BlockType.phe_tensor, BlockType.np_object]

return False

Expand All @@ -76,6 +82,8 @@ def __gt__(self, other):

@staticmethod
def get_block_type(data_type):
if isinstance(data_type, PHETensor) or type(data_type) == PHETensor:
return BlockType.phe_tensor
if hasattr(data_type, "dtype"):
data_type = data_type.dtype
if hasattr(data_type, "name"):
Expand Down Expand Up @@ -163,7 +171,8 @@ def derive_block(self, field_indexes) -> Tuple["Block", bool, list]:
src_field_indexes.append(src_field_index)
dst_field_indexes.append(dst_field_index)

new_block = type(self)(dst_field_indexes)
new_block = copy.deepcopy(self)
# new_block = type(self)(dst_field_indexes)
new_block.should_compress = self._should_compress

# TODO: can be optimize as sub_field_indexes is ordered, but this is not a bottle neck
Expand All @@ -186,6 +195,9 @@ def is_numeric(self):
BlockType.float32, BlockType.float64
}

def is_phe_tensor(self):
return self._block_type == BlockType.phe_tensor

def to_dict(self):
return dict(
block_type= json.dumps(self._block_type),
Expand Down Expand Up @@ -218,6 +230,8 @@ def get_block_by_type(block_type):
return BoolBlock
elif block_type == block_type.index:
return IndexBlock
elif block_type == block_type.phe_tensor:
return PHETensorBlock
else:
return NPObjectBlock

Expand All @@ -234,6 +248,22 @@ def convert_block_type(self, block_type):

return converted_block

@classmethod
def retrieval_row(cls, block, indexes):
if isinstance(block, FixedpointPaillierVector):
return block.slice_indexes(indexes)
else:
return block[indexes]

@classmethod
def transform_row_to_raw(cls, block, index):
if isinstance(block, pd.Index):
return block[index]
elif isinstance(block, FixedpointPaillierVector):
return block.slice_indexes([index])
else:
return block[index].tolist()


class Int32Block(Block):
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -310,6 +340,52 @@ def convert_block(block):
return pd.Index(block, dtype=str)


class PHETensorBlock(Block):
def __init__(self, *args, **kwargs):
kwargs["should_compress"] = False

super(PHETensorBlock, self).__init__(*args, **kwargs)
self._block_type = BlockType.phe_tensor
self._pk = None
self._evaluator = None
self._coder = None
self._dtype = None
self._device = None

def set_extra_kwargs(self, pk, evaluator, coder, dtype, device):
self._pk = pk
self._evaluator = evaluator
self._coder = coder
self._dtype = dtype
self._device = device

@staticmethod
def convert_block(block):
return block

def convert_to_phe_tensor(self, block, shape):
if isinstance(block, PHETensor):
return block

if isinstance(block, list):
return block[0].cat(block[1:])

return PHETensor(pk=self._pk,
evaluator=self._evaluator,
coder=self._coder,
shape=shape,
data=block,
dtype=self._dtype)

@property
def device(self):
return self._device

@property
def dtype(self):
return self._dtype


class NPObjectBlock(Block):
def __init__(self, *args, **kwargs):
super(NPObjectBlock, self).__init__(*args, **kwargs)
Expand Down
7 changes: 5 additions & 2 deletions python/fate/arch/dataframe/manager/schema_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,11 @@ def init_field_types(self, label_type="float32", weight_type="float32", dtype="f
self._type_mapping[column] = dtype.get(column, default_type)

def init_name_mapping(self):
offset = 1
self._name_offset_mapping[self._schema.sample_id_name] = 0
offset = 0

if self._schema.sample_id_name:
offset = 1
self._name_offset_mapping[self._schema.sample_id_name] = 0

if self._schema.match_id_name:
self._name_offset_mapping[self._schema.match_id_name] = offset
Expand Down
10 changes: 0 additions & 10 deletions python/fate/arch/dataframe/ops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._indexer import (
aggregate_indexer,
get_partition_order_mappings
)


__all__ = [
"aggregate_indexer",
"get_partition_order_mappings"
]
Loading

0 comments on commit 2ff5aaf

Please sign in to comment.