Skip to content

Commit

Permalink
refact: format and makes import from fate.arch rel
Browse files Browse the repository at this point in the history
Signed-off-by: weiwee <wbwmat@gmail.com>
  • Loading branch information
sagewe committed Sep 22, 2022
1 parent 644c8ae commit 433ef47
Show file tree
Hide file tree
Showing 113 changed files with 3,555 additions and 2,226 deletions.
23 changes: 13 additions & 10 deletions python/fate/arch/_standalone.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@
import lmdb
import numpy as np

from fate_arch.common import Party, file_utils
from fate_arch.common.log import getLogger
from fate_arch.federation import FederationDataType

from .common import Party, file_utils
from .common.log import getLogger
from .federation import FederationDataType

LOGGER = getLogger()

Expand Down Expand Up @@ -341,9 +340,7 @@ def get(self, k):
with self._get_env_for_partition(p) as env:
with env.begin(write=True) as txn:
old_value_bytes = txn.get(k_bytes)
return (
None if old_value_bytes is None else deserialize(old_value_bytes)
)
return None if old_value_bytes is None else deserialize(old_value_bytes)

def delete(self, k):
k_bytes = _k_to_bytes(k=k)
Expand Down Expand Up @@ -487,7 +484,10 @@ def _get_splits(obj, max_message_size):
return obj, num_slice
else:
_max_size = max_message_size
kv = [(i, obj_bytes[slice(i * _max_size, (i + 1) * _max_size)]) for i in range(num_slice)]
kv = [
(i, obj_bytes[slice(i * _max_size, (i + 1) * _max_size)])
for i in range(num_slice)
]
return kv, num_slice


Expand Down Expand Up @@ -667,10 +667,13 @@ def get(self, name: str, tag: str, parties: typing.List[Party]) -> typing.List:
dtype = r[2]
LOGGER.debug(
f"[{log_str}] got "
f"Table(namespace={table.namespace}, name={table.name}, partitions={table.partitions}), dtype={dtype}")
f"Table(namespace={table.namespace}, name={table.name}, partitions={table.partitions}), dtype={dtype}"
)

if dtype == FederationDataType.SPLIT_OBJECT:
obj_bytes = b''.join(map(lambda t: t[1], sorted(table.collect(), key=lambda x: x[0])))
obj_bytes = b"".join(
map(lambda t: t[1], sorted(table.collect(), key=lambda x: x[0]))
)
obj = deserialize(obj_bytes)
rtn.append(obj)
else:
Expand Down
13 changes: 6 additions & 7 deletions python/fate/arch/abc/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@

from fate_arch.abc._gc import GarbageCollectionABC
from fate_arch.abc._address import AddressABC
from fate_arch.abc._computing import CTableABC, CSessionABC
from fate_arch.abc._storage import StorageTableABC, StorageSessionABC, StorageTableMetaABC
from fate_arch.abc._federation import FederationABC
from fate_arch.abc._components import Components, ComponentMeta
from ._address import AddressABC
from ._components import ComponentMeta, Components
from ._computing import CSessionABC, CTableABC
from ._federation import FederationABC
from ._gc import GarbageCollectionABC
from ._storage import StorageSessionABC, StorageTableABC, StorageTableMetaABC
52 changes: 32 additions & 20 deletions python/fate/arch/abc/_computing.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
from abc import ABCMeta
from collections import Iterable

from fate_arch.abc._address import AddressABC
from fate_arch.abc._path import PathABC
from ._address import AddressABC
from ._path import PathABC

__all__ = ["CTableABC", "CSessionABC"]

Expand Down Expand Up @@ -148,7 +148,7 @@ def count(self) -> int:
...

@abc.abstractmethod
def map(self, func) -> 'CTableABC':
def map(self, func) -> "CTableABC":
"""
apply `func` to each data
Expand All @@ -164,7 +164,7 @@ def map(self, func) -> 'CTableABC':
Examples
--------
>>> from fate_arch.session import computing_session
>>> from fate.arch.session import computing_session
>>> a = computing_session.parallelize([('k1', 1), ('k2', 2), ('k3', 3)], include_key=True, partition=2)
>>> b = a.map(lambda k, v: (k, v**2))
>>> list(b.collect())
Expand All @@ -189,7 +189,7 @@ def mapValues(self, func):
Examples
--------
>>> from fate_arch.session import computing_session
>>> from fate.arch.session import computing_session
>>> a = computing_session.parallelize([('a', ['apple', 'banana', 'lemon']), ('b', ['grapes'])], include_key=True, partition=2)
>>> b = a.mapValues(lambda x: len(x))
>>> list(b.collect())
Expand All @@ -198,7 +198,9 @@ def mapValues(self, func):
...

@abc.abstractmethod
def mapPartitions(self, func, use_previous_behavior=True, preserves_partitioning=False):
def mapPartitions(
self, func, use_previous_behavior=True, preserves_partitioning=False
):
"""
apply ``func`` to each partition of table
Expand All @@ -218,7 +220,7 @@ def mapPartitions(self, func, use_previous_behavior=True, preserves_partitioning
Examples
--------
>>> from fate_arch.session import computing_session
>>> from fate.arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3, 4, 5], include_key=False, partition=2)
>>> def f(iterator):
... s = 0
Expand Down Expand Up @@ -251,7 +253,7 @@ def mapReducePartitions(self, mapper, reducer, **kwargs):
Examples
--------
>>> from fate_arch.session import computing_session
>>> from fate.arch.session import computing_session
>>> table = computing_session.parallelize([(1, 2), (2, 3), (3, 4), (4, 5)], include_key=False, partition=2)
>>> def _mapper(it):
... r = []
Expand Down Expand Up @@ -286,7 +288,7 @@ def applyPartitions(self, func):
Examples
--------
>>> from fate_arch.session import computing_session
>>> from fate.arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3], partition=3, include_key=False)
>>> def f(it):
... r = []
Expand Down Expand Up @@ -319,7 +321,7 @@ def flatMap(self, func):
Examples
--------
>>> from fate_arch.session import computing_session
>>> from fate.arch.session import computing_session
>>> a = computing_session.parallelize([(1, 1), (2, 2)], include_key=True, partition=2)
>>> b = a.flatMap(lambda x, y: [(x, y), (x + 10, y ** 2)])
>>> c = list(b.collect())
Expand Down Expand Up @@ -347,7 +349,7 @@ def reduce(self, func):
Examples
--------
>>> from fate_arch.session import computing_session
>>> from fate.arch.session import computing_session
>>> a = computing_session.parallelize(range(100), include_key=False, partition=4)
>>> assert a.reduce(lambda x, y: x + y) == sum(range(100))
Expand All @@ -370,15 +372,21 @@ def glom(self):
Examples
--------
>>> from fate_arch.session import computing_session
>>> from fate.arch.session import computing_session
>>> a = computing_session.parallelize(range(5), include_key=False, partition=3).glom().collect()
>>> list(a)
[(2, [(2, 2)]), (3, [(0, 0), (3, 3)]), (4, [(1, 1), (4, 4)])]
"""
...

@abc.abstractmethod
def sample(self, *, fraction: typing.Optional[float] = None, num: typing.Optional[int] = None, seed=None):
def sample(
self,
*,
fraction: typing.Optional[float] = None,
num: typing.Optional[int] = None,
seed=None
):
"""
return a sampled subset of this Table.
Parameters
Expand All @@ -399,7 +407,7 @@ def sample(self, *, fraction: typing.Optional[float] = None, num: typing.Optiona
Examples
--------
>>> from fate_arch.session import computing_session
>>> from fate.arch.session import computing_session
>>> x = computing_session.parallelize(range(100), include_key=False, partition=4)
>>> 6 <= x.sample(fraction=0.1, seed=81).count() <= 14
True
Expand Down Expand Up @@ -428,7 +436,7 @@ def filter(self, func):
Examples
--------
>>> from fate_arch.session import computing_session
>>> from fate.arch.session import computing_session
>>> a = computing_session.parallelize([0, 1, 2], include_key=False, partition=2)
>>> b = a.filter(lambda k, v : k % 2 == 0)
>>> list(b.collect())
Expand Down Expand Up @@ -461,7 +469,7 @@ def join(self, other, func):
Examples
--------
>>> from fate_arch.session import computing_session
>>> from fate.arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2) # [(0, 1), (1, 2), (2, 3)]
>>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
>>> c = a.join(b, lambda v1, v2 : v1 + v2)
Expand Down Expand Up @@ -492,7 +500,7 @@ def union(self, other, func=lambda v1, v2: v1):
Examples
--------
>>> from fate_arch.session import computing_session
>>> from fate.arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2) # [(0, 1), (1, 2), (2, 3)]
>>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
>>> c = a.union(b, lambda v1, v2 : v1 + v2)
Expand All @@ -518,7 +526,7 @@ def subtractByKey(self, other):
Examples
--------
>>> from fate_arch.session import computing_session
>>> from fate.arch.session import computing_session
>>> a = computing_session.parallelize(range(10), include_key=False, partition=2)
>>> b = computing_session.parallelize(range(5), include_key=False, partition=2)
>>> c = a.subtractByKey(b)
Expand All @@ -544,7 +552,9 @@ class CSessionABC(metaclass=ABCMeta):
"""

@abc.abstractmethod
def load(self, address: AddressABC, partitions, schema: dict, **kwargs) -> typing.Union[PathABC, CTableABC]:
def load(
self, address: AddressABC, partitions, schema: dict, **kwargs
) -> typing.Union[PathABC, CTableABC]:
"""
load a table from given address
Expand All @@ -565,7 +575,9 @@ def load(self, address: AddressABC, partitions, schema: dict, **kwargs) -> typin
...

@abc.abstractmethod
def parallelize(self, data: Iterable, partition: int, include_key: bool, **kwargs) -> CTableABC:
def parallelize(
self, data: Iterable, partition: int, include_key: bool, **kwargs
) -> CTableABC:
"""
create table from iterable data
Expand Down
24 changes: 13 additions & 11 deletions python/fate/arch/abc/_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import typing
from abc import ABCMeta

from fate_arch.abc._gc import GarbageCollectionABC
from fate_arch.common import Party
from ..common import Party
from ._gc import GarbageCollectionABC

__all__ = ["FederationABC"]

Expand All @@ -19,10 +19,9 @@ def session_id(self) -> str:
...

@abc.abstractmethod
def get(self, name: str,
tag: str,
parties: typing.List[Party],
gc: GarbageCollectionABC) -> typing.List:
def get(
self, name: str, tag: str, parties: typing.List[Party], gc: GarbageCollectionABC
) -> typing.List:
"""
get objects/tables from ``parties``
Expand All @@ -46,11 +45,14 @@ def get(self, name: str,
...

@abc.abstractmethod
def remote(self, v,
name: str,
tag: str,
parties: typing.List[Party],
gc: GarbageCollectionABC):
def remote(
self,
v,
name: str,
tag: str,
parties: typing.List[Party],
gc: GarbageCollectionABC,
):
"""
remote object/table to ``parties``
Expand Down
1 change: 0 additions & 1 deletion python/fate/arch/abc/_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,5 @@


class GarbageCollectionABC(metaclass=abc.ABCMeta):

def add_gc_action(self, tag: str, obj, method, args_dict):
...
42 changes: 28 additions & 14 deletions python/fate/arch/abc/_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@
import abc
from typing import Iterable

from fate_arch.common.log import getLogger

LOGGER = getLogger()


class StorageTableMetaABC(metaclass=abc.ABCMeta):
@abc.abstractmethod
Expand All @@ -37,7 +33,15 @@ def query_table_meta(self, filter_fields, query_fields=None):
...

@abc.abstractmethod
def update_metas(self, schema=None, count=None, part_of_data=None, description=None, partitions=None, **kwargs):
def update_metas(
self,
schema=None,
count=None,
part_of_data=None,
description=None,
partitions=None,
**kwargs
):
...

@abc.abstractmethod
Expand Down Expand Up @@ -169,13 +173,15 @@ def meta(self, meta: StorageTableMetaABC):
...

@abc.abstractmethod
def update_meta(self,
schema=None,
count=None,
part_of_data=None,
description=None,
partitions=None,
**kwargs) -> StorageTableMetaABC:
def update_meta(
self,
schema=None,
count=None,
part_of_data=None,
description=None,
partitions=None,
**kwargs
) -> StorageTableMetaABC:
...

@abc.abstractmethod
Expand Down Expand Up @@ -209,8 +215,16 @@ def check_address(self):

class StorageSessionABC(metaclass=abc.ABCMeta):
@abc.abstractmethod
def create_table(self, address, name, namespace, partitions, storage_type=None, options=None,
**kwargs) -> StorageTableABC:
def create_table(
self,
address,
name,
namespace,
partitions,
storage_type=None,
options=None,
**kwargs
) -> StorageTableABC:
...

@abc.abstractmethod
Expand Down
13 changes: 10 additions & 3 deletions python/fate/arch/common/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
from fate_arch.common._types import FederatedMode, FederatedCommunicationType, EngineType, CoordinationProxyService, \
CoordinationCommunicationProtocol
from fate_arch.common._types import BaseType, Party, DTable
from ._types import (
BaseType,
CoordinationCommunicationProtocol,
CoordinationProxyService,
DTable,
EngineType,
FederatedCommunicationType,
FederatedMode,
Party,
)
Loading

0 comments on commit 433ef47

Please sign in to comment.