Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand the functionality of Local Runtime Filter #7891

Open
6 tasks
yibin87 opened this issue Aug 3, 2023 · 10 comments
Open
6 tasks

Expand the functionality of Local Runtime Filter #7891

yibin87 opened this issue Aug 3, 2023 · 10 comments
Assignees
Labels
type/enhancement The issue or PR belongs to an enhancement.

Comments

@yibin87
Copy link
Contributor

yibin87 commented Aug 3, 2023

Enhancement

Current Local Runtime Filter design can be found here. However, there is no noticable performance improvement for TPCH 100 benchmark. After doing some quick experiments, we confirmed that we can expand the current local runtime filter to acheive 10%+ performance improvements:

  1. Expand the scope of current local runtime filter to include broadcast hash join that across multiple tasks. It can reduce some exchange cost.
    For example, A Join (Agg_on_B), A is chosen as build table and broadcast to all TiFlash probe nodes. B first applies two-phase agg operator, and the agg's output is used as probe side. This case doesn't match current local runtime filter pattern, thus won't generate any runtime filters. By expanding the scope, we can push down the filter to TableScan of B, reducing exchange cost introduced by two-phase-agg.

  2. In TiFlash, push runtime filter down to storage layer using late materialization techs.
    In TiFlash, current runtime filters will take effect as RS operator in storage layer. RS operator uses min max index inside to implement RS-IN filter operation. And the filter effect of RS-IN filter is not good in TPCH cases even if the Real-IN filter has very good filter effect. For example: if one column "pack" contains 1024 integer values, from 1 to 1024, no duplicate values. And the pushed down IN filter contains {3, 10, 1023}, then for RS-IN filter, the total "pack" pass the filter, and no data is filtered; for Real-IN filter, only 3 values can pass the filter.

TiDB & TiPB

  • Update Runtime Filter tipb protocol to add 'apply_late_materialization' flag.
  • Recognize broadcast hash join across fragments as Runtime Filter Sources, and set 'apply_late_materialization' flag when exchange cost can be reduced.

TiFlash

  • Change Runtime Filter Manager from MPPTask level to Query level
  • Introduce new Function used as PlaceHolder in compilation time, and do nothing when Runtime Filter failed; do In filter when Runtime Filter successed
  • Build push down filters when 'apply_late_materialization'flag is set in Runtime Filter.
  • Endless/FullStack tests
@yibin87 yibin87 added the type/enhancement The issue or PR belongs to an enhancement. label Aug 3, 2023
@yibin87 yibin87 self-assigned this Aug 3, 2023
@Lloyd-Pottiger
Copy link
Contributor

For example: if one column "pack" contains 1024 integer values, from 1 to 1024, no duplicate values. And the pushed down IN filter contains {3, 10, 1023}, then for RS-IN filter, the total "pack" pass the filter, and no data is filtered; for Real-IN filter, only 3 values can pass the filter.

but even with late materialization, we also need to read this pack, which means can not help reduce IO.

@yibin87
Copy link
Contributor Author

yibin87 commented Aug 3, 2023

For example: if one column "pack" contains 1024 integer values, from 1 to 1024, no duplicate values. And the pushed down IN filter contains {3, 10, 1023}, then for RS-IN filter, the total "pack" pass the filter, and no data is filtered; for Real-IN filter, only 3 values can pass the filter.

but even with late materialization, we also need to read this pack, which means can not help reduce IO.

It might help reduce IO of other columns, if none of the filter_column pack pass the filter condition. Besides the major improvement will be the reduction of network communications for exchange.

@Lloyd-Pottiger
Copy link
Contributor

For example: if one column "pack" contains 1024 integer values, from 1 to 1024, no duplicate values. And the pushed down IN filter contains {3, 10, 1023}, then for RS-IN filter, the total "pack" pass the filter, and no data is filtered; for Real-IN filter, only 3 values can pass the filter.

but even with late materialization, we also need to read this pack, which means can not help reduce IO.

It might help reduce IO of other columns, if none of the filter_column pack pass the filter condition. Besides the major improvement will be the reduction of network communications for exchange.

make sense, then why we need apply_late_materialization ? Why not just always apply?

@yibin87
Copy link
Contributor Author

yibin87 commented Aug 3, 2023

For example: if one column "pack" contains 1024 integer values, from 1 to 1024, no duplicate values. And the pushed down IN filter contains {3, 10, 1023}, then for RS-IN filter, the total "pack" pass the filter, and no data is filtered; for Real-IN filter, only 3 values can pass the filter.

but even with late materialization, we also need to read this pack, which means can not help reduce IO.

It might help reduce IO of other columns, if none of the filter_column pack pass the filter condition. Besides the major improvement will be the reduction of network communications for exchange.

make sense, then why we need apply_late_materialization ? Why not just always apply?

For TPCH100, since the major improvement is achieved by reducing network communications for exchange, identified it with 'apply_late_materialization'. If no exchange in probe side, there seems little performance improvement.

@Lloyd-Pottiger
Copy link
Contributor

For TPCH100, since the major improvement is achieved by reducing network communications for exchange, identified it with 'apply_late_materialization'. If no exchange in probe side, there seems little performance improvement.

If there is a filter like regex_like on tablescan, it can help reduce computing regex_like?

@yibin87
Copy link
Contributor Author

yibin87 commented Aug 3, 2023

Not quite get your point. Using late materialization will improve performance in two ways:

  1. Reduce disk IO
  2. Reduce exchange overhead, since filter is pushed from join build node to table scan node, acrossing exchange nodes. And its effect is equal to add a new normal filter in compute layer before exchange node.

For TPCH100, the major improvement is introduced by the second one, and might introduce performance degration when no exhcange cost reduced. Thus introduce "apply_late_materialization" flag to indicate whether exchange cost can be reduced.

@Lloyd-Pottiger
Copy link
Contributor

mysql> explain analyze select * from a join b on a.id = b.id where regexp_like(a.p, '.*') and  regexp_like(b.p, '.*');
+--------------------------------------+----------+---------+--------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------+-----------+------+
| id                                   | estRows  | actRows | task         | access object | execution info                                                                                                                                                                                                                                                                                                 | operator info                                            | memory    | disk |
+--------------------------------------+----------+---------+--------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------+-----------+------+
| TableReader_37                       | 9990.00  | 0       | root         |               | time:15.1ms, loops:1, RU:0.000000, cop_task: {num: 1, max: 0s, proc_keys: 0, copr_cache_hit_ratio: 0.00}                                                                                                                                                                                                       | MppVersion: 2, data:ExchangeSender_36                    | 595 Bytes | N/A  |
| └─ExchangeSender_36                  | 9990.00  | 0       | mpp[tiflash] |               | tiflash_task:{time:13ms, loops:0, threads:36}                                                                                                                                                                                                                                                                  | ExchangeType: PassThrough                                | N/A       | N/A  |
|   └─HashJoin_35                      | 9990.00  | 0       | mpp[tiflash] |               | tiflash_task:{time:12ms, loops:0, threads:36}                                                                                                                                                                                                                                                                  | inner join, equal:[eq(test.a.id, test.b.id)]             | N/A       | N/A  |
|     ├─ExchangeReceiver_16(Build)     | 7992.00  | 0       | mpp[tiflash] |               | tiflash_task:{time:10ms, loops:0, threads:36}                                                                                                                                                                                                                                                                  |                                                          | N/A       | N/A  |
|     │ └─ExchangeSender_15            | 7992.00  | 0       | mpp[tiflash] |               | tiflash_task:{time:7.56ms, loops:0, threads:36}                                                                                                                                                                                                                                                                | ExchangeType: Broadcast, Compression: FAST               | N/A       | N/A  |
|     │   └─Selection_14               | 7992.00  | 0       | mpp[tiflash] |               | tiflash_task:{time:6.56ms, loops:0, threads:36}                                                                                                                                                                                                                                                                | not(isnull(test.a.id)), regexp_like(test.a.p, ".*")      | N/A       | N/A  |
|     │     └─TableFullScan_13         | 10000.00 | 0       | mpp[tiflash] | table:a       | tiflash_task:{time:6.56ms, loops:0, threads:36}, tiflash_scan:{dtfile:{total_scanned_packs:0, total_skipped_packs:0, total_scanned_rows:0, total_skipped_rows:0, total_rs_index_load_time: 0ms, total_read_time: 0ms}, total_create_snapshot_time: 0ms, total_local_region_num: 1, total_remote_region_num: 0} | pushed down filter:empty, keep order:false, stats:pseudo | N/A       | N/A  |
|     └─Selection_18(Probe)            | 7992.00  | 0       | mpp[tiflash] |               | tiflash_task:{time:10ms, loops:0, threads:36}                                                                                                                                                                                                                                                                  | not(isnull(test.b.id)), regexp_like(test.b.p, ".*")      | N/A       | N/A  |
|       └─TableFullScan_17             | 10000.00 | 0       | mpp[tiflash] | table:b       | tiflash_task:{time:10ms, loops:0, threads:36}, tiflash_scan:{dtfile:{total_scanned_packs:0, total_skipped_packs:0, total_scanned_rows:0, total_skipped_rows:0, total_rs_index_load_time: 0ms, total_read_time: 0ms}, total_create_snapshot_time: 0ms, total_local_region_num: 1, total_remote_region_num: 0}   | pushed down filter:empty, keep order:false, stats:pseudo | N/A       | N/A  |
+--------------------------------------+----------+---------+--------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------+-----------+------+
9 rows in set (0.02 sec)

late materialization may help reduce the cost of Selection_18 due to less rows needs to compute regexp_like.

@yibin87
Copy link
Contributor Author

yibin87 commented Aug 3, 2023

I see, you're right, it acts more like adding a filter above TableScan operator.

@Lloyd-Pottiger
Copy link
Contributor

Yes, and since runtime filter usually can filter out many rows, and it is light weight in, so maybe always apply late materialization is acceptable.

@yibin87
Copy link
Contributor Author

yibin87 commented Aug 3, 2023

Yes, and since runtime filter usually can filter out many rows, and it is light weight in, so maybe always apply late materialization is acceptable.

If there are no heavy operators between table scan and join, late materialization seems degrade performance a little. And we can simply treat all exchange node as heavy operator, and we can take "heavy filter function" into consideration later if we have bandwith:).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The issue or PR belongs to an enhancement.
Projects
None yet
Development

No branches or pull requests

2 participants