Skip to content

Commit

Permalink
Included basic csv upload in airflow pipeline so that users can use t…
Browse files Browse the repository at this point in the history
…he demo setup with their own data if they don't have it stored in a DB (such as riders)
  • Loading branch information
JanPeterDatakind committed Mar 10, 2023
1 parent 566d3b2 commit f74d18c
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 0 deletions.
27 changes: 27 additions & 0 deletions docker/airflow/dags/run_dot_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,23 @@
from sqlalchemy import create_engine


def get_connection(target_conn_in):
connection = BaseHook.get_connection(target_conn_in)
connection_string = (
"postgresql://"
+ str(connection.login)
+ ":"
+ str(connection.password)
+ "@"
+ str(connection.host)
+ ":"
+ str(connection.port)
+ "/"
+ target_conn_in
)
return connection_string


def get_object(
object_name_in,
earliest_date_to_sync,
Expand Down Expand Up @@ -269,6 +286,7 @@ def sync_object(
object_name_in, target_conn_in, data, column_list, type_list, source_conn_in
)


def drop_tables_in_dot_tests_schema(target_conn_in, schema_to_drop_from):
"""
We are syncing new data where new columns and columns types might change.
Expand Down Expand Up @@ -303,6 +321,7 @@ def drop_tables_in_dot_tests_schema(target_conn_in, schema_to_drop_from):
cur.execute(query1)
cur.execute(query2)


def run_dot_app(project_id_in):
"""
Method to run the DOT.
Expand Down Expand Up @@ -350,6 +369,14 @@ def default_config():

af_tasks = []

af_tasks.append(
BashOperator(
task_id=f"import_local_files",
dag=dag,
bash_command=f"cd /app/dot && python upload_local_files.py --connection_string {get_connection(target_conn)}",
)
)

for project in config["dot_projects"]:

"""
Expand Down
47 changes: 47 additions & 0 deletions dot/upload_local_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import os
import pandas as pd
import re
from sqlalchemy import create_engine
import argparse

def read_csv_file(file_path):
print(f"Reading csv {file_path} ...")
df = pd.read_csv(file_path)
return df

def get_postgres_engine(connection_string):
print(connection_string)
engine = create_engine(connection_string)
return engine

def get_table_name(file_name):
# Strip non-alphanumeric characters from the filename
table_name = re.sub(r'\W+', '_', os.path.splitext(file_name)[0]).lower()
return table_name

parser = argparse.ArgumentParser(description="Specify arguments")
parser.add_argument(
"--connection_string",
action="store",
required=True,
help="Connection to airflow target_conn",
)

connection_string = parser.parse_args().connection_string

engine = get_postgres_engine(connection_string)

for file_name in os.listdir("./local_files"):
# Check if the file is an Excel file
if file_name.endswith(".csv"):
print(f"Saving {file_name} to the database ...")
# Create a database engine

with engine.connect() as conn:
# Drop the table if it already exists
table_name = get_table_name(file_name)
engine.execute(f'DROP TABLE IF EXISTS {table_name} CASCADE;')
# Read the file and save it to the database
df = read_csv_file(f"./local_files/{file_name}")
#ToDo: populate schema from target_conn instead of hard coded!
df.to_sql(name=table_name, con=engine, schema='public', if_exists='replace', index=False)

0 comments on commit f74d18c

Please sign in to comment.