Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PythonVirtualenvOperator crashes if any python_callable function is defined in the same source as DAG #35529

Closed
1 of 2 tasks
Felix-neko opened this issue Nov 8, 2023 · 13 comments · Fixed by #37165
Closed
1 of 2 tasks
Assignees
Labels
affected_version:2.7 Issues Reported for 2.7 area:core area:core-operators Operators, Sensors and hooks within Core Airflow good first issue kind:bug This is a clearly a bug

Comments

@Felix-neko
Copy link

Felix-neko commented Nov 8, 2023

Apache Airflow version

2.7.3

What happened

Hi folks!

I have to use PythonVirtualenvOperator operator and pass it {{ dag_run }}, {{ task_instance }} and other airflow context variables. And sometimes it crashes with following error:

[2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO - Traceback (most recent call last):
[2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO -   File "/tmp/venvqbspm8nx/script.py", line 17, in <module>
[2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO -     arg_dict = dill.load(file)
[2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO -   File "/tmp/venvqbspm8nx/lib/python3.10/site-packages/dill/_dill.py", line 373, in load
[2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO -     return Unpickler(file, ignore=ignore, **kwds).load()
[2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO -   File "/tmp/venvqbspm8nx/lib/python3.10/site-packages/dill/_dill.py", line 646, in load
[2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO -     obj = StockUnpickler.load(self)
[2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO -   File "/tmp/venvqbspm8nx/lib/python3.10/site-packages/dill/_dill.py", line 636, in find_class
[2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO -     return StockUnpickler.find_class(self, module, name)
[2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO - ModuleNotFoundError: No module named 'unusual_prefix_7da5b81975a8caeba2f4e2b91b352e55493c2e25_dag'
[2023-11-08, 11:30:20 UTC] {taskinstance.py:1937} ERROR - Task failed with exception

Afer some testing I have found out that this error occurs if there is any operator in the DAG (maybe other operator than PythonVirtualenvOperator) that takes a function as python_callable argument -- and whose function is defined in the same Python source as the DAG object.

What you think should happen instead

I think that airflow should check its DAGs befor running (or before serialization) and give an informative error message in following case: if there is a PythonVirtualenvOperator in the DAG and if there is a python_callable function who is declared in the same Python module as the DAG itself.

And, for the future, it will be really cool if airflow will migrate to cloudpickle and such functions will be deserialized correctly.

How to reproduce

Here's a minimal example that will give this error (should be tested with airflow standalone, with SequentialExecutor or KubernetesExecutor, does not happen on DebugExecutor):

import datetime
import pendulum
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
import dill


dag = DAG(
    dag_id='strange_pickling_error_dag',
    schedule_interval='0 5 * * 1',
    start_date=datetime.datetime(2020, 1, 1),
    catchup=False,
    render_template_as_native_obj=True,
)


context = {"ts": "{{ ts }}", "dag_run": "{{ dag_run }}"}


def make_foo(*args, **kwargs):
    print("---> making foo!")
    print("make foo(...): args")
    print(args)
    print("make foo(...): kwargs")
    print(kwargs)


make_foo_task = PythonVirtualenvOperator(
    task_id='make_foo',
    python_callable=make_foo,
    use_dill=True,
    system_site_packages=False,
    op_args=[context],
    requirements=[f"dill=={dill.__version__}", f"apache-airflow=={airflow.__version__}", "psycopg2-binary >= 2.9, < 3",
                  f"pendulum=={pendulum.__version__}", "lazy-object-proxy"],
    dag=dag)

And here's my workaround code:

  • dags/strange_pickling_error/dag.py:
import datetime
import pendulum
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
import dill

from strange_pickling_error.some_moar_code import make_foo


dag = DAG(
    dag_id='strange_pickling_error_dag',
    schedule_interval='0 5 * * 1',
    start_date=datetime.datetime(2020, 1, 1),
    catchup=False,
    render_template_as_native_obj=True,
)

context = {"ts": "{{ ts }}", "dag_run": "{{ dag_run }}"}


make_foo_task = PythonVirtualenvOperator(
    task_id='make_foo',
    python_callable=make_foo,
    use_dill=True,
    system_site_packages=False,
    op_args=[context],
    requirements=[f"dill=={dill.__version__}", f"apache-airflow=={airflow.__version__}", "psycopg2-binary >= 2.9, < 3",
                  f"pendulum=={pendulum.__version__}", "lazy-object-proxy"],
    dag=dag)
  • dags/strange_pickling_error/some_moar_code.py:
def make_foo(*args, **kwargs):
    print("---> making foo!")
    print("make foo(...): args")
    print(args)
    print("make foo(...): kwargs")
    print(kwargs)

Operating System

Ubuntu 22.04

Versions of Apache Airflow Providers

apache-airflow-providers-common-sql==1.8.0
apache-airflow-providers-ftp==3.6.0
apache-airflow-providers-google==10.11.0
apache-airflow-providers-http==4.6.0
apache-airflow-providers-imap==3.4.0
apache-airflow-providers-sqlite==3.5.0

Deployment

Virtualenv installation

Deployment details

Python 3.10
airflow==2.7.3
dill==0.3.5.1

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@Felix-neko Felix-neko added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Nov 8, 2023
@Felix-neko Felix-neko changed the title PythonVirtualenvOperator crashes if any python_callable function is defined in the same source as DAG PythonVirtualenvOperator crashes if any python_callable function is defined in the same source as DAG Nov 8, 2023
@potiuk
Copy link
Member

potiuk commented Nov 8, 2023

Marked is as good first issue for someone to pick.

I think that airflow should check its DAGs befor running (or before serialization) and give an informative error message in following case: if there is a PythonVirtualenvOperator in the DAG and if there is a python_callable function who is declared in the same Python module as the DAG itself.

If we can detect such condition cool, that would be nice condition. I also encourage you to contribute it - seems you have alreade enough involvement to be ablet to provide a PR.

@rawwar
Copy link
Collaborator

rawwar commented Nov 9, 2023

I will be working on this. Excited :)

Edit: Unless OP want to contribute it. I'll be working on this anyway as a learning experience.

@Pavel-Malyutin
Copy link

We also faced a similar problem. We will be glad to see a fix in the next releases :)

@rawwar
Copy link
Collaborator

rawwar commented Nov 13, 2023

Update: I was able to replicate the issue locally.

@josh-fell josh-fell removed the needs-triage label for new issues that we didn't triage yet label Nov 27, 2023
@rawwar
Copy link
Collaborator

rawwar commented Jan 31, 2024

I was looking into this issue. One thing I notice is that, when scheduler is processing the dags and checking if they are valid,
there is no check that is actually validating for any pre-requisites w.r.t an Operator . I see checks for validating if dag can be scheduled, validating parameters, timetable and setup teardown.

Until now, I've been going through scheduler code that processes dags. I just started looking into the task instance execution.

EDIT: I just realized, when the DAG is being loaded, for PythonVirtualEnvOperator, it does check whether the passed function is a normal function or a lambda. Something can be done here. I'm still looking into it

@eladkal eladkal added affected_version:2.7 Issues Reported for 2.7 area:core-operators Operators, Sensors and hooks within Core Airflow labels Feb 1, 2024
@rawwar
Copy link
Collaborator

rawwar commented Feb 2, 2024

in _execute_python_callable_in_subprocess --> when calling self._write_args(input_path) it seems some object in op_kwargs has a reference to modified dag module name.. which seems to be defined as f"unusual_prefix_{path_hash}_{org_mod_name}" .

And hence, during dill.loads, its trying to import the dag module using the modified name its failing. Modified module seems to be in memory and its not written to a file. I am still trying to locate the exact object which is causing this issue.

I am still debugging this. Will post more info on this here.

@rawwar
Copy link
Collaborator

rawwar commented Feb 2, 2024

@potiuk , I think there are two options for this issue.

  1. Directly raise an error when the callable is defined within the module where DAG is defined.
  2. Make the original DAG available by copying it to the temporary directory and rename it to the modified module name(which is unusual_prefix_* . This temporary directory is created by the PythonVirtualEnvironment operator and it does copy inputs, pickled args, kwargs and etc.

I am more inclined towards 2 as it will make the DAG work. Although, I think, coming up with a better approach is still possible as I am still exploring it. But, we can also update the documentation and go with approach 1.

@rawwar
Copy link
Collaborator

rawwar commented Feb 4, 2024

I was able to identify what objects are causing the import issue with dill.

We are using dill to dump args and kwargs given to the PythonVirtualEnvOperator. And, when doing this, in the kwargs, there are three objects dag_run, task and dag which are failing to get imported with dill as they are being imported from unusual_prefix_{sha1 of file path}_{original module name} . I am still trying to figure about what exact object inside the mentioned three objects is referring to the modified module name. Not sure how useful this information is. But, I think, if I can identify it, probably during the dill.dump we can try to copy the original dag module to the venv folder with the modified file name.

@rawwar
Copy link
Collaborator

rawwar commented Feb 5, 2024

  1. Make the original DAG available by copying it to the temporary directory and rename it to the modified module name(which is unusual_prefix_* . This temporary directory is created by the PythonVirtualEnvironment operator and it does copy inputs, pickled args, kwargs and etc.

I don't think this is might be an optimal approach. But, I think I understand the problem and probably can explain it here.

If we want to allow python callable to be defined as part of the dag module, we should make the original module available whenever we are serializing and deserializing objects. But, since we are modifying module's name and re-importing it. we should try to re-import with the modified name in python_virtualenv_script.jinja2 .

so, condition to detect if python_callable is part of the dag module will be to check using check_callable_in_dag_module as below

import hashlib
from pathlib import Path
import inspect

MODIFIED_DAG_MODULE_NAME = "unusual_prefix_{path_hash}_{module_name}"
def get_unique_dag_module_name(file_path: str) -> str:
    """Returns a unique module name in the format unusual_prefix_{sha1 of module's file path}_{original module name}."""
    if isinstance(file_path, str):
        path_hash = hashlib.sha1(file_path.encode("utf-8")).hexdigest()
        org_mod_name = Path(file_path).stem
        return MODIFIED_DAG_MODULE_NAME.format(path_hash=path_hash, module_name=org_mod_name)
    raise ValueError("file_path should be a string to generate unique module name")

def check_callable_in_dag_module(python_callable):
        if get_unique_dag_module_name(inspect.getfile(python_callable)) == python_callable.__module__:
            return True

If the above condition is met, we should copy the original module source code to the temp directory where the PythonVirtualenvOperator will execute the code

@potiuk
Copy link
Member

potiuk commented Feb 5, 2024

Maybe there is an easier solution: you already have the source of the callable:

{{ python_callable_source }}

and name

{{ python_callable }}(

and assuming you pass "unique_module_name" starting with unusual_prefix_... you can do this:

{{ python_callable_source }}

import types
{{ unique_module_name }}  = types.ModuleType("{{ unique_module_name }}")

{{ unique_module_name }}.{{ python_callable }} = {{ python_callable }}

All that before op_args/kwargs loading from dill. That should do the trick:

  • first you define the callable in current module
  • then you create an empty "unique" module
  • then you add the callable that you created locally to be present in the uniquely named module you just created

There is no need to copy the sources to a separate file - it would also be harmful, as we really need the callable only - not all the DAG or other methods defined in there.

@rawwar
Copy link
Collaborator

rawwar commented Feb 6, 2024

@potiuk , How do we want to handle the cases where there are multiple helper methods in the same module used by the python_callable?

Edit: I guess, this could be a feature request? But, we do mention that global objects are not imported [here] (https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#id1) .

I am going ahead and implementing what you suggested @potiuk . Thanks.

@potiuk
Copy link
Member

potiuk commented Feb 6, 2024

Edit: I guess, this could be a feature request? But, we do mention that global objects are not imported [here] (https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#id1) .

Yeah. This is how it currently works - and not a big limitarion.

@rawwar
Copy link
Collaborator

rawwar commented Feb 6, 2024

@potiuk , I have made the changes. Its still failing tests. I am looking into it. But, if you have any feedback, i can incorporate before marking the PR for review: #37165

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.7 Issues Reported for 2.7 area:core area:core-operators Operators, Sensors and hooks within Core Airflow good first issue kind:bug This is a clearly a bug
Projects
None yet
6 participants