-
Notifications
You must be signed in to change notification settings - Fork 0
/
airbnb_postgres_dbt_cosmos.py
73 lines (66 loc) · 2.13 KB
/
airbnb_postgres_dbt_cosmos.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import os
import logging
import requests
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from psycopg2.extras import execute_values
from airflow import AirflowException
from airflow import DAG
from airflow.decorators import dag
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from cosmos import DbtTaskGroup, DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping
from cosmos.constants import ExecutionMode
from airflow.settings import AIRFLOW_HOME
dag_default_args = {
'owner': 'airflow',
'start_date': datetime.now() - timedelta(days=2),
'email': [],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'depends_on_past': False,
'wait_for_downstream': False,
}
DBT_PROJECT_PATH = f"{AIRFLOW_HOME}/dags/dbt/airbnb_postgres"
# The path where Cosmos will find the dbt executable
# in the virtual environment created in the Dockerfile
DBT_EXECUTABLE_PATH = f"{AIRFLOW_HOME}/dbt_venv/bin/dbt"
POSTGRES_CONN_ID = "postgres"
profile_config = ProfileConfig(
profile_name="bde",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id=POSTGRES_CONN_ID,
profile_args={
"schema": "airbnb_raw",
"host": "localhost",
"dbname": "airflow",
"user": "airflow",
"password": "airflow",
"port":5432
},
),
)
execution_config = ExecutionConfig(
dbt_executable_path=DBT_EXECUTABLE_PATH,
)
my_cosmos_dag = DbtDag(
project_config=ProjectConfig(
DBT_PROJECT_PATH
),
profile_config=profile_config,
execution_config=ExecutionConfig(
dbt_executable_path=DBT_EXECUTABLE_PATH,
execution_mode=ExecutionMode.VIRTUALENV
),
schedule_interval=None,
start_date=datetime(2023, 1, 1),
catchup=False,
dag_id="airbnb_postgres_dbt_cosmos",
)