Daglib is a lightweight, embeddable parallel task execution library used for turning pure Python functions into executable task graphs.
Core
pip install daglib
With visualizations enabled
pip install 'daglib[graphviz]' # static visualizations
# or
pip install 'daglib[ipycytoscape]' # interactive visulizations
import daglib
dag = daglib.Dag()
@dag.task()
def task_1a():
return "Hello"
@dag.task()
def task_1b():
return "world!"
@dag.task()
def task_2(task_1a, task_1b):
return f"{task_1a}, {task_1b}"
dag.run()
'Hello, world!'
For a more involved example, we will create a small pipeline that takes data from four source tables and creates a single reporting table. The data is driver-level information from the current 2022 Formula 1 season. The output will be a pivot table for team-level metrics.
- Team - Team of driver
- Points - Current total Driver's World Championship points for each driver for the season
- Wins - Current number of wins for each driver for the season
- Podiums - Current number of times the driver finished in the top 3 for the season
import pandas as pd
import daglib
# Ignore. Used to render the DataFrame correctly in the README
pd.set_option("display.notebook_repr_html", False)
dag = daglib.Dag()
@dag.task()
def team():
return pd.DataFrame(dict(
driver=["Max", "Charles", "Lewis", "Sergio", "Carlos", "George"],
team=["Red Bull", "Ferrari", "Mercedes", "Red Bull", "Ferrari", "Mercedes"],
)).set_index("driver")
@dag.task()
def points():
return pd.DataFrame(dict(
driver=["Max", "Charles", "Lewis", "Sergio", "Carlos", "George"],
points=[258, 178, 146, 173, 156, 158]
)).set_index("driver")
@dag.task()
def wins():
return pd.DataFrame(dict(
driver=["Max", "Charles", "Lewis", "Sergio", "Carlos", "George"],
wins=[8, 3, 0, 1, 1, 0]
)).set_index("driver")
@dag.task()
def podiums():
return pd.DataFrame(dict(
driver=["Max", "Charles", "Lewis", "Sergio", "Carlos", "George"],
podiums=[10, 5, 6, 6, 6, 5]
)).set_index("driver")
@dag.task()
def driver_metrics(team, points, wins, podiums):
return team.join(points).join(wins).join(podiums)
@dag.task()
def team_metrics(driver_metrics):
return driver_metrics.groupby("team").sum().sort_values("points", ascending=False)
dag.run()
points wins podiums
team
Red Bull 431 9 16
Ferrari 334 4 11
Mercedes 304 0 11
The DAG we created above will create a task graph that looks like the following