A boilerplate implementation of Luigi at Groupon
-
Luigi is a Python package that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more
-
Luigi-Warehouse adds
-
example workflows (i.e. replicating postgresql tables to redshift)
-
more data sources
-
variable data sources that do not rely on default luigi behavior/configs (i.e.
VariableS3Client
)
- Install python3 - This repo has been tested against python 3.4+
python setup.py install
- Clone this repo
pip3 install -r requirements.txt
if you want full functionality of all data sources
mkdir your-path-to/data
- Put your credentials and settings in
luigi.cfg
.luigi.cfg-example
shows some possible options. You can also$ export LUIGI_CONFIG_PATH=/path/to/your/luigi.cfg && python...
- You're ready to replicate or move data around...
- Some example workflows are included. Assumptions, Args & Comments are in the File
File | Description | Main Class(es) |
---|---|---|
gsheet_to_redshift.py | replicates all data from a google sheet to a redshift table (full copy/replace) | Run |
gsheet_to_hadoop.py | replicates all data from a google sheet to a hadoop hive table via spark (full copy/replace) | main |
postgres_to_redshift.py | replicates postgres tables to redshift (incrementally or full copy/replace) | Run - PerformIncrementalImport PerformFullImport |
postgres_to_hadoop.py | spark app that replicates postgres tables to hadoop(hive) (incrementally or copy/replace) | Run - RunIncremental RunFromScratch |
salesforce_to_redshift.py | replicates a salesforce report or SOQL to a redshift table(full copy/replace) | SOQLtoRedshift ReporttoRedshift |
teradata_to_redshift.py | replicates given teradata SQL to redshift table (incrementally or full copy/replace) | Run |
typeform_to_redshift.py | replicates all data from typeform responses to a redshift table (full copy/replace) | Run |
zendesk_to_redshift.py | extracts users,orgs,tickets,ticket_events from zendesk to redshift (partially incremental) | Run |
zendesk_to_hadoop.py | generic class to extract from zendesk API and load to hadoop hive via spark (incrementally or full copy/replace) | ZendeskSpark |
- Example to start the luigi scheduler daemon
$ ./start_luigi_server.bash
- Example to run a workflow with multiple workers in parallel
$ LUIGI_CONFIG_PATH=/path/to/your/luigi.cfg && python3 luigi_warehouse/postgres_to_redshift.py Run --params here --workers 50
Luigi - Spotify/Luigi
Postgres / Redshift - psycopg2
MySQL - pymysql
Adwords - googleads : API Reference
Googlesheets - gspread : API Reference
Slack - slackclient : API Reference
Five9 - suds : API Reference
Twilio - twilio : API Reference
Livechat - API Reference
Zendesk - zdesk : API Reference
Shiftplanning - API Reference
Kochava - API Reference
Teradata - teradata
- requires some configuring to install. We typically have to do
$ mv ~/.odbc.ini ~/.odbc.ini.orig
$ cp /opt/teradata/client/15.10/odbc_64/odbcinst.ini ~/.odbcinst.ini
$ cp /opt/teradata/client/15.10/odbc_64/odbc.ini ~/.odbc.ini
OnboardIQ - API Reference
AppBoy - API Reference
Salesforce - simple-salesforce : API Reference
-
Props to cghall for the capability to query salesforce reports directly using the analytics API
-
Also available are
SalesforceBulk
andSalesforceBulkJob
classes which use the Salesforce bulk API
Braintree - braintree : API Reference
Typeform - API Reference
Checkr - API Reference
-
We currently use slack or email for job status notifications which can easily be added
from luigi_slack import SlackBot, notify
slack_channel = 'luigi-status-messages'
...
...
...
if __name__ == '__main__':
slack_channel = 'luigi-status-messages'
slacker = SlackBot(token=luigi.configuration.get_config().get('slackbots', 'BOWSER_SLACK_API_KEY'),
channels=[slack_channel])
with notify(slacker):
luigi.run()
import boto3
class Email:
def __init__(self, region, aws_access_key_id, aws_secret_access_key):
self.client = boto3.client('ses',region_name=region,aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
def send(self, from_, to_list, subject, body):
return self.client.send_email(Source=from_,
Destination={'ToAddresses': to_list},
Message={'Subject':
{'Data': subject},
'Body':
{'Text':
{'Data': body},
'Html':
{'Data':' '}
}
}
)
- Targeted towards ensuring successful replication of data to Redshift (see
modules/validation.py
)
- if the same number of columns in the csv are in the target table
- if the columns have the same datatypes in the same order (
VARCHAR
is acceptable for any python datatype)- uses python_redshift_dtypes to convert
- Checks for load errors for the target:schema:table provided since the load_start provided timestamp
-
Use the wrapper class RunAnywayTarget if you want to make it easier as we make each validation scheme better
-
pass in the
taskobj
with the following attributestype
= ['LoadError', 'Structure']target
= Redshifttable
=schema
=local_file
= local csv file pathload_start
= when you started to copy the records from S3
-
doing
RunAnywayTarget(self).done()
will not do validation -
doing
RunAnywayTarget(self).validation()
will do the validation and if successful also say we're done the task
- Takes the following args
target_cols
: a list of columns ordered for how you want your dataframe to be structureddf
: your dataframe you want restructured
- example: I my dataframe to have columns in this order
['one','two','three','four','five','six']
>>> from validation import OrderedDF
>>> import pandas as pd
>>> test = [[None,'',1,7,8],[None,'',2,5,6]]
>>> test = pd.DataFrame(test,columns=['one','two','four','five','three'])
>>> test
one two four five three
0 None 1 7 8
1 None 2 5 6
>>> result = OrderedDF(['one','two','three','four','five','six'],t)
>>> result.df
one two three four five six
0 None 8 1 7 None
1 None 6 2 5 None
- This class will fix tables for you
- Check for copy errors
- Handle the copy errors
- Add column(s) if needed
- Change dtype(s) if needed
- Get orig table's schema
- Craft new table's schema with changes from errors
- Make the change and retry the copy and remove duplicate * records
- While there are copy errors
-
handle the errors
-
attempt to fix
-
retry copy
-
remove duplicate * records
-
To run use
StructureDynamic(target_schema= ,# redshift schema your table is in
target_table= # your table
)
.run(
add_cols= ,# True or False for if you want columns added in attempting to fix
change_dtypes= ,# True or False if you want column data types changed in attempting to fix
copy= ,# copy command you attempted
load_start= # when you started the copy command, '%Y-%m-%d %H:%M:$S
)
- Example usage:
- sql prep: create the table
CREATE TABLE public.test(id INT, col VARCHAR);
INSERT INTO test VALUES (1,'2');
INSERT INTO test VALUES (2, 'two');
- test.csv: create the csv you want to attempt to copy
1,2
two,2
3,4
5,6
ab,test
- we attempt to copy normally but we get load errors because one of the columns isn't right
COPY public.test FROM 's3://luigi-godata/test.csv'
CREDENTIALS 'aws_access_key_id=XXXX;aws_secret_access_key=XXXX'
CSV DELIMITER ',' COMPUPDATE ON MAXERROR 0;
- we run ValidationDynamic
from validation import StructureDynamic
copy = '''COPY public.test FROM 's3://luigi-godata/test.csv'
CREDENTIALS 'aws_access_key_id=XXXX;aws_secret_access_key=XXXX'
CSV DELIMITER ',' COMPUPDATE ON MAXERROR 0;'''
StructureDynamic(target_schema='public',target_table='test').run(add_cols=True,change_dtypes=True,copy=copy,load_start='2016-10-6 10:15:00')
- our table is fixed and called
public.test
- our original table is kept as
public.test_orig_backup
- stdout lists the stl_load_errors
- the changes made to the table's ddl is printed to stdout