Skip to content

Commit

Permalink
merge dev-2.0.0-beta
Browse files Browse the repository at this point in the history
Signed-off-by: weiwee <wbwmat@gmail.com>
  • Loading branch information
sagewe committed Jun 16, 2023
2 parents a206ab5 + afdb365 commit 1673108
Show file tree
Hide file tree
Showing 52 changed files with 2,747 additions and 519 deletions.
10 changes: 5 additions & 5 deletions doc/2.0/quick_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ host_data_path = os.path.join(base_path, "breast_hetero_host.csv")
# create pipeline
pipeline = StandalonePipeline().set_roles(guest="9999", host="10000", arbiter="10001")

# create reader component
# create reader component_desc
reader_0 = Reader(name="reader_0")
reader_0.guest.component_param(
path=f"file://${guest_data_path}",
Expand All @@ -54,11 +54,11 @@ reader_0.hosts[0].component_param(
dtype="float32",
)

# create intersection component
# create intersection component_desc
intersection_0 = Intersection(name="intersection_0", method="raw", input_data=reader_0.outputs["output_data"])
intersection_1 = Intersection(name="intersection_1", method="raw", input_data=reader_0.outputs["output_data"])

# create feature scale component
# create feature scale component_desc
feature_scale_0 = FeatureScale(
name="feature_scale_0", method="standard", train_data=intersection_0.outputs["output_data"]
)
Expand All @@ -68,7 +68,7 @@ feature_scale_1 = FeatureScale(
input_model=feature_scale_0.outputs["output_model"],
)

# create lr component
# create lr component_desc
lr_0 = HeteroLR(
name="lr_0",
train_data=feature_scale_0.outputs["train_output_data"],
Expand All @@ -78,7 +78,7 @@ lr_0 = HeteroLR(
batch_size=-1,
)

# create evaluation component
# create evaluation component_desc
evaluation_0 = Evaluation(name="evaluation_0", runtime_roles="guest", input_data=lr_0.outputs["train_output_data"])

# add components
Expand Down
162 changes: 162 additions & 0 deletions examples/pipeline/test_feature_scale.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#
# Copyright 2019 The FATE Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse

from fate_client.pipeline import StandalonePipeline, FateFlowPipeline
from fate_client.pipeline.components.fate import FeatureScale
from fate_client.pipeline.components.fate import Intersection
from fate_client.pipeline.components.fate import Reader
from fate_client.pipeline.utils import test_utils


def main(config="./config.yaml", namespace=""):
if isinstance(config, str):
config = test_utils.load_job_config(config)

parties = config.parties
guest = parties.guest[0]
host = parties.host[0]
arbiter = parties.arbiter[0]

if config.work_mode == 0:
pipeline = StandalonePipeline().set_roles(guest=guest, host=host, arbiter=arbiter)
else:
pipeline = FateFlowPipeline().set_roles(guest=guest, host=host, arbiter=arbiter)
reader_0 = Reader(name="reader_0")
cluster = config.work_mode

if cluster:
reader_0.guest.component_param(table_name="breast_hetero_guest",
namespace=f"{namespace}experiment",
# path="file:///data/projects/fate/examples/data/breast_hetero_guest.csv",
# format="csv",
# match_id_name="id",
# delimiter=",",
label_name="y",
label_type="float32",
dtype="float32")

reader_0.hosts[0].component_param(table_name="breast_hetero_host",
namespace=f"{namespace}experiment",
# path="file:///data/projects/fate/examples/data/breast_hetero_host.csv",
# match_id_name="id",
# delimiter=",",
label_name=None,
dtype="float32")
else:
data_base = config.data_base_dir

reader_0.guest.component_param(path=f"file://{data_base}/examples/data/breast_hetero_guest.csv",
# path="file:///data/projects/fate/examples/data/breast_hetero_guest.csv",
format="csv",
match_id_name="id",
delimiter=",",
label_name="y",
label_type="float32",
dtype="float32")

reader_0.hosts[0].component_param(path=f"file://{data_base}/examples/data/breast_hetero_host.csv",
# path="file:///data/projects/fate/examples/data/breast_hetero_host.csv",
format="csv",
match_id_name="id",
delimiter=",",
label_name=None,
dtype="float32")

intersection_0 = Intersection(name="intersection_0",
method="raw",
input_data=reader_0.outputs["output_data"])

intersection_1 = Intersection(name="intersection_1",
method="raw",
input_data=reader_0.outputs["output_data"])

feature_scale_0 = FeatureScale(name="feature_scale_0",
method="standard",
train_data=intersection_0.outputs["output_data"])

feature_scale_1 = FeatureScale(name="feature_scale_1",
test_data=intersection_1.outputs["output_data"],
input_model=feature_scale_0.outputs["output_model"])

pipeline.add_task(reader_0)
pipeline.add_task(intersection_0)
pipeline.add_task(intersection_1)
pipeline.add_task(feature_scale_0)
pipeline.add_task(feature_scale_1)
pipeline.compile()
print(pipeline.get_dag())
pipeline.fit()
pipeline.deploy(["intersection_0", "feature_scale_0"])

predict_pipeline = StandalonePipeline()
reader_1 = Reader(name="reader_1")
if cluster:
reader_1.guest.component_param(table_name="breast_hetero_guest",
namespace=f"{namespace}experiment",
# path="file:///data/projects/fate/examples/data/breast_hetero_guest.csv",
# format="csv",
# match_id_name="id",
# delimiter=",",
label_name="y",
label_type="float32",
dtype="float32")

reader_1.hosts[0].component_param(table_name="breast_hetero_host",
namespace=f"{namespace}experiment",
# path="file:///data/projects/fate/examples/data/breast_hetero_host.csv",
# match_id_name="id",
# delimiter=",",
label_name=None,
dtype="float32")
else:
data_base = config.data_base_dir

reader_1.guest.component_param(path=f"file://{data_base}/examples/data/breast_hetero_guest.csv",
# path="file:///data/projects/fate/examples/data/breast_hetero_guest.csv",
format="csv",
match_id_name="id",
delimiter=",",
label_name="y",
label_type="float32",
dtype="float32")

reader_1.hosts[0].component_param(path=f"file://{data_base}/examples/data/breast_hetero_host.csv",
# path="file:///data/projects/fate/examples/data/breast_hetero_host.csv",
format="csv",
match_id_name="id",
delimiter=",",
label_name=None,
dtype="float32")

deployed_pipeline = pipeline.get_deployed_pipeline()
deployed_pipeline.intersection_0.input_data = reader_1.outputs["output_data"]

predict_pipeline.add_task(deployed_pipeline)
predict_pipeline.add_task(reader_1)

print("\n\n\n")
print(predict_pipeline.compile().get_dag())
predict_pipeline.predict()


if __name__ == "__main__":
parser = argparse.ArgumentParser("PIPELINE DEMO")
parser.add_argument("-config", type=str, default="",
help="config file")
parser.add_argument("-namespace", type=str, default="",
help="namespace for data stored in FATE")
args = parser.parse_args()
main(config=args.config, namespace=args.namespace)
118 changes: 118 additions & 0 deletions examples/pipeline/test_statistics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#
# Copyright 2019 The FATE Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import json

from fate_client.pipeline import StandalonePipeline, FateFlowPipeline
from fate_client.pipeline.components.fate import FeatureScale
from fate_client.pipeline.components.fate import Intersection
from fate_client.pipeline.components.fate import Reader
from fate_client.pipeline.components.fate import Statistics
from fate_client.pipeline.utils import test_utils


def main(config="./config.yaml", namespace=""):
if isinstance(config, str):
config = test_utils.load_job_config(config)

parties = config.parties
guest = parties.guest[0]
host = parties.host[0]
arbiter = parties.arbiter[0]

if config.work_mode == 0:
pipeline = StandalonePipeline().set_roles(guest=guest, host=host, arbiter=arbiter)
else:
pipeline = FateFlowPipeline().set_roles(guest=guest, host=host, arbiter=arbiter)
reader_0 = Reader(name="reader_0")
cluster = config.work_mode

if cluster:
reader_0.guest.component_param(table_name="breast_hetero_guest",
namespace=f"{namespace}experiment",
# path="file:///data/projects/fate/examples/data/breast_hetero_guest.csv",
# format="csv",
# match_id_name="id",
# delimiter=",",
label_name="y",
label_type="float32",
dtype="float32")

reader_0.hosts[0].component_param(table_name="breast_hetero_host",
namespace=f"{namespace}experiment",
# path="file:///data/projects/fate/examples/data/breast_hetero_host.csv",
# match_id_name="id",
# delimiter=",",
label_name=None,
dtype="float32")
else:
data_base = config.data_base_dir

reader_0.guest.component_param(path=f"file://{data_base}/examples/data/breast_hetero_guest.csv",
# path="file:///data/projects/fate/examples/data/breast_hetero_guest.csv",
format="csv",
match_id_name="id",
delimiter=",",
label_name="y",
label_type="float32",
dtype="float32")

reader_0.hosts[0].component_param(path=f"file://{data_base}/examples/data/breast_hetero_host.csv",
# path="file:///data/projects/fate/examples/data/breast_hetero_host.csv",
format="csv",
match_id_name="id",
delimiter=",",
label_name=None,
dtype="float32")

intersection_0 = Intersection(name="intersection_0",
method="raw",
input_data=reader_0.outputs["output_data"])

intersection_1 = Intersection(name="intersection_1",
method="raw",
input_data=reader_0.outputs["output_data"])

feature_scale_0 = FeatureScale(name="feature_scale_0",
method="standard",
train_data=intersection_0.outputs["output_data"])

feature_scale_1 = FeatureScale(name="feature_scale_1",
test_data=intersection_1.outputs["output_data"],
input_model=feature_scale_0.outputs["output_model"])

statistics_0 = Statistics(name="statistics_0", train_data=feature_scale_1.outputs["test_output_data"],
metrics=["mean", "max", "std", "var", "kurtosis", "skewness"])

pipeline.add_task(reader_0)
pipeline.add_task(feature_scale_0)
pipeline.add_task(feature_scale_1)
pipeline.add_task(intersection_0)
pipeline.add_task(intersection_1)
pipeline.add_task(statistics_0)
pipeline.compile()
print(pipeline.get_dag())
pipeline.fit()
print(json.dumps(pipeline.get_task_info("statistics_0").get_output_model(), indent=4))


if __name__ == "__main__":
parser = argparse.ArgumentParser("PIPELINE DEMO")
parser.add_argument("-config", type=str, default="",
help="config file")
parser.add_argument("-namespace", type=str, default="",
help="namespace for data stored in FATE")
args = parser.parse_args()
main(config=args.config, namespace=args.namespace)
13 changes: 1 addition & 12 deletions python/fate/arch/context/io/kit.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,7 @@ def reader(self, ctx, path, metadata):

raise NotImplementedError(f"{artifact}")

def writer(self, ctx, artifact, **kwargs) -> "Writer":
name = artifact.name
metadata = artifact.metadata
if "metadata" in kwargs:
metadata = kwargs["metadata"]
for k, v in kwargs.items():
if k not in ["name", "metadata"]:
metadata[k] = v
writer_format = metadata.get("format")
if "name" in kwargs:
name = kwargs["name"]

def writer(self, ctx, path, metadata) -> "Writer":
if isinstance(artifact, MetricArtifact):
uri = URI.from_string(artifact.uri)
if uri.schema == "file":
Expand Down
Loading

0 comments on commit 1673108

Please sign in to comment.