Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add examples from TPC-H #666

Merged
merged 30 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
27c60d9
Update location of docker image
timsaucer May 6, 2024
ebf3ae9
Initial commit for queries 1-3
timsaucer May 6, 2024
83138ea
Commit queries 4-7 of TPC-H in examples
timsaucer May 7, 2024
1a10c17
Add required license text
timsaucer May 7, 2024
15aa893
Add additional text around why to use a case statement in the example
timsaucer May 8, 2024
0060ae7
add market share example
timsaucer May 9, 2024
8fb4aa7
Add example for product type profit measure
timsaucer May 9, 2024
9ca5136
Inital commit returned item report
timsaucer May 9, 2024
99008bc
Linting
timsaucer May 9, 2024
5a04aee
Initial commit of q11 example
timsaucer May 10, 2024
c26c524
Initial commit of q12 from tpc-h
timsaucer May 10, 2024
11ffced
Initial commit for customer distribution example
timsaucer May 11, 2024
edb848b
Initial commit of promotion effect example
timsaucer May 11, 2024
0762336
Initial commit of q15 in tph-c, top supplier
timsaucer May 11, 2024
883a61b
Initial commit of q16 in tph-c, part supplier relationship
timsaucer May 11, 2024
396029e
Initial commit of q17 in tph-c, small quatity order
timsaucer May 11, 2024
6835f56
Initial commit of q18 in tph-c, large volume customer
timsaucer May 11, 2024
c9ca2ba
Initial commit of q19 in tph-c, discounted revenue
timsaucer May 11, 2024
7a20e39
Initial commit of q20 in tph-c, potential part promotion
timsaucer May 11, 2024
7497829
Initial commit of q21 in tph-c, supplier who kept order waiting
timsaucer May 12, 2024
33ecff3
Initial commit of q22 in tph-c, global sales opportunity
timsaucer May 12, 2024
99aac68
Adding readme information and marking text as copyrighted
timsaucer May 12, 2024
ebbe69f
Minimum part cost must be identified per part not across all parts th…
timsaucer May 13, 2024
487b62e
Change ordering of output rows to match spec
timsaucer May 13, 2024
61e027f
Set parameter to match spec
timsaucer May 13, 2024
acffc87
Set parameter to match spec
timsaucer May 13, 2024
fc9f845
setting values to match spec
timsaucer May 13, 2024
0b90a66
Linting
timsaucer May 13, 2024
36cc531
Expand on readme to link to examples within tpch folder
timsaucer May 13, 2024
59327df
Minor typo
timsaucer May 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions benchmarks/tpch/tpch-gen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ FILE=./data/supplier.tbl
if test -f "$FILE"; then
echo "$FILE exists."
else
docker run -v `pwd`/data:/data -it --rm ghcr.io/databloom-ai/tpch-docker:main -vf -s $1
docker run -v `pwd`/data:/data -it --rm ghcr.io/scalytics/tpch-docker:main -vf -s $1

# workaround for https://github.com/apache/arrow-datafusion/issues/6147
mv data/customer.tbl data/customer.csv
Expand All @@ -49,5 +49,5 @@ FILE=./data/answers/q1.out
if test -f "$FILE"; then
echo "$FILE exists."
else
docker run -v `pwd`/data:/data -it --entrypoint /bin/bash --rm ghcr.io/databloom-ai/tpch-docker:main -c "cp /opt/tpch/2.18.0_rc2/dbgen/answers/* /data/answers/"
fi
docker run -v `pwd`/data:/data -it --entrypoint /bin/bash --rm ghcr.io/scalytics/tpch-docker:main -c "cp /opt/tpch/2.18.0_rc2/dbgen/answers/* /data/answers/"
fi
64 changes: 64 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,67 @@ Here is a direct link to the file used in the examples:
- [Executing SQL on Polars](./sql-on-polars.py)
- [Executing SQL on Pandas](./sql-on-pandas.py)
- [Executing SQL on cuDF](./sql-on-cudf.py)

## TPC-H Examples

Within the subdirectory `tpch` there are 22 examples that reproduce queries in
the TPC-H specification. These include realistic data that can be generated at
arbitrary scale and allow the user to see use cases for a variety of data frame
operations.

In the list below we describe which new operations can be found in the examples.
The queries are designed to be of increasing complexity, so it is recommended to
review them in order. For brevity, the following list does not include operations
found in previous examples.

- [Convert CSV to Parquet](./tpch/convert_data_to_parquet.py)
- Read from a CSV files where the delimiter is something other than a comma
- Specify schema during CVS reading
- Write to a parquet file
- [Pricing Summary Report](./tpch/q01_pricing_summary_report.py)
- Aggregation computing the maximum value, average, sum, and number of entries
- Filter data by date and interval
- Sorting
- [Minimum Cost Supplier](./tpch/q02_minimum_cost_supplier.py)
- Window operation to find minimum
- Sorting in descending order
- [Shipping Priority](./tpch/q03_shipping_priority.py)
- [Order Priority Checking](./tpch/q04_order_priority_checking.py)
- Aggregating multiple times in one data frame
- [Local Supplier Volume](./tpch/q05_local_supplier_volume.py)
- [Forecasting Revenue Change](./tpch/q06_forecasting_revenue_change.py)
- Using collect and extracting values as a python object
- [Volume Shipping](./tpch/q07_volume_shipping.py)
- Finding multiple distinct and mutually exclusive values within one dataframe
- Using `case` and `when` statements
- [Market Share](./tpch/q08_market_share.py)
- The operations in this query are similar to those in the prior examples, but
it is a more complex example of using filters, joins, and aggregates
- Using left outer joins
- [Product Type Profit Measure](./tpch/q09_product_type_profit_measure.py)
- Extract year from a date
- [Returned Item Reporting](./tpch/q10_returned_item_reporting.py)
- [Important Stock Identification](./tpch/q11_important_stock_identification.py)
- [Shipping Modes and Order](./tpch/q12_ship_mode_order_priority.py)
- Finding non-null values using a boolean operation in a filter
- Case statement with default value
- [Customer Distribution](./tpch/q13_customer_distribution.py)
- [Promotion Effect](./tpch/q14_promotion_effect.py)
- [Top Supplier](./tpch/q15_top_supplier.py)
- [Parts/Supplier Relationship](./tpch/q16_part_supplier_relationship.py)
- Using anti joins
- Using regular expressions (regex)
- Creating arrays of literal values
- Determine if an element exists within an array
- [Small-Quantity-Order Revenue](./tpch/q17_small_quantity_order.py)
- [Large Volume Customer](./tpch/q18_large_volume_customer.py)
- [Discounted Revenue](./tpch/q19_discounted_revenue.py)
- Creating a user defined function (UDF)
- Convert pyarrow Array to python values
- Filtering based on a UDF
- [Potential Part Promotion](./tpch/q20_potential_part_promotion.py)
- Extracting part of a string using substr
- [Suppliers Who Kept Orders Waiting](./tpch/q21_suppliers_kept_orders_waiting.py)
- Using array aggregation
- Determining the size of array elements
- [Global Sales Opportunity](./tpch/q22_global_sales_opportunity.py)
2 changes: 2 additions & 0 deletions examples/tpch/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
data

57 changes: 57 additions & 0 deletions examples/tpch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# DataFusion Python Examples for TPC-H

These examples reproduce the problems listed in the Transaction Process Council
TPC-H benchmark. The purpose of these examples is to demonstrate how to use
different aspects of Data Fusion and not necessarily geared towards creating the
most performant queries possible. Within each example is a description of the
problem. For users who are familiar with SQL style commands, you can compare the
approaches in these examples with those listed in the specification.

- https://www.tpc.org/tpch/

The examples provided are based on version 2.18.0 of the TPC-H specification.

## Data Setup

To run these examples, you must first generate a dataset. The `dbgen` tool
provided by TPC can create datasets of arbitrary scale. For testing it is
typically sufficient to create a 1 gigabyte dataset. For convenience, this
repository has a script which uses docker to create this dataset. From the
`benchmarks/tpch` directory execute the following script.

```bash
./tpch-gen.sh 1
```

The examples provided use parquet files for the tables generated by `dbgen`.
A python script is provided to convert the text files from `dbgen` into parquet
files expected by the examples. From the `examples/tpch` directory you can
execute the following command to create the necessary parquet files.

```bash
python convert_data_to_parquet.py
```

## Description of Examples

For easier access, a description of the techniques demonstrated in each file
is in the README.md file in the `examples` directory.
142 changes: 142 additions & 0 deletions examples/tpch/convert_data_to_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
This is a utility function that will consumer the data generated by dbgen from TPC-H and convert
it into a parquet file with the column names as expected by the TPC-H specification. It assumes
the data generated resides in a path ../../benchmarks/tpch/data relative to the current file,
as will be generated by the script provided in this repository.
"""

import os
import pyarrow
import datafusion

ctx = datafusion.SessionContext()

all_schemas = {}

all_schemas["customer"] = [
("C_CUSTKEY", pyarrow.int32()),
("C_NAME", pyarrow.string()),
("C_ADDRESS", pyarrow.string()),
("C_NATIONKEY", pyarrow.int32()),
("C_PHONE", pyarrow.string()),
("C_ACCTBAL", pyarrow.float32()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not important for example usage, but the numeric fields should be decimal not float

("C_MKTSEGMENT", pyarrow.string()),
("C_COMMENT", pyarrow.string()),
]

all_schemas["lineitem"] = [
("L_ORDERKEY", pyarrow.int32()),
("L_PARTKEY", pyarrow.int32()),
("L_SUPPKEY", pyarrow.int32()),
("L_LINENUMBER", pyarrow.int32()),
("L_QUANTITY", pyarrow.float32()),
("L_EXTENDEDPRICE", pyarrow.float32()),
("L_DISCOUNT", pyarrow.float32()),
("L_TAX", pyarrow.float32()),
("L_RETURNFLAG", pyarrow.string()),
("L_LINESTATUS", pyarrow.string()),
("L_SHIPDATE", pyarrow.date32()),
("L_COMMITDATE", pyarrow.date32()),
("L_RECEIPTDATE", pyarrow.date32()),
("L_SHIPINSTRUCT", pyarrow.string()),
("L_SHIPMODE", pyarrow.string()),
("L_COMMENT", pyarrow.string()),
]

all_schemas["nation"] = [
("N_NATIONKEY", pyarrow.int32()),
("N_NAME", pyarrow.string()),
("N_REGIONKEY", pyarrow.int32()),
("N_COMMENT", pyarrow.string()),
]

all_schemas["orders"] = [
("O_ORDERKEY", pyarrow.int32()),
("O_CUSTKEY", pyarrow.int32()),
("O_ORDERSTATUS", pyarrow.string()),
("O_TOTALPRICE", pyarrow.float32()),
("O_ORDERDATE", pyarrow.date32()),
("O_ORDERPRIORITY", pyarrow.string()),
("O_CLERK", pyarrow.string()),
("O_SHIPPRIORITY", pyarrow.int32()),
("O_COMMENT", pyarrow.string()),
]

all_schemas["part"] = [
("P_PARTKEY", pyarrow.int32()),
("P_NAME", pyarrow.string()),
("P_MFGR", pyarrow.string()),
("P_BRAND", pyarrow.string()),
("P_TYPE", pyarrow.string()),
("P_SIZE", pyarrow.int32()),
("P_CONTAINER", pyarrow.string()),
("P_RETAILPRICE", pyarrow.float32()),
("P_COMMENT", pyarrow.string()),
]

all_schemas["partsupp"] = [
("PS_PARTKEY", pyarrow.int32()),
("PS_SUPPKEY", pyarrow.int32()),
("PS_AVAILQTY", pyarrow.int32()),
("PS_SUPPLYCOST", pyarrow.float32()),
("PS_COMMENT", pyarrow.string()),
]

all_schemas["region"] = [
("r_REGIONKEY", pyarrow.int32()),
("r_NAME", pyarrow.string()),
("r_COMMENT", pyarrow.string()),
]

all_schemas["supplier"] = [
("S_SUPPKEY", pyarrow.int32()),
("S_NAME", pyarrow.string()),
("S_ADDRESS", pyarrow.string()),
("S_NATIONKEY", pyarrow.int32()),
("S_PHONE", pyarrow.string()),
("S_ACCTBAL", pyarrow.float32()),
("S_COMMENT", pyarrow.string()),
]

curr_dir = os.path.dirname(os.path.abspath(__file__))
for filename, curr_schema in all_schemas.items():

# For convenience, go ahead and convert the schema column names to lowercase
curr_schema = [(s[0].lower(), s[1]) for s in curr_schema]

# Pre-collect the output columns so we can ignore the null field we add
# in to handle the trailing | in the file
output_cols = [r[0] for r in curr_schema]

# Trailing | requires extra field for in processing
curr_schema.append(("some_null", pyarrow.null()))

schema = pyarrow.schema(curr_schema)

source_file = os.path.abspath(
os.path.join(curr_dir, f"../../benchmarks/tpch/data/{filename}.csv")
)
dest_file = os.path.abspath(os.path.join(curr_dir, f"./data/{filename}.parquet"))

df = ctx.read_csv(source_file, schema=schema, has_header=False, delimiter="|")

df = df.select_columns(*output_cols)

df.write_parquet(dest_file, compression="snappy")
90 changes: 90 additions & 0 deletions examples/tpch/q01_pricing_summary_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
TPC-H Problem Statement Query 1:

The Pricing Summary Report Query provides a summary pricing report for all lineitems shipped as of
a given date. The date is within 60 - 120 days of the greatest ship date contained in the database.
The query lists totals for extended price, discounted extended price, discounted extended price
plus tax, average quantity, average extended price, and average discount. These aggregates are
grouped by RETURNFLAG and LINESTATUS, and listed in ascending order of RETURNFLAG and LINESTATUS.
A count of the number of lineitems in each group is included.

The above problem statement text is copyrighted by the Transaction Processing Performance Council
as part of their TPC Benchmark H Specification revision 2.18.0.
"""

import pyarrow as pa
from datafusion import SessionContext, col, lit, functions as F

ctx = SessionContext()

df = ctx.read_parquet("data/lineitem.parquet")

# It may be that the date can be hard coded, based on examples shown.
# This approach will work with any date range in the provided data set.

greatest_ship_date = df.aggregate(
[], [F.max(col("l_shipdate")).alias("shipdate")]
).collect()[0]["shipdate"][0]

# From the given problem, this is how close to the last date in the database we
# want to report results for. It should be between 60-120 days before the end.
DAYS_BEFORE_FINAL = 68

# Note: this is a hack on setting the values. It should be set differently once
# https://github.com/apache/datafusion-python/issues/665 is resolved.
timsaucer marked this conversation as resolved.
Show resolved Hide resolved
interval = pa.scalar((0, 0, DAYS_BEFORE_FINAL), type=pa.month_day_nano_interval())

print("Final date in database:", greatest_ship_date)

# Filter data to the dates of interest
df = df.filter(col("l_shipdate") <= lit(greatest_ship_date) - lit(interval))

# Aggregate the results

df = df.aggregate(
[col("l_returnflag"), col("l_linestatus")],
[
F.sum(col("l_quantity")).alias("sum_qty"),
F.sum(col("l_extendedprice")).alias("sum_base_price"),
F.sum(col("l_extendedprice") * (lit(1.0) - col("l_discount"))).alias(
"sum_disc_price"
),
F.sum(
col("l_extendedprice")
* (lit(1.0) - col("l_discount"))
* (lit(1.0) + col("l_tax"))
).alias("sum_charge"),
F.avg(col("l_quantity")).alias("avg_qty"),
F.avg(col("l_extendedprice")).alias("avg_price"),
F.avg(col("l_discount")).alias("avg_disc"),
F.count(col("l_returnflag")).alias(
"count_order"
), # Counting any column should return same result
],
)

# Sort per the expected result

df = df.sort(col("l_returnflag").sort(), col("l_linestatus").sort())

# Note: There appears to be a discrepancy between what is returned here and what is in the generated
# answers file for the case of return flag N and line status O, but I did not investigate further.

df.show()
Loading
Loading