diff --git a/python/fate/arch/_standalone.py b/python/fate/arch/_standalone.py index 4fb7e22c9f..09e47b47d6 100644 --- a/python/fate/arch/_standalone.py +++ b/python/fate/arch/_standalone.py @@ -20,7 +20,6 @@ import logging.config import os from typing import Callable, Any, Iterable, Optional -import pickle as c_pickle import shutil import signal import threading @@ -47,9 +46,6 @@ class FederationDataType(object): SPLIT_OBJECT = "split_obj" -serialize = c_pickle.dumps -deserialize = c_pickle.loads - # default message max size in bytes = 1MB DEFAULT_MESSAGE_MAX_SIZE = 1048576 @@ -409,12 +405,6 @@ def map_reduce_partitions_with_index( shutil.rmtree(path, ignore_errors=True) return output - def save_as(self, name, namespace, partitions=None, need_cleanup=True): - if partitions is not None and partitions != self.num_partitions: - return self._repartition(partitions=partitions, need_cleanup=True).copy_as(name, namespace, need_cleanup) - - return self.copy_as(name, namespace, need_cleanup) - def copy_as(self, name, namespace, need_cleanup=True): return self.map_reduce_partitions_with_index( map_partition_op=lambda i, x: x, @@ -433,7 +423,7 @@ def copy_as(self, name, namespace, need_cleanup=True): def _get_env_for_partition(self, p: int, write=False): return _get_env(self._namespace, self._name, str(p), write=write) - def put(self, k_bytes, v_bytes, partitioner: Callable[[bytes, int], int] = None): + def put(self, k_bytes: bytes, v_bytes: bytes, partitioner: Callable[[bytes, int], int] = None): p = partitioner(k_bytes, self._partitions) with self._get_env_for_partition(p, write=True) as env: with env.begin(write=True) as txn: @@ -459,12 +449,11 @@ def put_all(self, kv_list: Iterable[Tuple[bytes, bytes]], partitioner: Callable[ for p, (env, txn) in txn_map.items(): txn.commit() - def get(self, k_bytes: bytes, partitioner: Callable[[bytes, int], int]): + def get(self, k_bytes: bytes, partitioner: Callable[[bytes, int], int]) -> bytes: p = partitioner(k_bytes, self._partitions) with self._get_env_for_partition(p) as env: with env.begin(write=True) as txn: - old_value_bytes = txn.get(k_bytes) - return None if old_value_bytes is None else deserialize(old_value_bytes) + return txn.get(k_bytes) def delete(self, k_bytes: bytes, partitioner: Callable[[bytes, int], int]): p = partitioner(k_bytes, self._partitions) @@ -472,7 +461,7 @@ def delete(self, k_bytes: bytes, partitioner: Callable[[bytes, int], int]): with env.begin(write=True) as txn: old_value_bytes = txn.get(k_bytes) if txn.delete(k_bytes): - return None if old_value_bytes is None else deserialize(old_value_bytes) + return old_value_bytes return None @@ -662,21 +651,9 @@ def _submit_process(self, do_func, process_infos): return results -def _get_splits(obj, max_message_size): - obj_bytes = serialize(obj, protocol=4) - byte_size = len(obj_bytes) - num_slice = (byte_size - 1) // max_message_size + 1 - if num_slice <= 1: - return obj, num_slice - else: - _max_size = max_message_size - kv = [(serialize(i), obj_bytes[slice(i * _max_size, (i + 1) * _max_size)]) for i in range(num_slice)] - return kv, num_slice - - class Federation(object): - def _federation_object_key(self, name: str, tag: str, s_party: Tuple[str, str], d_party: Tuple[str, str]): - return f"{self._session_id}-{name}-{tag}-{s_party[0]}-{s_party[1]}-{d_party[0]}-{d_party[1]}" + def _federation_object_key(self, name: str, tag: str, s_party: Tuple[str, str], d_party: Tuple[str, str]) -> bytes: + return f"{self._session_id}-{name}-{tag}-{s_party[0]}-{s_party[1]}-{d_party[0]}-{d_party[1]}".encode("utf-8") def __init__(self, session: Session, session_id: str, party: Tuple[str, str]): self._session_id = session_id @@ -693,91 +670,46 @@ def __init__(self, session: Session, session_id: str, party: Tuple[str, str]): def destroy(self): self._session.cleanup(namespace=self._session_id, name="*") - # noinspection PyUnusedLocal - def remote(self, v, name: str, tag: str, parties: List[PartyMeta]): - log_str = f"federation.standalone.remote.{name}.{tag}" - - if v is None: - raise ValueError(f"[{log_str}]remote `None` to {parties}") - - LOGGER.debug(f"[{log_str}]remote data, type={type(v)}") - - if isinstance(v, Table): - dtype = FederationDataType.TABLE - LOGGER.debug( - f"[{log_str}]remote " - f"Table(namespace={v.namespace}, name={v.name}, partitions={v.partitions}), dtype={dtype}" - ) - else: - v_splits, num_slice = _get_splits(v, self._max_message_size) - if num_slice > 1: - v = _create_table( - session=self._session, - name=str(uuid.uuid1()), - namespace=self._session_id, - partitions=1, - need_cleanup=True, - error_if_exist=False, - ) - v.put_all(kv_list=v_splits) - dtype = FederationDataType.SPLIT_OBJECT - LOGGER.debug( - f"[{log_str}]remote " - f"Table(namespace={v.namespace}, name={v.name}, partitions={v.partitions}), dtype={dtype}" - ) - else: - LOGGER.debug(f"[{log_str}]remote object with type: {type(v)}") - dtype = FederationDataType.OBJECT - + def push_table(self, table, name: str, tag: str, parties: List[PartyMeta]): for party in parties: _tagged_key = self._federation_object_key(name, tag, self._party, party) - if isinstance(v, Table): - saved_name = str(uuid.uuid1()) - LOGGER.debug( - f"[{log_str}]save Table(namespace={v.namespace}, name={v.name}, partitions={v.partitions}) as " - f"Table(namespace={v.namespace}, name={saved_name}, partitions={v.partitions})" - ) - _v = v.copy_as(name=saved_name, namespace=v.namespace, need_cleanup=False) - self._meta.set_status(party, _tagged_key, (_v.name, _v.namespace, dtype)) - else: - self._meta.set_object(party, _tagged_key, v) - self._meta.set_status(party, _tagged_key, _tagged_key) + saved_name = str(uuid.uuid1()) + _table = table.copy_as(name=saved_name, namespace=table.namespace, need_cleanup=False) + self._meta.set_status(party, _tagged_key, _serialize_tuple_of_str(_table.name, _table.namespace)) - # noinspection PyProtectedMember - def get(self, name: str, tag: str, parties: List[PartyMeta]) -> List: - log_str = f"federation.standalone.get.{name}.{tag}" - LOGGER.debug(f"[{log_str}]") - results = [] + def push_bytes(self, v: bytes, name: str, tag: str, parties: List[PartyMeta]): + for party in parties: + _tagged_key = self._federation_object_key(name, tag, self._party, party) + self._meta.set_object(party, _tagged_key, v) + self._meta.set_status(party, _tagged_key, _tagged_key) + def pull_table(self, name: str, tag: str, parties: List[PartyMeta]) -> List[Table]: + results: List[bytes] = [] for party in parties: _tagged_key = self._federation_object_key(name, tag, party, self._party) results.append(self._meta.wait_status_set(_tagged_key)) rtn = [] for r in results: - if isinstance(r, tuple): - # noinspection PyTypeChecker - table: Table = _load_table(session=self._session, name=r[0], namespace=r[1], need_cleanup=True) - - dtype = r[2] - LOGGER.debug( - f"[{log_str}] got " - f"Table(namespace={table.namespace}, name={table.name}, partitions={table.partitions}), dtype={dtype}" - ) + name, namespace = _deserialize_tuple_of_str(self._meta.get_status(r)) + table: Table = _load_table(session=self._session, name=name, namespace=namespace, need_cleanup=True) + rtn.append(table) + self._meta.ack_status(r) + return rtn - if dtype == FederationDataType.SPLIT_OBJECT: - obj_bytes = b"".join(map(lambda t: t[1], sorted(table.collect(), key=lambda x: x[0]))) - obj = deserialize(obj_bytes) - rtn.append(obj) - else: - rtn.append(table) - else: - obj = self._meta.get_object(r) - if obj is None: - raise EnvironmentError(f"federation get None from {parties} with name {name}, tag {tag}") - rtn.append(obj) - self._meta.ack_object(r) - LOGGER.debug(f"[{log_str}] got object with type: {type(obj)}") + def pull_bytes(self, name: str, tag: str, parties: List[PartyMeta]) -> List[bytes]: + results = [] + for party in parties: + _tagged_key = self._federation_object_key(name, tag, party, self._party) + results.append(self._meta.wait_status_set(_tagged_key)) + + rtn = [] + for r in results: + obj = self._meta.get_object(r) + if obj is None: + raise EnvironmentError(f"object not found: {r}") + rtn.append(obj) + self._meta.ack_object(r) self._meta.ack_status(r) return rtn @@ -1179,30 +1111,29 @@ def __init__(self, session_id, party: Tuple[str, str]) -> None: self.party = party self._env = {} - def wait_status_set(self, key): + def wait_status_set(self, key: bytes) -> bytes: value = self.get_status(key) while value is None: time.sleep(0.1) value = self.get_status(key) - LOGGER.debug("[GET] Got {} type {}".format(key, "Table" if isinstance(value, tuple) else "Object")) - return value + return key - def get_status(self, key): + def get_status(self, key: bytes): return self._get(self._get_status_table_name(self.party), key) - def set_status(self, party: Tuple[str, str], key: str, value): + def set_status(self, party: Tuple[str, str], key: bytes, value: bytes): return self._set(self._get_status_table_name(party), key, value) - def ack_status(self, key): + def ack_status(self, key: bytes): return self._ack(self._get_status_table_name(self.party), key) - def get_object(self, key): + def get_object(self, key: bytes): return self._get(self._get_object_table_name(self.party), key) - def set_object(self, party: Tuple[str, str], key, value): + def set_object(self, party: Tuple[str, str], key: bytes, value: bytes): return self._set(self._get_object_table_name(party), key, value) - def ack_object(self, key): + def ack_object(self, key: bytes): return self._ack(self._get_object_table_name(self.party), key) def _get_status_table_name(self, party: Tuple[str, str]): @@ -1216,23 +1147,20 @@ def _get_env(self, name): self._env[name] = _get_env(self.session_id, name, str(0), write=True) return self._env[name] - def _get(self, name, key): + def _get(self, name: str, key: bytes) -> bytes: env = self._get_env(name) with env.begin(write=False) as txn: - old_value_bytes = txn.get(serialize(key)) - if old_value_bytes is not None: - old_value_bytes = deserialize(old_value_bytes) - return old_value_bytes + return txn.get(key) - def _set(self, name, key, value): + def _set(self, name, key: bytes, value: bytes): env = self._get_env(name) with env.begin(write=True) as txn: - return txn.put(serialize(key), serialize(value)) + return txn.put(key, value) - def _ack(self, name, key): + def _ack(self, name, key: bytes): env = self._get_env(name) with env.begin(write=True) as txn: - txn.delete(serialize(key)) + txn.delete(key) def _hash_namespace_name_to_partition(namespace: str, name: str, partitions: int) -> Tuple[bytes, int]: @@ -1280,12 +1208,7 @@ def get_table_meta(cls, namespace: str, name: str) -> "_TableMeta": with env.begin(write=False) as txn: old_value_bytes = txn.get(k_bytes) if old_value_bytes is not None: - try: - num_partitions = deserialize(old_value_bytes) - old_value_bytes = _TableMeta(num_partitions, 0, 0, 0) - except Exception: - old_value_bytes = _TableMeta.deserialize(old_value_bytes) - + old_value_bytes = _TableMeta.deserialize(old_value_bytes) return old_value_bytes @classmethod @@ -1318,3 +1241,17 @@ def deserialize(cls, serialized_bytes: bytes) -> "_TableMeta": value_serdes_type = int.from_bytes(serialized_bytes[8:12], "big") partitioner_type = int.from_bytes(serialized_bytes[12:16], "big") return cls(num_partitions, key_serdes_type, value_serdes_type, partitioner_type) + + +def _serialize_tuple_of_str(name: str, namespace: str): + name_bytes = name.encode("utf-8") + namespace_bytes = namespace.encode("utf-8") + split_index_bytes = len(name_bytes).to_bytes(4, "big") + return split_index_bytes + name_bytes + namespace_bytes + + +def _deserialize_tuple_of_str(serialized_bytes: bytes): + split_index = int.from_bytes(serialized_bytes[:4], "big") + name = serialized_bytes[4 : 4 + split_index].decode("utf-8") + namespace = serialized_bytes[4 + split_index :].decode("utf-8") + return name, namespace diff --git a/python/fate/arch/computing/standalone/_csession.py b/python/fate/arch/computing/standalone/_csession.py index fcab40b290..9fccc02b14 100644 --- a/python/fate/arch/computing/standalone/_csession.py +++ b/python/fate/arch/computing/standalone/_csession.py @@ -57,11 +57,9 @@ def _load( raise ValueError(f"uri `{uri}` not valid, demo format: standalone://database_path/namespace/name") from e raw_table = self._session.load(name=name, namespace=namespace) - partitions = raw_table.partitions - raw_table = raw_table.save_as( + raw_table = raw_table.copy_as( name=f"{name}_{uuid()}", namespace=namespace, - partitions=partitions, need_cleanup=True, ) table = Table(raw_table) diff --git a/python/fate/arch/computing/standalone/_table.py b/python/fate/arch/computing/standalone/_table.py index 89b67b0cbc..8efc481490 100644 --- a/python/fate/arch/computing/standalone/_table.py +++ b/python/fate/arch/computing/standalone/_table.py @@ -37,6 +37,14 @@ def __init__(self, table: StandaloneTable): num_partitions=table.partitions, ) + @property + def table(self): + return self._table + + @property + def partitions(self): + return self._table.partitions + @property def engine(self): return self._engine @@ -116,25 +124,17 @@ def _count(self): def _reduce(self, func, **kwargs): return self._table.reduce(func) - @property - def partitions(self): - return self._table.partitions - @computing_profile - def save(self, uri: URI, schema, options: dict = None): - if options is None: - options = {} - + def _save(self, uri: URI, schema, options: dict = None): if uri.scheme != "standalone": raise ValueError(f"uri scheme `{uri.scheme}` not supported with standalone backend") try: *database, namespace, name = uri.path_splits() except Exception as e: raise ValueError(f"uri `{uri}` not supported with standalone backend") from e - self._table.save_as( + self._table.copy_as( name=name, namespace=namespace, - partitions=options.get("partitions", self.partitions), need_cleanup=False, ) # TODO: self.schema is a bit confusing here, it set by property assignment directly, not by constructor diff --git a/python/fate/arch/computing/table.py b/python/fate/arch/computing/table.py index 8ed3cba478..44d43c5199 100644 --- a/python/fate/arch/computing/table.py +++ b/python/fate/arch/computing/table.py @@ -411,8 +411,14 @@ def repartition_with(self, other: "KVTable") -> Tuple["KVTable", "KVTable"]: else: return self.repartition(other.num_partitions, other.partitioner_type), other - # def save_as(self, name, namespace, partition=None, options=None): - # return self.rp.save_as(name=name, namespace=namespace, partition=partition, options=options) + def save(self, uri: URI, schema, options: dict = None): + options = options or {} + if (partition := options.get("partition")) is not None and partition != self.num_partitions: + self.repartition(partition)._save(uri, schema, options) + return self._save(uri, schema, options) + + def _save(self, uri: URI, schema, options: dict = None): + raise NotImplementedError(f"{self.__class__.__name__}._save") def _serdes_wrapped_generator(_iter, key_serdes, value_serdes): diff --git a/python/fate/arch/context/_federation.py b/python/fate/arch/context/_federation.py index 6708468c9e..2c50b4534e 100644 --- a/python/fate/arch/context/_federation.py +++ b/python/fate/arch/context/_federation.py @@ -28,6 +28,7 @@ if typing.TYPE_CHECKING: from fate.arch.context import Context + from fate.arch.federation.federation import Federation class GC: @@ -107,7 +108,7 @@ class Parties: def __init__( self, ctx: "Context", - federation: FederationEngine, + federation: "Federation", parties: List[Tuple[int, PartyMeta]], namespace: NS, ) -> None: @@ -148,7 +149,7 @@ def get(self, name: str): def _push( - federation: FederationEngine, + federation: "Federation", name: str, namespace: NS, parties: List[PartyMeta], @@ -202,19 +203,19 @@ def _push_int(federation: FederationEngine, name: str, namespace: NS, parties: L def _pull( ctx: "Context", - federation: FederationEngine, + federation: "Federation", name: str, namespace: NS, parties: List[PartyMeta], ): tag = namespace.federation_tag - raw_values = federation.pull( + buffer_list = federation.pull_bytes( name=name, tag=tag, parties=parties, ) values = [] - for party, buffers in zip(parties, raw_values): + for party, buffers in zip(parties, buffer_list): values.append(_TableRemotePersistentUnpickler.pull(buffers, ctx, federation, name, tag, party)) return values @@ -232,7 +233,7 @@ def __init__(self, key) -> None: class _TableRemotePersistentPickler(pickle.Pickler): def __init__( self, - federation: FederationEngine, + federation: "Federation", name: str, tag: str, parties: List[PartyMeta], @@ -256,7 +257,7 @@ def persistent_id(self, obj: Any) -> Any: if is_table(obj): key = self._get_next_table_key() - self._federation.push(v=obj, name=key, tag=self._tag, parties=self._parties) + self._federation.push_table(table=obj, name=key, tag=self._tag, parties=self._parties) self._table_index += 1 return _TablePersistentId(key) if isinstance(obj, Context): @@ -267,7 +268,7 @@ def persistent_id(self, obj: Any) -> Any: def push( cls, value, - federation: FederationEngine, + federation: "Federation", name: str, tag: str, parties: List[PartyMeta], @@ -275,14 +276,14 @@ def push( with io.BytesIO() as f: pickler = _TableRemotePersistentPickler(federation, name, tag, parties, f) pickler.dump(value) - federation.push(v=f.getvalue(), name=name, tag=tag, parties=parties) + federation.push_bytes(v=f.getvalue(), name=name, tag=tag, parties=parties) class _TableRemotePersistentUnpickler(pickle.Unpickler): def __init__( self, ctx: "Context", - federation: FederationEngine, + federation: "Federation", name: str, tag: str, party: PartyMeta, @@ -297,7 +298,7 @@ def __init__( def persistent_load(self, pid: Any) -> Any: if isinstance(pid, _TablePersistentId): - table = self._federation.pull(pid.key, self._tag, [self._party])[0] + table = self._federation.pull_table(pid.key, self._tag, [self._party])[0] return table if isinstance(pid, _ContextPersistentId): return self._ctx @@ -307,7 +308,7 @@ def pull( cls, buffers, ctx: "Context", - federation: FederationEngine, + federation: "Federation", name: str, tag: str, party: PartyMeta, diff --git a/python/fate/arch/federation/federation.py b/python/fate/arch/federation/federation.py index ca6276c246..56ee992a8e 100644 --- a/python/fate/arch/federation/federation.py +++ b/python/fate/arch/federation/federation.py @@ -13,51 +13,122 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import List, Tuple +import typing +from typing import List +from fate.arch.abc import PartyMeta + +if typing.TYPE_CHECKING: + from fate.arch.computing.table import KVTable LOGGER = logging.getLogger(__name__) class Federation: - def _push( + def __init__(self): + self._push_history = set() + self._pull_history = set() + + def _pull_table( + self, + name: str, + tag: str, + parties: List[PartyMeta], + ) -> List["KVTable"]: + raise NotImplementedError(f"pull table is not supported in {self.__class__.__name__}") + + def _pull_bytes( + self, + name: str, + tag: str, + parties: List[PartyMeta], + ) -> List[bytes]: + raise NotImplementedError(f"pull bytes is not supported in {self.__class__.__name__}") + + def _push_table( + self, + table: "KVTable", + name: str, + tag: str, + parties: List[PartyMeta], + ): + raise NotImplementedError(f"push table is not supported in {self.__class__.__name__}") + + def _push_bytes( self, - v, + v: bytes, name: str, tag: str, - parties: List[Tuple[str, str]], + parties: List[PartyMeta], ): - ... + raise NotImplementedError(f"push bytes is not supported in {self.__class__.__name__}") - def push( + def push_table( self, - v, + table: "KVTable", name: str, tag: str, - parties: List[Tuple[str, str]], + parties: List[PartyMeta], ): - self._push( + for party in parties: + if (name, tag, party) in self._push_history: + raise ValueError(f"push table to {parties} with duplicate name and tag: name={name}, tag={tag}") + self._push_history.add((name, tag, party)) + + self._push_table( + table=table, + name=name, + tag=tag, + parties=parties, + ) + + def push_bytes( + self, + v: bytes, + name: str, + tag: str, + parties: List[PartyMeta], + ): + for party in parties: + if (name, tag, party) in self._push_history: + raise ValueError(f"push bytes to {parties} with duplicate name and tag: name={name}, tag={tag}") + self._push_history.add((name, tag, party)) + + self._push_bytes( v=v, name=name, tag=tag, parties=parties, ) - def _pull( + def pull_table( self, name: str, tag: str, - parties: List[Tuple[str, str]], - ) -> List: - raise NotImplementedError("pull is not supported in standalone federation") + parties: List[PartyMeta], + ) -> List["KVTable"]: + for party in parties: + if (name, tag, party) in self._pull_history: + raise ValueError(f"pull table from {party} with duplicate name and tag: name={name}, tag={tag}") + self._pull_history.add((name, tag, party)) + + return self._pull_table( + name=name, + tag=tag, + parties=parties, + ) - def pull( + def pull_bytes( self, name: str, tag: str, - parties: List[Tuple[str, str]], - ) -> List: - return self._pull( + parties: List[PartyMeta], + ) -> List[bytes]: + for party in parties: + if (name, tag, party) in self._pull_history: + raise ValueError(f"pull bytes from {party} with duplicate name and tag: name={name}, tag={tag}") + self._pull_history.add((name, tag, party)) + return self._pull_bytes( name=name, tag=tag, parties=parties, diff --git a/python/fate/arch/federation/standalone/_federation.py b/python/fate/arch/federation/standalone/_federation.py index 7e0ecffe8e..86b85d1b9e 100644 --- a/python/fate/arch/federation/standalone/_federation.py +++ b/python/fate/arch/federation/standalone/_federation.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import List, Tuple +from typing import List from fate.arch.abc import PartyMeta @@ -33,6 +33,7 @@ def __init__( party: PartyMeta, parties: List[PartyMeta], ): + super().__init__() LOGGER.debug( f"[federation.standalone]init federation: " f"standalone_session={standalone_session}, " @@ -42,12 +43,7 @@ def __init__( self._session_id = federation_session_id self._federation = RawFederation(standalone_session._session, federation_session_id, party) LOGGER.debug("[federation.standalone]init federation context done") - self._remote_history = set() - self._get_history = set() - # standalone has build in design of table clean - self.get_gc = None - self.remote_gc = None self.local_party = party self.parties = parties @@ -55,34 +51,41 @@ def __init__( def session_id(self) -> str: return self._session_id - def _push( + def _push_table( self, - v, + table: Table, name: str, tag: str, - parties: List[Tuple[str, str]], + parties: List[PartyMeta], + ): + return self._federation.push_table(table=table.table, name=name, tag=tag, parties=parties) + + def _push_bytes( + self, + v: bytes, + name: str, + tag: str, + parties: List[PartyMeta], ): - for party in parties: - if (name, tag, party) in self._remote_history: - raise ValueError(f"remote to {parties} with duplicate tag: {name}.{tag}") - self._remote_history.add((name, tag, party)) + return self._federation.push_bytes(v=v, name=name, tag=tag, parties=parties) - if isinstance(v, Table): - # noinspection PyProtectedMember - v = v._table - return self._federation.remote(v=v, name=name, tag=tag, parties=parties) + def _pull_table( + self, + name: str, + tag: str, + parties: List[PartyMeta], + ) -> List[Table]: + rtn = self._federation.pull_table(name=name, tag=tag, parties=parties) - def _pull( + return [Table(r) if isinstance(r, RawTable) else r for r in rtn] + + def _pull_bytes( self, name: str, tag: str, - parties: List[Tuple[str, str]], - ) -> List: - for party in parties: - if (name, tag, party) in self._get_history: - raise ValueError(f"get from {party} with duplicate tag: {name}.{tag}") - self._get_history.add((name, tag, party)) - rtn = self._federation.get(name=name, tag=tag, parties=parties) + parties: List[PartyMeta], + ) -> List[bytes]: + rtn = self._federation.pull_bytes(name=name, tag=tag, parties=parties) return [Table(r) if isinstance(r, RawTable) else r for r in rtn]