Skip to content

Commit

Permalink
add component io meta
Browse files Browse the repository at this point in the history
Signed-off-by: weiwee <wbwmat@gmail.com>
  • Loading branch information
sagewe committed Jun 19, 2023
1 parent 87ec298 commit 3552824
Show file tree
Hide file tree
Showing 13 changed files with 206 additions and 82 deletions.
39 changes: 35 additions & 4 deletions python/fate/components/core/component_desc/_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,23 +152,54 @@ def _dict(self):
)
)

def dump_yaml(self, stream=None):
def _io_dict(self):
from fate.components.core.spec.component import (
ComponentIOArtifactsTypeSpec,
ComponentIOArtifactTypeSpec,
ComponentIOInputsArtifactsTypeSpec,
ComponentIOOutputsArtifactsTypeSpec,
)

def _get_io_artifact_type_spec(v):
return ComponentIOArtifactTypeSpec(
type_name=v.get_type().type.type_name,
path_type=v.get_type().type.path_type,
uri_types=v.get_type().type.uri_types,
is_multi=v.multi,
)

return ComponentIOArtifactsTypeSpec(
inputs=ComponentIOInputsArtifactsTypeSpec(
data=[_get_io_artifact_type_spec(v) for v in self.artifacts.data_inputs.values()],
model=[_get_io_artifact_type_spec(v) for v in self.artifacts.model_inputs.values()],
),
outputs=ComponentIOOutputsArtifactsTypeSpec(
data=[_get_io_artifact_type_spec(v) for v in self.artifacts.data_outputs.values()],
model=[_get_io_artifact_type_spec(v) for v in self.artifacts.model_outputs.values()],
metric=[_get_io_artifact_type_spec(v) for v in self.artifacts.metric_outputs.values()],
),
)

def dump_io_yaml(self, stream=None):
from io import StringIO

import ruamel.yaml

spec = self.dict()
inefficient = False
if stream is None:
inefficient = True
stream = StringIO()
yaml = ruamel.yaml.YAML()
yaml.indent(mapping=2, sequence=4, offset=2)
yaml.dump(spec.dict(), stream=stream)
yaml.dump(self._flatten_stages()._io_dict().dict(), stream=stream)
if inefficient:
return stream.getvalue()

def dump_simplified_yaml(self, stream=None):
def dump_yaml(self, stream=None):
from io import StringIO

import ruamel.yaml

spec = self.dict()
inefficient = False
if stream is None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ._base_type import ArtifactDescribe, ArtifactType
from ._base_type import ArtifactDescribe, _ArtifactType
from .data import (
data_directory_input,
data_directory_inputs,
Expand All @@ -24,7 +24,7 @@
)

__all__ = [
"ArtifactType",
"_ArtifactType",
"ArtifactDescribe",
"json_model_input",
"json_model_inputs",
Expand Down
24 changes: 12 additions & 12 deletions python/fate/components/core/component_desc/artifacts/_base_type.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Dict, Generic, List, TypeVar, Union

from fate.components.core.essential import Role, Stage
from fate.components.core.essential import ArtifactType, Role, Stage
from fate.components.core.spec.artifact import URI, Metadata
from fate.components.core.spec.component import ArtifactSpec
from fate.components.core.spec.task import (
Expand All @@ -9,15 +9,15 @@
)


class ArtifactType:
type: str
class _ArtifactType:
type: ArtifactType

@classmethod
def _load(cls, uri: URI, metadata: Metadata) -> "ArtifactType":
def _load(cls, uri: URI, metadata: Metadata) -> "_ArtifactType":
raise NotImplementedError(f"load artifact from spec `{cls}`")

@classmethod
def load_input(cls, spec: ArtifactInputApplySpec) -> "ArtifactType":
def load_input(cls, spec: ArtifactInputApplySpec) -> "_ArtifactType":
return cls._load(spec.get_uri(), spec.metadata)

@classmethod
Expand Down Expand Up @@ -50,12 +50,12 @@ def is_active_for(self, stage: Stage, role: Role):
return stage in self.stages and role in self.roles

def __str__(self) -> str:
return f"ArtifactDeclare<name={self.name}, type={self._get_type()}, roles={self.roles}, stages={self.stages}, optional={self.optional}>"
return f"ArtifactDeclare<name={self.name}, type={self.get_type()}, roles={self.roles}, stages={self.stages}, optional={self.optional}>"

def merge(self, a: "ArtifactDescribe"):
if self.__class__ != a.__class__ or self.multi != a.multi:
raise ValueError(
f"artifact {self.name} declare multiple times with different optional: `{self._get_type()}` vs `{a._get_type()}`"
f"artifact {self.name} declare multiple times with different optional: `{self.get_type()}` vs `{a.get_type()}`"
)
if set(self.roles) != set(a.roles):
raise ValueError(
Expand All @@ -74,15 +74,15 @@ def merge(self, a: "ArtifactDescribe"):

def dict(self, roles):
return ArtifactSpec(
type=self._get_type().type,
type=self.get_type().type.type_name,
optional=self.optional,
roles=roles,
stages=self.stages,
description=self.desc,
is_multi=self.multi,
)

def _get_type(self) -> AT:
def get_type(self) -> AT:
raise NotImplementedError()

def _load_as_component_execute_arg(self, ctx, artifact: AT):
Expand All @@ -98,12 +98,12 @@ def load_as_input(self, ctx, apply_config):
if apply_config is not None:
try:
if self.multi:
artifacts = [self._get_type().load_input(c) for c in apply_config]
artifacts = [self.get_type().load_input(c) for c in apply_config]
metas = [c.dict() for c in artifacts]
args = [self._load_as_component_execute_arg(ctx, artifact) for artifact in artifacts]
return metas, args
else:
artifact = self._get_type().load_input(apply_config)
artifact = self.get_type().load_input(apply_config)
meta = artifact.dict()
return meta, self._load_as_component_execute_arg(ctx, artifact)
except Exception as e:
Expand All @@ -115,7 +115,7 @@ def load_as_input(self, ctx, apply_config):

def load_as_output_slot(self, ctx, apply_config):
if apply_config is not None:
output_iter = self._get_type().load_output(apply_config)
output_iter = self.get_type().load_output(apply_config)
try:
if self.multi:
return _generator_recorder(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import typing

from .._base_type import URI, ArtifactDescribe, ArtifactType, Metadata
from fate.components.core.essential import DataframeArtifactType

from .._base_type import URI, ArtifactDescribe, Metadata, _ArtifactType

if typing.TYPE_CHECKING:
from fate.arch.dataframe._dataframe import DataFrame


class DataframeArtifactType(ArtifactType):
type = "dataframe"
class _DataframeArtifactType(_ArtifactType):
type = DataframeArtifactType

class EggrollAddress:
def __init__(self, name: str, namespace: str, metadata: dict):
Expand Down Expand Up @@ -99,7 +101,7 @@ def dict(self):


class DataframeWriter:
def __init__(self, artifact: DataframeArtifactType) -> None:
def __init__(self, artifact: _DataframeArtifactType) -> None:
self.artifact = artifact

def write(self, ctx, dataframe: "DataFrame", name=None, namespace=None):
Expand All @@ -116,12 +118,12 @@ def __repr__(self):
return str(self)


class DataframeArtifactDescribe(ArtifactDescribe[DataframeArtifactType]):
def _get_type(self):
return DataframeArtifactType
class DataframeArtifactDescribe(ArtifactDescribe[_DataframeArtifactType]):
def get_type(self):
return _DataframeArtifactType

def _load_as_component_execute_arg(self, ctx, artifact: DataframeArtifactType):
def _load_as_component_execute_arg(self, ctx, artifact: _DataframeArtifactType):
return artifact.address.read(ctx)

def _load_as_component_execute_arg_writer(self, ctx, artifact: DataframeArtifactType):
def _load_as_component_execute_arg_writer(self, ctx, artifact: _DataframeArtifactType):
return DataframeWriter(artifact)
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
from pathlib import Path

from .._base_type import URI, ArtifactDescribe, ArtifactType, Metadata
from fate.components.core.essential import DataDirectoryArtifactType

from .._base_type import URI, ArtifactDescribe, Metadata, _ArtifactType

class DataDirectoryArtifactType(ArtifactType):
type = "data_directory"

class _DataDirectoryArtifactType(_ArtifactType):
type = DataDirectoryArtifactType

def __init__(self, path, metadata: Metadata) -> None:
self.path = path
self.metadata = metadata

@classmethod
def _load(cls, uri: URI, metadata: Metadata):
return DataDirectoryArtifactType(uri.path, metadata)
return _DataDirectoryArtifactType(uri.path, metadata)

def dict(self):
return {
Expand All @@ -22,7 +24,7 @@ def dict(self):


class DataDirectoryWriter:
def __init__(self, artifact: DataDirectoryArtifactType) -> None:
def __init__(self, artifact: _DataDirectoryArtifactType) -> None:
self.artifact = artifact

def get_directory(self) -> Path:
Expand All @@ -45,11 +47,11 @@ def __repr__(self):


class DataDirectoryArtifactDescribe(ArtifactDescribe):
def _get_type(self):
return DataDirectoryArtifactType
def get_type(self):
return _DataDirectoryArtifactType

def _load_as_component_execute_arg(self, ctx, artifact: DataDirectoryArtifactType):
def _load_as_component_execute_arg(self, ctx, artifact: _DataDirectoryArtifactType):
return artifact

def _load_as_component_execute_arg_writer(self, ctx, artifact: DataDirectoryArtifactType):
def _load_as_component_execute_arg_writer(self, ctx, artifact: _DataDirectoryArtifactType):
return DataDirectoryWriter(artifact)
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from .._base_type import URI, ArtifactDescribe, ArtifactType, Metadata
from fate.components.core.essential import TableArtifactType

from .._base_type import URI, ArtifactDescribe, Metadata, _ArtifactType

class TableArtifactType(ArtifactType):
type = "table"

class _TableArtifactType(_ArtifactType):
type = TableArtifactType

class EggrollAddress:
def __init__(self, name: str, namespace: str, metadata: dict):
Expand Down Expand Up @@ -94,7 +96,7 @@ def dict(self):


class TableWriter:
def __init__(self, artifact: TableArtifactType) -> None:
def __init__(self, artifact: _TableArtifactType) -> None:
self.artifact = artifact

def write(self, slot):
Expand All @@ -108,12 +110,12 @@ def __repr__(self):


class TableArtifactDescribe(ArtifactDescribe):
def _get_type(self):
return TableArtifactType
def get_type(self):
return _TableArtifactType

def _load_as_component_execute_arg(self, ctx, artifact: TableArtifactType):
def _load_as_component_execute_arg(self, ctx, artifact: _TableArtifactType):
pass
# return ctx.reader(apply_config.name, apply_config.uri, apply_config.metadata).read_dataframe()

def _load_as_component_execute_arg_writer(self, ctx, artifact: TableArtifactType):
def _load_as_component_execute_arg_writer(self, ctx, artifact: _TableArtifactType):
return TableWriter(artifact)
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
from pathlib import Path
from typing import Dict, Optional

from .._base_type import URI, ArtifactDescribe, ArtifactType, Metadata
from fate.components.core.essential import JsonMetricArtifactType

from .._base_type import URI, ArtifactDescribe, Metadata, _ArtifactType

class JsonMetricArtifactType(ArtifactType):
type = "metrics_json"

class _JsonMetricArtifactType(_ArtifactType):
type = JsonMetricArtifactType

def __init__(self, path, metadata: Metadata) -> None:
self.path = path
Expand All @@ -21,7 +23,7 @@ def dict(self):


class JsonMetricWriter:
def __init__(self, artifact: JsonMetricArtifactType) -> None:
def __init__(self, artifact: _JsonMetricArtifactType) -> None:
self._artifact = artifact

def write(
Expand All @@ -40,16 +42,16 @@ def write(
json.dump(data, fw)


class JsonMetricArtifactDescribe(ArtifactDescribe[JsonMetricArtifactType]):
def _get_type(self):
return JsonMetricArtifactType
class JsonMetricArtifactDescribe(ArtifactDescribe[_JsonMetricArtifactType]):
def get_type(self):
return _JsonMetricArtifactType

def _load_as_component_execute_arg(self, ctx, artifact: JsonMetricArtifactType):
def _load_as_component_execute_arg(self, ctx, artifact: _JsonMetricArtifactType):
try:
with open(artifact.path, "r") as fr:
return json.load(fr)
except Exception as e:
raise RuntimeError(f"load json model named from {artifact} failed: {e}")

def _load_as_component_execute_arg_writer(self, ctx, artifact: JsonMetricArtifactType):
def _load_as_component_execute_arg_writer(self, ctx, artifact: _JsonMetricArtifactType):
return JsonMetricWriter(artifact)
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from pathlib import Path

from .._base_type import URI, ArtifactDescribe, ArtifactType, Metadata
from fate.components.core.essential import ModelDirectoryArtifactType

from .._base_type import URI, ArtifactDescribe, Metadata, _ArtifactType

class ModelDirectoryArtifactType(ArtifactType):
type = "model_directory"

class _ModelDirectoryArtifactType(_ArtifactType):
type = ModelDirectoryArtifactType

def __init__(self, path, metadata: Metadata) -> None:
self.path = path
Expand All @@ -19,7 +21,7 @@ def dict(self):


class ModelDirectoryWriter:
def __init__(self, artifact: ModelDirectoryArtifactType) -> None:
def __init__(self, artifact: _ModelDirectoryArtifactType) -> None:
self._artifact = artifact

def write(self, data):
Expand All @@ -28,12 +30,12 @@ def write(self, data):
return self._artifact.path


class ModelDirectoryArtifactDescribe(ArtifactDescribe[ModelDirectoryArtifactType]):
def _get_type(self):
return ModelDirectoryArtifactType
class ModelDirectoryArtifactDescribe(ArtifactDescribe[_ModelDirectoryArtifactType]):
def get_type(self):
return _ModelDirectoryArtifactType

def _load_as_component_execute_arg(self, ctx, artifact: ModelDirectoryArtifactType):
def _load_as_component_execute_arg(self, ctx, artifact: _ModelDirectoryArtifactType):
return artifact

def _load_as_component_execute_arg_writer(self, ctx, artifact: ModelDirectoryArtifactType):
def _load_as_component_execute_arg_writer(self, ctx, artifact: _ModelDirectoryArtifactType):
return ModelDirectoryWriter(artifact)
Loading

0 comments on commit 3552824

Please sign in to comment.