Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add commands to list and retrieve additional components #3114

Merged
merged 15 commits into from
Jan 16, 2025
Merged
2 changes: 2 additions & 0 deletions nvflare/apis/fl_constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ class AdminCommandNames(object):
SUBMIT_JOB = "submit_job"
LIST_JOBS = "list_jobs"
GET_JOB_META = "get_job_meta"
LIST_JOB = "list_job"
DOWNLOAD_JOB = "download_job"
DOWNLOAD_JOB_COMPONENTS = "download_job_components"
DOWNLOAD_JOB_FILE = "download_job_file"
ABORT_JOB = "abort_job"
DELETE_JOB = "delete_job"
Expand Down
18 changes: 18 additions & 0 deletions nvflare/apis/impl/job_def_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,13 @@ def get_client_data(self, jid: str, client_name: str, data_type: str, fl_ctx: FL
except StorageException:
return None

def list_components(self, jid: str, fl_ctx: FLContext):
nvkevlu marked this conversation as resolved.
Show resolved Hide resolved
store = self._get_job_store(fl_ctx)
self.log_debug(
fl_ctx, f"list_components called for {jid}: {store.list_components_of_object(self.job_uri(jid))}"
)
return store.list_components_of_object(self.job_uri(jid))

def set_status(self, jid: str, status: RunStatus, fl_ctx: FLContext):
meta = {JobMetaKey.STATUS.value: status.value}
store = self._get_job_store(fl_ctx)
Expand Down Expand Up @@ -364,6 +371,17 @@ def get_storage_component(self, jid: str, component: str, fl_ctx: FLContext):
def get_storage_for_download(
self, jid: str, download_dir: str, component: str, download_file: str, fl_ctx: FLContext
):
"""Prepares the specified component of the job for download at the specified directory

The component is prepared for download at download_dir/jid/download_file.

Args:
jid: job ID
download_dir: directory to download the component to
component: component name
download_file: file name to save the downloaded component
fl_ctx: FLContext
"""
store = self._get_job_store(fl_ctx)
os.makedirs(os.path.join(download_dir, jid), exist_ok=True)
destination_file = os.path.join(download_dir, jid, download_file)
Expand Down
14 changes: 14 additions & 0 deletions nvflare/apis/job_def_manager_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,20 @@ def get_client_data(self, jid: str, client_name: str, data_type: str, fl_ctx: FL
"""
pass

@abstractmethod
def list_components(self, jid: str, fl_ctx: FLContext):
nvkevlu marked this conversation as resolved.
Show resolved Hide resolved
"""Get list of all the components for the specified job.

Args:
jid (str): Job ID
fl_ctx (FLContext): FLContext information

Returns:
list of components

"""
pass

@abstractmethod
def set_status(self, jid: str, status: RunStatus, fl_ctx: FLContext):
"""Set status of an existing Job.
Expand Down
16 changes: 16 additions & 0 deletions nvflare/apis/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,22 @@ def get_meta(self, uri: str) -> dict:
"""
pass

@abstractmethod
def list_components_of_object(self, uri: str) -> List[str]:
"""Gets all components of the specified object.

Args:
uri: URI of the object

Returns:
list of component names

Raises StorageException when:
- invalid args

"""
pass

@abstractmethod
def get_data(self, uri: str, component_name: str = DATA) -> bytes:
"""Gets data of the specified object.
Expand Down
21 changes: 21 additions & 0 deletions nvflare/app_common/storages/filesystem_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,27 @@ def get_meta(self, uri: str) -> dict:

return _decode_meta(_read(os.path.join(full_uri, META)))

def list_components_of_object(self, uri: str) -> List[str]:
"""Gets all components of the specified object.

Args:
uri: URI of the object

Returns:
list of component names

Raises:
TypeError: if invalid argument types
StorageException: if object does not exist

"""
full_uri = self._object_path(uri)

if not _object_exists(full_uri):
raise StorageException("object {} does not exist".format(uri))

return os.listdir(full_uri)

def get_data(self, uri: str, component_name: str = DATA) -> bytes:
"""Gets data of the specified object.

Expand Down
87 changes: 86 additions & 1 deletion nvflare/private/fed/server/job_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ def get_spec(self):
handler_func=self.get_job_meta,
authz_func=self.authorize_job,
),
CommandSpec(
name=AdminCommandNames.LIST_JOB,
description="list additional components of specified job",
usage=f"{AdminCommandNames.LIST_JOB} job_id",
handler_func=self.list_job_components,
authz_func=self.authorize_job,
),
CommandSpec(
name=AdminCommandNames.DELETE_JOB,
description="delete a job and persisted workspace",
Expand Down Expand Up @@ -162,7 +169,13 @@ def get_spec(self):
handler_func=self.pull_file,
authz_func=self.authorize_job_file,
client_cmd=ftd.PULL_BINARY_FQN,
visible=False,
nvkevlu marked this conversation as resolved.
Show resolved Hide resolved
),
yanchengnv marked this conversation as resolved.
Show resolved Hide resolved
CommandSpec(
name=AdminCommandNames.DOWNLOAD_JOB_COMPONENTS,
description="download additional components for a specified job",
usage=f"{AdminCommandNames.DOWNLOAD_JOB_COMPONENTS} job_id",
handler_func=self.download_job_components,
client_cmd=ftd.PULL_FOLDER_FQN,
yanchengnv marked this conversation as resolved.
Show resolved Hide resolved
),
CommandSpec(
name=AdminCommandNames.APP_COMMAND,
Expand Down Expand Up @@ -422,6 +435,33 @@ def get_job_meta(self, conn: Connection, args: List[str]):
f"job {job_id} does not exist", meta=make_meta(MetaStatusValue.INVALID_JOB_ID, job_id)
)

def list_job_components(self, conn: Connection, args: List[str]):
if len(args) < 2:
conn.append_error("Usage: list_job_components job_id", meta=make_meta(MetaStatusValue.SYNTAX_ERROR))
return

job_id = conn.get_prop(self.JOB_ID)
engine = conn.app_ctx
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
raise TypeError(
f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}"
)
with engine.new_context() as fl_ctx:
list_of_data = job_def_manager.list_components(jid=job_id, fl_ctx=fl_ctx)
if list_of_data:
system_components = {"workspace", "meta", "scheduled", "data"}
filtered_data = [item for item in list_of_data if item not in system_components]
if filtered_data:
data_str = ", ".join(filtered_data)
conn.append_string(data_str)
nvkevlu marked this conversation as resolved.
Show resolved Hide resolved
else:
conn.append_string("No additional job components found.")
nvkevlu marked this conversation as resolved.
Show resolved Hide resolved
else:
conn.append_error(
f"job {job_id} does not exist", meta=make_meta(MetaStatusValue.INVALID_JOB_ID, job_id)
)

def abort_job(self, conn: Connection, args: List[str]):
engine = conn.app_ctx
job_runner = engine.job_runner
Expand Down Expand Up @@ -656,6 +696,51 @@ def download_job(self, conn: Connection, args: List[str]):
self._clean_up_download(conn, tx_id)
conn.append_error("internal error", meta=make_meta(MetaStatusValue.INTERNAL_ERROR))

def download_job_components(self, conn: Connection, args: List[str]):
"""Download additional job components (e.g., ERRORLOG_site-1) for a specified job.

Based on job download but downloads the additional components for a job that job download does
not download.
"""
job_id = args[1]
engine = conn.app_ctx
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
self.logger.error(
f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}"
)
conn.append_error("internal error", meta=make_meta(MetaStatusValue.INTERNAL_ERROR))
return

# It is possible that the same job is downloaded in multiple sessions at the same time.
# To allow this, we use a separate sub-folder in the download_dir for each download.
# This sub-folder is named with a transaction ID (tx_id), which is a UUID.
# The folder path for download the job is: <download_dir>/<tx_id>/<job_id>.
tx_id = str(uuid.uuid4()) # generate a new tx_id
job_download_dir = self.tx_path(conn, tx_id) # absolute path of the job download dir.
with engine.new_context() as fl_ctx:
try:
list_of_data = job_def_manager.list_components(jid=job_id, fl_ctx=fl_ctx)
if list_of_data:
job_components = [
item for item in list_of_data if item not in {"workspace", "meta", "scheduled", "data"}
]
for job_component in job_components:
job_def_manager.get_storage_for_download(
job_id + "_components", job_download_dir, job_component, job_component, fl_ctx
)
self.download_folder(
conn,
tx_id=tx_id,
folder_name=job_id + "_components",
download_file_cmd_name=AdminCommandNames.DOWNLOAD_JOB_FILE,
)
except Exception as e:
secure_log_traceback()
self.logger.error(f"exception downloading job {job_id}: {secure_format_exception(e)}")
self._clean_up_download(conn, tx_id)
conn.append_error("internal error", meta=make_meta(MetaStatusValue.INTERNAL_ERROR))

def do_app_command(self, conn: Connection, args: List[str]):
# cmd job_id topic
if len(args) != 3:
Expand Down
Loading