From 3f8f0c435af330da1d4e9168673e84677d63f0ec Mon Sep 17 00:00:00 2001 From: mgqa34 Date: Wed, 11 Jan 2023 21:34:48 +0800 Subject: [PATCH] update pipeline: support get_output_model/metrics Signed-off-by: mgqa34 --- .../pipeline/components/component_base.py | 4 +- .../fate_client/pipeline/entity/__init__.py | 5 +- .../{executor => entity}/model_info.py | 15 +++- .../pipeline/entity/model_structure.py | 56 ++++++++++++ .../fate_client/pipeline/entity/task_info.py | 57 ++++++++++++ .../pipeline/executor/task_executor.py | 26 +++++- .../pipeline/manager/metric_manager.py | 12 ++- .../pipeline/manager/model_manager.py | 36 ++++++-- .../pipeline/manager/resource_manager.py | 8 +- .../pipeline/manager/status_manager.py | 89 ++++++++++--------- python/fate_client/pipeline/pipeline.py | 18 ++++ .../pipeline/scheduler/runtime_constructor.py | 36 +++++++- python/fate_client/pipeline/test/test_dag.py | 31 ++++--- .../pipeline/test/test_dag_flow.py | 80 +++++++++++++++-- .../fate_client/pipeline/test/test_upload.py | 14 +++ .../utils/fateflow/fate_flow_job_invoker.py | 50 +++++++++-- .../pipeline/utils/fateflow/flow_client.py | 39 ++++++++ .../pipeline/utils/standalone/job_process.py | 2 +- .../fate_client/pipeline/worker/__init__.py | 0 19 files changed, 482 insertions(+), 96 deletions(-) rename python/fate_client/pipeline/{executor => entity}/model_info.py (78%) create mode 100644 python/fate_client/pipeline/entity/model_structure.py create mode 100644 python/fate_client/pipeline/entity/task_info.py create mode 100644 python/fate_client/pipeline/test/test_upload.py delete mode 100644 python/fate_client/pipeline/worker/__init__.py diff --git a/python/fate_client/pipeline/components/component_base.py b/python/fate_client/pipeline/components/component_base.py index 81f63603ee..640bf74a14 100644 --- a/python/fate_client/pipeline/components/component_base.py +++ b/python/fate_client/pipeline/components/component_base.py @@ -1,8 +1,8 @@ import copy from ..conf.types import SupportRole, PlaceHolder, ArtifactSourceType from ..conf.job_configuration import TaskConf -from python.fate_client.pipeline.utils.standalone.id_gen import get_uuid -from pipeline.entity.component_structures import load_component_spec +from ..utils.standalone.id_gen import get_uuid +from ..entity.component_structures import load_component_spec from ..interface import ArtifactChannel from ..entity.dag_structures import RuntimeTaskOutputChannelSpec, ModelWarehouseChannelSpec diff --git a/python/fate_client/pipeline/entity/__init__.py b/python/fate_client/pipeline/entity/__init__.py index 75ab3de3b5..4b39f4c95b 100644 --- a/python/fate_client/pipeline/entity/__init__.py +++ b/python/fate_client/pipeline/entity/__init__.py @@ -1,6 +1,9 @@ from .dag import DAG +from .task_info import FateFlowTaskInfo, StandaloneTaskInfo __all__ = [ - "DAG" + "DAG", + "FateFlowTaskInfo", + "StandaloneTaskInfo" ] diff --git a/python/fate_client/pipeline/executor/model_info.py b/python/fate_client/pipeline/entity/model_info.py similarity index 78% rename from python/fate_client/pipeline/executor/model_info.py rename to python/fate_client/pipeline/entity/model_info.py index 376929f4e9..dc48a5ec09 100644 --- a/python/fate_client/pipeline/executor/model_info.py +++ b/python/fate_client/pipeline/entity/model_info.py @@ -1,14 +1,15 @@ -from typing import Union, Dict -from ..scheduler.runtime_constructor import RuntimeConstructor +from typing import Dict class StandaloneModelInfo(object): - def __init__(self, job_id: str, task_info: Dict[str, RuntimeConstructor], + def __init__(self, job_id: str, task_info, local_role: str, local_party_id: str, model_id: str = None, model_version: int = None): self._job_id = job_id self._task_info = task_info self._model_id = model_id self._model_version = model_version + self._local_role = local_role + self._local_party_id = local_party_id @property def job_id(self): @@ -26,6 +27,14 @@ def model_id(self): def model_version(self): return self._model_version + @property + def local_role(self): + return self._local_role + + @property + def local_party_id(self): + return self._local_party_id + class FateFlowModelInfo(object): def __init__(self, job_id: str, local_role: str, local_party_id: str, diff --git a/python/fate_client/pipeline/entity/model_structure.py b/python/fate_client/pipeline/entity/model_structure.py new file mode 100644 index 0000000000..aa75b6780f --- /dev/null +++ b/python/fate_client/pipeline/entity/model_structure.py @@ -0,0 +1,56 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from datetime import datetime +from typing import List + +import pydantic + + +class MLModelComponentSpec(pydantic.BaseModel): + name: str + provider: str + version: str + metadata: dict + + +class MLModelPartiesSpec(pydantic.BaseModel): + guest: List[str] + host: List[str] + arbiter: List[str] + + +class MLModelFederatedSpec(pydantic.BaseModel): + task_id: str + parties: MLModelPartiesSpec + component: MLModelComponentSpec + + +class MLModelModelSpec(pydantic.BaseModel): + name: str + created_time: datetime + file_format: str + metadata: dict + + +class MLModelPartySpec(pydantic.BaseModel): + party_task_id: str + role: str + partyid: str + models: List[MLModelModelSpec] + + +class MLModelSpec(pydantic.BaseModel): + federated: MLModelFederatedSpec + party: MLModelPartySpec diff --git a/python/fate_client/pipeline/entity/task_info.py b/python/fate_client/pipeline/entity/task_info.py new file mode 100644 index 0000000000..2651893bd5 --- /dev/null +++ b/python/fate_client/pipeline/entity/task_info.py @@ -0,0 +1,57 @@ +import abc +import typing +from .model_info import StandaloneModelInfo, FateFlowModelInfo +from ..utils.fateflow.fate_flow_job_invoker import FATEFlowJobInvoker + + +class TaskInfo(object): + def __init__(self, task_name: str, model_info: typing.Union[StandaloneModelInfo, FateFlowModelInfo]): + self._model_info = model_info + self._task_name = task_name + + @abc.abstractmethod + def get_output_data(self, *args, **kwargs): + ... + + @abc.abstractmethod + def get_output_model(self, *args, **kwargs): + ... + + @abc.abstractmethod + def get_output_metrics(self, *args, **kwargs): + ... + + +class StandaloneTaskInfo(TaskInfo): + def get_output_data(self): + ... + + def get_output_model(self, role=None, party_id=None): + party_id = party_id if role else self._model_info.local_party_id + role = role if role else self._model_info.local_role + return self._model_info.task_info[self._task_name].get_output_model(role, party_id) + + def get_output_metrics(self, role=None, party_id=None): + party_id = party_id if role else self._model_info.local_party_id + role = role if role else self._model_info.local_role + return self._model_info.task_info[self._task_name].get_output_metrics(role, party_id) + + +class FateFlowTaskInfo(TaskInfo): + def get_output_model(self): + return FATEFlowJobInvoker().get_output_model(job_id=self._model_info.job_id, + role=self._model_info.local_role, + party_id=self._model_info.local_party_id, + task_name=self._task_name) + + def get_output_data(self, limits=None, ): + ... + + def get_output_metrics(self): + return FATEFlowJobInvoker().get_output_metrics(job_id=self._model_info.job_id, + role=self._model_info.local_role, + party_id=self._model_info.local_party_id, + task_name=self._task_name) + + + diff --git a/python/fate_client/pipeline/executor/task_executor.py b/python/fate_client/pipeline/executor/task_executor.py index 1f5a0fa596..c050acb589 100644 --- a/python/fate_client/pipeline/executor/task_executor.py +++ b/python/fate_client/pipeline/executor/task_executor.py @@ -8,7 +8,7 @@ from ..scheduler.dag_parser import DagParser from ..scheduler.runtime_constructor import RuntimeConstructor from ..utils.fateflow.fate_flow_job_invoker import FATEFlowJobInvoker -from .model_info import StandaloneModelInfo, FateFlowModelInfo +from python.fate_client.pipeline.entity.model_info import StandaloneModelInfo, FateFlowModelInfo class StandaloneExecutor(object): @@ -23,9 +23,12 @@ def fit(self, dag_schema: DAGSchema, component_specs: Dict[str, ComponentSpec], self._dag_parser.parse_dag(dag_schema, component_specs) self._run() + local_party_id = self.get_site_party_id(dag_schema, local_role, local_party_id) return StandaloneModelInfo( job_id=self._job_id, task_info=self._runtime_constructor_dict, + local_role=local_role, + local_party_id=local_party_id, model_id=self._job_id, model_version=0 ) @@ -38,7 +41,9 @@ def predict(self, self._run(fit_model_info) return StandaloneModelInfo( job_id=self._job_id, - task_info=self._runtime_constructor_dict + task_info=self._runtime_constructor_dict, + local_role=fit_model_info.local_role, + local_party_id=fit_model_info.local_party_id ) def _run(self, fit_model_info: StandaloneModelInfo = None): @@ -62,12 +67,12 @@ def _run(self, fit_model_info: StandaloneModelInfo = None): job_id=self._job_id, task_name=task_name, component_ref=task_node.component_ref, + component_spec=task_node.component_spec, stage=stage, runtime_parameters=runtime_parameters, log_dir=log_dir) runtime_constructor.construct_input_artifacts(upstream_inputs, runtime_constructor_dict, - component_spec, fit_model_info) runtime_constructor.construct_outputs() # runtime_constructor.construct_output_artifacts(output_definitions) @@ -103,6 +108,18 @@ def _exec_task(task_type, task_name, runtime_constructor): return ret_msg + @staticmethod + def get_site_party_id(dag_schema, role, party_id): + if party_id: + return party_id + + if party_id is None: + for party in dag_schema.dag.parties: + if role == party.role: + return party.party_id[0] + + raise ValueError(f"Can not retrieval site's party_id from site's role {role}") + class FateFlowExecutor(object): def __init__(self): @@ -163,7 +180,8 @@ def get_site_party_id(flow_job_invoker, dag_schema, role, party_id): raise ValueError(f"Can not retrieval site's party_id from site's role {role}") - def upload(self, file: str, head: int, + @staticmethod + def upload(file: str, head: int, namespace: str, name: str, meta: dict, partitions=4, storage_engine=None, **kwargs): flow_job_invoker = FATEFlowJobInvoker() diff --git a/python/fate_client/pipeline/manager/metric_manager.py b/python/fate_client/pipeline/manager/metric_manager.py index 32f2eba849..54561d534f 100644 --- a/python/fate_client/pipeline/manager/metric_manager.py +++ b/python/fate_client/pipeline/manager/metric_manager.py @@ -1,3 +1,4 @@ +import json from typing import Union from ..utils.uri_tools import parse_uri, replace_uri_path, get_schema_from_uri from ..utils.file_utils import construct_local_dir @@ -14,17 +15,14 @@ def generate_output_metric_uri(cls, output_dir_uri: str, job_id: str, task_name: uri_obj = replace_uri_path(uri_obj, str(local_path)) return uri_obj.geturl() - -class LMDBMetricManager(object): @classmethod - def generate_output_metric_uri(cls, output_dir_uri: str, job_id: str, task_name: str, - role: str, party_id: Union[str, int]): - ... + def get_output_metrics(cls, uri): + uri_obj = parse_uri(uri) + with open(uri_obj.path, "r") as fin: + return json.loads(fin.read()) def get_metric_manager(metric_uri: str): uri_type = get_schema_from_uri(metric_uri) if uri_type == UriTypes.LOCAL: return LocalFSMetricManager - else: - return LMDBMetricManager diff --git a/python/fate_client/pipeline/manager/model_manager.py b/python/fate_client/pipeline/manager/model_manager.py index f161b4f5aa..d483c9fa1a 100644 --- a/python/fate_client/pipeline/manager/model_manager.py +++ b/python/fate_client/pipeline/manager/model_manager.py @@ -1,6 +1,12 @@ +import json +import os +import tarfile +import tempfile +import yaml from ..utils.uri_tools import parse_uri, replace_uri_path, get_schema_from_uri from ..utils.file_utils import construct_local_dir from ..conf.types import UriTypes +from ..entity.model_structure import MLModelSpec class LocalFSModelManager(object): @@ -8,23 +14,39 @@ class LocalFSModelManager(object): def generate_output_model_uri(cls, output_dir_uri: str, job_id: str, task_name: str, role: str, party_id: str): model_id = "_".join([job_id, task_name, role, str(party_id)]) - model_version = "v0" + model_version = "0" uri_obj = parse_uri(output_dir_uri) local_path = construct_local_dir(uri_obj.path, *[model_id, model_version]) uri_obj = replace_uri_path(uri_obj, str(local_path)) return uri_obj.geturl() - -class LMDBModelManager(object): @classmethod - def generate_output_model_uri(cls, uri_obj, session_id: str, role: str, party_id: str, namespace: str, name: str): - ... + def get_output_model(cls, output_dir_uri): + uri_obj = parse_uri(output_dir_uri) + models = dict() + with tempfile.TemporaryDirectory() as temp_dir: + tar = tarfile.open(uri_obj.path, "r:") + tar.extractall(path=temp_dir) + tar.close() + for file_name in os.listdir(temp_dir): + if file_name.endswith("FMLModel.yaml"): + with open(os.path.join(temp_dir, file_name), "r") as fp: + model_meta = yaml.safe_load(fp) + model_spec = MLModelSpec.parse_obj(model_meta) + + for model in model_spec.party.models: + file_format = model.file_format + model_name = model.name + + if file_format == "json": + with open(os.path.join(temp_dir, model_name), "r") as fp: + models[model_name] = json.loads(fp.read()) + + return models def get_model_manager(model_uri: str): uri_type = get_schema_from_uri(model_uri) if uri_type == UriTypes.LOCAL: return LocalFSModelManager - else: - return LMDBModelManager diff --git a/python/fate_client/pipeline/manager/resource_manager.py b/python/fate_client/pipeline/manager/resource_manager.py index 2a217baf40..ac3480d700 100644 --- a/python/fate_client/pipeline/manager/resource_manager.py +++ b/python/fate_client/pipeline/manager/resource_manager.py @@ -1,4 +1,4 @@ -from python.fate_client.pipeline.utils.standalone.id_gen import get_uuid +from ..utils.standalone.id_gen import get_uuid from ..utils.file_utils import construct_local_dir from ..conf.env_config import StandaloneConfig from ..entity.task_structure import OutputArtifact @@ -93,6 +93,12 @@ def generate_output_terminate_status_uri(self, job_id, task_name, role, party_id role, party_id) + def get_output_model(self, uri): + return self._model_manager.get_output_model(uri) + + def get_output_metrics(self, uri): + return self._metric_manager.get_output_metrics(uri) + @staticmethod def generate_log_uri(log_dir_prefix, role, party_id): return str(construct_local_dir(log_dir_prefix, *[role, str(party_id)])) diff --git a/python/fate_client/pipeline/manager/status_manager.py b/python/fate_client/pipeline/manager/status_manager.py index c25ca0be81..1a5a73cdc5 100644 --- a/python/fate_client/pipeline/manager/status_manager.py +++ b/python/fate_client/pipeline/manager/status_manager.py @@ -54,7 +54,7 @@ def get_task_results(self, tasks_info): return ret def get_task_outputs(self, task_id): - return self._meta_manager.get_artifacts(task_id) + return self._meta_manager.get_artifacts(task_id)["output"] def get_status_manager(): @@ -84,9 +84,17 @@ def get_artifacts(self, taskid): artifacts = self.store.get_artifacts_by_context(context_id) # parameters parameters = [] - data = [] - model = [] - metric = [] + input_data, output_data = [], [] + input_model, output_model = [], [] + input_metric, output_metric = [], [] + + def _to_dict(artifact): + return dict( + uri=artifact.uri, + name=artifact.properties["name"].string_value, + metadata=json.loads(artifact.properties["metadata"].string_value), + ) + for artifact in artifacts: if self.parameter_type_id == artifact.type_id: parameters.append( @@ -96,34 +104,31 @@ def get_artifacts(self, taskid): type=artifact.properties["type"].string_value, ) ) - if self.data_type_id == artifact.type_id: - data.append( - dict( - uri=artifact.uri, - name=artifact.properties["name"].string_value, - metadata=json.loads(artifact.properties["metadata"].string_value), - ) - ) - - if self.model_type_id == artifact.type_id: - data.append( - dict( - uri=artifact.uri, - name=artifact.properties["name"].string_value, - metadata=json.loads(artifact.properties["metadata"].string_value), - ) - ) - - if self.metric_type_id == artifact.type_id: - data.append( - dict( - uri=artifact.uri, - name=artifact.properties["name"].string_value, - metadata=json.loads(artifact.properties["metadata"].string_value), - ) - ) - - return dict(parameters=parameters, data=data, model=model, metric=metric) + if artifact.type_id in {self.data_type_id, self.model_type_id, self.metric_type_id}: + is_input = artifact.properties["is_input"].bool_value + + if self.data_type_id == artifact.type_id: + if is_input: + input_data.append(_to_dict(artifact)) + else: + output_data.append(_to_dict(artifact)) + + if self.model_type_id == artifact.type_id: + if is_input: + input_model.append(_to_dict(artifact)) + else: + output_model.append(_to_dict(artifact)) + + if self.metric_type_id == artifact.type_id: + if is_input: + input_metric.append(_to_dict(artifact)) + else: + output_metric.append(_to_dict(artifact)) + return dict( + parameters=parameters, + input=dict(data=input_data, model=input_model, metric=input_metric), + output=dict(data=output_data, model=output_model, metric=output_metric), + ) def get_or_create_task_context(self, taskid): task_context_run = self.store.get_context_by_type_and_name("TaskContext", taskid) @@ -162,8 +167,8 @@ def get_or_create_task(self, taskid): task_run.name = taskid task_run.properties["state"].string_value = "INIT" task_run.properties["safe_terminate"].bool_value = False - [task_run_id] = self.store.put_executions([task_run]) - task_run.id = task_run_id + [task_run_id] = self.store.put_executions([task_run]) + task_run.id = task_run_id return task_run def get_task_safe_terminate_flag(self, taskid: str): @@ -198,19 +203,20 @@ def add_parameter(self, name: str, value): [artifact_id] = self.store.put_artifacts([artifact]) return artifact_id - def add_data_artifact(self, name: str, uri: str, metadata: dict): - return self.add_artifact(self.data_type_id, name, uri, metadata) + def add_data_artifact(self, name: str, uri: str, metadata: dict, is_input): + return self.add_artifact(self.data_type_id, name, uri, metadata, is_input) - def add_model_artifact(self, name: str, uri: str, metadata: dict): - return self.add_artifact(self.model_type_id, name, uri, metadata) + def add_model_artifact(self, name: str, uri: str, metadata: dict, is_input): + return self.add_artifact(self.model_type_id, name, uri, metadata, is_input) - def add_metric_artifact(self, name: str, uri: str, metadata: dict): - return self.add_artifact(self.metric_type_id, name, uri, metadata) + def add_metric_artifact(self, name: str, uri: str, metadata: dict, is_input): + return self.add_artifact(self.metric_type_id, name, uri, metadata, is_input) - def add_artifact(self, type_id: int, name: str, uri: str, metadata: dict): + def add_artifact(self, type_id: int, name: str, uri: str, metadata: dict, is_input): artifact = metadata_store_pb2.Artifact() artifact.uri = uri artifact.properties["name"].string_value = name + artifact.properties["is_input"].bool_value = is_input artifact.properties["metadata"].string_value = json.dumps(metadata) artifact.type_id = type_id [artifact_id] = self.store.put_artifacts([artifact]) @@ -270,6 +276,7 @@ def create_artifact_type(self, name): artifact_type.name = name artifact_type.properties["uri"] = metadata_store_pb2.STRING artifact_type.properties["name"] = metadata_store_pb2.STRING + artifact_type.properties["is_input"] = metadata_store_pb2.BOOLEAN artifact_type.properties["metadata"] = metadata_store_pb2.STRING artifact_type_id = self.store.put_artifact_type(artifact_type) return artifact_type_id diff --git a/python/fate_client/pipeline/pipeline.py b/python/fate_client/pipeline/pipeline.py index a27ec8a903..5dea575d1c 100644 --- a/python/fate_client/pipeline/pipeline.py +++ b/python/fate_client/pipeline/pipeline.py @@ -3,6 +3,7 @@ import yaml from .executor import StandaloneExecutor, FateFlowExecutor from .entity import DAG +from .entity import FateFlowTaskInfo, StandaloneTaskInfo from .entity.runtime_entity import Roles from .conf.env_config import SiteInfo from .conf.types import SupportRole, PlaceHolder @@ -26,9 +27,11 @@ def __init__(self, executor, *args): def set_site_role(self, role): self._local_role = role + return self def set_site_party_id(self, party_id): self._local_party_id = party_id + return self def set_stage(self, stage): self._stage = stage @@ -171,6 +174,9 @@ def get_component_specs(self): return component_specs + def get_task_info(self, task_name): + raise NotADirectoryError + def fit(self) -> "Pipeline": self._model_info = self._executor.fit(self._dag.dag_spec, self.get_component_specs(), @@ -229,6 +235,12 @@ class StandalonePipeline(Pipeline): def __init__(self, *args): super(StandalonePipeline, self).__init__(StandaloneExecutor(), *args) + def get_task_info(self, task): + if isinstance(task, Component): + task = task.name + + return StandaloneTaskInfo(task_name=task, model_info=self._model_info) + class FateFlowPipeline(Pipeline): def __init__(self, *args): @@ -239,3 +251,9 @@ def upload(self, file: str, head: int, meta: dict, partitions=4, storage_engine=None, **kwargs): self._executor.upload(file, head, namespace, name, meta, partitions, storage_engine, **kwargs) + + def get_task_info(self, task): + if isinstance(task, Component): + task = task.name + + return FateFlowTaskInfo(task_name=task, model_info=self._model_info) diff --git a/python/fate_client/pipeline/scheduler/runtime_constructor.py b/python/fate_client/pipeline/scheduler/runtime_constructor.py index c3c2c806e0..33093f4741 100644 --- a/python/fate_client/pipeline/scheduler/runtime_constructor.py +++ b/python/fate_client/pipeline/scheduler/runtime_constructor.py @@ -1,22 +1,25 @@ from ..conf.env_config import StandaloneConfig +from ..conf.types import ArtifactType from ..entity.dag_structures import RuntimeTaskOutputChannelSpec from ..entity.task_structure import TaskScheduleSpec, LOGGERSpec, TaskRuntimeInputSpec, \ MLMDSpec, RuntimeConfSpec, ComputingEngineSpec, DeviceSpec, FederationPartySpec, \ ComputingEngineMetadata, FederationEngineSpec, FederationEngineMetadata, InputArtifact from ..manager.resource_manager import StandaloneResourceManager -from python.fate_client.pipeline.utils.standalone.id_gen import gen_computing_id, gen_federation_id, gen_task_id +from ..utils.standalone.id_gen import gen_computing_id, gen_federation_id, gen_task_id class RuntimeConstructor(object): OUTPUT_KEYS = ["model", "metric", "data"] - def __init__(self, runtime_parties, stage, job_id, task_name, component_ref, runtime_parameters, log_dir): + def __init__(self, runtime_parties, stage, job_id, task_name, + component_ref, component_spec, runtime_parameters, log_dir): self._task_name = task_name self._runtime_parties = runtime_parties self._job_id = job_id self._federation_id = gen_federation_id(job_id, task_name) self._stage = stage self._component_ref = component_ref + self._component_spec = component_spec self._runtime_parameters = runtime_parameters self._log_dir = log_dir @@ -70,8 +73,8 @@ def get_output_artifact(self, role, party_id, output_key): return self._output_artifacts[role][party_id].get(output_key, None) def construct_input_artifacts(self, upstream_inputs, runtime_constructor_dict, - component_spec, fit_model_info=None): - input_artifacts = component_spec.input_definitions.artifacts + fit_model_info=None): + input_artifacts = self._component_spec.input_definitions.artifacts for input_key, channels in upstream_inputs.items(): artifact_spec = input_artifacts[input_key] if self._stage not in set(artifact_spec.stages): @@ -237,3 +240,28 @@ def retrieval_task_outputs(self): for output in output_list: output_artifact = InputArtifact(**output) self._output_artifacts[party.role][party.party_id].update({output_artifact.name: output_artifact}) + + def get_output_model(self, role, party_id): + output_artifacts = self._output_artifacts[role][party_id] + models = dict() + for artifact_key, artifact in output_artifacts.items(): + artifact_spec = self._component_spec.output_definitions.artifacts[artifact_key] + uri = artifact.uri + if artifact_spec.type in [ArtifactType.MODEL, ArtifactType.MODELS]: + models.update(self._resource_manager.get_output_model(uri)) + + return models + + def get_output_metrics(self, role, party_id): + output_artifacts = self._output_artifacts[role][party_id] + metrics = dict() + for artifact_key, artifact in output_artifacts.items(): + artifact_spec = self._component_spec.output_definitions.artifacts[artifact_key] + uri = artifact.uri + if ArtifactType.METRIC in artifact_spec.type: + metric_name = uri.split("/", -1)[-1] + metrics[metric_name] = self._resource_manager.get_output_metrics(uri) + + return metrics + + diff --git a/python/fate_client/pipeline/test/test_dag.py b/python/fate_client/pipeline/test/test_dag.py index 509c96e1d8..083308c977 100644 --- a/python/fate_client/pipeline/test/test_dag.py +++ b/python/fate_client/pipeline/test/test_dag.py @@ -7,7 +7,7 @@ pipeline = StandalonePipeline().set_scheduler_party_id(party_id=10001).set_roles( - guest=9999, host=[10000, 10001], arbiter=10001) + guest=9999, host=10000, arbiter=10001) reader_0 = Reader(name="reader_0") reader_0.guest.component_param(path="file:///Users/maguoqiang/mgq/FATE-2.0-alpha-with-flow/FATE/" "examples/data/breast_hetero_guest.csv", @@ -17,8 +17,9 @@ label_name="y", label_type="float32", dtype="float32") +reader_0.guest.conf.set("test_reader_guest", 2) -reader_0.hosts[[0, 1]].component_param(path="file:///Users/maguoqiang/mgq/FATE-2.0-alpha-with-flow/FATE/" +reader_0.hosts[0].component_param(path="file:///Users/maguoqiang/mgq/FATE-2.0-alpha-with-flow/FATE/" "examples/data/breast_hetero_host.csv", format="csv", id_name="id", @@ -26,6 +27,8 @@ label_name=None, dtype="float32") +reader_0.hosts[[0, 1]].conf.set("test_reader_guest", 2) + intersection_0 = Intersection(name="intersection_0", method="raw", input_data=reader_0.outputs["output_data"]) @@ -43,14 +46,13 @@ input_model=feature_scale_0.outputs["output_model"]) lr_0 = HeteroLR(name="lr_0", - # train_data=feature_scale_0.outputs["train_output_data"], - # validate_data=feature_scale_1.outputs["test_output_data"], - train_data=reader_0.outputs["output_data"], - validate_data=reader_0.outputs["output_data"], + train_data=feature_scale_0.outputs["train_output_data"], + validate_data=feature_scale_1.outputs["test_output_data"], max_iter=1, learning_rate=0.01, - batch_size=569) + batch_size=100) +lr_0.conf.set("backend", "gpu") lr_1 = HeteroLR(name="lr_1", test_data=feature_scale_1.outputs["test_output_data"], input_model=lr_0.outputs["output_model"]) @@ -60,8 +62,8 @@ input_data=lr_0.outputs["train_output_data"]) pipeline.add_task(reader_0) -# pipeline.add_task(feature_scale_0) -# pipeline.add_task(feature_scale_1) +pipeline.add_task(feature_scale_0) +pipeline.add_task(feature_scale_1) pipeline.add_task(intersection_0) pipeline.add_task(intersection_1) pipeline.add_task(lr_0) @@ -72,12 +74,16 @@ pipeline.compile() print(pipeline.get_dag()) pipeline.fit() -print(pipeline.deploy([intersection_0, lr_0])) +print(pipeline.get_task_info("feature_scale_0").get_output_model()) +print(pipeline.get_task_info("lr_0").get_output_model()) +print(pipeline.get_task_info("evaluation_0").get_output_metrics()) +print(pipeline.deploy([intersection_0, feature_scale_0, lr_0])) predict_pipeline = StandalonePipeline() reader_1 = Reader(name="reader_1") -reader_1.guest.component_param(path="examples/data/breast_hetero_guest.csv", +reader_1.guest.component_param(path="file:///Users/maguoqiang/mgq/FATE-2.0-alpha-with-flow/FATE/" + "examples/data/breast_hetero_guest.csv", format="csv", id_name="id", delimiter=",", @@ -85,7 +91,8 @@ label_type="float32", dtype="float32") -reader_1.hosts[[0, 1]].component_param(path="examples/data/breast_hetero_host.csv", +reader_1.hosts[0].component_param(path="file:///Users/maguoqiang/mgq/FATE-2.0-alpha-with-flow/FATE/" + "examples/data/breast_hetero_host.csv", format="csv", id_name="id", delimiter=",", diff --git a/python/fate_client/pipeline/test/test_dag_flow.py b/python/fate_client/pipeline/test/test_dag_flow.py index 9f2ea3eda9..2bc66d314d 100644 --- a/python/fate_client/pipeline/test/test_dag_flow.py +++ b/python/fate_client/pipeline/test/test_dag_flow.py @@ -2,13 +2,14 @@ from pipeline.components.fate import Reader from pipeline.components.fate import FeatureScale from pipeline.components.fate import Intersection +from pipeline.components.fate import Evaluation from pipeline.pipeline import FateFlowPipeline pipeline = FateFlowPipeline().set_scheduler_party_id(party_id=10001).set_roles( - guest=9999, host=[10000, 10001], arbiter=10001) + guest=9999, host=10000, arbiter=10001) reader_0 = Reader(name="reader_0") -reader_0.guest.component_param(path="/Users/maguoqiang/mgq/FATE-2.0-alpha-with-flow/FATE/" +reader_0.guest.component_param(path="file:///Users/maguoqiang/mgq/FATE-2.0-alpha-with-flow/FATE/" "examples/data/breast_hetero_guest.csv", format="csv", id_name="id", @@ -16,15 +17,18 @@ label_name="y", label_type="float32", dtype="float32") +reader_0.guest.conf.set("test_reader_guest", 2) -reader_0.hosts[[0, 1]].component_param(path="/Users/maguoqiang/mgq/FATE-2.0-alpha-with-flow/FATE/" - "examples/data/breast_hetero_host.csv", +reader_0.hosts[0].component_param(path="file:///Users/maguoqiang/mgq/FATE-2.0-alpha-with-flow/FATE/" + "examples/data/breast_hetero_host.csv", format="csv", id_name="id", delimiter=",", label_name=None, dtype="float32") +reader_0.hosts[0].conf.set("test_reader_guest", 2) + intersection_0 = Intersection(name="intersection_0", method="raw", input_data=reader_0.outputs["output_data"]) @@ -33,17 +37,77 @@ method="raw", input_data=reader_0.outputs["output_data"]) +feature_scale_0 = FeatureScale(name="feature_scale_0", + method="standard", + train_data=intersection_0.outputs["output_data"]) + +feature_scale_1 = FeatureScale(name="feature_scale_1", + test_data=intersection_1.outputs["output_data"], + input_model=feature_scale_0.outputs["output_model"]) + lr_0 = HeteroLR(name="lr_0", - train_data=intersection_0.outputs["train_output_data"], - eval_data=intersection_1.outputs["test_output_data"], + # train_data=feature_scale_0.outputs["train_output_data"], + # validate_data=feature_scale_1.outputs["test_output_data"], + train_data=intersection_0.outputs["output_data"], + validate_data=intersection_0.outputs["output_data"], max_iter=1, learning_rate=0.01, - batch_size=-1) + batch_size=100) + +lr_0.conf.set("backend", "gpu") +lr_1 = HeteroLR(name="lr_1", + test_data=feature_scale_1.outputs["test_output_data"], + input_model=lr_0.outputs["output_model"]) + +evaluation_0 = Evaluation(name="evaluation_0", + runtime_roles="guest", + input_data=lr_0.outputs["train_output_data"]) pipeline.add_task(reader_0) +# pipeline.add_task(feature_scale_0) +# pipeline.add_task(feature_scale_1) pipeline.add_task(intersection_0) pipeline.add_task(intersection_1) pipeline.add_task(lr_0) +pipeline.add_task(evaluation_0) + +# pipeline.add_task(lr_1) +pipeline.conf.set("task_parallelism", 1) pipeline.compile() -print (pipeline.get_dag()) +print(pipeline.get_dag()) pipeline.fit() +print(pipeline.get_task_info("lr_0").get_output_model()) +print(pipeline.get_task_info("evaluation_0").get_output_metrics()) +exit(0) +print(pipeline.deploy([intersection_0, lr_0])) + + +predict_pipeline = StandalonePipeline() +reader_1 = Reader(name="reader_1") +reader_1.guest.component_param(path="file:///Users/maguoqiang/mgq/FATE-2.0-alpha-with-flow/FATE/" + "examples/data/breast_hetero_guest.csv", + format="csv", + id_name="id", + delimiter=",", + label_name="y", + label_type="float32", + dtype="float32") + +reader_1.hosts[0].component_param(path="file:///Users/maguoqiang/mgq/FATE-2.0-alpha-with-flow/FATE/" + "examples/data/breast_hetero_host.csv", + format="csv", + id_name="id", + delimiter=",", + label_name=None, + dtype="float32") + + +deployed_pipeline = pipeline.get_deployed_pipeline() +deployed_pipeline.intersection_0.input_data = reader_1.outputs["output_data"] + +predict_pipeline.add_task(deployed_pipeline) +predict_pipeline.add_task(reader_1) + +print("\n\n\n") +print(predict_pipeline.compile().get_dag()) +predict_pipeline.predict() diff --git a/python/fate_client/pipeline/test/test_upload.py b/python/fate_client/pipeline/test/test_upload.py new file mode 100644 index 0000000000..3fe0a8def1 --- /dev/null +++ b/python/fate_client/pipeline/test/test_upload.py @@ -0,0 +1,14 @@ +from pipeline.pipeline import FateFlowPipeline + +pipeline = FateFlowPipeline() +pipeline.upload(file="/Users/maguoqiang/mgq/FATE-2.0-alpha-with-flow/FATE/examples/data/breast_hetero_guest.csv", + head=1, + partitions=4, + namespace="experiment", + name="breast_hetero_guest", + storage_engine="standalone", + meta={ + "label_name": "y", + "label_type": "float32", + "dtype": "float32" + }) diff --git a/python/fate_client/pipeline/utils/fateflow/fate_flow_job_invoker.py b/python/fate_client/pipeline/utils/fateflow/fate_flow_job_invoker.py index adec54bf00..d5c5573198 100644 --- a/python/fate_client/pipeline/utils/fateflow/fate_flow_job_invoker.py +++ b/python/fate_client/pipeline/utils/fateflow/fate_flow_job_invoker.py @@ -89,16 +89,56 @@ def query_task(self, job_id, role, party_id, status): def query_site_info(self): response = self._client.query_site_info() try: + code = response["code"] + if code != 0: + return None + party_id = response["data"]["party_id"] - return '9999' + return party_id # TODO: fix it later # return party_id - except BaseException: - raise ValueError(f"query site info is failed, response={response}") + except ValueError: + return None def upload_data(self, upload_conf): - resource = self._client.upload_data(upload_conf) - return resource + response = self._client.upload_data(upload_conf) + try: + code = response["code"] + if code != 0: + raise ValueError(f"Return code {code}!=0") + + namespace = response["data"]["namespace"] + name = response["data"]["name"] + print(f"Upload data successfully, please use eggroll:///{namespace}/{name} as input uri") + except BaseException: + raise ValueError(f"Upload data fails, response={response}") + + def get_output_data(self, ): + ... + + def get_output_model(self, job_id, role, party_id, task_name): + response = self._client.query_model(job_id, role, party_id, task_name) + try: + code = response["code"] + if code != 0: + raise ValueError(f"Return code {code}!=0") + model = response["data"]["output_model"] + return model + except BaseException: + raise ValueError(f"query task={job_id}, role={role}, " + f"party_id={party_id}'s output model is failed, response={response}") + + def get_output_metrics(self, job_id, role, party_id, task_name): + response = self._client.query_metrics(job_id, role, party_id, task_name) + try: + code = response["code"] + if code != 0: + raise ValueError(f"Return code {code}!=0") + metrics = response["data"] + return metrics + except BaseException: + raise ValueError(f"query task={job_id}, role={role}, " + f"party_id={party_id}'s output metrics is failed, response={response}") class JobStatus(object): diff --git a/python/fate_client/pipeline/utils/fateflow/flow_client.py b/python/fate_client/pipeline/utils/fateflow/flow_client.py index 48f0a89b1c..e057e2ca2e 100644 --- a/python/fate_client/pipeline/utils/fateflow/flow_client.py +++ b/python/fate_client/pipeline/utils/fateflow/flow_client.py @@ -8,6 +8,8 @@ class Address(object): QUERY_TASK = "/job/task/query" SITE_INFO = "/site/info/query" UPLOAD_DATA = "/data/upload" + QUERY_MODEL = "/output/model/query" + QUERY_METRIC = "/output/metric/query" def __init__(self, server_url): self._submit_job_url = server_url + self.SUBMIT_JOB @@ -19,6 +21,9 @@ def __init__(self, server_url): self._upload_url = server_url + self.UPLOAD_DATA + self._query_model_url = server_url + self.QUERY_MODEL + self._query_metric_url = server_url + self.QUERY_METRIC + @property def submit_job_url(self): return self._submit_job_url @@ -43,6 +48,14 @@ def site_info_url(self): def upload_data_url(self): return self._upload_url + @property + def query_model_url(self): + return self._query_model_url + + @property + def query_metric_url(self): + return self._query_metric_url + class FlowClient(object): def __init__(self, ip, port, version): @@ -107,3 +120,29 @@ def upload_data(self, upload_conf): ) return response.json() + + def query_model(self, job_id, role, party_id, task_name): + resource = requests.get( + self._address.query_model_url, + params={ + "job_id": job_id, + "role": role, + "party_id": party_id, + "task_name": task_name + } + ) + + return resource.json() + + def query_metrics(self, job_id, role, party_id, task_name): + resource = requests.get( + self._address.query_metric_url, + params={ + "job_id": job_id, + "role": role, + "party_id": party_id, + "task_name": task_name + } + ) + + return resource.json() diff --git a/python/fate_client/pipeline/utils/standalone/job_process.py b/python/fate_client/pipeline/utils/standalone/job_process.py index 1ad6b8f1b3..444af461b8 100644 --- a/python/fate_client/pipeline/utils/standalone/job_process.py +++ b/python/fate_client/pipeline/utils/standalone/job_process.py @@ -6,7 +6,7 @@ from pathlib import Path from types import SimpleNamespace -from python.fate_client.pipeline.scheduler.runtime_constructor import RuntimeConstructor +from ...scheduler.runtime_constructor import RuntimeConstructor def run_subprocess(exec_cmd, std_log_fd): diff --git a/python/fate_client/pipeline/worker/__init__.py b/python/fate_client/pipeline/worker/__init__.py deleted file mode 100644 index e69de29bb2..0000000000