From 470a2fa4765af99ecc18ecde9f9f0fa7d70878ea Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Sun, 18 Feb 2024 14:39:57 +0530 Subject: [PATCH] Fix KPO task hanging when pod fails to start within specified timeout I am observing an issue wrt to the recent deferrable KPO changes in PR https://github.com/apache/airflow/pull/37279 and https://github.com/apache/airflow/pull/37454, where when the pod fails to start within a specified timeout value, the KPO task is hanging forever whereas it is expected to fail after the timeout. This PR fixes the issue by correcting a logical error for detecting if elapsed timeout has occured for raising the timeout trigger event. --- airflow/providers/cncf/kubernetes/operators/pod.py | 10 ++++++++-- airflow/providers/cncf/kubernetes/triggers/pod.py | 2 ++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 61442a6014ebcf..93fc2b59e73d8b 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -704,6 +704,10 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: ) if event["status"] in ("error", "failed", "timeout"): + # fetch some logs when pod is failed + if self.get_logs: + self.write_logs(self.pod) + if self.do_xcom_push: _ = self.extract_xcom(pod=self.pod) @@ -729,6 +733,10 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: self.invoke_defer_method() elif event["status"] == "success": + # fetch some logs when pod is executed successfully + if self.get_logs: + self.write_logs(self.pod) + if self.do_xcom_push: xcom_sidecar_output = self.extract_xcom(pod=self.pod) return xcom_sidecar_output @@ -741,8 +749,6 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: def _clean(self, event: dict[str, Any]): if event["status"] == "running": return - if self.get_logs: - self.write_logs(self.pod) istio_enabled = self.is_istio_enabled(self.pod) # Skip await_pod_completion when the event is 'timeout' due to the pod can hang # on the ErrImagePull or ContainerCreating step and it will never complete diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py b/airflow/providers/cncf/kubernetes/triggers/pod.py index 8c411e4e4a3450..7cfb34a97eee16 100644 --- a/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -190,6 +190,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] "message": message, } ) + return except Exception as e: yield TriggerEvent( { @@ -223,6 +224,7 @@ async def _wait_for_pod_start(self) -> ContainerState: return self.define_container_state(pod) self.log.info("Still waiting for pod to start. The pod state is %s", pod.status.phase) await asyncio.sleep(self.poll_interval) + delta = datetime.datetime.now(tz=datetime.timezone.utc) - self.trigger_start_time raise PodLaunchTimeoutException("Pod did not leave 'Pending' phase within specified timeout") async def _wait_for_container_completion(self) -> TriggerEvent: