Skip to content
This repository has been archived by the owner on Jul 3, 2023. It is now read-only.

Demonstrates duckdb adapters + data loaders #195

Merged
merged 1 commit into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions examples/data_loaders/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Data Loaders
elijahbenizzy marked this conversation as resolved.
Show resolved Hide resolved

Among multiple uses, Hamilton excels at building maintainable, scalable representations of ETLs.
If you've read through the other guides, it should be pretty clear how hamilton enables transformations (the T in ETL/ELT).
In this example, we'll talk about an approach to _Extracting_ data, and how Hamilton enables you to build out extracts,
in a scalable, pluggable way. For example, being able to switch where data is loaded between development and production
is useful, since you might only want a subsample in development, or even load it from a different source.
Here we'll show you how you can achieve this without cluttering your code with `if else`,
which will make your dataflow easier to maintain in the long run.

The goal is to show you two things:

1. How to load data from various sources
2. How to switch the sources of data you're loading from by swapping out modules, using[polymorphism](https://en.wikipedia.org/wiki/Polymorphism_(computer_science))
elijahbenizzy marked this conversation as resolved.
Show resolved Hide resolved

As such, we have three data loaders to use:

1. [load_data_mock.py](load_data_mock.py): generates mock data on the fly. Meant to represent a unit-testing/quick iteration scenario.
2. [load_data_csv.py](load_data_csv.py): Uses CSV data. Meant to represent more ad-hoc research.
3. [load_data_duckdb.py](load_data_duckdb.py) Uses a duckdb database (saved locally). Meant to represent more production-ready dataflows,
as well as demonstrate the ease of working with duckdb.

But this is hardly exclusive, or exhaustive. One can easily imagine loading from snowflake, your custom datawarehouse, hdfs, etc...
All by swapping out the data loaders.

# The data

The data comes pregenerated in (test_data)[test_data], in both `.csv` and `.duckdb` format.

# Loading/Analyzing the data

To load/analyze the data, you can run the script `run.py`

- `python run.py csv` reads from the `.csv` files and runs all the variables
- `python run.py duckdb` reads from the `duckdb` database and runs all the variables
- `python run.py mock` creates mock data and runs the pipeline

Note that you, as the user, have to manually handle connections/whatnot for duckdb.
We are currently designing the ability to do this natively in hamilton: https://github.com/stitchfix/hamilton/issues/197.
15 changes: 15 additions & 0 deletions examples/data_loaders/load_data_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import os

import pandas as pd


def spend(db_path: str) -> pd.DataFrame:
return pd.read_csv(os.path.join(db_path, "marketing_spend.csv"))


def churn(db_path: str) -> pd.DataFrame:
return pd.read_csv(os.path.join(db_path, "marketing_spend.csv"))


def signups(db_path: str) -> pd.DataFrame:
return pd.read_csv(os.path.join(db_path, "marketing_spend.csv"))
18 changes: 18 additions & 0 deletions examples/data_loaders/load_data_duckdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import duckdb
import pandas as pd


def connection(db_path: str) -> duckdb.DuckDBPyConnection:
return duckdb.connect(database=db_path)


def spend(connection: duckdb.DuckDBPyConnection) -> pd.DataFrame:
return connection.execute("select * from marketing_spend").fetchdf()
elijahbenizzy marked this conversation as resolved.
Show resolved Hide resolved


def churn(connection: duckdb.DuckDBPyConnection) -> pd.DataFrame:
return connection.execute("select * from churn").fetchdf()


def signups(connection: duckdb.DuckDBPyConnection) -> pd.DataFrame:
return connection.execute("select * from signups").fetchdf()
160 changes: 160 additions & 0 deletions examples/data_loaders/load_data_mock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import os

import numpy as np
import pandas as pd


def spend() -> pd.DataFrame:
data = np.array(
[
(
"2022-08-03T00:00:00.000000000",
104052.98074001,
115300.21226012,
69384.46649019,
49474.45580366,
12851.6540992,
1498.5114764,
"2022-08-03T00:00:00.000000000",
),
(
"2022-08-04T00:00:00.000000000",
103234.15793884,
115326.0151612,
71113.31018247,
52513.19734904,
12344.42778548,
1033.79398268,
"2022-08-04T00:00:00.000000000",
),
(
"2022-08-05T00:00:00.000000000",
101816.40188563,
115194.04661767,
71367.20874633,
51795.51413309,
11536.41253561,
2101.46146166,
"2022-08-05T00:00:00.000000000",
),
(
"2022-08-06T00:00:00.000000000",
102263.53043232,
115601.2888751,
71474.76280964,
52861.22158421,
11652.28867968,
1046.83170946,
"2022-08-06T00:00:00.000000000",
),
(
"2022-08-07T00:00:00.000000000",
103271.09660695,
115306.96341012,
71888.99025677,
50742.70043588,
11160.23631976,
2521.31311947,
"2022-08-07T00:00:00.000000000",
),
(
"2022-08-08T00:00:00.000000000",
100775.86701231,
116634.88666304,
71603.50462531,
52361.08798097,
12869.33161266,
3269.57027156,
"2022-08-08T00:00:00.000000000",
),
(
"2022-08-09T00:00:00.000000000",
101527.74726883,
114868.8422755,
70260.81680881,
49647.9754876,
13187.07115589,
2134.71274923,
"2022-08-09T00:00:00.000000000",
),
(
"2022-08-10T00:00:00.000000000",
101150.73295175,
114941.32547639,
68802.02668922,
49590.55466274,
13129.31334755,
3328.0293293,
"2022-08-10T00:00:00.000000000",
),
(
"2022-08-11T00:00:00.000000000",
100317.64365959,
115682.20050942,
67735.95105252,
50621.23723767,
14019.11780391,
2360.4382216,
"2022-08-11T00:00:00.000000000",
),
(
"2022-08-12T00:00:00.000000000",
102024.067597,
116770.81592363,
66244.22984364,
49503.73825509,
14533.2726457,
1868.18205207,
"2022-08-12T00:00:00.000000000",
),
],
dtype=[
("index", "<M8[ns]"),
("facebook", "<f8"),
("twitter", "<f8"),
("tv", "<f8"),
("youtube", "<f8"),
("radio", "<f8"),
("billboards", "<f8"),
("date", "<M8[ns]"),
],
)
return pd.DataFrame.from_records(data)


def churn() -> pd.DataFrame:
data = np.array(
[
("2022-08-03T00:00:00.000000000", 160, 53, "2022-08-03T00:00:00.000000000"),
("2022-08-04T00:00:00.000000000", 162, 54, "2022-08-04T00:00:00.000000000"),
("2022-08-05T00:00:00.000000000", 162, 50, "2022-08-05T00:00:00.000000000"),
("2022-08-06T00:00:00.000000000", 161, 53, "2022-08-06T00:00:00.000000000"),
("2022-08-07T00:00:00.000000000", 160, 49, "2022-08-07T00:00:00.000000000"),
("2022-08-08T00:00:00.000000000", 160, 52, "2022-08-08T00:00:00.000000000"),
("2022-08-09T00:00:00.000000000", 161, 53, "2022-08-09T00:00:00.000000000"),
("2022-08-10T00:00:00.000000000", 160, 57, "2022-08-10T00:00:00.000000000"),
("2022-08-11T00:00:00.000000000", 156, 56, "2022-08-11T00:00:00.000000000"),
("2022-08-12T00:00:00.000000000", 148, 58, "2022-08-12T00:00:00.000000000"),
],
dtype=[("index", "<M8[ns]"), ("womens", "<i8"), ("mens", "<i8"), ("date", "<M8[ns]")],
)
return pd.DataFrame.from_records(data)


def signups() -> pd.DataFrame:
data = np.array(
[
("2022-08-03T00:00:00.000000000", 2184, 429, "2022-08-03T00:00:00.000000000"),
("2022-08-04T00:00:00.000000000", 2164, 461, "2022-08-04T00:00:00.000000000"),
("2022-08-05T00:00:00.000000000", 2159, 454, "2022-08-05T00:00:00.000000000"),
("2022-08-06T00:00:00.000000000", 2157, 449, "2022-08-06T00:00:00.000000000"),
("2022-08-07T00:00:00.000000000", 2121, 478, "2022-08-07T00:00:00.000000000"),
("2022-08-08T00:00:00.000000000", 2151, 517, "2022-08-08T00:00:00.000000000"),
("2022-08-09T00:00:00.000000000", 2133, 541, "2022-08-09T00:00:00.000000000"),
("2022-08-10T00:00:00.000000000", 2160, 565, "2022-08-10T00:00:00.000000000"),
("2022-08-11T00:00:00.000000000", 2135, 609, "2022-08-11T00:00:00.000000000"),
("2022-08-12T00:00:00.000000000", 2116, 633, "2022-08-12T00:00:00.000000000"),
],
dtype=[("index", "<M8[ns]"), ("womens", "<i8"), ("mens", "<i8"), ("date", "<M8[ns]")],
)
return pd.DataFrame.from_records(data)
78 changes: 78 additions & 0 deletions examples/data_loaders/prep_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import pandas as pd

from hamilton.function_modifiers import does, extract_columns, parameterize, source, value


def _sum_series(**series):
return sum(series.values())


@extract_columns(
"facebook_spend",
"twitter_spend",
"tv_spend",
"youtube_spend",
"radio_spend",
"billboards_spend",
"womens_churn",
"mens_churn",
"womens_signups",
"mens_signups",
)
def joined_data(spend: pd.DataFrame, signups: pd.DataFrame, churn: pd.DataFrame) -> pd.DataFrame:
spend = spend.set_index("date").rename(columns=lambda col: col + "_spend")
churn = churn.set_index("date").rename(columns=lambda col: col + "_churn")
signups = signups.set_index("date").rename(columns=lambda col: col + "_signups")
return pd.concat([spend, churn, signups], axis=1)


@does(_sum_series)
def total_marketing_spend(
facebook_spend: pd.Series,
twitter_spend: pd.Series,
tv_spend: pd.Series,
youtube_spend: pd.Series,
radio_spend: pd.Series,
billboards_spend: pd.Series,
) -> pd.Series:
pass


@does(_sum_series)
def total_signups(mens_signups: pd.Series, womens_signups: pd.Series) -> pd.Series:
pass


@does(_sum_series)
def total_churn(mens_churn: pd.Series, womens_churn: pd.Series) -> pd.Series:
pass


def total_customers(total_signups: pd.Series, total_churn: pd.Series) -> pd.Series:
customer_deltas = total_signups + total_churn
return customer_deltas.cumsum()


def acquisition_cost(total_marketing_spend: pd.Series, total_signups: pd.Series) -> pd.Series:
return total_marketing_spend / total_signups


@parameterize(
twitter_spend_smoothed={"lookback_days": value(7), "spend": source("twitter_spend")},
facebook_spend_smoothed={"lookback_days": value(7), "spend": source("facebook_spend")},
radio_spend_smoothed={"lookback_days": value(21), "spend": source("radio_spend")},
tv_spend_smoothed={"lookback_days": value(21), "spend": source("tv_spend")},
billboards_spend_smoothed={"lookback_days": value(7), "spend": source("billboards_spend")},
youtube_spend_smoothed={"lookback_days": value(7), "spend": source("twitter_spend")},
)
def spend_smoothed(lookback_days: int, spend: pd.Series) -> pd.Series:
"""{spend} smoothed by {lookback_days}. Might want to smooth different ad spends differently,
figuring that it takes different amounts of time to get to the customer. A cheap hack at determining
auto-correlation of a series -- this should be a parameter in a model,
but this is to demonstrate the framework

:param lookback_days: Days to smooth over
:param spend: Spend source
:return:
"""
return spend.rolling(window=lookback_days).mean().fillna(0)
2 changes: 2 additions & 0 deletions examples/data_loaders/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
click
duckdb==0.5.0
49 changes: 49 additions & 0 deletions examples/data_loaders/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import click
import load_data_mock
import prep_data

import hamilton.driver
from examples.data_loaders import load_data_csv, load_data_duckdb


@click.group()
def main():
pass


VARS = [
"total_signups",
"total_churn",
"total_marketing_spend",
"acquisition_cost",
"twitter_spend_smoothed",
"facebook_spend_smoothed",
"radio_spend_smoothed",
"tv_spend_smoothed",
"billboards_spend_smoothed",
"youtube_spend_smoothed",
]


@main.command()
def duckdb():
driver = hamilton.driver.Driver(
{"db_path": "./test_data/database.duckdb"}, load_data_duckdb, prep_data
)
print(driver.execute(VARS))


@main.command()
def csv():
driver = hamilton.driver.Driver({"db_path": "test_data"}, load_data_csv, prep_data)
print(driver.execute(VARS))


@main.command()
def mock():
driver = hamilton.driver.Driver({}, load_data_mock, prep_data)
print(driver.execute(VARS))


if __name__ == "__main__":
main()
Loading