Skip to content

Commit

Permalink
[Pipeline] Enable pipeline input as runsetting value (Azure#27774)
Browse files Browse the repository at this point in the history
* enable pipeline input as runsetting value

* fix comment

* fix test case
  • Loading branch information
lalala123123 committed Dec 2, 2022
1 parent ac75f0f commit 89deaed
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ def get_organization_project_by_git():
f"Azure DevOps or Azure DevOps Server repository."
)

@classmethod
def _get_checksum_path(cls, path):
artifact_path = Path(path)
return artifact_path.parent / f"{artifact_path.name}_{cls.POSTFIX_CHECKSUM}"

def _redirect_artifacts_tool_path(self, organization):
"""To avoid the transient issue when download artifacts,
download the artifacts tool and redirect az artifact command to it."""
Expand Down Expand Up @@ -216,7 +221,7 @@ def _check_artifacts(self, artifact_package_path):
path = Path(artifact_package_path)
if not path.exists():
return False
checksum_path = path.parent / f"{path.name}_{self.POSTFIX_CHECKSUM}"
checksum_path = self._get_checksum_path(artifact_package_path)
if checksum_path.exists():
with open(checksum_path, "r") as f:
checksum = f.read()
Expand Down Expand Up @@ -264,6 +269,9 @@ def get(
# When the cache folder of artifact package exists, it's sure that the package has been downloaded.
return artifact_package_path.absolute().resolve()
if resolve:
check_sum_path = self._get_checksum_path(artifact_package_path)
if Path(check_sum_path).exists():
os.unlink(check_sum_path)
if artifact_package_path.exists():
# Remove invalid artifact package to avoid affecting download artifact.
temp_folder = tempfile.mktemp() # nosec B306
Expand Down
5 changes: 5 additions & 0 deletions sdk/ml/azure-ai-ml/azure/ai/ml/entities/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ def get_rest_dict_for_node_attrs(target_obj, clear_empty_value=False):
Allow data binding expression as value, disregarding of the type defined in rest object.
"""
# pylint: disable=too-many-return-statements
from azure.ai.ml.entities._job.pipeline._io import PipelineInput

if target_obj is None:
return None
if isinstance(target_obj, dict):
Expand Down Expand Up @@ -269,6 +271,9 @@ def get_rest_dict_for_node_attrs(target_obj, clear_empty_value=False):
# can't use result.as_dict() as data binding expression may not fit rest object structure
return get_rest_dict_for_node_attrs(target_obj.__dict__, clear_empty_value=clear_empty_value)

if isinstance(target_obj, PipelineInput):
return get_rest_dict_for_node_attrs(str(target_obj), clear_empty_value=clear_empty_value)

if not isinstance(target_obj, (str, int, float, bool)):
raise ValueError("Unexpected type {}".format(type(target_obj)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
ImageInstanceSegmentationJob,
ImageObjectDetectionJob,
)
from azure.ai.ml.entities._job.job_resource_configuration import JobResourceConfiguration
from azure.ai.ml.entities._job.automl.nlp import TextClassificationJob, TextClassificationMultilabelJob, TextNerJob
from azure.ai.ml.entities._job.automl.tabular import ClassificationJob, ForecastingJob, RegressionJob
from azure.ai.ml.entities._job.pipeline._io import PipelineInput, _GroupAttrDict
Expand Down Expand Up @@ -1463,4 +1464,28 @@ def empty_value_pipeline(integer: int, boolean: bool, number: float,
assert "integer" in rest_obj.properties.inputs
assert "boolean" in rest_obj.properties.inputs
assert "number" in rest_obj.properties.inputs
assert "str_param" in rest_obj.properties.inputs
assert "str_param" in rest_obj.properties.inputs

def test_pipeline_input_as_runsettings_value(self, client: MLClient) -> None:
input_types_func = load_component(source="./tests/test_configs/components/input_types_component.yml")

@dsl.pipeline(
default_compute="cpu-cluster",
description="Set pipeline input to runsettings",
)
def empty_value_pipeline(integer: int, boolean: bool, number: float,
str_param: str, shm_size: str):
component = input_types_func(component_in_string=str_param,
component_in_ranged_integer=integer,
component_in_boolean=boolean,
component_in_ranged_number=number)
component.resources = JobResourceConfiguration(
instance_count=integer,
shm_size=shm_size,
)

pipeline = empty_value_pipeline(integer=0, boolean=False, number=0,
str_param="str_param", shm_size="20g")
rest_obj = pipeline._to_rest_object()
expect_resource = {'instance_count': '${{parent.inputs.integer}}', 'shm_size': '${{parent.inputs.shm_size}}'}
assert rest_obj.properties.jobs["component"]["resources"] == expect_resource

0 comments on commit 89deaed

Please sign in to comment.