Skip to content

Commit

Permalink
refactor pt 2 (#6)
Browse files Browse the repository at this point in the history
* refactor pt 2

* clear eda.ipynb
  • Loading branch information
lostmygithubaccount authored Aug 16, 2024
1 parent 2574447 commit 6e1fd3b
Show file tree
Hide file tree
Showing 21 changed files with 412 additions and 674 deletions.
4 changes: 0 additions & 4 deletions .dagster/dagster.yaml

This file was deleted.

37 changes: 29 additions & 8 deletions apps/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,26 @@

from datetime import datetime, timedelta

from icarus.investments.dag.resources import Catalog
from icarus.catalog import Catalog

catalog = Catalog()
# px.defaults.template = "plotly_dark"

buy_sell_t = catalog.table("gold_buy_sell")
social_media_t = catalog.table("gold_social_media")
# load data
catalog = Catalog()

# dark themes
# px.defaults.template = "plotly_dark"
ui.page_opts(theme=theme.sketchy)
buy_sell_t = catalog.table("buy_sell")
social_media_t = catalog.table("social_media")

# page options
ui.page_opts(
title="Icarus Investments",
# theme=theme.sketchy(),
fillable=False,
full_width=True,
)

# add page title and sidebar
with ui.sidebar(open="desktop"):
with ui.sidebar(open="always"):
ui.input_date_range(
"date_range",
"Date range",
Expand All @@ -50,6 +50,27 @@
def total_rows():
f"{buy_sell_t.count().to_pyarrow().as_py():,}"

with ui.card(full_screen=True):
"Rows by ticker"

@render_plotly
def bar_tickers():
t = (
buy_sell_data()
.group_by("ticker")
.agg(count=ibis._.count())
.order_by(ibis.desc("count"))
)

c = px.bar(
t,
x="ticker",
y="count",
color="ticker",
)

return c

with ui.card(full_screen=True):
"Some data"

Expand Down
242 changes: 34 additions & 208 deletions eda.ipynb

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
set dotenv-load

# variables
module := "icarus.investments.dag"
package := "icarus-cds"

# aliases
Expand Down
5 changes: 1 addition & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ packages = ["src/icarus"]

[project]
name = "icarus-cds"
version = "0.1.0"
version = "0.2.0"
authors = [{ name = "Cody", email = "cody@dkdc.dev" }]
description = "Python composable data stack solution accelerator"
readme = "readme.md"
Expand All @@ -30,9 +30,6 @@ dependencies = [
# data
'ibis-framework[duckdb,polars,deltalake]',
'Faker',
# orchestration
'dagster',
'dagster-webserver',
# visualization
'plotly',
'great-tables',
Expand Down
58 changes: 45 additions & 13 deletions src/icarus/catalog.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,57 @@
import os
import ibis

from icarus.config import DATA_DIR
from icarus.config import CLOUD, BUCKET, DATA_DIR


# define data catalog
class Catalog:
def __init__(self, data_dir=DATA_DIR):
self.data_dir = data_dir
# functions
def delta_table_path(table_name: str) -> str:
return os.path.join(DATA_DIR, f"{table_name}.delta")


def read_table(table_name: str) -> ibis.Table:
if CLOUD:
import gcsfs

fs = gcsfs.GCSFileSystem()
ibis.get_backend().register_filesystem(fs)

table_path = f"gs://{BUCKET}/{delta_table_path(table_name)}"
else:
table_path = delta_table_path(table_name)

return ibis.read_delta(table_path)


def list_groups(self):
def write_table(t: ibis.Table, table_name: str) -> None:
if CLOUD:
import gcsfs

fs = gcsfs.GCSFileSystem()
ibis.get_backend().register_filesystem(fs)

table_path = f"gs://{BUCKET}/{delta_table_path(table_name)}"
else:
table_path = delta_table_path(table_name)

t.to_delta(
table_path,
mode="overwrite",
partition_by=["extracted_at"],
)


# classes
class Catalog:
def list_tables(self):
return [
d
for d in os.listdir(self.data_dir)
for d in os.listdir(DATA_DIR)
if not (d.startswith("_") or d.startswith("."))
]

def list_tables(self, group):
return [d.split(".")[0] for d in os.listdir(os.path.join(self.data_dir, group))]
def table(self, table_name):
return read_table(table_name)

def table(self, table_name, group_name=None):
if group_name is None:
group_name = table_name.split("_")[0].upper()
return ibis.read_delta(f"{DATA_DIR}/{group_name}/{table_name}.delta")
def write_table(self, t, table_name):
write_table(t, table_name)
45 changes: 15 additions & 30 deletions src/icarus/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@
import typer
import subprocess

from icarus.synthetic_data import (
gen_buy_sell_batch,
gen_social_media_batch,
)
from icarus.config import (
DAG_MODULE,
DATA_DIR,
RAW_DATA_DIR,
BRONZE,
SILVER,
GOLD,
BUY_SELL_TABLE,
SOCIAL_MEDIA_TABLE,
)
from icarus.investments.run import main as run_main
from icarus.synthetic_data.investments import (
gen_buy_sell_batch,
gen_social_media_batch,
)

TYPER_KWARGS = {
Expand Down Expand Up @@ -41,8 +40,8 @@ def check_raw_data_exists() -> bool:

def check_data_lake_exists() -> bool:
# check that the data lake exists
for metal in [BRONZE, SILVER, GOLD]:
if not os.path.exists(os.path.join(DATA_DIR, metal)):
for table in [BUY_SELL_TABLE, SOCIAL_MEDIA_TABLE]:
if not os.path.exists(os.path.join(DATA_DIR, table)):
typer.echo("run `icarus run` first or use `--override`!")
return False
return True
Expand All @@ -65,21 +64,8 @@ def gen():


@app.command()
def gui():
"""Start the dagster webserver/GUI."""
cmd = f"dagster dev -m {DAG_MODULE}"
subprocess.call(cmd, shell=True)


@app.command()
@app.command("etl", hidden=True)
def run(
job_name: str = typer.Option(
"all_assets",
"--job-name",
"-j",
help="Name of the job to run",
show_default=True,
),
override: bool = typer.Option(
False, "--override", "-o", help="Override checks", show_default=True
),
Expand All @@ -90,9 +76,8 @@ def run(
if not override and not check_raw_data_exists():
return

# materialize all assets
cmd = f"dagster job execute -j {job_name} -m {DAG_MODULE}"
subprocess.call(cmd, shell=True)
# run the ETL job
run_main()


@app.command("app")
Expand All @@ -113,10 +98,10 @@ def clean_lake(
if not override and not check_data_lake_exists():
return

medals = [BRONZE, SILVER, GOLD]
tables = [BUY_SELL_TABLE, SOCIAL_MEDIA_TABLE]

for metal in medals:
cmd = f"rm -rf {os.path.join(DATA_DIR, metal)}/"
for table in tables:
cmd = f"rm -rf {os.path.join(DATA_DIR, table)}/"
typer.echo(f"running: {cmd}...")
subprocess.call(cmd, shell=True)

Expand Down
13 changes: 3 additions & 10 deletions src/icarus/config.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
# configuration file for the data DAG
DAG_MODULE = "icarus.investments.dag"

CLOUD = False
BUCKET = "icarus"
BUCKET = "icarus-datalake"

DATA_DIR = "datalake"
RAW_DATA_DIR = "_raw"
RAW_BUY_SELL_TABLE = "buy_sell"
RAW_SOCIAL_MEDIA_TABLE = "social_media"

BRONZE = "bronze"
SILVER = "silver"
GOLD = "gold"
BUY_SELL_TABLE = "buy_sell"
SOCIAL_MEDIA_TABLE = "social_media"
13 changes: 0 additions & 13 deletions src/icarus/investments/dag/__init__.py

This file was deleted.

18 changes: 0 additions & 18 deletions src/icarus/investments/dag/assets/__init__.py

This file was deleted.

55 changes: 0 additions & 55 deletions src/icarus/investments/dag/assets/bronze.py

This file was deleted.

14 changes: 0 additions & 14 deletions src/icarus/investments/dag/assets/gold.py

This file was deleted.

Loading

0 comments on commit 6e1fd3b

Please sign in to comment.