Skip to content

Commit

Permalink
minor updates
Browse files Browse the repository at this point in the history
  • Loading branch information
lostmygithubaccount committed Aug 25, 2024
1 parent 5815658 commit 0b981ba
Show file tree
Hide file tree
Showing 14 changed files with 430 additions and 55 deletions.
10 changes: 7 additions & 3 deletions .github/workflows/cicd.yaml → .github/workflows/docs.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: cicd
name: docs

permissions:
contents: write
Expand All @@ -11,8 +11,9 @@ on:
paths:
- 'justfile'
- 'website/**'
- 'pyproject.toml'
- 'dev-requirements.txt'
- '.github/workflows/cicd.yaml'
- '.github/workflows/docs.yaml'

jobs:
deploy:
Expand All @@ -31,8 +32,11 @@ jobs:

- uses: quarto-dev/quarto-actions/setup@v2

- name: setup uv
run: curl -LsSf https://astral.sh/uv/install.sh | sh

- name: install requirements
run: pip install uv && just setup
run: just setup

- name: build site
run: |
Expand Down
130 changes: 126 additions & 4 deletions eda.ipynb

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ packages = ["src/icarus"]

[project]
name = "icarus-cds"
version = "0.2.0"
version = "0.3.0"
authors = [{ name = "Cody", email = "cody@dkdc.dev" }]
description = "Python composable data stack solution accelerator"
readme = "readme.md"
Expand All @@ -28,7 +28,7 @@ dependencies = [
# cloud
'gcsfs',
# data
'ibis-framework[duckdb,polars,deltalake]',
'ibis-framework[duckdb,polars,deltalake,examples]',
'Faker',
# visualization
'plotly',
Expand Down
6 changes: 6 additions & 0 deletions src/icarus/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ def delta_table_path(table_name: str) -> str:
def read_table(table_name: str) -> ibis.Table:
if CLOUD:
import gcsfs
import warnings

warnings.filterwarnings("ignore")

fs = gcsfs.GCSFileSystem()
ibis.get_backend().register_filesystem(fs)
Expand All @@ -31,6 +34,9 @@ def read_table(table_name: str) -> ibis.Table:
def write_table(t: ibis.Table, table_name: str) -> None:
if CLOUD:
import gcsfs
import warnings

warnings.filterwarnings("ignore")

fs = gcsfs.GCSFileSystem()
ibis.get_backend().register_filesystem(fs)
Expand Down
33 changes: 27 additions & 6 deletions src/icarus/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
from icarus.config import (
DATA_DIR,
RAW_DATA_DIR,
PENGUINS_TABLE,
BUY_SELL_TABLE,
SOCIAL_MEDIA_TABLE,
)
from icarus.catalog import delta_table_filename
from icarus.investments.run import main as run_main
from icarus.penguins.run import main as penguins_run_main
from icarus.investments.run import main as investments_run_main
from icarus.synthetic_data.investments import (
gen_buy_sell_batch,
gen_social_media_batch,
Expand All @@ -25,13 +27,16 @@

# typer apps
app = typer.Typer(help="Icarus: soaring beyond limits.", **TYPER_KWARGS)
run_app = typer.Typer(help="Run the ETL job.", **TYPER_KWARGS)
clean_app = typer.Typer(help="Clean the data lake.", **TYPER_KWARGS)

# add subcommands
app.add_typer(clean_app, name="clean")
app.add_typer(run_app, name="run")

# add subcommand aliases
app.add_typer(clean_app, name="c", hidden=True)
app.add_typer(run_app, name="r", hidden=True)


# helper functions
Expand Down Expand Up @@ -71,9 +76,25 @@ def gen():
typer.echo(f"error: {e}")


@app.command()
@app.command("etl", hidden=True)
def run(
@run_app.command()
def penguins(
override: bool = typer.Option(
False, "--override", "-o", help="Override checks", show_default=True
),
):
"""Run ETL."""

# ensure raw data exists
if not override and not check_raw_data_exists():
return

# run the ETL job
typer.echo("running ETL job...")
penguins_run_main()


@run_app.command()
def investments(
override: bool = typer.Option(
False, "--override", "-o", help="Override checks", show_default=True
),
Expand All @@ -86,7 +107,7 @@ def run(

# run the ETL job
typer.echo("running ETL job...")
run_main()
investments_run_main()


@app.command("app")
Expand All @@ -108,7 +129,7 @@ def clean_lake(
if not override and not check_data_lake_exists():
return

tables = [BUY_SELL_TABLE, SOCIAL_MEDIA_TABLE]
tables = [PENGUINS_TABLE, BUY_SELL_TABLE, SOCIAL_MEDIA_TABLE]
tables = [delta_table_filename(table) for table in tables]

for table in tables:
Expand Down
1 change: 1 addition & 0 deletions src/icarus/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@

DATA_DIR = "datalake"
RAW_DATA_DIR = "_raw"
PENGUINS_TABLE = "penguins"
BUY_SELL_TABLE = "buy_sell"
SOCIAL_MEDIA_TABLE = "social_media"
16 changes: 0 additions & 16 deletions src/icarus/investments/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,10 @@ def main():
extract_buy_sell_t = extract_buy_sell()
extract_social_media_t = extract_social_media()

# data validation
assert (
extract_buy_sell_t.count().to_pyarrow().as_py() > 0
), "No extracted buy/sell data"
assert (
extract_social_media_t.count().to_pyarrow().as_py() > 0
), "No extracted social media data"

# transform
transform_buy_sell_t = transform_buy_sell(extract_buy_sell_t)
transform_social_media_t = transform_social_media(extract_social_media_t)

# data validation
assert (
transform_buy_sell_t.count().to_pyarrow().as_py() > 0
), "No transformed buy/sell data"
assert (
transform_social_media_t.count().to_pyarrow().as_py() > 0
), "No transformed social media data"

# load
catalog.write_table(transform_buy_sell_t, BUY_SELL_TABLE)
catalog.write_table(transform_social_media_t, SOCIAL_MEDIA_TABLE)
10 changes: 4 additions & 6 deletions src/icarus/investments/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,23 @@ def postprocess(t: ibis.Table) -> ibis.Table:


# data assets
def buy_sell(bronze_buy_sell):
def buy_sell(t: ibis.Table) -> ibis.Table:
"""Transform ticker buy/sell data."""

def transform(t):
t = t.mutate(t["buy_sell"].unnest()).unpack("buy_sell")
return t

buy_sell = bronze_buy_sell.pipe(preprocess).pipe(transform).pipe(postprocess)
buy_sell = t.pipe(preprocess).pipe(transform).pipe(postprocess)
return buy_sell


def social_media(bronze_social_media):
def social_media(t: ibis.Table) -> ibis.Table:
"""Transform ticker social media data."""

def transform(t):
t = t.unpack("social_media_post")
return t

social_media = (
bronze_social_media.pipe(preprocess).pipe(transform).pipe(postprocess)
)
social_media = t.pipe(preprocess).pipe(transform).pipe(postprocess)
return social_media
Empty file added src/icarus/penguins/__init__.py
Empty file.
32 changes: 32 additions & 0 deletions src/icarus/penguins/extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# imports
import ibis

from datetime import datetime

# set extracted_at timestamp
# note we don't use ibis.now() to ensure it's the same...
# ...for all tables/rows on a given run
extracted_at = datetime.utcnow().isoformat()


# functions
def add_extracted_at(t: ibis.Table) -> ibis.Table:
"""Add extracted_at column to table"""

# add extracted_at column and relocate it to the first position
t = t.mutate(extracted_at=ibis.literal(extracted_at)).relocate("extracted_at")

return t


# data assets
def penguins() -> ibis.Table:
"""Extract penguins data"""

# read in raw data
penguins = ibis.examples.penguins.fetch()

# add extracted_at column
penguins = penguins.pipe(add_extracted_at)

return penguins
22 changes: 22 additions & 0 deletions src/icarus/penguins/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# imports
from icarus.config import PENGUINS_TABLE
from icarus.catalog import Catalog
from icarus.penguins.extract import (
penguins as extract_penguins,
)
from icarus.penguins.transform import penguins as transform_penguins


# functions
def main():
# instantiate catalog
catalog = Catalog()

# extract
extract_penguins_t = extract_penguins()

# transform
transform_penguins_t = transform_penguins(extract_penguins_t)

# load
catalog.write_table(transform_penguins_t, PENGUINS_TABLE)
33 changes: 33 additions & 0 deletions src/icarus/penguins/transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# imports
import ibis
import ibis.selectors as s


# functions
def preprocess(t: ibis.Table) -> ibis.Table:
"""Common preprocessing steps"""

# ensure unique records
t = t.distinct(on=~s.c("extracted_at"), keep="first").order_by("extracted_at")

return t


def postprocess(t: ibis.Table) -> ibis.Table:
"""Common postprocessing steps"""

# ensure consistent column casing
t = t.rename("snake_case")

return t


# data assets
def penguins(t: ibis.Table) -> ibis.Table:
"""Transform penguins data."""

def transform(t):
return t

penguins = t.pipe(preprocess).pipe(transform).pipe(postprocess)
return penguins
Loading

0 comments on commit 0b981ba

Please sign in to comment.