Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(deps): update to openjd-sessions 0.5.x #160

Merged
merged 1 commit into from
Feb 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ dependencies = [
"requests ~= 2.31",
"boto3 >= 1.28.80",
"deadline == 0.37.*",
"openjd-sessions == 0.2.*",
"openjd-sessions == 0.5.*",
# tomli became tomllib in standard library in Python 3.11
"tomli == 2.0.* ; python_version<'3.11'",
"typing_extensions ~= 4.8",
Expand Down
5 changes: 1 addition & 4 deletions scripts/create_service_resources.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ then

echo "Creating Amazon Deadline Cloud Farm $farm_name"
FARM_ID=$(aws deadline create-farm --display-name $farm_name | jq -r ".farmId")
echo "Created Farm: ${FARM_ID}"
fi

if [ "${QUEUE_ID_1:-}" == "" ]
Expand All @@ -47,7 +48,6 @@ then
{
"farmId": "$FARM_ID",
"displayName": "$queue_name",
"status": "IDLE",
"jobRunAsUser": {
"posix": {
"user": "jobuser",
Expand All @@ -66,7 +66,6 @@ EOF
{
"farmId": "$FARM_ID",
"displayName": "$queue_name",
"status": "IDLE",
"roleArn": "$queue_1_iam_role",
"jobRunAsUser": {
"posix": {
Expand Down Expand Up @@ -112,7 +111,6 @@ then
{
"farmId": "$FARM_ID",
"displayName": "$queue_name",
"status": "IDLE",
"jobRunAsUser": {
"posix": {
"user": "jobuser",
Expand All @@ -131,7 +129,6 @@ EOF
{
"farmId": "$FARM_ID",
"displayName": "$queue_name",
"status": "IDLE",
"roleArn": "$queue_2_iam_role",
"jobRunAsUser": {
"posix": {
Expand Down
5 changes: 3 additions & 2 deletions scripts/submit_jobs/sleep/sleep_job.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
{
"name": "duration",
"type": "INT",
"default": 45,
"default": 60,
"minValue": 10,
"maxValue": 600
}
Expand All @@ -17,7 +17,8 @@
"actions": {
"onRun": {
"command": "{{ Task.File.runScript }}",
"args": ["{{ Param.duration }}"]
"args": ["{{ Param.duration }}"],
"timeout": 45
}
},
"embeddedFiles": [
Expand Down
4 changes: 2 additions & 2 deletions src/deadline_worker_agent/scheduler/session_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
JobEntityUnsupportedSchemaError,
StepDetailsError,
)
from ..sessions.job_entities.job_details import parameters_data_to_list
from ..sessions.job_entities.job_details import parameters_from_api_response

if TYPE_CHECKING:
from ..sessions.job_entities import JobEntities
Expand Down Expand Up @@ -374,7 +374,7 @@ def dequeue(self) -> SessionActionDefinition | None:
except (ValueError, RuntimeError) as e:
raise StepDetailsError(action_id, str(e)) from e
task_parameters_data: dict = action_definition.get("parameters", {})
task_parameters = parameters_data_to_list(task_parameters_data)
task_parameters = parameters_from_api_response(task_parameters_data)

next_action = RunStepTaskAction(
id=action_id,
Expand Down
10 changes: 5 additions & 5 deletions src/deadline_worker_agent/sessions/actions/run_step_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from concurrent.futures import Executor
from typing import Any, TYPE_CHECKING

from openjd.sessions import Parameter
from openjd.model import TaskParameterSet

from .openjd_action import OpenjdAction

Expand All @@ -26,14 +26,14 @@ class RunStepTaskAction(OpenjdAction):
The environment details
task_id : str
The unique task identifier
task_parameter_values : list[Parameter]
task_parameter_values : TaskParameterSet
The task parameter values
"""

step_id: str
task_id: str
_details: StepDetails
_task_parameter_values: list[Parameter]
_task_parameter_values: TaskParameterSet

def __init__(
self,
Expand All @@ -42,7 +42,7 @@ def __init__(
step_id: str,
details: StepDetails,
task_id: str,
task_parameter_values: list[Parameter],
task_parameter_values: TaskParameterSet,
) -> None:
super(RunStepTaskAction, self).__init__(
id=id,
Expand Down Expand Up @@ -79,6 +79,6 @@ def start(self, *, session: Session, executor: Executor) -> None:

def human_readable(self) -> str:
param_str = ", ".join(
f"{param.name}={repr(param.value)}" for param in self._task_parameter_values
f"{name}={repr(param.value)}" for name, param in self._task_parameter_values.items()
)
return f"step[{self.step_id}].run({param_str})"
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dataclasses import dataclass
from typing import Any, cast

from openjd.model import parse_model, SchemaVersion, UnsupportedSchema
from openjd.model import parse_model, TemplateSpecificationVersion, UnsupportedSchema
from openjd.model.v2023_09 import Environment as Environment_2023_09
from openjd.sessions import EnvironmentModel

Expand Down Expand Up @@ -44,9 +44,12 @@ def from_boto(cls, environment_details_data: EnvironmentDetailsData) -> Environm
If the environment's Open Job Description schema version not unsupported
"""

schema_version = SchemaVersion(environment_details_data["schemaVersion"])
schema_version = TemplateSpecificationVersion(environment_details_data["schemaVersion"])

if schema_version == SchemaVersion.v2023_09:
if schema_version in (
TemplateSpecificationVersion.JOBTEMPLATE_v2023_09,
TemplateSpecificationVersion.ENVIRONMENT_v2023_09,
):
environment = parse_model(
model=Environment_2023_09, obj=environment_details_data["template"]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,15 @@
from dataclasses import dataclass
from typing import Any, cast

from openjd.sessions import Parameter, ParameterType
from deadline.job_attachments.models import JobAttachmentsFileSystem

from ...api_models import (
FloatParameter,
IntParameter,
JobAttachmentDetailsData,
PathParameter,
StringParameter,
)
from .job_entity_type import JobEntityType
from .validation import Field, validate_object


def parameters_data_to_list(
params: dict[str, StringParameter | PathParameter | IntParameter | FloatParameter | str]
) -> list[Parameter]:
result = list[Parameter]()
for name, value in params.items():
# TODO: Change to the correct type once typing information is available
# in the task_run action details.
if isinstance(value, str):
# old style for the API - TODO remove this once the assign API is updated
result.append(Parameter(ParameterType.STRING, name, value))
elif "string" in value:
value = cast(StringParameter, value)
result.append(Parameter(ParameterType.STRING, name, value["string"]))
elif "int" in value:
value = cast(IntParameter, value)
result.append(Parameter(ParameterType.INT, name, value["int"]))
elif "float" in value:
value = cast(FloatParameter, value)
result.append(Parameter(ParameterType.FLOAT, name, value["float"]))
elif "path" in value:
value = cast(PathParameter, value)
result.append(Parameter(ParameterType.PATH, name, value["path"]))
else:
# TODO - PATH parameter types
raise ValueError(f"Parameter {name} -- unknown form in API response: {str(value)}")
return result


@dataclass(frozen=True)
class JobAttachmentManifestProperties:
"""Information used to facilitate the transfer of input/output job attachments and mapping of
Expand Down
49 changes: 26 additions & 23 deletions src/deadline_worker_agent/sessions/job_entities/job_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@
from typing import Any, cast
import os

from openjd.model import SchemaVersion, UnsupportedSchema
from openjd.model import (
JobParameterValues,
ParameterValue,
ParameterValueType,
SpecificationRevision,
TemplateSpecificationVersion,
UnsupportedSchema,
)
from openjd.sessions import (
Parameter,
ParameterType,
PathFormat,
PosixSessionUser,
)
Expand All @@ -29,31 +34,27 @@
from .validation import Field, validate_object


def parameters_data_to_list(
def parameters_from_api_response(
params: dict[str, StringParameter | PathParameter | IntParameter | FloatParameter | str]
) -> list[Parameter]:
result = list[Parameter]()
) -> dict[str, ParameterValue]:
result = dict[str, ParameterValue]()
for name, value in params.items():
# TODO: Change to the correct type once typing information is available
# in the task_run action details.
if isinstance(value, str):
# old style for the API - TODO remove this once the assign API is updated
result.append(Parameter(ParameterType.STRING, name, value))
elif "string" in value:
print(name, value)
if "string" in value:
value = cast(StringParameter, value)
result.append(Parameter(ParameterType.STRING, name, value["string"]))
param_value = ParameterValue(type=ParameterValueType.STRING, value=value["string"])
elif "int" in value:
value = cast(IntParameter, value)
result.append(Parameter(ParameterType.INT, name, value["int"]))
param_value = ParameterValue(type=ParameterValueType.INT, value=value["int"])
elif "float" in value:
value = cast(FloatParameter, value)
result.append(Parameter(ParameterType.FLOAT, name, value["float"]))
param_value = ParameterValue(type=ParameterValueType.FLOAT, value=value["float"])
elif "path" in value:
value = cast(PathParameter, value)
result.append(Parameter(ParameterType.PATH, name, value["path"]))
param_value = ParameterValue(type=ParameterValueType.PATH, value=value["path"])
else:
# TODO - PATH parameter types
raise ValueError(f"Parameter {name} -- unknown form in API response: {str(value)}")
result[name] = param_value
return result


Expand Down Expand Up @@ -153,13 +154,13 @@ class JobDetails:
log_group_name: str
"""The name of the log group for the session"""

schema_version: SchemaVersion
schema_version: SpecificationRevision
"""The Open Job Description schema version"""

job_attachment_settings: JobAttachmentSettings | None = None
"""The job attachment settings of the job's queue"""

parameters: list[Parameter] = field(default_factory=list)
parameters: JobParameterValues = field(default_factory=dict)
"""The job's parameters"""

job_run_as_user: JobRunAsUser | None = None
Expand Down Expand Up @@ -187,7 +188,7 @@ def from_boto(cls, job_details_data: JobDetailsData) -> JobDetails:
"""

job_parameters_data: dict = job_details_data.get("parameters", {})
job_parameters = parameters_data_to_list(job_parameters_data)
job_parameters = parameters_from_api_response(job_parameters_data)
path_mapping_rules: list[OPENJDPathMappingRule] = []
path_mapping_rules_data = job_details_data.get("pathMappingRules", None)
if path_mapping_rules_data:
Expand All @@ -209,10 +210,12 @@ def from_boto(cls, job_details_data: JobDetailsData) -> JobDetails:
or None
)

schema_version = SchemaVersion(job_details_data["schemaVersion"])
given_schema_version = TemplateSpecificationVersion(job_details_data["schemaVersion"])

if schema_version != SchemaVersion.v2023_09:
raise UnsupportedSchema(schema_version.value)
if given_schema_version == TemplateSpecificationVersion.JOBTEMPLATE_v2023_09:
schema_version = SpecificationRevision.v2023_09
else:
raise UnsupportedSchema(given_schema_version.value)

return JobDetails(
parameters=job_parameters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, cast

from openjd.model import parse_model, SchemaVersion, UnsupportedSchema
from openjd.model import parse_model, TemplateSpecificationVersion, UnsupportedSchema
from openjd.model.v2023_09 import StepTemplate as StepTemplate_2023_09

from ...api_models import StepDetailsData
Expand Down Expand Up @@ -53,9 +53,9 @@ def from_boto(cls, step_details_data: StepDetailsData) -> StepDetails:
If the environment's Open Job Description schema version not unsupported
"""

schema_version = SchemaVersion(step_details_data["schemaVersion"])
schema_version = TemplateSpecificationVersion(step_details_data["schemaVersion"])

if schema_version == SchemaVersion.v2023_09:
if schema_version == TemplateSpecificationVersion.JOBTEMPLATE_v2023_09:
# Jan 23, 2024: Forwards compatibility. The 'template' field is changing from a StepScript to
# a StepTemplate. Remove the StepScript case after the transition is complete.
details_data = step_details_data["template"]
Expand Down
15 changes: 10 additions & 5 deletions src/deadline_worker_agent/sessions/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@
from .actions import SessionActionDefinition
from .job_entities import JobAttachmentDetails, JobDetails

from openjd.model import TaskParameterSet
from openjd.sessions import (
ActionState,
ActionStatus,
EnvironmentIdentifier,
EnvironmentModel,
Parameter,
LOG as OPENJD_LOG,
PathMappingRule,
PosixSessionUser,
StepScriptModel,
Session as OPENJDSession,
SessionUser,
)
from openjd.sessions import Session as OPENJDSession
from openjd.sessions import LOG as OPENJD_LOG

from deadline.job_attachments.asset_sync import AssetSync
from deadline.job_attachments.asset_sync import logger as ASSET_SYNC_LOGGER
Expand Down Expand Up @@ -79,6 +79,7 @@
ActionState.CANCELED: "CANCELED",
ActionState.FAILED: "FAILED",
ActionState.SUCCESS: "SUCCEEDED",
ActionState.TIMEOUT: "FAILED",
}
TIME_DELTA_ZERO = timedelta()

Expand Down Expand Up @@ -960,7 +961,11 @@ def _action_updated_impl(
assert self._stop.is_set(), "current_action is None or stopping"
return

is_unsuccessful = action_status.state in (ActionState.FAILED, ActionState.CANCELED)
is_unsuccessful = action_status.state in (
ActionState.FAILED,
ActionState.CANCELED,
ActionState.TIMEOUT,
)

if (
action_status.state == ActionState.SUCCESS
Expand Down Expand Up @@ -1139,7 +1144,7 @@ def run_task(
self,
*,
step_script: StepScriptModel,
task_parameter_values: list[Parameter],
task_parameter_values: TaskParameterSet,
) -> None:
self._session.run_task(
step_script=step_script,
Expand Down
Loading