Skip to content

Commit

Permalink
Merge pull request #4936 from FederatedAI/feature-2.0.0-beta-flow_upload
Browse files Browse the repository at this point in the history
Feature 2.0.0 beta flow upload
  • Loading branch information
sagewe authored Jun 20, 2023
2 parents 5d07bc1 + 3b5923b commit ec0abe6
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 20 deletions.
7 changes: 3 additions & 4 deletions python/fate/arch/dataframe/_frame_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def to_frame(self, ctx, table):
def _dense_format_to_frame(self, ctx, table):
data_manager = DataManager()
columns = self._header.split(self._delimiter, -1)
columns = columns.remove(self._sample_id_name)
columns.remove(self._sample_id_name)
retrieval_index_dict = data_manager.init_from_local_file(
sample_id_name=self._sample_id_name, columns=columns, match_id_list=self._match_id_list,
match_id_name=self._match_id_name, label_name=self._label_name, weight_name=self._weight_name,
Expand All @@ -88,9 +88,8 @@ def _dense_format_to_frame(self, ctx, table):
# partition_order_mappings = _get_partition_order(table)
functools.partial(_to_blocks,
data_manager=data_manager,
index_dict=retrieval_index_dict,
partition_order_mappings=partition_order_mappings,
na_values=self._na_values)
retrieval_index_dict=retrieval_index_dict,
partition_order_mappings=partition_order_mappings)
block_table = table.mapPartitions(
_to_blocks,
use_previous_behavior=False
Expand Down
3 changes: 1 addition & 2 deletions python/fate/components/components/dataframe_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ def dataframe_transformer(
anonymous_party_id,
):
from fate.arch.dataframe import TableReader

metadata = table.schema["meta"]
metadata = table.schema
table_reader = TableReader(
sample_id_name=metadata.get("sample_id_name", None),
match_id_name=metadata.get("match_id_name", None),
Expand Down
2 changes: 1 addition & 1 deletion python/fate/components/core/spec/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@


class PartySpec(pydantic.BaseModel):
role: Literal["guest", "host", "arbiter"]
role: Literal["guest", "host", "arbiter", "local"]
partyid: str

def tuple(self):
Expand Down
24 changes: 14 additions & 10 deletions python/fate/components/entrypoint/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@ def cleanup_component_execution(config: TaskCleanupConfigSpec):
raise e


def execute_component_from_config(config: TaskConfigSpec):
status_file_name = "task_finalize.json"
cwd = os.path.abspath(os.path.curdir)
logger.debug(f"component execution in path `{cwd}`")
logger.debug(f"logging final status to `{os.path.join(cwd, status_file_name)}`")
def execute_component_from_config(config: TaskConfigSpec, output_path=None):
if not output_path:
cwd = os.path.abspath(os.path.curdir)
output_path = os.path.join(cwd, "task_finalize.json")
logger.debug(f"component execution in path `{cwd}`")

else:
os.makedirs(os.path.dirname(output_path), exist_ok=True)
logger.debug(f"logging final status to `{output_path}`")
try:
party_task_id = config.party_task_id
device = load_device(config.conf.device)
Expand Down Expand Up @@ -87,18 +91,18 @@ def execute_component_from_config(config: TaskConfigSpec):
# final execution io meta
execution_io_meta = execution_io.dump_io_meta()
try:
with open(status_file_name, "w") as fw:
json.dump(dict(status=dict(final_status="finish"), io_meta=execution_io_meta), fw)
with open(output_path, "w") as fw:
json.dump(dict(status=dict(code=0), io_meta=execution_io_meta), fw, indent=4)
except Exception as e:
raise RuntimeError(
f"failed to dump execution io meta to `{os.path.join(cwd, status_file_name)}`: meta={execution_io_meta}"
f"failed to dump execution io meta to `{output_path}`: meta={execution_io_meta}"
) from e

logger.debug("done without error, waiting signal to terminate")
logger.debug("terminating, bye~")

except Exception as e:
logger.error(e, exc_info=True)
with open(status_file_name, "w") as fw:
json.dump(dict(status=dict(final_status="exception", exceptions=traceback.format_exc())), fw)
with open(output_path, "w") as fw:
json.dump(dict(status=dict(code=-1, exceptions=traceback.format_exc())), fw)
raise e
7 changes: 4 additions & 3 deletions python/fate/components/entrypoint/component_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ def component():


@component.command()
@click.option("--process-tag", required=True, help="unique id to identify this execution process")
@click.option("--process-tag", required=False, help="unique id to identify this execution process")
@click.option("--config", required=False, type=click.File(), help="config path")
@click.option("--config-entrypoint", required=False, help="enctypoint to get config")
@click.option("--properties", "-p", multiple=True, help="properties config")
@click.option("--env-prefix", "-e", type=str, default="runtime.component_desc.", help="prefix for env config")
@click.option("--env-name", required=False, type=str, help="env name for config")
def execute(process_tag, config, config_entrypoint, properties, env_prefix, env_name):
@click.option("--output-path", type=str, help="output path")
def execute(process_tag, config, config_entrypoint, properties, env_prefix, env_name, output_path=None):
"execute component_desc"
import logging

Expand Down Expand Up @@ -58,7 +59,7 @@ def execute(process_tag, config, config_entrypoint, properties, env_prefix, env_

from fate.components.entrypoint.component import execute_component_from_config

execute_component_from_config(task_config)
execute_component_from_config(task_config, output_path)


@component.command()
Expand Down

0 comments on commit ec0abe6

Please sign in to comment.