diff --git a/docs/book/component-guide/orchestrators/sagemaker.md b/docs/book/component-guide/orchestrators/sagemaker.md index 64643339347..869c988f763 100644 --- a/docs/book/component-guide/orchestrators/sagemaker.md +++ b/docs/book/component-guide/orchestrators/sagemaker.md @@ -59,7 +59,7 @@ There are three ways you can authenticate your orchestrator and link it to the I {% tabs %} {% tab title="Authentication via Service Connector" %} -The recommended way to authenticate your SageMaker orchestrator is by registering an [AWS Service Connector](../../how-to/infrastructure-deployment/auth-management/aws-service-connector.md) and connecting it to your SageMaker orchestrator: +The recommended way to authenticate your SageMaker orchestrator is by registering an [AWS Service Connector](../../how-to/infrastructure-deployment/auth-management/aws-service-connector.md) and connecting it to your SageMaker orchestrator. If you plan to use scheduled pipelines, ensure the credentials used by the service connector have the necessary EventBridge and IAM permissions listed in the [Required IAM Permissions](#required-iam-permissions) section: ```shell zenml service-connector register --type aws -i @@ -72,7 +72,7 @@ zenml stack register -o ... --set {% endtab %} {% tab title="Explicit Authentication" %} -Instead of creating a service connector, you can also configure your AWS authentication credentials directly in the orchestrator: +Instead of creating a service connector, you can also configure your AWS authentication credentials directly in the orchestrator. If you plan to use scheduled pipelines, ensure these credentials have the necessary EventBridge and IAM permissions listed in the [Required IAM Permissions](#required-iam-permissions) section: ```shell zenml orchestrator register \ @@ -88,7 +88,7 @@ See the [`SagemakerOrchestratorConfig` SDK Docs](https://sdkdocs.zenml.io/latest {% endtab %} {% tab title="Implicit Authentication" %} -If you neither connect your orchestrator to a service connector nor configure credentials explicitly, ZenML will try to implicitly authenticate to AWS via the `default` profile in your local [AWS configuration file](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html). +If you neither connect your orchestrator to a service connector nor configure credentials explicitly, ZenML will try to implicitly authenticate to AWS via the `default` profile in your local [AWS configuration file](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html). If you plan to use scheduled pipelines, ensure this profile has the necessary EventBridge and IAM permissions listed in the [Required IAM Permissions](#required-iam-permissions) section: ```shell zenml orchestrator register \ @@ -153,10 +153,6 @@ Alternatively, for a more detailed view of log messages during SageMaker pipelin ![SageMaker CloudWatch Logs](../../.gitbook/assets/sagemaker-cloudwatch-logs.png) -### Run pipelines on a schedule - -The ZenML Sagemaker orchestrator doesn't currently support running pipelines on a schedule. We maintain a public roadmap for ZenML, which you can find [here](https://zenml.io/roadmap). We welcome community contributions (see more [here](https://github.com/zenml-io/zenml/blob/main/CONTRIBUTING.md)) so if you want to enable scheduling for Sagemaker, please [do let us know](https://zenml.io/slack)! - ### Configuration at pipeline or step level When running your ZenML pipeline with the Sagemaker orchestrator, the configuration set when configuring the orchestrator as a ZenML component will be used by default. However, it is possible to provide additional configuration at the pipeline or step level. This allows you to run whole pipelines or individual steps with alternative configurations. For example, this allows you to run the training process with a heavier, GPU-enabled instance type, while running other steps with lighter instances. @@ -339,4 +335,119 @@ This approach allows for more granular tagging, giving you flexibility in how yo Note that if you wish to use this orchestrator to run steps on a GPU, you will need to follow [the instructions on this page](../../how-to/pipeline-development/training-with-gpus/README.md) to ensure that it works. It requires adding some extra settings customization and is essential to enable CUDA for the GPU to give its full acceleration. -
ZenML Scarf
+### Scheduling Pipelines + +The SageMaker orchestrator supports running pipelines on a schedule using AWS EventBridge. You can configure schedules in three ways: + +* Using a cron expression +* Using a fixed interval +* Running once at a specific time + +```python +from zenml import pipeline +from datetime import datetime, timedelta + +# Using a cron expression (runs daily at 2 AM UTC) +@pipeline(schedule=Schedule(cron_expression="0 2 * * *")) +def my_scheduled_pipeline(): + # Your pipeline steps here + pass + +# Using an interval (runs every 2 hours) +@pipeline(schedule=Schedule(interval_second=timedelta(hours=2))) +def my_interval_pipeline(): + # Your pipeline steps here + pass + +# Running once at a specific time +@pipeline(schedule=Schedule(run_once_start_time=datetime(2024, 12, 31, 23, 59))) +def my_one_time_pipeline(): + # Your pipeline steps here + pass +``` + +When you deploy a scheduled pipeline, ZenML will: +1. Create an EventBridge rule with the specified schedule +2. Configure the necessary IAM permissions +3. Set up the SageMaker pipeline as the target + +{% hint style="info" %} +If you run the same pipeline with a schedule multiple times, the existing schedule will be updated with the new settings rather than creating a new schedule. This allows you to modify schedules by simply running the pipeline again with new schedule parameters. +{% endhint %} + +#### Required IAM Permissions + +When using scheduled pipelines, you need to ensure your IAM role has the correct permissions and trust relationships. Here's a detailed breakdown of why each permission is needed: + +1. **Trust Relationships** +Your execution role needs to trust both SageMaker and EventBridge services to allow them to assume the role: + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": [ + "sagemaker.amazonaws.com", // Required for SageMaker execution + "events.amazonaws.com" // Required for EventBridge to trigger pipelines + ] + }, + "Action": "sts:AssumeRole" + } + ] +} +``` + +2. **Required IAM Policies** +In addition to the basic SageMaker permissions, the AWS credentials used by the service connector (or provided directly to the orchestrator) need the following permissions to create and manage scheduled pipelines: + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "events:PutRule", // Required to create schedule rules + "events:PutTargets", // Required to set pipeline as target + "events:DeleteRule", // Required for cleanup + "events:RemoveTargets", // Required for cleanup + "events:DescribeRule", // Required to verify rule creation + "events:ListTargetsByRule" // Required to verify target setup + ], + "Resource": "arn:aws:events:*:*:rule/zenml-*" + } + ] +} +``` + +The following IAM permissions are optional but recommended to allow automatic policy updates for the execution role: +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "iam:GetRole", // For verifying role exists + "iam:GetRolePolicy", // For checking existing policies + "iam:PutRolePolicy", // For adding new policies + "iam:UpdateAssumeRolePolicy" // For updating trust relationships + ], + "Resource": "arn:aws:iam::*:role/*" + } + ] +} +``` + +These permissions enable: +* Creation and management of EventBridge rules for scheduling +* Setting up trust relationships between services +* Managing IAM policies required for the scheduled execution +* Cleanup of resources when schedules are removed + +Without the EventBridge permissions, the scheduling functionality will fail. Without the IAM permissions, you'll need to manually ensure your execution role has the necessary permissions to start pipeline executions. + +
ZenML Scarf
\ No newline at end of file diff --git a/docs/book/how-to/pipeline-development/build-pipelines/schedule-a-pipeline.md b/docs/book/how-to/pipeline-development/build-pipelines/schedule-a-pipeline.md index be725e386fc..f922339393e 100644 --- a/docs/book/how-to/pipeline-development/build-pipelines/schedule-a-pipeline.md +++ b/docs/book/how-to/pipeline-development/build-pipelines/schedule-a-pipeline.md @@ -18,7 +18,7 @@ Schedules don't work for all orchestrators. Here is a list of all supported orch | [KubernetesOrchestrator](../../../component-guide/orchestrators/kubernetes.md) | ✅ | | [LocalOrchestrator](../../../component-guide/orchestrators/local.md) | ⛔️ | | [LocalDockerOrchestrator](../../../component-guide/orchestrators/local-docker.md) | ⛔️ | -| [SagemakerOrchestrator](../../../component-guide/orchestrators/sagemaker.md) | ⛔️ | +| [SagemakerOrchestrator](../../../component-guide/orchestrators/sagemaker.md) | ✅ | | [SkypilotAWSOrchestrator](../../../component-guide/orchestrators/skypilot-vm.md) | ⛔️ | | [SkypilotAzureOrchestrator](../../../component-guide/orchestrators/skypilot-vm.md) | ⛔️ | | [SkypilotGCPOrchestrator](../../../component-guide/orchestrators/skypilot-vm.md) | ⛔️ | diff --git a/src/zenml/integrations/aws/flavors/sagemaker_orchestrator_flavor.py b/src/zenml/integrations/aws/flavors/sagemaker_orchestrator_flavor.py index 2898f24cc67..ea17571d778 100644 --- a/src/zenml/integrations/aws/flavors/sagemaker_orchestrator_flavor.py +++ b/src/zenml/integrations/aws/flavors/sagemaker_orchestrator_flavor.py @@ -132,6 +132,15 @@ class SagemakerOrchestratorSettings(BaseSettings): ("processor_role", "execution_role"), ("processor_tags", "tags") ) + @property + def is_schedulable(self) -> bool: + """Whether the orchestrator is schedulable or not. + + Returns: + Whether the orchestrator is schedulable or not. + """ + return True + @model_validator(mode="before") def validate_model(cls, data: Dict[str, Any]) -> Dict[str, Any]: """Check if model is configured correctly. diff --git a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py index f832647a97e..72066fae642 100644 --- a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py +++ b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py @@ -13,8 +13,10 @@ # permissions and limitations under the License. """Implementation of the SageMaker orchestrator.""" +import json import os import re +from datetime import datetime, timezone from typing import ( TYPE_CHECKING, Any, @@ -29,7 +31,7 @@ import boto3 import sagemaker -from botocore.exceptions import WaiterError +from botocore.exceptions import BotoCoreError, ClientError, WaiterError from sagemaker.network import NetworkConfig from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.execution_variables import ExecutionVariables @@ -238,20 +240,14 @@ def prepare_or_run_pipeline( Raises: RuntimeError: If a connector is used that does not return a - `boto3.Session` object. + `boto3.Session` object, or if there are insufficient permissions + to create EventBridge rules. TypeError: If the network_config passed is not compatible with the AWS SageMaker NetworkConfig class. Yields: A dictionary of metadata related to the pipeline run. """ - if deployment.schedule: - logger.warning( - "The Sagemaker Orchestrator currently does not support the " - "use of schedules. The `schedule` will be ignored " - "and the pipeline will be run immediately." - ) - # sagemaker requires pipelineName to use alphanum and hyphens only unsanitized_orchestrator_run_name = get_orchestrator_run_name( pipeline_name=deployment.pipeline_configuration.name @@ -459,7 +455,7 @@ def prepare_or_run_pipeline( sagemaker_steps.append(sagemaker_step) - # construct the pipeline from the sagemaker_steps + # Create the pipeline pipeline = Pipeline( name=orchestrator_run_name, steps=sagemaker_steps, @@ -479,38 +475,232 @@ def prepare_or_run_pipeline( if settings.pipeline_tags else None, ) - execution = pipeline.start() - logger.warning( - "Steps can take 5-15 minutes to start running " - "when using the Sagemaker Orchestrator." - ) - # Yield metadata based on the generated execution object - yield from self.compute_metadata( - execution=execution, settings=settings - ) + # Handle scheduling if specified + if deployment.schedule: + if settings.synchronous: + logger.warning( + "The 'synchronous' setting is ignored for scheduled pipelines since " + "they run independently of the deployment process." + ) + + events_client = session.boto_session.client("events") + rule_name = f"zenml-{deployment.pipeline_configuration.name}" + + # Determine first execution time based on schedule type + if deployment.schedule.cron_expression: + # AWS EventBridge requires cron expressions in format: cron(0 12 * * ? *) + # Strip any "cron(" prefix if it exists + cron_exp = deployment.schedule.cron_expression.replace( + "cron(", "" + ).replace(")", "") + schedule_expr = f"cron({cron_exp})" + next_execution = None + elif deployment.schedule.interval_second: + minutes = max( + 1, + int( + deployment.schedule.interval_second.total_seconds() + / 60 + ), + ) + schedule_expr = f"rate({minutes} minutes)" + next_execution = ( + datetime.utcnow() + deployment.schedule.interval_second + ) + elif deployment.schedule.run_once_start_time: + # Convert local time to UTC for EventBridge + dt = deployment.schedule.run_once_start_time.astimezone( + timezone.utc + ) + schedule_expr = f"cron({dt.minute} {dt.hour} {dt.day} {dt.month} ? {dt.year})" + next_execution = deployment.schedule.run_once_start_time + + # Create IAM policy for EventBridge + iam_client = session.boto_session.client("iam") + role_name = self.config.execution_role.split("/")[ + -1 + ] # Extract role name from ARN + + # Create the policy document (existing) + policy_document = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": ["sagemaker:StartPipelineExecution"], + "Resource": f"arn:aws:sagemaker:{session.boto_region_name}:{session.boto_session.client('sts').get_caller_identity()['Account']}:pipeline/{orchestrator_run_name}", + } + ], + } + + try: + # Update the role policy (existing) + policy_name = f"zenml-eventbridge-{orchestrator_run_name}" + iam_client.put_role_policy( + RoleName=role_name, + PolicyName=policy_name, + PolicyDocument=json.dumps(policy_document), + ) + logger.info(f"Created/Updated IAM policy: {policy_name}") + + except (ClientError, BotoCoreError) as e: + logger.warning( + f"Failed to update IAM policy: {e}. " + f"Please ensure your execution role has sufficient permissions " + f"to start pipeline executions." + ) + except KeyError as e: + logger.warning( + f"Missing required field for IAM policy creation: {e}. " + f"Please ensure your execution role has sufficient permissions " + f"to start pipeline executions." + ) + + # Create the EventBridge rule + events_client = session.boto_session.client("events") + rule_name = f"zenml-{deployment.pipeline_configuration.name}" + + # Determine first execution time based on schedule type + if deployment.schedule.cron_expression: + # AWS EventBridge requires cron expressions in format: cron(0 12 * * ? *) + # Strip any "cron(" prefix if it exists + cron_exp = deployment.schedule.cron_expression.replace( + "cron(", "" + ).replace(")", "") + schedule_expr = f"cron({cron_exp})" + next_execution = None + elif deployment.schedule.interval_second: + minutes = max( + 1, + int( + deployment.schedule.interval_second.total_seconds() + / 60 + ), + ) + schedule_expr = f"rate({minutes} minutes)" + next_execution = ( + datetime.utcnow() + deployment.schedule.interval_second + ) + elif deployment.schedule.run_once_start_time: + # Convert local time to UTC for EventBridge + dt = deployment.schedule.run_once_start_time.astimezone( + timezone.utc + ) + schedule_expr = f"cron({dt.minute} {dt.hour} {dt.day} {dt.month} ? {dt.year})" + next_execution = deployment.schedule.run_once_start_time - # mainly for testing purposes, we wait for the pipeline to finish - if settings.synchronous: logger.info( - "Executing synchronously. Waiting for pipeline to finish... \n" - "At this point you can `Ctrl-C` out without cancelling the " - "execution." + f"Creating EventBridge rule with schedule expression: {schedule_expr}\n" + f"Note: AWS EventBridge schedules are always executed in UTC timezone.\n" + + ( + f"First execution will occur at: {next_execution.strftime('%Y-%m-%d %H:%M:%S')} " + f"({next_execution.astimezone().tzinfo}) / " + f"{next_execution.astimezone(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} (UTC)" + if next_execution + else f"Using cron expression: {deployment.schedule.cron_expression}" + ) + + ( + f" (and every {int(minutes)} minutes after)" + if deployment.schedule.interval_second + else "" + ) ) + try: - execution.wait( - delay=POLLING_DELAY, max_attempts=MAX_POLLING_ATTEMPTS + events_client.put_rule( + Name=rule_name, + ScheduleExpression=schedule_expr, + State="ENABLED", + ) + # Add the SageMaker pipeline as target with the role + events_client.put_targets( + Rule=rule_name, + Targets=[ + { + "Id": f"zenml-target-{deployment.pipeline_configuration.name}", + "Arn": f"arn:aws:sagemaker:{session.boto_region_name}:{session.boto_session.client('sts').get_caller_identity()['Account']}:pipeline/{orchestrator_run_name}", + "RoleArn": self.config.execution_role, + } + ], ) - logger.info("Pipeline completed successfully.") - except WaiterError: + except (ClientError, BotoCoreError) as e: raise RuntimeError( - "Timed out while waiting for pipeline execution to " - "finish. For long-running pipelines we recommend " - "configuring your orchestrator for asynchronous execution. " - "The following command does this for you: \n" - f"`zenml orchestrator update {self.name} " - f"--synchronous=False`" + f"Failed to create EventBridge rule or target. Please ensure you have " + f"sufficient permissions to create and manage EventBridge rules and targets: {str(e)}" + ) from e + + logger.info( + f"Successfully scheduled pipeline with rule: {rule_name}\n" + f"Schedule type: {schedule_expr}\n" + + ( + f"First execution will occur at: {next_execution.strftime('%Y-%m-%d %H:%M:%S')} " + f"({next_execution.astimezone().tzinfo})" + if next_execution + else f"Using cron expression: {deployment.schedule.cron_expression}" + ) + + ( + f" (and every {int(minutes)} minutes after)" + if deployment.schedule.interval_second + else "" ) + ) + + # Yield metadata about the schedule + schedule_type = ( + "cron" + if deployment.schedule.cron_expression + else "rate" + if deployment.schedule.interval_second + else "one-time" + ) + + schedule_metadata = { + "rule_name": rule_name, + "schedule_type": schedule_type, + "schedule_expr": schedule_expr, + "pipeline_name": orchestrator_run_name, + "next_execution": next_execution, + } + + yield from self.compute_metadata( + execution=schedule_metadata, + settings=settings, + ) + else: + # Execute the pipeline immediately if no schedule is specified + execution = pipeline.start() + logger.warning( + "Steps can take 5-15 minutes to start running " + "when using the Sagemaker Orchestrator." + ) + + # Yield metadata based on the generated execution object + yield from self.compute_metadata( + execution=execution, settings=settings + ) + + # mainly for testing purposes, we wait for the pipeline to finish + if settings.synchronous: + logger.info( + "Executing synchronously. Waiting for pipeline to finish... \n" + "At this point you can `Ctrl-C` out without cancelling the " + "execution." + ) + try: + execution.wait( + delay=POLLING_DELAY, max_attempts=MAX_POLLING_ATTEMPTS + ) + logger.info("Pipeline completed successfully.") + except WaiterError: + raise RuntimeError( + "Timed out while waiting for pipeline execution to " + "finish. For long-running pipelines we recommend " + "configuring your orchestrator for asynchronous execution. " + "The following command does this for you: \n" + f"`zenml orchestrator update {self.name} " + f"--synchronous=False`" + ) def get_pipeline_run_metadata( self, run_id: UUID @@ -594,7 +784,7 @@ def compute_metadata( """Generate run metadata based on the generated Sagemaker Execution. Args: - execution: The corresponding _PipelineExecution object. + execution: The corresponding _PipelineExecution object or schedule metadata dict. settings: The Sagemaker orchestrator settings. Yields: @@ -603,19 +793,31 @@ def compute_metadata( # Metadata metadata: Dict[str, MetadataType] = {} - # Orchestrator Run ID - if run_id := self._compute_orchestrator_run_id(execution): - metadata[METADATA_ORCHESTRATOR_RUN_ID] = run_id + # Handle schedule metadata if execution is a dict + if isinstance(execution, dict): + metadata.update( + { + "schedule_rule_name": execution["rule_name"], + "schedule_type": execution["schedule_type"], + "schedule_expression": execution["schedule_expr"], + "pipeline_name": execution["pipeline_name"], + } + ) + + if next_execution := execution.get("next_execution"): + metadata["next_execution_time"] = next_execution.isoformat() + else: + # Handle execution metadata + if run_id := self._compute_orchestrator_run_id(execution): + metadata[METADATA_ORCHESTRATOR_RUN_ID] = run_id - # URL to the Sagemaker's pipeline view - if orchestrator_url := self._compute_orchestrator_url(execution): - metadata[METADATA_ORCHESTRATOR_URL] = Uri(orchestrator_url) + if orchestrator_url := self._compute_orchestrator_url(execution): + metadata[METADATA_ORCHESTRATOR_URL] = Uri(orchestrator_url) - # URL to the corresponding CloudWatch page - if logs_url := self._compute_orchestrator_logs_url( - execution, settings - ): - metadata[METADATA_ORCHESTRATOR_LOGS_URL] = Uri(logs_url) + if logs_url := self._compute_orchestrator_logs_url( + execution, settings + ): + metadata[METADATA_ORCHESTRATOR_LOGS_URL] = Uri(logs_url) yield metadata diff --git a/tests/integration/integrations/aws/orchestrators/test_sagemaker_orchestrator.py b/tests/integration/integrations/aws/orchestrators/test_sagemaker_orchestrator.py index 788dcb05063..9d4299b9fcc 100644 --- a/tests/integration/integrations/aws/orchestrators/test_sagemaker_orchestrator.py +++ b/tests/integration/integrations/aws/orchestrators/test_sagemaker_orchestrator.py @@ -13,8 +13,17 @@ # permissions and limitations under the License. +from datetime import datetime, timedelta, timezone +from unittest.mock import MagicMock, patch + from zenml.enums import StackComponentType from zenml.integrations.aws.flavors import SagemakerOrchestratorFlavor +from zenml.integrations.aws.flavors.sagemaker_orchestrator_flavor import ( + SagemakerOrchestratorSettings, +) +from zenml.integrations.aws.orchestrators.sagemaker_orchestrator import ( + SagemakerOrchestrator, +) def test_sagemaker_orchestrator_flavor_attributes(): @@ -23,3 +32,97 @@ def test_sagemaker_orchestrator_flavor_attributes(): flavor = SagemakerOrchestratorFlavor() assert flavor.type == StackComponentType.ORCHESTRATOR assert flavor.name == "sagemaker" + + +def test_compute_schedule_metadata(): + """Tests that schedule metadata is computed correctly.""" + # Setup + orchestrator = SagemakerOrchestrator( + name="test_orchestrator", + id="test-id", + config={}, + flavor="sagemaker", + type="orchestrator", + user="test-user", + workspace="test-workspace", + created="2023-01-01", + updated="2023-01-01", + ) + settings = SagemakerOrchestratorSettings() + + # Mock schedule info with timezone-aware datetime in UTC + next_execution = datetime.now(timezone.utc) + timedelta(hours=1) + schedule_info = { + "rule_name": "test-rule", + "schedule_type": "rate", + "schedule_expr": "rate(1 hour)", + "pipeline_name": "test-pipeline", + "next_execution": next_execution, + "region": "us-west-2", + "account_id": "123456789012", + } + + # Mock boto3 session and SageMaker client + mock_sagemaker_client = MagicMock() + mock_sagemaker_client.list_domains.return_value = { + "Domains": [{"DomainId": "d-test123"}] + } + + with patch("boto3.Session") as mock_session: + mock_session.return_value.client.return_value = mock_sagemaker_client + + # Get metadata + metadata = next( + orchestrator.compute_metadata( + execution=schedule_info, + settings=settings, + ) + ) + + # Verify schedule-specific metadata + assert metadata["schedule_rule_name"] == "test-rule" + assert metadata["schedule_type"] == "rate" + assert metadata["schedule_expression"] == "rate(1 hour)" + assert metadata["pipeline_name"] == "test-pipeline" + assert metadata["next_execution_time"] == next_execution.isoformat() + + +def test_compute_schedule_metadata_error_handling(): + """Tests error handling in schedule metadata computation.""" + orchestrator = SagemakerOrchestrator( + name="test_orchestrator", + id="test-id", + config={}, + flavor="sagemaker", + type="orchestrator", + user="test-user", + workspace="test-workspace", + created="2023-01-01", + updated="2023-01-01", + ) + settings = SagemakerOrchestratorSettings() + + # Invalid schedule info missing required fields + schedule_info = { + "rule_name": "test-rule", + "schedule_type": "rate", # Add minimum required fields + "schedule_expr": "rate(1 hour)", + "pipeline_name": "test-pipeline", + } + + with patch("boto3.Session") as mock_session: + mock_session.side_effect = Exception("Failed to create session") + + # Get metadata - should not raise exception + metadata = next( + orchestrator.compute_metadata( + execution=schedule_info, + settings=settings, + ) + ) + + # Basic metadata should still be present + assert metadata["schedule_rule_name"] == "test-rule" + assert metadata["schedule_type"] == "rate" + assert metadata["schedule_expression"] == "rate(1 hour)" + assert metadata["pipeline_name"] == "test-pipeline"