Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate *args and **kwargs in BaseOperator #1285

Merged
merged 1 commit into from
Apr 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions UPDATING.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
# Updating Airflow

This file aims to document the backwards-incompatible changes in Airflow and
assist people with migrating to a new version.
This file documents any backwards-incompatible changes in Airflow and
assists people when migrating to a new version.

## 1.7 to 1.8

### DAGs now don't start automatically when created
## Airflow 1.8

To retain the old behavior, add this to your configuration:
### Changes to Behavior

#### New DAGs are paused by default

Previously, new DAGs would be scheduled immediately. To retain the old behavior, add this to airflow.cfg:

```
[core]
dags_are_paused_at_creation = False
```

### Deprecated Features
These features are marked for deprecation. They may still work (and raise a `DeprecationWarning`), but are no longer supported and will be removed entirely in Airflow 2.0

#### Operators no longer accept arbitrary arguments
Previously, `Operator.__init__()` accepted any arguments (either positional `*args` or keyword `**kwargs`) without complaint. Now, invalid arguments will be rejected.
4 changes: 4 additions & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
import os
import subprocess
import warnings

from future import standard_library
standard_library.install_aliases()
Expand All @@ -16,6 +17,9 @@
from collections import OrderedDict
from configparser import ConfigParser

# show DeprecationWarning and PendingDeprecationWarning
warnings.simplefilter('default', DeprecationWarning)
warnings.simplefilter('default', PendingDeprecationWarning)

class AirflowConfigException(Exception):
pass
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_http_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@

sensor = HttpSensor(
task_id='http_sensor_check',
conn_id='http_default',
http_conn_id='http_default',
endpoint='',
params={},
response_check=lambda response: True if "Google" in response.content else False,
Expand Down
12 changes: 12 additions & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import socket
import sys
import traceback
import warnings
from urllib.parse import urlparse

from sqlalchemy import (
Expand Down Expand Up @@ -1597,6 +1598,17 @@ def __init__(
*args,
**kwargs):

if args or kwargs:
# TODO remove *args and **kwargs in Airflow 2.0
warnings.warn(
'Invalid arguments were passed to {c}. Support for '
'passing such arguments will be dropped in Airflow 2.0.'
'Invalid arguments were:'
'\n*args: {a}\n**kwargs: {k}'.format(
c=self.__class__.__name__, a=args, k=kwargs),
category=PendingDeprecationWarning
)

validate_key(task_id)
self.dag_id = dag.dag_id if dag else 'adhoc_' + owner
self.task_id = task_id
Expand Down
24 changes: 17 additions & 7 deletions airflow/utils/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import inspect
import os

# inspect.signature is only available in Python 3. funcsigs.signature is
# a backport.
try:
import inspect
signature = inspect.signature
except AttributeError:
import funcsigs
signature = funcsigs.signature

from copy import copy
from functools import wraps

Expand Down Expand Up @@ -57,12 +65,14 @@ def wrapper(*args, **kwargs):

dag_args.update(default_args)
default_args = dag_args
arg_spec = inspect.getargspec(func)
num_defaults = len(arg_spec.defaults) if arg_spec.defaults else 0
non_optional_args = arg_spec.args[:-num_defaults]
if 'self' in non_optional_args:
non_optional_args.remove('self')
for arg in func.__code__.co_varnames:

sig = signature(func)
non_optional_args = [
name for (name, param) in sig.parameters.items()
if param.default == param.empty and
param.name != 'self' and
param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD)]
for arg in sig.parameters:
if arg in default_args and arg not in kwargs:
kwargs[arg] = default_args[arg]
missing_args = list(set(non_optional_args) - set(kwargs))
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def run(self):
'flask-cache>=0.13.1, <0.14',
'flask-login==0.2.11',
'future>=0.15.0, <0.16',
'funcsigs>=0.4, <1'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This missed a "," @jlowin

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setup.py is my kryptonite... Fixed in 6581858

'gunicorn>=19.3.0, <19.4.0', # 19.4.? seemed to have issues
'jinja2>=2.7.3, <3.0',
'markdown>=2.5.2, <3.0',
Expand Down
17 changes: 17 additions & 0 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from email.mime.application import MIMEApplication
import signal
from time import sleep
import warnings

from dateutil.relativedelta import relativedelta

Expand Down Expand Up @@ -320,6 +321,22 @@ def test_clear_api(self):
ti = models.TaskInstance(task=task, execution_date=DEFAULT_DATE)
ti.are_dependents_done()

def test_illegal_args(self):
"""
Tests that Operators reject illegal arguments
"""
with warnings.catch_warnings(record=True) as w:
t = operators.BashOperator(
task_id='test_illegal_args',
bash_command='echo success',
dag=self.dag,
illegal_argument_1234='hello?')
self.assertTrue(
issubclass(w[0].category, PendingDeprecationWarning))
self.assertIn(
'Invalid arguments were passed to BashOperator.',
w[0].message.args[0])

def test_bash_operator(self):
t = operators.BashOperator(
task_id='time_sensor_check',
Expand Down