Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev-2.0-alpha' into feature-2.0-…
Browse files Browse the repository at this point in the history
…flow
  • Loading branch information
zhihuiwan committed Jan 7, 2023
2 parents 12ca946 + b94905a commit fea7667
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 35 deletions.
12 changes: 1 addition & 11 deletions python/fate/arch/dataframe/io/_json_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,7 @@ def build_schema(data):
if schema.header is not None:
values = data.values
columns = schema.header
if isinstance(values, pd.DataFrame):
for col_name in columns:
fields.append(
dict(
type=values[col_name].dtype.name,
name=col_name,
property="value",
source="pd.dataframe"
)
)
elif isinstance(values, ValueStore):
if isinstance(values, ValueStore):
dtypes = values.dtypes
for col_name in columns:
fields.append(
Expand Down
16 changes: 2 additions & 14 deletions python/fate/arch/dataframe/io/_json_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,6 @@ def _flatten(index: list, t):

serialize_data.schema = schema
return serialize_data
# data_dict = dict(data=sorted(list(serialize_data.collect())),
# schema=schema)
# return data_dict


def serialize(ctx, data):
Expand All @@ -143,15 +140,6 @@ def serialize(ctx, data):


def deserialize(ctx, data):
# local_data = data["data"]
# schema = data["schema"]
# data = ctx.computing.parallelize(
# local_data,
# include_key=True,
# partition=len(local_data)
# )
# data.schema = schema

recovery_schema, global_ranks, block_partition_mapping, column_info = parse_schema(data.schema)

def _recovery_index(kvs):
Expand Down Expand Up @@ -203,7 +191,7 @@ def _to_distributed_tensor(tensor_list):
tensor_keywords = ["weight", "label", "values"]
for keyword in tensor_keywords:
if keyword in column_info:
if keyword == "values" and column_info["values"]["source"] == "pd.dataframe":
if keyword == "values" and column_info["values"]["source"] == "fate.dataframe.value_store":
continue
_recovery_func = functools.partial(
_recovery_tensor,
Expand All @@ -212,7 +200,7 @@ def _to_distributed_tensor(tensor_list):
tensors = [tensor for key, tensor in sorted(list(data.mapValues(_recovery_func).collect()))]
ret_dict[keyword] = _to_distributed_tensor(tensors)

if "values" in column_info and column_info["values"]["source"] == "pd.dataframe":
if "values" in column_info and column_info["values"]["source"] == "fate.dataframe.value_store":
_recovery_df_func = functools.partial(
_recovery_distributed_value_store,
value_info=column_info["values"],
Expand Down
8 changes: 8 additions & 0 deletions python/fate/arch/federation/osx/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
import os
import sys

# add pythonpath
_pb_path = os.path.abspath(os.path.join(__file__, os.path.pardir))
if _pb_path not in sys.path:
sys.path.append(_pb_path)

from ._federation import MQ, OSXFederation

__all__ = ["OSXFederation", "MQ"]
2 changes: 1 addition & 1 deletion python/fate/arch/unify/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def create_file(self, name):
return EggrollURI(namespace=self.namespace, name=name)

def to_string(self):
return f"eggroll://{self.namespace}/{self.name}"
return f"eggroll:///{self.namespace}/{self.name}"


@dataclass
Expand Down
8 changes: 5 additions & 3 deletions python/fate/components/components/feature_scale.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ def train(ctx, train_data, train_output_data, output_model, method):

scaler = FeatureScale(method)
with ctx.sub_ctx("train") as sub_ctx:
train_data = sub_ctx.reader(train_data).read_dataframe().data.to_local()
train_data = sub_ctx.reader(train_data).read_dataframe().data
scaler.fit(sub_ctx, train_data)

model = scaler.to_model()
sub_ctx.writer(output_model).write_model(model)
with output_model as model_writer:
model_writer.write_model("feature_scale", model, metadata={})

with ctx.sub_ctx("predict") as sub_ctx:
output_data = scaler.transform(sub_ctx, train_data)
Expand All @@ -65,7 +66,8 @@ def predict(ctx, input_model, test_data, test_output_data):
from fate.ml.feature_scale import FeatureScale

with ctx.sub_ctx("predict") as sub_ctx:
model = sub_ctx.reader(input_model).read_model()
with input_model as model_reader:
model = model_reader.read_model()
scaler = FeatureScale.from_model(model)
test_data = sub_ctx.reader(test_data).read_dataframe().data
output_data = scaler.transform(sub_ctx, test_data)
Expand Down
15 changes: 9 additions & 6 deletions python/fate/components/loader/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ def load_input_model_wrapper():
return ComponentModelLoaderWrapper()


_MODEL_META_NAME = "FMLModel.yaml"


class ComponentModelWriterWrapper:
def __init__(self, cpn, federation, task_id, party_task_id, role, party_id) -> None:
self.task_id = task_id
Expand Down Expand Up @@ -91,9 +94,9 @@ def _get_meta(self):

def _write_meta(self):
with tempfile.NamedTemporaryFile("w") as f:
yaml.safe_dump(self._get_meta().json(), f)
yaml.safe_dump(self._get_meta().dict(), f)
f.flush()
self._add(f.name, "FMLModel.yaml")
self._add(f.name, _MODEL_META_NAME)

def write_model(self, name, model, metadata, created_time=None):
if created_time is None:
Expand Down Expand Up @@ -129,8 +132,8 @@ def _get_meta(self):
if self._tar is None:
raise ValueError(f"should open first")
with tempfile.TemporaryDirectory() as d:
path = f"{d}/FMLModel.yaml"
self._tar.extract("FMLModel.yaml", path)
path = f"{d}/{_MODEL_META_NAME}"
self._tar.extract(_MODEL_META_NAME, d)
with open(path, "r") as f:
meta = yaml.safe_load(f)

Expand All @@ -145,8 +148,8 @@ def read_model(self, **kwargs):
model_name = model_info.name
with tempfile.TemporaryDirectory() as d:
path = f"{d}/{model_name}"
self._tar.extract(model_name, path)
with open(model_name, "r") as f:
self._tar.extract(model_name, d)
with open(path, "r") as f:
return json.load(f)


Expand Down

0 comments on commit fea7667

Please sign in to comment.