Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark SQL metrics is incorrect when running a modified version of TPC-H Query 10 #1003

Open
Kontinuation opened this issue Oct 8, 2024 · 0 comments
Labels
bug Something isn't working
Milestone

Comments

@Kontinuation
Copy link
Member

Describe the bug

I am running a modified version of TPC-H query 10. I've removed the filters to stress comet and see how it behaves when processing large amount of data:

-- SQLBench-H query 10 derived from TPC-H query 10 under the terms of the TPC Fair Use Policy.
-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council.
select
	c_custkey,
	c_name,
	sum(l_extendedprice * (1 - l_discount)) as revenue,
	c_acctbal,
	n_name,
	c_address,
	c_phone,
	c_comment
from
	customer,
	orders,
	lineitem,
	nation
where
	c_custkey = o_custkey
	and l_orderkey = o_orderkey
	and c_nationkey = n_nationkey
group by
	c_custkey,
	c_name,
	c_acctbal,
	c_phone,
	n_name,
	c_address,
	c_comment
order by
	revenue desc limit 20;

The Spark SQL metrics page showed that the CometSortExec operators return 100 rows and 0 row, and no spill is triggered. The SMJ node returns 0 rows, which is certainly not the case.

image

I've enabled spark.comet.explain.native.enabled and saw that the native execution plan with metrics showed somewhat reasonable numbers:

24/10/08 13:34:59 INFO core/src/execution/jni_api.rs: Comet native query plan with metrics (stage: 13 task: 42):
AggregateExec: mode=Partial, gby=[col_0@0 as col_0, col_1@1 as col_1, col_4@4 as col_2, col_3@3 as col_3, col_8@8 as col_4, col_2@2 as col_5, col_5@5 as col_6], aggr=[sum], metrics=[output_rows=2553986, elapsed_compute=3.777709066s]
  ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_4@4 as col_3, col_5@5 as col_4, col_6@6 as col_5, col_7@7 as col_6, col_8@8 as col_7, col_1@10 as col_8], metrics=[output_rows=14996536, elapsed_compute=2.170829ms]
    ProjectionExec: expr=[col_0@2 as col_0, col_1@3 as col_1, col_2@4 as col_2, col_3@5 as col_3, col_4@6 as col_4, col_5@7 as col_5, col_6@8 as col_6, col_7@9 as col_7, col_8@10 as col_8, col_0@0 as col_0, col_1@1 as col_1], metrics=[output_rows=14996536, elapsed_compute=2.538593ms]
      HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col_0@0, col_3@3)], metrics=[output_rows=14996536, input_batches=1831, build_input_rows=25, build_input_batches=1, output_batches=1831, input_rows=14996536, build_mem_used=1392, build_time=53.125µs, join_time=856.613437ms]
        CopyExec [UnpackOrDeepCopy], metrics=[output_rows=25, elapsed_compute=5.209µs]
          ScanExec: source=[BroadcastExchange (unknown)], schema=[col_0: Int64, col_1: Utf8], metrics=[output_rows=25, elapsed_compute=1.168µs, cast_time=1ns]
        CopyExec [UnpackOrClone], metrics=[output_rows=14996536, elapsed_compute=2.237373ms]
          ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_3@3 as col_3, col_4@4 as col_4, col_5@5 as col_5, col_6@6 as col_6, col_1@9 as col_7, col_2@10 as col_8], metrics=[output_rows=14996536, elapsed_compute=2.037619ms]
            SortMergeJoin: join_type=Inner, on=[(col_7@7, col_0@0)], metrics=[output_rows=14996536, spill_count=0, spilled_bytes=0, spilled_rows=0, input_batches=2289, input_rows=18746334, output_batches=1831, peak_mem_used=918320, join_time=4.79088405s]
              SortExec: expr=[col_7@7 ASC], preserve_partitioning=[false], metrics=[output_rows=3749798, elapsed_compute=1.841784352s, spill_count=3, spilled_bytes=586947488, spilled_rows=3144844]
                CopyExec [UnpackOrDeepCopy], metrics=[output_rows=3749798, elapsed_compute=66.035032ms]
                  ScanExec: source=[Exchange (unknown)], schema=[col_0: Int64, col_1: Utf8, col_2: Utf8, col_3: Int64, col_4: Utf8, col_5: Decimal128(12, 2), col_6: Utf8, col_7: Int64], metrics=[output_rows=3749798, elapsed_compute=411.397µs, cast_time=1ns]
              SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false], metrics=[output_rows=14996536, elapsed_compute=2.318524006s, spill_count=4, spilled_bytes=590603456, spilled_rows=14752112]
                CopyExec [UnpackOrDeepCopy], metrics=[output_rows=14996536, elapsed_compute=32.209479ms]
                  ScanExec: source=[Exchange (unknown)], schema=[col_0: Int64, col_1: Decimal128(12, 2), col_2: Decimal128(12, 2)], metrics=[output_rows=14996536, elapsed_compute=472.861µs, cast_time=1ns]

The SortExec operator in one of the tasks produced 15 million rows, and spilled 3~4 times.

Steps to reproduce

Run the SQL query mentioned above on TPC-H data with scale factor = 10.

Expected behavior

The metrics shown on the SQL page should be consistent with the native datafusion metrics.

Additional context

This issue is produced using Spark 3.5 with master=local[4], the version of comet is a slightly modified version of 3413397

Here are relevant spark configurations:

spark.sql.extensions   org.apache.comet.CometSparkSessionExtensions
spark.comet.enabled  true
spark.comet.exec.enabled true
spark.comet.exec.all.enabled true
spark.comet.exec.shuffle.enabled true
spark.shuffle.manager  org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
spark.comet.exec.shuffle.mode native
spark.sql.adaptive.enabled false
spark.sql.adaptive.coalescePartitions.enabled false
spark.sql.shuffle.partitions 4
@Kontinuation Kontinuation added the bug Something isn't working label Oct 8, 2024
@Kontinuation Kontinuation changed the title Spark SQL metrics is incorrect when running some particular queries Spark SQL metrics is incorrect when running a modified version of TPC-H Query 10 Oct 8, 2024
@andygrove andygrove added this to the 0.4.0 milestone Oct 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants