Skip to content

Commit

Permalink
bench2 updates and other stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
lostmygithubaccount committed Aug 24, 2024
1 parent 9cd6ad2 commit 0ce9a04
Show file tree
Hide file tree
Showing 3 changed files with 339 additions and 11 deletions.
319 changes: 319 additions & 0 deletions eda.qmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
::: {.column-screen-inset}

# August 2024 -- ad hoc analysis

Work in progress. DataFusion and Polars have both had new releases. DataFusion via Ibis now completes on all queries.

::: {.callout-important title="Versions used"}

Versions used in this analysis:

- `ibis-framework @ git+https://github.com/ibis-project/ibis`
- `duckdb==1.0.0`
- `datafusion==40.1.0`
- `polars==1.5.0`

:::

::: {.callout-tip title="Show me the analysis code" collapse="true"}

```{python}
import ibis
import gcsfs
import ibis.selectors as s
import plotly.express as px
ibis.options.interactive = True
ibis.options.repr.interactive.max_rows = 40
ibis.options.repr.interactive.max_length = 22
ibis.options.repr.interactive.max_columns = None
px.defaults.template = "plotly_dark"
# fs = gcsfs.GCSFileSystem()
# ibis.get_backend().register_filesystem(fs)
```

```{python}
def get_t(floats=False):
t = (
ibis.read_json("bench_logs_v2/raw_json/file_id=*.json")
.mutate(
timestamp=ibis._["timestamp"].cast("timestamp"),
instance_type=ibis.literal("MacBook Pro (2023 Apple M2 Max 96GB)"),
)
.filter(ibis._["floats"] == floats)
.distinct()
)
return t
```

```{python}
def get_sfs(t):
sfs = sorted(t.distinct(on="sf")["sf"].to_pyarrow().to_pylist())
return sfs
```

```{python}
def get_systems(t):
systems = sorted(t.distinct(on="system")["system"].to_pyarrow().to_pylist())
return systems
```

```{python}
def get_instance_types(t):
instance_types = sorted(
t.distinct(on="instance_type")["instance_type"].to_pyarrow().to_pylist(),
key=lambda x: (x.split("-")[0], int(x.split("-")[-1])) if "-" in x else (x, 0),
)
return instance_types
```

```{python}
def get_query_numbers(t):
query_numbers = sorted(
t.distinct(on="query_number")["query_number"].to_pyarrow().to_pylist()
)
return query_numbers
```

```{python}
def get_failing_queries(t):
fail = t.group_by("system", "sf", "floats").agg(
present_queries=ibis._["query_number"].collect().unique().sort()
)
fail = (
fail.mutate(
failing_queries=t.distinct(on="query_number")["query_number"]
.collect()
.filter(lambda x: ~fail["present_queries"].contains(x))
.sort()
)
.mutate(num_failing_queries=ibis._["failing_queries"].length())
.drop("present_queries")
.order_by(ibis.desc("sf"), "system")
)
return fail
```

```{python}
def get_agg(t):
agg = (
t.filter(t["sf"] >= 1)
# .filter((t["system"].contains("duckdb")) | (t["system"].contains("datafusion")))
.group_by("instance_type", "system", "sf", "n_partitions", "query_number")
.agg(
mean_execution_seconds=t["execution_seconds"].mean(),
)
.order_by(
ibis.asc("instance_type"),
ibis.desc("sf"),
ibis.asc("n_partitions"),
ibis.asc("query_number"),
ibis.desc("system"),
ibis.asc("mean_execution_seconds"),
)
)
return agg
```

```{python}
def get_totals(t):
totals = (
agg.filter(agg["sf"] >= 1)
.group_by("system", "sf")
.agg(total_execution_seconds=agg["mean_execution_seconds"].sum())
.order_by(
ibis.desc("sf"), ibis.desc("system"), ibis.desc("total_execution_seconds")
)
)
return totals
```

```{python}
def get_category_orders(t):
category_orders = {
"query_number": sorted(
agg.select("query_number").distinct().to_pandas()["query_number"].tolist()
),
"system": sorted(
agg.select("system").distinct().to_pandas()["system"].tolist()
),
"instance_type": sorted(
agg.select("instance_type")
.distinct()
.to_pandas()["instance_type"]
.tolist(),
key=lambda x: (x.split("-")[0], int(x.split("-")[-1]))
if "-" in x
else (x, 0),
),
}
return category_orders
```

```{python}
def totals_line(totals, log_y=False):
px.line(
totals.mutate(sf=ibis._["sf"].log2()),
x="sf",
y="total_execution_seconds",
color="system",
hover_name="system",
markers=True,
log_y=log_y,
title="total execution time by scale factor",
labels={"sf": "log2(sf)"},
category_orders=category_orders,
).show()
```

```{python}
def queries_bar(agg, sfs, category_orders, log_y=True):
for sf in sorted(sfs):
c = px.bar(
agg.filter(agg["sf"] == sf).filter(
agg["instance_type"].lower().contains("macbook")
),
x="query_number",
y="mean_execution_seconds",
color="system",
barmode="group",
log_y=log_y,
# pattern_shape="instance_type",
category_orders=category_orders,
title=f"TPC-H scale factor {sf} (~{sf} GB in memory; ~{sf*2//5}GB on disk in Parquet) on MacBook Pro (2021 Apple M1 Max 32GB)",
)
c.update_layout(
legend=dict(orientation="h", yanchor="top", y=1.02, xanchor="right", x=1)
)
c.show()
print()
```

:::

## Decimals (original TPC-H data)

::: {.callout-warning}
Polars fails on 9/22 queries because [you can't round on decimals yet](https://github.com/pola-rs/polars/issues/15151).

[We also run with decimals converted to floats to account for this and get a comparison](#floats-tpc-h-data-with-decimals-casted-to-floats).
:::


```{python}
t = get_t()
t.head()
```

```{python}
sfs = get_sfs(t)
sfs
```

```{python}
systems = get_systems(t)
systems
```

```{python}
instance_types = get_instance_types(t)
instance_types
```

```{python}
query_numbers = get_query_numbers(t)
query_numbers
```

```{python}
fail = get_failing_queries(t)
fail
```

```{python}
agg = get_agg(t)
agg
```

```{python}
totals = get_totals(t)
totals
```

```{python}
category_orders = get_category_orders(t)
```

```{python}
totals_line(totals)
```

```{python}
totals_line(totals, log_y=True)
```

```{python}
queries_bar(agg, sfs, category_orders, log_y=False)
```

## Floats (TPC-H data with decimals casted to floats)

```{python}
t = get_t(floats=True)
t.head()
```

```{python}
sfs = get_sfs(t)
sfs
```

```{python}
systems = get_systems(t)
systems
```

```{python}
instance_types = get_instance_types(t)
instance_types
```

```{python}
query_numbers = get_query_numbers(t)
query_numbers
```

```{python}
fail = get_failing_queries(t)
fail
```

```{python}
agg = get_agg(t)
agg
```

```{python}
totals = get_totals(t)
totals
```

```{python}
category_orders = get_category_orders(t)
```

```{python}
totals_line(totals)
```

```{python}
totals_line(totals, log_y=True)
```

```{python}
queries_bar(agg, sfs, category_orders)
```

:::
8 changes: 6 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ allow-direct-references = true

[project]
name = "ibis-bench"
version = "0.2.0"
version = "0.3.0"
authors = [{ name = "Cody", email = "cody@dkdc.dev" }]
description = "benchmarking with Ibis"
readme = "README.md"
Expand All @@ -19,7 +19,11 @@ classifiers = [
]
dependencies = [
# data stuff
'ibis-framework[duckdb,datafusion,polars] == 9.3.0', # @ git+https://github.com/ibis-project/ibis',
#'ibis-framework[duckdb,datafusion,polars] == 9.3.0', # @ git+https://github.com/ibis-project/ibis',
'ibis-framework @ git+https://github.com/ibis-project/ibis',
'duckdb==1.0.0',
'datafusion==40.1.0',
'polars==1.5.0',
# other stuff
'typer',
'python-dotenv',
Expand Down
23 changes: 14 additions & 9 deletions src/ibis_bench/cli2.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
64,
128,
]
DEFAULT_N_PARTITIONS = [1]
DEFAULT_QS = range(1, 23)

TYPER_KWARGS = {
Expand All @@ -43,6 +44,9 @@ def run(
scale_factors: list[int] = typer.Option(
DEFAULT_SFS, "--scale-factor", "-s", help="scale factors"
),
n_partitions: list[int] = typer.Option(
DEFAULT_N_PARTITIONS, "--n-partitions", "-n", help="number of partitions"
),
queries: list[int] = typer.Option(
DEFAULT_QS, "--query-number", "-q", help="query numbers"
),
Expand All @@ -61,17 +65,18 @@ def run(
"""
for _ in range(repeat):
for sf in scale_factors:
for system in systems:
for q in queries:
cmd = f"bench run {system} -s {sf} -q {q} -i '{instance_type}'"
cmd += " --csv" if use_csv else ""
for n in n_partitions:
for system in systems:
for q in queries:
cmd = f"bench tpch run {system} -s {sf} -n {n} -q {q} -i '{instance_type}'"
cmd += " --csv" if use_csv else ""

log.info(f"running: {cmd}")
res = subprocess.run(cmd, shell=True)
if res.returncode != 0:
log.info(f"failed to run: {cmd}")
log.info(f"running: {cmd}")
res = subprocess.run(cmd, shell=True)
if res.returncode != 0:
log.info(f"failed to run: {cmd}")

log.info(f"finished running: {cmd}")
log.info(f"finished running: {cmd}")


@app.command()
Expand Down

0 comments on commit 0ce9a04

Please sign in to comment.