From 89deaed372e4018124f17876c28dd9b2187ba349 Mon Sep 17 00:00:00 2001 From: zhen Date: Fri, 2 Dec 2022 14:13:20 +0800 Subject: [PATCH] [Pipeline] Enable pipeline input as runsetting value (#27774) * enable pipeline input as runsetting value * fix comment * fix test case --- .../ml/_internal/entities/_artifact_cache.py | 10 ++++++- .../azure-ai-ml/azure/ai/ml/entities/_util.py | 5 ++++ .../unittests/test_pipeline_job_entity.py | 27 ++++++++++++++++++- 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/_internal/entities/_artifact_cache.py b/sdk/ml/azure-ai-ml/azure/ai/ml/_internal/entities/_artifact_cache.py index 2c0941ded194..8ed06cfb463e 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/_internal/entities/_artifact_cache.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/_internal/entities/_artifact_cache.py @@ -133,6 +133,11 @@ def get_organization_project_by_git(): f"Azure DevOps or Azure DevOps Server repository." ) + @classmethod + def _get_checksum_path(cls, path): + artifact_path = Path(path) + return artifact_path.parent / f"{artifact_path.name}_{cls.POSTFIX_CHECKSUM}" + def _redirect_artifacts_tool_path(self, organization): """To avoid the transient issue when download artifacts, download the artifacts tool and redirect az artifact command to it.""" @@ -216,7 +221,7 @@ def _check_artifacts(self, artifact_package_path): path = Path(artifact_package_path) if not path.exists(): return False - checksum_path = path.parent / f"{path.name}_{self.POSTFIX_CHECKSUM}" + checksum_path = self._get_checksum_path(artifact_package_path) if checksum_path.exists(): with open(checksum_path, "r") as f: checksum = f.read() @@ -264,6 +269,9 @@ def get( # When the cache folder of artifact package exists, it's sure that the package has been downloaded. return artifact_package_path.absolute().resolve() if resolve: + check_sum_path = self._get_checksum_path(artifact_package_path) + if Path(check_sum_path).exists(): + os.unlink(check_sum_path) if artifact_package_path.exists(): # Remove invalid artifact package to avoid affecting download artifact. temp_folder = tempfile.mktemp() # nosec B306 diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_util.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_util.py index 0f4540007b8b..ddaa414262f0 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_util.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_util.py @@ -240,6 +240,8 @@ def get_rest_dict_for_node_attrs(target_obj, clear_empty_value=False): Allow data binding expression as value, disregarding of the type defined in rest object. """ # pylint: disable=too-many-return-statements + from azure.ai.ml.entities._job.pipeline._io import PipelineInput + if target_obj is None: return None if isinstance(target_obj, dict): @@ -269,6 +271,9 @@ def get_rest_dict_for_node_attrs(target_obj, clear_empty_value=False): # can't use result.as_dict() as data binding expression may not fit rest object structure return get_rest_dict_for_node_attrs(target_obj.__dict__, clear_empty_value=clear_empty_value) + if isinstance(target_obj, PipelineInput): + return get_rest_dict_for_node_attrs(str(target_obj), clear_empty_value=clear_empty_value) + if not isinstance(target_obj, (str, int, float, bool)): raise ValueError("Unexpected type {}".format(type(target_obj))) diff --git a/sdk/ml/azure-ai-ml/tests/pipeline_job/unittests/test_pipeline_job_entity.py b/sdk/ml/azure-ai-ml/tests/pipeline_job/unittests/test_pipeline_job_entity.py index 967a158daf47..c23632111388 100644 --- a/sdk/ml/azure-ai-ml/tests/pipeline_job/unittests/test_pipeline_job_entity.py +++ b/sdk/ml/azure-ai-ml/tests/pipeline_job/unittests/test_pipeline_job_entity.py @@ -23,6 +23,7 @@ ImageInstanceSegmentationJob, ImageObjectDetectionJob, ) +from azure.ai.ml.entities._job.job_resource_configuration import JobResourceConfiguration from azure.ai.ml.entities._job.automl.nlp import TextClassificationJob, TextClassificationMultilabelJob, TextNerJob from azure.ai.ml.entities._job.automl.tabular import ClassificationJob, ForecastingJob, RegressionJob from azure.ai.ml.entities._job.pipeline._io import PipelineInput, _GroupAttrDict @@ -1463,4 +1464,28 @@ def empty_value_pipeline(integer: int, boolean: bool, number: float, assert "integer" in rest_obj.properties.inputs assert "boolean" in rest_obj.properties.inputs assert "number" in rest_obj.properties.inputs - assert "str_param" in rest_obj.properties.inputs \ No newline at end of file + assert "str_param" in rest_obj.properties.inputs + + def test_pipeline_input_as_runsettings_value(self, client: MLClient) -> None: + input_types_func = load_component(source="./tests/test_configs/components/input_types_component.yml") + + @dsl.pipeline( + default_compute="cpu-cluster", + description="Set pipeline input to runsettings", + ) + def empty_value_pipeline(integer: int, boolean: bool, number: float, + str_param: str, shm_size: str): + component = input_types_func(component_in_string=str_param, + component_in_ranged_integer=integer, + component_in_boolean=boolean, + component_in_ranged_number=number) + component.resources = JobResourceConfiguration( + instance_count=integer, + shm_size=shm_size, + ) + + pipeline = empty_value_pipeline(integer=0, boolean=False, number=0, + str_param="str_param", shm_size="20g") + rest_obj = pipeline._to_rest_object() + expect_resource = {'instance_count': '${{parent.inputs.integer}}', 'shm_size': '${{parent.inputs.shm_size}}'} + assert rest_obj.properties.jobs["component"]["resources"] == expect_resource \ No newline at end of file