diff --git a/sdk/python/kfp/deprecated/_client.py b/sdk/python/kfp/deprecated/_client.py index 5d57dfe7825e..097107846883 100644 --- a/sdk/python/kfp/deprecated/_client.py +++ b/sdk/python/kfp/deprecated/_client.py @@ -182,7 +182,7 @@ def __init__(self, header_value=self._context_setting.get( 'client_authentication_header_value')) _add_generated_apis(self, kfp_server_api, api_client) - self._job_api = kfp_server_api.api.job_service_api.JobServiceApi( + self._recurring_run_api = kfp_server_api.api.recurring_run_service_api.RecurringRunServiceApi( api_client) self._run_api = kfp_server_api.api.run_service_api.RunServiceApi( api_client) @@ -393,7 +393,7 @@ def set_user_namespace(self, namespace: str): with open(Client.LOCAL_KFP_CONTEXT, 'w') as f: json.dump(self._context_setting, f) - def get_kfp_healthz(self) -> kfp_server_api.ApiGetHealthzResponse: + def get_kfp_healthz(self) -> kfp_server_api.V2beta1GetHealthzResponse: """Gets healthz info of KFP deployment. Returns: @@ -431,7 +431,7 @@ def create_experiment( self, name: str, description: str = None, - namespace: str = None) -> kfp_server_api.ApiExperiment: + namespace: str = None) -> kfp_server_api.V2beta1Experiment: """Create a new experiment. Args: @@ -515,7 +515,7 @@ def list_experiments( sort_by: str = '', namespace: Optional[str] = None, filter: Optional[str] = None - ) -> kfp_server_api.ApiListExperimentsResponse: + ) -> kfp_server_api.V2beta1ListExperimentsResponse: """List experiments. Args: @@ -557,7 +557,7 @@ def list_experiments( def get_experiment(self, experiment_id=None, experiment_name=None, - namespace=None) -> kfp_server_api.ApiExperiment: + namespace=None) -> kfp_server_api.V2beta1Experiment: """Get details of an experiment. Either experiment_id or experiment_name is required @@ -685,7 +685,7 @@ def list_pipelines( page_size: int = 10, sort_by: str = '', filter: Optional[str] = None - ) -> kfp_server_api.ApiListPipelinesResponse: + ) -> kfp_server_api.V2beta1ListPipelinesResponse: """List pipelines. Args: @@ -728,7 +728,7 @@ def run_pipeline( pipeline_root: Optional[str] = None, enable_caching: Optional[str] = None, service_account: Optional[str] = None, - ) -> kfp_server_api.ApiRun: + ) -> kfp_server_api.V2beta1Run: """Run a specified pipeline. Args: @@ -771,7 +771,7 @@ def run_pipeline( version_id=version_id, enable_caching=enable_caching, ) - run_body = kfp_server_api.models.ApiRun( + run_body = kfp_server_api.models.V2beta1Run( pipeline_spec=job_config.spec, resource_references=job_config.resource_references, name=job_name, @@ -805,48 +805,52 @@ def create_recurring_run( enabled: bool = True, enable_caching: Optional[bool] = None, service_account: Optional[str] = None, - ) -> kfp_server_api.ApiJob: - """Create a recurring run. + ) -> kfp_server_api.V2beta1RecurringRun: + """Creates a recurring run. Args: - experiment_id: The string id of an experiment. - job_name: Name of the job. - description: An optional job description. - start_time: The RFC3339 time string of the time when to start the job. - end_time: The RFC3339 time string of the time when to end the job. - interval_second: Integer indicating the seconds between two recurring runs in for a periodic schedule. - cron_expression: A cron expression representing a set of times, using 6 space-separated fields, e.g. "0 0 9 ? * 2-6". - See `here `_ for details of the cron expression format. - max_concurrency: Integer indicating how many jobs can be run in parallel. - no_catchup: Whether the recurring run should catch up if behind schedule. - For example, if the recurring run is paused for a while and re-enabled - afterwards. If no_catchup=False, the scheduler will catch up on (backfill) each - missed interval. Otherwise, it only schedules the latest interval if more than one interval - is ready to be scheduled. - Usually, if your pipeline handles backfill internally, you should turn catchup - off to avoid duplicate backfill. (default: {False}) - pipeline_package_path: Local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml). - params: A dictionary with key (string) as param name and value (string) as param value. - pipeline_id: The id of a pipeline. - version_id: The id of a pipeline version. - If both pipeline_id and version_id are specified, version_id will take precendence. - If only pipeline_id is specified, the default version of this pipeline is used to create the run. - enabled: A bool indicating whether the recurring run is enabled or disabled. - enable_caching: Optional. Whether or not to enable caching for the run. - This setting affects v2 compatible mode and v2 mode only. - If not set, defaults to the compile time settings, which are True for all - tasks by default, while users may specify different caching options for - individual tasks. - If set, the setting applies to all tasks in the pipeline -- overrides - the compile time settings. - service_account: Optional. Specifies which Kubernetes service account this - recurring run uses. - + experiment_id: ID of the experiment. + job_name: Name of the job. + description: Description of the job. + start_time: RFC3339 time string of the time when to start the + job. + end_time: RFC3339 time string of the time when to end the job. + interval_second: Integer indicating the seconds between two + recurring runs in for a periodic schedule. + cron_expression: Cron expression representing a set of times, + using 6 space-separated fields (e.g., ``'0 0 9 ? * 2-6'``). See `cron format + `_. + max_concurrency: Integer indicating how many jobs can be run in + parallel. + no_catchup: Whether the recurring run should catch up if behind + schedule. For example, if the recurring run is paused for a + while and re-enabled afterwards. If ``no_catchup=False``, the + scheduler will catch up on (backfill) each missed interval. + Otherwise, it only schedules the latest interval if more than + one interval is ready to be scheduled. Usually, if your pipeline + handles backfill internally, you should turn catchup off to + avoid duplicate backfill. + pipeline_package_path: Local path of the pipeline package (the + filename should end with one of the following .tar.gz, .tgz, + .zip, .json). + params: Arguments to the pipeline function provided as a dict. + pipeline_id: ID of a pipeline. + version_id: ID of a pipeline version. + If both ``pipeline_id`` and ``version_id`` are specified, ``version_id`` + will take precedence. + If only ``pipeline_id`` is specified, the default version of this + pipeline is used to create the run. + enabled: Whether to enable or disable the recurring run. + enable_caching: Whether or not to enable caching for the + run. If not set, defaults to the compile time settings, which + is ``True`` for all tasks by default, while users may specify + different caching options for individual tasks. If set, the + setting applies to all tasks in the pipeline (overrides the + compile time settings). + service_account: Specifies which Kubernetes service + account this recurring run uses. Returns: - A Job object. Most important field is id. - - Raises: - ValueError: If required parameters are not supplied. + ``V2beta1RecurringRun`` object. """ job_config = self._create_job_config( @@ -861,31 +865,37 @@ def create_recurring_run( if all([interval_second, cron_expression ]) or not any([interval_second, cron_expression]): raise ValueError( - 'Either interval_second or cron_expression is required') + 'Either interval_second or cron_expression is required.') if interval_second is not None: - trigger = kfp_server_api.models.ApiTrigger( - periodic_schedule=kfp_server_api.models.ApiPeriodicSchedule( + trigger = kfp_server_api.V2beta1Trigger( + periodic_schedule=kfp_server_api.V2beta1PeriodicSchedule( start_time=start_time, end_time=end_time, interval_second=interval_second)) if cron_expression is not None: - trigger = kfp_server_api.models.ApiTrigger( - cron_schedule=kfp_server_api.models.ApiCronSchedule( + trigger = kfp_server_api.V2beta1Trigger( + cron_schedule=kfp_server_api.V2beta1CronSchedule( start_time=start_time, end_time=end_time, cron=cron_expression)) - job_body = kfp_server_api.models.ApiJob( - enabled=enabled, - pipeline_spec=job_config.spec, - resource_references=job_config.resource_references, - name=job_name, + mode = kfp_server_api.RecurringRunMode.DISABLE + if enabled: + mode = kfp_server_api.RecurringRunMode.ENABLE + + job_body = kfp_server_api.V2beta1RecurringRun( + experiment_id=experiment_id, + mode=mode, + pipeline_spec=job_config.pipeline_spec, + pipeline_version_reference=job_config.pipeline_version_reference, + runtime_config=job_config.runtime_config, + display_name=job_name, description=description, no_catchup=no_catchup, trigger=trigger, max_concurrency=max_concurrency, service_account=service_account) - return self._job_api.create_job(body=job_body) + return self._recurring_run_api.create_recurring_run(body=job_body) def _create_job_config( self, @@ -1118,47 +1128,89 @@ def __repr__(self): ) return RunPipelineResult(self, run_info) - def delete_job(self, job_id: str): - """Deletes a job. + def delete_job(self, job_id: str) -> dict: + """Deletes a job (recurring run). Args: - job_id: id of the job. + job_id: ID of the job. Returns: - Object. If the method is called asynchronously, returns the request thread. + Empty dictionary. + """ + warnings.warn( + '`delete_job` is deprecated. Please use `delete_recurring_run` instead.' + f'\nReroute to calling `delete_recurring_run(recurring_run_id="{job_id}")`', + category=DeprecationWarning, + stacklevel=2) + return self.delete_recurring_run(recurring_run_id=job_id) - Raises: - kfp_server_api.ApiException: If the job is not found. + def delete_recurring_run(self, recurring_run_id: str) -> dict: + """Deletes a recurring run. + + Args: + recurring_run_id: ID of the recurring_run. + + Returns: + Empty dictionary. """ - return self._job_api.delete_job(id=job_id) + return self._recurring_run_api.delete_recurring_run( + recurring_run_id=recurring_run_id) - def disable_job(self, job_id: str): - """Disables a job. + def disable_job(self, job_id: str) -> dict: + """Disables a job (recurring run). Args: - job_id: id of the job. + job_id: ID of the job. Returns: - Object. If the method is called asynchronously, returns the request thread. + Empty dictionary. + """ + warnings.warn( + '`disable_job` is deprecated. Please use `disable_recurring_run` instead.' + f'\nReroute to calling `disable_recurring_run(recurring_run_id="{job_id}")`', + category=DeprecationWarning, + stacklevel=2) + return self.disable_recurring_run(recurring_run_id=job_id) - Raises: - ApiException: If the job is not found. + def disable_recurring_run(self, recurring_run_id: str) -> dict: + """Disables a recurring run. + + Args: + recurring_run_id: ID of the recurring_run. + + Returns: + Empty dictionary. """ - return self._job_api.disable_job(id=job_id) + return self._recurring_run_api.disable_recurring_run( + recurring_run_id=recurring_run_id) - def enable_job(self, job_id: str): - """Enables a job. + def enable_job(self, job_id: str) -> dict: + """Enables a job (recurring run). Args: - job_id: id of the job. + job_id: ID of the job. Returns: - Object. If the method is called asynchronously, returns the request thread. + Empty dictionary. + """ + warnings.warn( + '`enable_job` is deprecated. Please use `enable_recurring_run` instead.' + f'\nReroute to calling `enable_recurring_run(recurring_run_id="{job_id}")`', + category=DeprecationWarning, + stacklevel=2) + return self.enable_recurring_run(recurring_run_id=job_id) - Raises: - ApiException: If the job is not found. + def enable_recurring_run(self, recurring_run_id: str) -> dict: + """Enables a recurring run. + + Args: + recurring_run_id: ID of the recurring_run. + + Returns: + Empty dictionary. """ - return self._job_api.enable_job(id=job_id) + return self._recurring_run_api.enable_recurring_run( + recurring_run_id=recurring_run_id) def list_runs( self, @@ -1167,7 +1219,7 @@ def list_runs( sort_by: str = '', experiment_id: Optional[str] = None, namespace: Optional[str] = None, - filter: Optional[str] = None) -> kfp_server_api.ApiListRunsResponse: + filter: Optional[str] = None) -> kfp_server_api.V2beta1ListRunsResponse: """List runs, optionally can be filtered by experiment or namespace. Args: @@ -1229,7 +1281,7 @@ def list_recurring_runs( page_size: int = 10, sort_by: str = '', experiment_id: Optional[str] = None, - filter: Optional[str] = None) -> kfp_server_api.ApiListJobsResponse: + filter: Optional[str] = None) -> kfp_server_api.V2beta1ListRunsResponse: """List recurring runs. Args: @@ -1256,23 +1308,21 @@ def list_recurring_runs( A response object including a list of recurring_runs and next page token. """ if experiment_id is not None: - response = self._job_api.list_jobs( + response = self._recurring_run_api.list_recurring_runs( page_token=page_token, page_size=page_size, sort_by=sort_by, - resource_reference_key_type=kfp_server_api.models - .api_resource_type.ApiResourceType.EXPERIMENT, - resource_reference_key_id=experiment_id, + experiment_id=experiment_id, filter=filter) else: - response = self._job_api.list_jobs( + response = self._recurring_run_api.list_recurring_runs( page_token=page_token, page_size=page_size, sort_by=sort_by, filter=filter) return response - def get_recurring_run(self, job_id: str) -> kfp_server_api.ApiJob: + def get_recurring_run(self, job_id: str) -> kfp_server_api.V2beta1RecurringRun: """Get recurring_run details. Args: @@ -1284,9 +1334,9 @@ def get_recurring_run(self, job_id: str) -> kfp_server_api.ApiJob: Raises: kfp_server_api.ApiException: If recurring_run is not found. """ - return self._job_api.get_job(id=job_id) + return self._recurring_run_api.get_recurring_run(recurring_run_id=job_id) - def get_run(self, run_id: str) -> kfp_server_api.ApiRun: + def get_run(self, run_id: str) -> kfp_server_api.V2beta1Run: """Get run details. Args: @@ -1360,7 +1410,7 @@ def upload_pipeline( pipeline_package_path: str = None, pipeline_name: str = None, description: str = None, - ) -> kfp_server_api.ApiPipeline: + ) -> kfp_server_api.V2beta1Pipeline: """Uploads the pipeline to the Kubeflow Pipelines cluster. Args: @@ -1388,7 +1438,7 @@ def upload_pipeline_version( pipeline_id: Optional[str] = None, pipeline_name: Optional[str] = None, description: Optional[str] = None, - ) -> kfp_server_api.ApiPipelineVersion: + ) -> kfp_server_api.V2beta1PipelineVersion: """Uploads a new version of the pipeline to the Kubeflow Pipelines cluster. @@ -1439,7 +1489,7 @@ def upload_pipeline_version( IPython.display.display(IPython.display.HTML(html)) return response - def get_pipeline(self, pipeline_id: str) -> kfp_server_api.ApiPipeline: + def get_pipeline(self, pipeline_id: str) -> kfp_server_api.V2beta1Pipeline: """Get pipeline details. Args: @@ -1474,7 +1524,7 @@ def list_pipeline_versions( page_size: int = 10, sort_by: str = '', filter: Optional[str] = None - ) -> kfp_server_api.ApiListPipelineVersionsResponse: + ) -> kfp_server_api.V2beta1ListPipelineVersionsResponse: """Lists pipeline versions. Args: