Skip to content

Commit

Permalink
Merge pull request #1219 from airbnb/arthurw_utils_refactor_take2
Browse files Browse the repository at this point in the history
Refactoring utils to be more sane
  • Loading branch information
artwr committed Mar 29, 2016
2 parents dea40d1 + 773f52f commit 60e22ed
Show file tree
Hide file tree
Showing 90 changed files with 1,366 additions and 1,140 deletions.
2 changes: 1 addition & 1 deletion airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from airflow.models import DAG
from flask_admin import BaseView
from importlib import import_module
from airflow.utils import AirflowException
from airflow.exceptions import AirflowException

DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
if DAGS_FOLDER not in sys.path:
Expand Down
22 changes: 13 additions & 9 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
import json

import airflow
from airflow import jobs, settings, utils
from airflow import jobs, settings
from airflow import configuration as conf
from airflow.executors import DEFAULT_EXECUTOR
from airflow.models import DagModel, DagBag, TaskInstance, DagPickle, DagRun
from airflow.utils import AirflowException, State
from airflow.utils import db as db_utils
from airflow.utils import logging as logging_utils
from airflow.utils.state import State
from airflow.exceptions import AirflowException

DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))

Expand Down Expand Up @@ -78,7 +81,8 @@ def backfill(args, dag=None):
mark_success=args.mark_success,
include_adhoc=args.include_adhoc,
local=args.local,
donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')),
donot_pickle=(args.donot_pickle or
conf.getboolean('core', 'donot_pickle')),
ignore_dependencies=args.ignore_dependencies,
pool=args.pool)

Expand Down Expand Up @@ -133,7 +137,7 @@ def set_is_paused(is_paused, args, dag=None):

def run(args, dag=None):

utils.pessimistic_connection_handling()
db_utils.pessimistic_connection_handling()
if dag:
args.dag_id = dag.dag_id

Expand Down Expand Up @@ -236,10 +240,10 @@ def run(args, dag=None):
remote_log_location = filename.replace(log_base, remote_base)
# S3
if remote_base.startswith('s3:/'):
utils.S3Log().write(log, remote_log_location)
logging_utils.S3Log().write(log, remote_log_location)
# GCS
elif remote_base.startswith('gs:/'):
utils.GCSLog().write(
logging_utils.GCSLog().write(
log,
remote_log_location,
append=True)
Expand Down Expand Up @@ -401,7 +405,7 @@ def worker(args):

def initdb(args): # noqa
print("DB: " + repr(settings.engine.url))
utils.initdb()
db_utils.initdb()
print("Done.")


Expand All @@ -412,14 +416,14 @@ def resetdb(args):
"Proceed? (y/n)").upper() == "Y":
logging.basicConfig(level=settings.LOGGING_LEVEL,
format=settings.SIMPLE_LOG_FORMAT)
utils.resetdb()
db_utils.resetdb()
else:
print("Bail.")


def upgradedb(args): # noqa
print("DB: " + repr(settings.engine.url))
utils.upgradedb()
db_utils.upgradedb()


def version(args): # noqa
Expand Down
3 changes: 3 additions & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from collections import OrderedDict
from configparser import ConfigParser


class AirflowConfigException(Exception):
pass

Expand Down Expand Up @@ -612,6 +613,7 @@ def test_mode():
def get(section, key, **kwargs):
return conf.get(section, key, **kwargs)


def getboolean(section, key):
return conf.getboolean(section, key)

Expand Down Expand Up @@ -644,5 +646,6 @@ def set(section, option, value): # noqa
########################
# convenience method to access config entries


def get_dags_folder():
return os.path.expanduser(get('core', 'DAGS_FOLDER'))
4 changes: 2 additions & 2 deletions airflow/contrib/executors/mesos_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from airflow import configuration
from airflow.executors.base_executor import BaseExecutor
from airflow.settings import Session
from airflow.utils import State
from airflow.utils import AirflowException
from airflow.utils.state import State
from airflow.exceptions import AirflowException


DEFAULT_FRAMEWORK_NAME = 'Airflow'
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/hooks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Imports the hooks dynamically while keeping the package API clean,
# abstracting the underlying modules
from airflow.utils import import_module_attrs as _import_module_attrs
from airflow.utils.helpers import import_module_attrs as _import_module_attrs

_hooks = {
'ftp_hook': ['FTPHook'],
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/hooks/gc_base_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging

from airflow.hooks.base_hook import BaseHook
from airflow.utils import AirflowException
from airflow.exceptions import AirflowException
from oauth2client.client import SignedJwtAssertionCredentials, GoogleCredentials

class GoogleCloudBaseHook(BaseHook):
Expand Down
4 changes: 2 additions & 2 deletions airflow/contrib/hooks/qubole_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import datetime
import logging

from airflow.utils import AirflowException
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow import configuration

Expand Down Expand Up @@ -151,4 +151,4 @@ def create_cmd_args(self):
else:
args += inplace_args.split(' ')

return args
return args
2 changes: 1 addition & 1 deletion airflow/contrib/hooks/ssh_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from contextlib import contextmanager

from airflow.hooks.base_hook import BaseHook
from airflow import AirflowException
from airflow.exceptions import AirflowException

import logging

Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Imports the operators dynamically while keeping the package API clean,
# abstracting the underlying modules
from airflow.utils import import_module_attrs as _import_module_attrs
from airflow.utils.helpers import import_module_attrs as _import_module_attrs

_operators = {
'ssh_execute_operator': ['SSHExecuteOperator'],
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/bigquery_check_operator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.operators import CheckOperator, ValueCheckOperator, IntervalCheckOperator
from airflow.utils import apply_defaults
from airflow.utils.decorators import apply_defaults


class BigQueryCheckOperator(CheckOperator):
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/bigquery_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from airflow.utils.decorators import apply_defaults

class BigQueryOperator(BaseOperator):
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/bigquery_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from airflow.utils.decorators import apply_defaults

class BigQueryToBigQueryOperator(BaseOperator):
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/bigquery_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from airflow.utils.decorators import apply_defaults

class BigQueryToCloudStorageOperator(BaseOperator):
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/gcs_download_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from airflow.utils.decorators import apply_defaults

class GoogleCloudStorageDownloadOperator(BaseOperator):
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/gcs_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from airflow.utils.decorators import apply_defaults

class GoogleCloudStorageToBigQueryOperator(BaseOperator):
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/mysql_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.hooks import MySqlHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from airflow.utils.decorators import apply_defaults
from collections import OrderedDict
from datetime import date, datetime
from decimal import Decimal
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/qubole_operator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks import QuboleHook


Expand Down
4 changes: 2 additions & 2 deletions airflow/contrib/operators/ssh_execute_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from subprocess import STDOUT

from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from airflow.utils import AirflowException
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException


class SSHTempFileContent():
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/vertica_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from airflow.contrib.hooks import VerticaHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from airflow.utils.decorators import apply_defaults


class VerticaOperator(BaseOperator):
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/vertica_to_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from airflow.hooks import HiveCliHook
from airflow.contrib.hooks import VerticaHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from airflow.utils.decorators import apply_defaults

class VerticaToHiveTransfer(BaseOperator):
"""
Expand Down
6 changes: 3 additions & 3 deletions airflow/example_dags/example_short_circuit_operator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from airflow.operators import ShortCircuitOperator, DummyOperator
from airflow.models import DAG
import airflow.utils
import airflow.utils.helpers
from datetime import datetime, timedelta

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
Expand All @@ -21,5 +21,5 @@
ds_true = [DummyOperator(task_id='true_' + str(i), dag=dag) for i in [1, 2]]
ds_false = [DummyOperator(task_id='false_' + str(i), dag=dag) for i in [1, 2]]

airflow.utils.chain(cond_true, *ds_true)
airflow.utils.chain(cond_false, *ds_false)
airflow.utils.helpers.chain(cond_true, *ds_true)
airflow.utils.helpers.chain(cond_false, *ds_false)
6 changes: 4 additions & 2 deletions airflow/example_dags/example_trigger_controller_dag.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

"""This example illustrates the use of the TriggerDagRunOperator. There are 2
entities at work in this scenario:
1. The Controller DAG - the DAG that conditionally executes the trigger
Expand All @@ -14,6 +15,7 @@
state is then made available to the TargetDag
2. A Target DAG : c.f. example_trigger_target_dag.py
"""

from airflow import DAG
from airflow.operators import TriggerDagRunOperator
from datetime import datetime
Expand All @@ -35,8 +37,8 @@ def conditionally_trigger(context, dag_run_obj):

# Define the DAG
dag = DAG(dag_id='example_trigger_controller_dag',
default_args={"owner" : "me",
"start_date":datetime.now()},
default_args={"owner": "me",
"start_date": datetime.now()},
schedule_interval='@once')


Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_trigger_target_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@


def run_this_func(ds, **kwargs):
print( "Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message']))
print("Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message']))

run_this = PythonOperator(
task_id='run_this',
Expand Down
10 changes: 10 additions & 0 deletions airflow/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
class AirflowException(Exception):
pass


class AirflowSensorTimeout(Exception):
pass


class AirflowTaskTimeout(Exception):
pass
2 changes: 1 addition & 1 deletion airflow/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
except:
pass

from airflow.utils import AirflowException
from airflow.exceptions import AirflowException

_EXECUTOR = configuration.get('core', 'EXECUTOR')

Expand Down
3 changes: 2 additions & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from builtins import range

from airflow import configuration
from airflow.utils import State, LoggingMixin
from airflow.utils.state import State
from airflow.utils.logging import LoggingMixin

PARALLELISM = configuration.getint('core', 'PARALLELISM')

Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from celery import Celery
from celery import states as celery_states

from airflow.utils import AirflowException
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor
from airflow import configuration

Expand Down
3 changes: 2 additions & 1 deletion airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

from airflow import configuration
from airflow.executors.base_executor import BaseExecutor
from airflow.utils import State, LoggingMixin
from airflow.utils.state import State
from airflow.utils.logging import LoggingMixin

PARALLELISM = configuration.get('core', 'PARALLELISM')

Expand Down
3 changes: 1 addition & 2 deletions airflow/executors/sequential_executor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from builtins import str
import logging
import subprocess

from airflow.executors.base_executor import BaseExecutor
from airflow.utils import State
from airflow.utils.state import State


class SequentialExecutor(BaseExecutor):
Expand Down
2 changes: 1 addition & 1 deletion airflow/hooks/S3_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
boto.set_stream_logger('boto')
logging.getLogger("boto").setLevel(logging.INFO)

from airflow.utils import AirflowException
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook


Expand Down
3 changes: 2 additions & 1 deletion airflow/hooks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Imports the hooks dynamically while keeping the package API clean,
# abstracting the underlying modules
from airflow.utils import import_module_attrs as _import_module_attrs

from airflow.utils.helpers import import_module_attrs as _import_module_attrs
from airflow.hooks.base_hook import BaseHook # noqa to expose in package

_hooks = {
Expand Down
Loading

0 comments on commit 60e22ed

Please sign in to comment.