-
Notifications
You must be signed in to change notification settings - Fork 1
/
dag_strategy_dynamic.py
67 lines (63 loc) · 2.38 KB
/
dag_strategy_dynamic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#https://bigdata-etl.com/apache-airflow-create-dynamic-dag/
from airflow.models import DAG
from datetime import datetime, timedelta
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import date, timedelta, datetime
import json
import pandas as pd
import boto3
def create_dag(dag_id,
schedule,
default_args,
conf):
dag = DAG(dag_id, default_args=default_args, schedule_interval=schedule)
with dag:
init = BashOperator(
bash_command='echo START' ,
task_id='Init',
dag=dag
)
clear = BashOperator(
bash_command='echo STOPPING',
task_id='clear',
dag=dag
)
for i,row in df.iterrows():
command={}
command['--strat_name']=row['Strategy']
command['--mode']=str(row['Mode'])
command['--tickers']=row['Securities']
command['--broker_token']=row['Token']
command['--broker_account']=row['Account']
if row['Model ID']!="" or row["Strategy Parameters"]!="":
command['--strat_param']=("model_uri="+row['Model ID']+","+row["Strategy Parameters"]) if row["Strategy Parameters"]!="" else ("model_uri="+row['Model ID'])
final_commmand='python /usr/local/airflow/dags/q_pack/q_run/run_BT.py '+' '.join([(k+"="+v) for k, v in command.items() if v!=''])
tab = BashOperator(
bash_command=final_commmand,
task_id=(str(i)+"_"+row['Strategy']),
dag=dag
)
init >> tab >> clear
return dag
schedule = None #"@daily"
dag_id = "strategy_dynamic_DAG"
s3 = boto3.client('s3',endpoint_url="http://minio-image:9000",aws_access_key_id="minio-image",aws_secret_access_key="minio-image-pass")
Bucket="airflow-files"
Key="strategy.csv"
read_file = s3.get_object(Bucket=Bucket, Key=Key)
df = pd.read_csv(read_file['Body'],sep=',')
df.fillna('', inplace=True)
args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date':datetime(2019,1,1),
# 'start_date': datetime.now(),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'concurrency': 1,
'max_active_runs': 1
}
globals()[dag_id] = create_dag(dag_id, schedule, args, df)