Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/refact/2.0.0-beta/tensor-over-st…
Browse files Browse the repository at this point in the history
…orage-temp' into feature-2.0.0-beta-flow_upload
  • Loading branch information
zhihuiwan committed Jun 19, 2023
2 parents 3c82fe0 + 87ec298 commit dd9d2f7
Show file tree
Hide file tree
Showing 38 changed files with 1,118 additions and 1,209 deletions.
114 changes: 0 additions & 114 deletions python/fate/arch/computing/_address.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,6 @@
from fate.interface import Address


class StandaloneAddress(Address):
def __init__(
self,
home=None,
name=None,
namespace=None,
storage_type=None,
):
self.home = home
self.name = name
self.namespace = namespace
self.storage_type = storage_type

def __hash__(self):
return (self.home, self.name, self.namespace, self.storage_type).__hash__()

def __str__(self):
return f"StandaloneAddress(name={self.name}, namespace={self.namespace})"

def __repr__(self):
return self.__str__()

@property
def connector(self):
return {"home": self.home}


class EggRollAddress(Address):
def __init__(self, home=None, name=None, namespace=None):
self.name = name
Expand All @@ -58,10 +31,6 @@ def __str__(self):
def __repr__(self):
return self.__str__()

@property
def connector(self):
return {"home": self.home}


class HDFSAddress(Address):
def __init__(self, name_node=None, path=None):
Expand All @@ -77,78 +46,6 @@ def __str__(self):
def __repr__(self):
return self.__str__()

@property
def connector(self):
return {"name_node": self.name_node}


class PathAddress(Address):
def __init__(self, path):
self.path = path

def __hash__(self):
return self.path.__hash__()

def __str__(self):
return f"PathAddress(path={self.path})"

def __repr__(self):
return self.__str__()


class ApiAddress(Address):
def __init__(self, method="POST", url=None, header=None, body=None):
self.method = method
self.url = url
self.header = header if header else {}
self.body = body if body else {}

def __hash__(self):
return (self.method, self.url).__hash__()

def __str__(self):
return f"ApiAddress(url={self.url})"

def __repr__(self):
return self.__str__()


class MysqlAddress(Address):
def __init__(
self,
user=None,
passwd=None,
host=None,
port=None,
db=None,
name=None,
):
self.user = user
self.passwd = passwd
self.host = host
self.port = port
self.db = db
self.name = name

def __hash__(self):
return (self.host, self.port, self.db, self.name).__hash__()

def __str__(self):
return f"MysqlAddress(db={self.db}, name={self.name})"

def __repr__(self):
return self.__str__()

@property
def connector(self):
return {
"user": self.user,
"passwd": self.passwd,
"host": self.host,
"port": self.port,
"db": self.db,
}


class HiveAddress(Address):
def __init__(
Expand Down Expand Up @@ -178,17 +75,6 @@ def __str__(self):
def __repr__(self):
return self.__str__()

@property
def connector(self):
return {
"host": self.host,
"port": self.port,
"username": self.username,
"password": self.password,
"auth_mechanism": self.auth_mechanism,
"database": self.database,
}


class LinkisHiveAddress(Address):
def __init__(
Expand Down
4 changes: 2 additions & 2 deletions python/fate/arch/computing/standalone/_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ def copy(self):

@computing_profile
def save(self, address, partitions, schema, **kwargs):
from .._address import StandaloneAddress
from .._address import EggRollAddress

if isinstance(address, StandaloneAddress):
if isinstance(address, EggRollAddress):
self._table.save_as(
name=address.name,
namespace=address.namespace,
Expand Down
2 changes: 0 additions & 2 deletions python/fate/arch/context/_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from ._cipher import CipherKit
from ._federation import GC, Parties, Party
from ._namespace import NS, default_ns
from .io.kit import IOKit
from .metric import MetricsWrap

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -60,7 +59,6 @@ def __init__(

self.metrics = MetricsWrap(metrics_handler)
self.cipher: CipherKit = CipherKit(device)
self._io_kit: IOKit = IOKit()
self._role_to_parties = None
self._gc = GC()
self._is_destroyed = False
Expand Down
16 changes: 5 additions & 11 deletions python/fate/arch/context/io/data/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ....unify import URI
from .df import Dataframe


class CSVReader:
def __init__(self, ctx, uri: URI, metadata: dict) -> None:
def __init__(self, ctx, path, metadata: dict) -> None:
self.ctx = ctx
self.uri = uri
self.path = path
self.metadata = metadata

def read_dataframe(self):
Expand All @@ -34,17 +32,13 @@ def read_dataframe(self):
if k in parameter_keys:
kwargs[k] = v

dataframe_reader = dataframe.CSVReader(**kwargs).to_frame(self.ctx, self.uri.path)
# s_df = dataframe.serialize(self.ctx, dataframe_reader)
# dataframe_reader = dataframe.deserialize(self.ctx, s_df)
return Dataframe(dataframe_reader, dataframe_reader.shape[1], dataframe_reader.shape[0])
return dataframe.CSVReader(**kwargs).to_frame(self.ctx, self.path)


class CSVWriter:
def __init__(self, ctx, name: str, uri: URI, metadata: dict) -> None:
self.name = name
def __init__(self, ctx, path, metadata: dict) -> None:
self.ctx = ctx
self.uri = uri
self.path = path
self.metadata = metadata

def write_dataframe(self, df):
Expand Down
14 changes: 0 additions & 14 deletions python/fate/arch/context/io/data/dataframe.py

This file was deleted.

25 changes: 0 additions & 25 deletions python/fate/arch/context/io/data/df.py

This file was deleted.

86 changes: 8 additions & 78 deletions python/fate/arch/context/io/data/eggroll.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,67 +13,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ....unify import EggrollURI


class EggrollDataFrameWriter:
def __init__(self, ctx, uri: EggrollURI, metadata: dict) -> None:
self.ctx = ctx
self.uri = EggrollMetaURI(uri)
self.metadata = metadata

def write_dataframe(self, df):
from fate.arch import dataframe
from fate.arch.computing._address import EggRollAddress

table = dataframe.serialize(self.ctx, df)
schema = {}
table.save(
address=EggRollAddress(name=self.uri.get_data_name(), namespace=self.uri.get_data_namespace()),
partitions=int(self.metadata.get("num_partitions", table.partitions)),
schema=schema,
**self.metadata,
)
# save meta
meta_table = self.ctx.computing.parallelize([("schema", schema)], partition=1, include_key=True)
meta_table.save(
address=EggRollAddress(name=self.uri.get_meta_name(), namespace=self.uri.get_meta_namespace()),
partitions=1,
schema={},
**self.metadata,
)


class EggrollDataFrameReader:
def __init__(self, ctx, uri: EggrollURI, metadata: dict) -> None:
self.ctx = ctx
self.uri = EggrollMetaURI(uri)
self.metadata = metadata

def read_dataframe(self):
from fate.arch import dataframe

from .df import Dataframe

table = load_table(self.ctx, self.uri, self.metadata)
df = dataframe.deserialize(self.ctx, table)
return Dataframe(df, df.shape[1], df.shape[0])


class EggrollRawTableReader:
def __init__(self, ctx, uri: EggrollURI, metadata: dict) -> None:
def __init__(self, ctx, namespace, name, metadata: dict) -> None:
self.ctx = ctx
self.uri = EggrollMetaURI(uri)
self.name = name
self.namespace = namespace
self.metadata = metadata

def read_dataframe(self):
import inspect

from fate.arch import dataframe

from .df import Dataframe

table = load_table(self.ctx, self.uri, self.metadata)
table = load_table(self.ctx, self.namespace, self.name, self.metadata)

kwargs = {}
p = inspect.signature(dataframe.RawTableReader.__init__).parameters
Expand All @@ -82,45 +35,22 @@ def read_dataframe(self):
if k in parameter_keys:
kwargs[k] = v

dataframe_reader = dataframe.RawTableReader(**kwargs).to_frame(self.ctx, table)
return Dataframe(dataframe_reader, dataframe_reader.shape[1], dataframe_reader.shape[0])


class EggrollMetaURI:
def __init__(self, uri: EggrollURI) -> None:
self.uri = uri

def get_data_namespace(self):
return self.uri.namespace

def get_data_name(self):
return self.uri.name

def get_meta_namespace(self):
return self.uri.namespace

def get_meta_name(self):
return f"{self.uri.name}.meta"
return dataframe.RawTableReader(**kwargs).to_frame(self.ctx, table)


def load_table(ctx, uri: EggrollMetaURI, metadata: dict):
def load_table(ctx, namespace, name, metadata: dict):
from fate.arch.computing._address import EggRollAddress

meta_name = f"{name}.meta"
meta_key, meta = list(
ctx.computing.load(
address=EggRollAddress(name=uri.get_meta_name(), namespace=uri.get_meta_namespace()),
address=EggRollAddress(name=meta_name, namespace=namespace),
partitions=1,
schema={},
**metadata,
).collect()
)[0]
assert meta_key == "schema"
num_partitions = metadata.get("num_partitions")
table = ctx.computing.load(
address=EggRollAddress(name=uri.get_data_name(), namespace=uri.get_data_namespace()),
partitions=num_partitions,
schema=meta,
**metadata,
)

return table
Loading

0 comments on commit dd9d2f7

Please sign in to comment.