-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
remove lazy load from TaskInstance.DagModel #6
Conversation
漫步这次更新的整体变更是为了解决多个来自同一DAG的任务进入僵尸状态可能阻止第二个任务懒加载DAG对象的问题。 变更
TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Configuration used: CodeRabbit UI
Commits
Files that changed from the base of the PR and between f763058 and 80f3f21a67b7b3c0eaa5ed0fcdd35d0a77fa71a7.Files selected for processing (1)
- airflow/models/taskinstance.py (1 hunks)
Additional comments: 1
airflow/models/taskinstance.py (1)
- 507-507: 在
TaskInstance
类的dag_model
关系定义中添加了lazy="immediate"
参数。这个更改旨在解决当一个 DAG 中的多个任务进入僵尸状态时,导致调度器崩溃的问题。通过设置为"immediate"
,可以确保在访问TaskInstance
时立即加载 DAG 对象,从而避免了在描述的情况下失败的懒加载操作。这是一个针对特定问题的解决方案,应该在实际环境中进行充分测试,以确保它不会引入新的性能问题或其他意外行为。
…ss (apache#28198) ``` [2022-12-06T14:20:21.622+0000] {base_job.py:229} DEBUG - [heartbeat] [2022-12-06T14:20:21.623+0000] {scheduler_job.py:1495} DEBUG - Finding 'running' jobs without a recent heartbeat [2022-12-06T14:20:21.637+0000] {scheduler_job.py:1515} WARNING - Failing (2) jobs without heartbeat after 2022-12-06 14:15:21.623199+00:00 [2022-12-06T14:20:21.641+0000] {scheduler_job.py:1526} ERROR - Detected zombie job: {'full_filepath': '/opt/airflow/dags/xxx_dag.py', 'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'xxx', 'Task Id': 'xxx', 'Run Id': 'scheduled__2022-12-05T00:15:00+00:00', 'Hostname': 'airflow-worker-0.airflow-worker.airflow2.svc.cluster.local', 'External Executor Id': '9520cb9f-3245-497a-8e17-e9dec29d4549'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7f1cd4de4130>, 'is_failure_callback': True} [2022-12-06T14:20:21.645+0000] {scheduler_job.py:763} ERROR - Exception when executing SchedulerJob._run_scheduler_loop Traceback (most recent call last): File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 746, in _execute self._run_scheduler_loop() File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 878, in _run_scheduler_loop next_event = timers.run(blocking=False) File "/usr/local/lib/python3.10/sched.py", line 151, in run action(*argument, **kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/event_scheduler.py", line 37, in repeat action(*args, **kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 75, in wrapper return func(*args, session=session, **kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1522, in _find_zombies processor_subdir=ti.dag_model.processor_subdir, File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 481, in __get__ return self.impl.get(state, dict_) File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 926, in get value = self._fire_loader_callables(state, key, passive) File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 962, in _fire_loader_callables return self.callable_(state, passive) File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/strategies.py", line 861, in _load_for_state raise orm_exc.DetachedInstanceError( sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <TaskInstance at 0x7f1ccc3e8520> is not bound to a Session; lazy load operation of attribute 'dag_model' cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3) [2022-12-06T14:20:21.647+0000] {celery_executor.py:443} DEBUG - Inquiring about 5 celery task(s) [2022-12-06T14:20:21.669+0000] {celery_executor.py:602} DEBUG - Fetched 5 state(s) for 5 task(s) [2022-12-06T14:20:21.669+0000] {celery_executor.py:446} DEBUG - Inquiries completed. [2022-12-06T14:20:21.669+0000] {scheduler_job.py:775} INFO - Exited execute loop [2022-12-06T14:20:21.674+0000] {cli_action_loggers.py:83} DEBUG - Calling callbacks: [] Traceback (most recent call last): File "/home/airflow/.local/bin/airflow", line 8, in <module> sys.exit(main()) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/__main__.py", line 39, in main args.func(args) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/cli_parser.py", line 52, in command return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/cli.py", line 103, in wrapper return f(*args, **kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 85, in scheduler _run_scheduler_job(args=args) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 50, in _run_scheduler_job job.run() File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/base_job.py", line 247, in run self._execute() File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 746, in _execute self._run_scheduler_loop() File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 878, in _run_scheduler_loop next_event = timers.run(blocking=False) File "/usr/local/lib/python3.10/sched.py", line 151, in run action(*argument, **kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/event_scheduler.py", line 37, in repeat action(*args, **kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 75, in wrapper return func(*args, session=session, **kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1522, in _find_zombies processor_subdir=ti.dag_model.processor_subdir, File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 481, in __get__ return self.impl.get(state, dict_) File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 926, in get value = self._fire_loader_callables(state, key, passive) File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 962, in _fire_loader_callables return self.callable_(state, passive) File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/strategies.py", line 861, in _load_for_state raise orm_exc.DetachedInstanceError( sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <TaskInstance at 0x7f1ccc3e8520> is not bound to a Session; lazy load operation of attribute 'dag_model' cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3) ``` When in standalone dag processor mode, will use `DatabaseCallbackSink` `_find_zombies` func call `self.executor.send_callback(request)` func. But not propagation orm `session` , provide_session in `send` func again. ``` class DatabaseCallbackSink(BaseCallbackSink): """Sends callbacks to database.""" @provide_session def send(self, callback: CallbackRequest, session: Session = NEW_SESSION) -> None: """Sends callback for execution.""" db_callback = DbCallbackRequest(callback=callback, priority_weight=10) session.add(db_callback) ``` Signed-off-by: BobDu <i@bobdu.cc> (cherry picked from commit 4b340b7)
…e#28544) apache#28198 accidentally dropped a join in a query, leading to this: airflow/jobs/scheduler_job.py:1547 SAWarning: SELECT statement has a cartesian product between FROM element(s) "dag_run_1", "task_instance", "job" and FROM element "dag". Apply join condition(s) between each element to resolve. (cherry picked from commit a24d18a)
80f3f21
to
a9e9b3f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review Status
Configuration used: CodeRabbit UI
Files selected for processing (2)
- airflow/jobs/scheduler_job.py (2 hunks)
- tests/jobs/test_scheduler_job.py (4 hunks)
Additional Context Used
Additional comments not posted (9)
airflow/jobs/scheduler_job.py (1)
1482-1512
: > 📝 NOTEThis review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [1474-1509]
在
_find_zombies
方法中,你移除了@provide_session
装饰器并改为使用create_session()
来管理会话。这是一个好的实践,因为它提供了更细粒度的控制,确保会话在使用完毕后被正确关闭。此外,你还调整了查询结构并更新了变量赋值,这有助于提高代码的可读性和维护性。整体上,这些更改有助于提高方法的效率和可靠性。tests/jobs/test_scheduler_job.py (8)
4054-4074
: > 📝 NOTEThis review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [4008-4071]
在
test_find_zombies
测试中,确保了僵尸任务被正确地识别和处理。这个测试模拟了一个场景,其中有两个任务实例被标记为运行状态,但其中一个由于超时被视为僵尸任务。测试验证了只有超时的任务实例被发送到了执行器的回调接收器中,这是预期的行为。
4054-4074
: > 📝 NOTEThis review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [4071-4145]
在
test_zombie_message
测试中,检查了生成的僵尸消息是否符合预期格式。这个测试通过修改任务实例的不同属性来验证僵尸消息的内容是否正确反映了这些属性的变化。这有助于确保在任务实例变成僵尸任务时,相关的错误信息能够准确地被记录和报告。
4143-4153
: > 📝 NOTEThis review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [4146-4178]
test_find_zombies_handle_failure_callbacks_are_correctly_passed_to_dag_processor
测试确保了当发现僵尸任务时,失败回调被正确地传递给DAG处理器。这个测试模拟了一个场景,其中一个任务实例因为超时而被视为僵尸任务,并验证了相应的失败回调是否被发送到了DAG文件处理器。这是重要的,因为它确保了在任务失败时,可以触发相应的失败处理逻辑。
4176-4182
: > 📝 NOTEThis review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [4179-4208]
test_cleanup_stale_dags
测试检查了清理过时DAGs的逻辑是否按预期工作。这个测试首先创建了一个DAG并将其标记为过时,然后运行清理逻辑,并验证过时的DAG是否被正确地从数据库中移除。这有助于确保系统中不会积累无用的DAG记录,从而保持数据库的整洁。
4005-4017
: > 📝 NOTEThis review was outside the diff hunks, and no overlapping diff hunk was found. Original lines [4209-4271]
test_should_mark_empty_task_as_success
测试验证了空任务(没有执行命令的任务)是否被正确地标记为成功。这个测试创建了一个包含空任务的DAG,并运行调度器来处理这些任务。测试检查了任务实例的状态,以确保空任务在没有执行任何操作的情况下被标记为成功。这是重要的,因为它确保了即使是空任务也能正确地通过调度器的处理流程。
4005-4017
: > 📝 NOTEThis review was outside the diff hunks, and no overlapping diff hunk was found. Original lines [4272-4372]
test_catchup_works_correctly
测试验证了调度器的追赶(catchup)逻辑是否按预期工作。这个测试通过创建一个需要追赶的DAG,并运行调度器来处理过去的执行日期。测试检查了DAG运行的创建情况,以及调度器是否正确地更新了下一个DAG运行的计划时间。这有助于确保调度器能够正确地处理需要追赶的DAG运行,而不会错过任何计划的执行。
4005-4017
: > 📝 NOTEThis review was outside the diff hunks, and no overlapping diff hunk was found. Original lines [4373-4445]
test_schedule_dag_run_with_upstream_skip
测试验证了当一个任务的上游任务被跳过时,调度器是否正确地处理这种情况。这个测试创建了一个包含有依赖关系的任务的DAG,并将其中一个上游任务标记为跳过。测试检查了依赖于被跳过任务的下游任务的状态,以确保调度器正确地将这些任务也标记为跳过。这是重要的,因为它确保了任务的依赖关系在调度过程中得到了正确的处理。
4005-4017
: > 📝 NOTEThis review was outside the diff hunks, and no overlapping diff hunk was found. Original lines [4446-4545]
TestSchedulerJobQueriesCount
类包含了一系列测试,旨在检测调度器处理不同DAG文件时数据库查询数量的变化。这些测试通过模拟不同数量的DAG和任务,以及不同的调度间隔和结构,来评估调度器的性能。这些测试对于及时发现可能影响调度器性能的代码更改非常有用。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review Status
Configuration used: CodeRabbit UI
Commits
Files that changed from the base of the PR and between a9e9b3f and a46302b5496ac478284b253cd53685765c4ec029.Files selected for processing (1)
- airflow/dag_processing/manager.py (1 hunks)
Additional Context Used
Additional comments not posted (1)
airflow/dag_processing/manager.py (1)
541-551
: 在_run_parsing_loop
方法中,移除了基于_async_mode
的条件赋值,直接将poll_time
设置为None
。这个更改意味着无论是否处于异步模式,都将采用阻塞等待的方式。这种更改可能会影响到调度器的性能和响应能力,特别是在处理大量 DAG 文件时。建议详细评估这一更改对调度器性能的影响,并考虑是否有其他方法可以解决原始问题,同时保持异步处理的能力。
a46302b
to
a9e9b3f
Compare
背景
单个dag含有多个任务,且多个任务都发生故障进入僵尸状态后会导致scheduler崩溃
相关上游issue:
方案
cherry-pick 上有更改修复sql查询语句问题
日志