From 0d47a5e85cd37ba45901ea8b76b3e8ef8fcecdd1 Mon Sep 17 00:00:00 2001 From: dsdashun Date: Tue, 31 Jan 2023 13:45:55 +0800 Subject: [PATCH 1/5] lightning: enable setting conflict max-error (#40874) ref pingcap/tidb#40743 --- br/pkg/lightning/config/config.go | 60 +++++++-- br/pkg/lightning/config/config_test.go | 121 ++++++++++++++++++ br/pkg/lightning/errormanager/errormanager.go | 31 +++-- .../data/mytest.testtbl-schema.sql | 5 + .../data/mytest.testtbl.csv | 16 +++ .../err_config.toml | 8 ++ .../normal_config.toml | 8 ++ .../normal_config_old_style.toml | 8 ++ br/tests/lightning_config_max_error/run.sh | 81 ++++++++++++ 9 files changed, 319 insertions(+), 19 deletions(-) create mode 100644 br/tests/lightning_config_max_error/data/mytest.testtbl-schema.sql create mode 100644 br/tests/lightning_config_max_error/data/mytest.testtbl.csv create mode 100644 br/tests/lightning_config_max_error/err_config.toml create mode 100644 br/tests/lightning_config_max_error/normal_config.toml create mode 100644 br/tests/lightning_config_max_error/normal_config_old_style.toml create mode 100755 br/tests/lightning_config_max_error/run.sh diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index b5236127f5ee1..d14f12066c0f4 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -343,32 +343,64 @@ type MaxError struct { // In TiDB backend, this also includes all possible SQL errors raised from INSERT, // such as unique key conflict when `on-duplicate` is set to `error`. // When tolerated, the row causing the error will be skipped, and adds 1 to the counter. + // The default value is zero, which means that such errors are not tolerated. Type atomic.Int64 `toml:"type" json:"type"` // Conflict is the maximum number of unique key conflicts in local backend accepted. // When tolerated, every pair of conflict adds 1 to the counter. // Those pairs will NOT be deleted from the target. Conflict resolution is performed separately. - // TODO Currently this is hard-coded to infinity. - Conflict atomic.Int64 `toml:"conflict" json:"-"` + // The default value is max int64, which means conflict errors will be recorded as much as possible. + // Sometime the actual number of conflict record logged will be greater than the value configured here, + // because conflict error data are recorded batch by batch. + // If the limit is reached in a single batch, the entire batch of records will be persisted before an error is reported. + Conflict atomic.Int64 `toml:"conflict" json:"conflict"` } func (cfg *MaxError) UnmarshalTOML(v interface{}) error { + defaultValMap := map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 0, + "conflict": math.MaxInt64, + } + // set default value first + cfg.Syntax.Store(defaultValMap["syntax"]) + cfg.Charset.Store(defaultValMap["charset"]) + cfg.Type.Store(defaultValMap["type"]) + cfg.Conflict.Store(defaultValMap["conflict"]) switch val := v.(type) { case int64: // ignore val that is smaller than 0 - if val < 0 { - val = 0 + if val >= 0 { + // only set type error + cfg.Type.Store(val) } - cfg.Syntax.Store(0) - cfg.Charset.Store(math.MaxInt64) - cfg.Type.Store(val) - cfg.Conflict.Store(math.MaxInt64) return nil case map[string]interface{}: - // TODO support stuff like `max-error = { charset = 1000, type = 1000 }` if proved useful. + // support stuff like `max-error = { charset = 1000, type = 1000 }`. + getVal := func(k string, v interface{}) int64 { + defaultVal, ok := defaultValMap[k] + if !ok { + return 0 + } + iVal, ok := v.(int64) + if !ok || iVal < 0 { + return defaultVal + } + return iVal + } + for k, v := range val { + switch k { + case "type": + cfg.Type.Store(getVal(k, v)) + case "conflict": + cfg.Conflict.Store(getVal(k, v)) + } + } + return nil default: + return errors.Errorf("invalid max-error '%v', should be an integer or a map of string:int64", v) } - return errors.Errorf("invalid max-error '%v', should be an integer", v) } // DuplicateResolutionAlgorithm is the config type of how to resolve duplicates. @@ -805,8 +837,16 @@ func (cfg *Config) LoadFromTOML(data []byte) error { unusedGlobalKeyStrs[key.String()] = struct{}{} } +iterateUnusedKeys: for _, key := range unusedConfigKeys { keyStr := key.String() + switch keyStr { + // these keys are not counted as decoded by toml decoder, but actually they are decoded, + // because the corresponding unmarshal logic handles these key's decoding in a custom way + case "lightning.max-error.type", + "lightning.max-error.conflict": + continue iterateUnusedKeys + } if _, found := unusedGlobalKeyStrs[keyStr]; found { bothUnused = append(bothUnused, keyStr) } else { diff --git a/br/pkg/lightning/config/config_test.go b/br/pkg/lightning/config/config_test.go index 16db98845e80c..f590391740ec4 100644 --- a/br/pkg/lightning/config/config_test.go +++ b/br/pkg/lightning/config/config_test.go @@ -19,6 +19,7 @@ import ( "context" "flag" "fmt" + "math" "net" "net/http" "net/http/httptest" @@ -561,6 +562,126 @@ func TestDurationUnmarshal(t *testing.T) { require.Regexp(t, "time: unknown unit .?x.? in duration .?13x20s.?", err.Error()) } +func TestMaxErrorUnmarshal(t *testing.T) { + type testCase struct { + TOMLStr string + ExpectedValues map[string]int64 + ExpectErrStr string + CaseName string + } + for _, tc := range []*testCase{ + { + TOMLStr: `max-error = 123`, + ExpectedValues: map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 123, + "conflict": math.MaxInt64, + }, + CaseName: "Normal_Int", + }, + { + TOMLStr: `max-error = -123`, + ExpectedValues: map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 0, + "conflict": math.MaxInt64, + }, + CaseName: "Abnormal_Negative_Int", + }, + { + TOMLStr: `max-error = "abcde"`, + ExpectErrStr: "invalid max-error 'abcde', should be an integer or a map of string:int64", + CaseName: "Abnormal_String", + }, + { + TOMLStr: `[max-error] +syntax = 1 +charset = 2 +type = 3 +conflict = 4 +`, + ExpectedValues: map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 3, + "conflict": 4, + }, + CaseName: "Normal_Map_All_Set", + }, + { + TOMLStr: `[max-error] +conflict = 1000 +`, + ExpectedValues: map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 0, + "conflict": 1000, + }, + CaseName: "Normal_Map_Partial_Set", + }, + { + TOMLStr: `max-error = { conflict = 1000, type = 123 }`, + ExpectedValues: map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 123, + "conflict": 1000, + }, + CaseName: "Normal_OneLineMap_Partial_Set", + }, + { + TOMLStr: `[max-error] +conflict = 1000 +not_exist = 123 +`, + ExpectedValues: map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 0, + "conflict": 1000, + }, + CaseName: "Normal_Map_Partial_Set_Invalid_Key", + }, + { + TOMLStr: `[max-error] +conflict = 1000 +type = -123 +`, + ExpectedValues: map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 0, + "conflict": 1000, + }, + CaseName: "Normal_Map_Partial_Set_Invalid_Value", + }, + { + TOMLStr: `[max-error] +conflict = 1000 +type = abc +`, + ExpectErrStr: `toml: line 3 (last key "max-error.type"): expected value but found "abc" instead`, + CaseName: "Normal_Map_Partial_Set_Invalid_ValueType", + }, + } { + targetLightningCfg := new(config.Lightning) + err := toml.Unmarshal([]byte(tc.TOMLStr), targetLightningCfg) + if len(tc.ExpectErrStr) > 0 { + require.Errorf(t, err, "test case: %s", tc.CaseName) + require.Equalf(t, tc.ExpectErrStr, err.Error(), "test case: %s", tc.CaseName) + } else { + require.NoErrorf(t, err, "test case: %s", tc.CaseName) + require.Equalf(t, tc.ExpectedValues["syntax"], targetLightningCfg.MaxError.Syntax.Load(), "test case: %s", tc.CaseName) + require.Equalf(t, tc.ExpectedValues["charset"], targetLightningCfg.MaxError.Charset.Load(), "test case: %s", tc.CaseName) + require.Equalf(t, tc.ExpectedValues["type"], targetLightningCfg.MaxError.Type.Load(), "test case: %s", tc.CaseName) + require.Equalf(t, tc.ExpectedValues["conflict"], targetLightningCfg.MaxError.Conflict.Load(), "test case: %s", tc.CaseName) + } + } +} + func TestDurationMarshalJSON(t *testing.T) { duration := config.Duration{} err := duration.UnmarshalText([]byte("13m20s")) diff --git a/br/pkg/lightning/errormanager/errormanager.go b/br/pkg/lightning/errormanager/errormanager.go index 373ba572779d4..4085226063d38 100644 --- a/br/pkg/lightning/errormanager/errormanager.go +++ b/br/pkg/lightning/errormanager/errormanager.go @@ -194,7 +194,8 @@ func (em *ErrorManager) RecordTypeError( if em.remainingError.Type.Dec() < 0 { threshold := em.configError.Type.Load() if threshold > 0 { - encodeErr = errors.Annotatef(encodeErr, "meet errors exceed the max-error.type threshold '%d'", + encodeErr = errors.Annotatef(encodeErr, + "The number of type errors exceeds the threshold configured by `max-error.type`: '%d'", em.configError.Type.Load()) } return encodeErr @@ -241,17 +242,20 @@ func (em *ErrorManager) RecordDataConflictError( tableName string, conflictInfos []DataConflictInfo, ) error { + var gerr error if len(conflictInfos) == 0 { return nil } if em.remainingError.Conflict.Sub(int64(len(conflictInfos))) < 0 { threshold := em.configError.Conflict.Load() - return errors.Errorf(" meet errors exceed the max-error.conflict threshold '%d'", threshold) + // Still need to record this batch of conflict records, and then return this error at last. + // Otherwise, if the max-error.conflict is set a very small value, non of the conflict errors will be recorded + gerr = errors.Errorf("The number of conflict errors exceeds the threshold configured by `max-error.conflict`: '%d'", threshold) } if em.db == nil { - return nil + return gerr } exec := common.SQLWithRetry{ @@ -259,7 +263,7 @@ func (em *ErrorManager) RecordDataConflictError( Logger: logger, HideQueryLog: redact.NeedRedact(), } - return exec.Transact(ctx, "insert data conflict error record", func(c context.Context, txn *sql.Tx) error { + if err := exec.Transact(ctx, "insert data conflict error record", func(c context.Context, txn *sql.Tx) error { sb := &strings.Builder{} fmt.Fprintf(sb, insertIntoConflictErrorData, em.schemaEscaped) var sqlArgs []interface{} @@ -279,7 +283,10 @@ func (em *ErrorManager) RecordDataConflictError( } _, err := txn.ExecContext(c, sb.String(), sqlArgs...) return err - }) + }); err != nil { + gerr = err + } + return gerr } func (em *ErrorManager) RecordIndexConflictError( @@ -290,17 +297,20 @@ func (em *ErrorManager) RecordIndexConflictError( conflictInfos []DataConflictInfo, rawHandles, rawRows [][]byte, ) error { + var gerr error if len(conflictInfos) == 0 { return nil } if em.remainingError.Conflict.Sub(int64(len(conflictInfos))) < 0 { threshold := em.configError.Conflict.Load() - return errors.Errorf(" meet errors exceed the max-error.conflict threshold %d", threshold) + // Still need to record this batch of conflict records, and then return this error at last. + // Otherwise, if the max-error.conflict is set a very small value, non of the conflict errors will be recorded + gerr = errors.Errorf("The number of conflict errors exceeds the threshold configured by `max-error.conflict`: '%d'", threshold) } if em.db == nil { - return nil + return gerr } exec := common.SQLWithRetry{ @@ -308,7 +318,7 @@ func (em *ErrorManager) RecordIndexConflictError( Logger: logger, HideQueryLog: redact.NeedRedact(), } - return exec.Transact(ctx, "insert index conflict error record", func(c context.Context, txn *sql.Tx) error { + if err := exec.Transact(ctx, "insert index conflict error record", func(c context.Context, txn *sql.Tx) error { sb := &strings.Builder{} fmt.Fprintf(sb, insertIntoConflictErrorIndex, em.schemaEscaped) var sqlArgs []interface{} @@ -331,7 +341,10 @@ func (em *ErrorManager) RecordIndexConflictError( } _, err := txn.ExecContext(c, sb.String(), sqlArgs...) return err - }) + }); err != nil { + gerr = err + } + return gerr } // ResolveAllConflictKeys query all conflicting rows (handle and their diff --git a/br/tests/lightning_config_max_error/data/mytest.testtbl-schema.sql b/br/tests/lightning_config_max_error/data/mytest.testtbl-schema.sql new file mode 100644 index 0000000000000..93582d5178139 --- /dev/null +++ b/br/tests/lightning_config_max_error/data/mytest.testtbl-schema.sql @@ -0,0 +1,5 @@ +CREATE TABLE testtbl ( + id INTEGER PRIMARY KEY, + val1 VARCHAR(40) NOT NULL, + INDEX `idx_val1` (`val1`) +); diff --git a/br/tests/lightning_config_max_error/data/mytest.testtbl.csv b/br/tests/lightning_config_max_error/data/mytest.testtbl.csv new file mode 100644 index 0000000000000..021f6bbf7be1c --- /dev/null +++ b/br/tests/lightning_config_max_error/data/mytest.testtbl.csv @@ -0,0 +1,16 @@ +id,val1 +1,"aaa01" +2,"aaa01" +3,"aaa02" +4,"aaa02" +5,"aaa05" +6,"aaa06" +7,"aaa07" +8,"aaa08" +9,"aaa09" +10,"aaa10" +1,"bbb01" +2,"bbb02" +3,"bbb03" +4,"bbb04" +5,"bbb05" diff --git a/br/tests/lightning_config_max_error/err_config.toml b/br/tests/lightning_config_max_error/err_config.toml new file mode 100644 index 0000000000000..79447e685a8f5 --- /dev/null +++ b/br/tests/lightning_config_max_error/err_config.toml @@ -0,0 +1,8 @@ +[lightning.max-error] +conflict = 4 + +[mydumper.csv] +header = true + +[tikv-importer] +duplicate-resolution = 'remove' diff --git a/br/tests/lightning_config_max_error/normal_config.toml b/br/tests/lightning_config_max_error/normal_config.toml new file mode 100644 index 0000000000000..92e08739fe04a --- /dev/null +++ b/br/tests/lightning_config_max_error/normal_config.toml @@ -0,0 +1,8 @@ +[lightning.max-error] +conflict = 20 + +[mydumper.csv] +header = true + +[tikv-importer] +duplicate-resolution = 'remove' diff --git a/br/tests/lightning_config_max_error/normal_config_old_style.toml b/br/tests/lightning_config_max_error/normal_config_old_style.toml new file mode 100644 index 0000000000000..fe402d071f5e0 --- /dev/null +++ b/br/tests/lightning_config_max_error/normal_config_old_style.toml @@ -0,0 +1,8 @@ +[lightning] +max-error = 0 # this actually sets the type error + +[mydumper.csv] +header = true + +[tikv-importer] +duplicate-resolution = 'remove' diff --git a/br/tests/lightning_config_max_error/run.sh b/br/tests/lightning_config_max_error/run.sh new file mode 100755 index 0000000000000..1d850ae55f0d8 --- /dev/null +++ b/br/tests/lightning_config_max_error/run.sh @@ -0,0 +1,81 @@ +#!/bin/sh +# +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eux + +check_cluster_version 4 0 0 'local backend' || exit 0 + +mydir=$(dirname "${BASH_SOURCE[0]}") + +data_file="${mydir}/data/mytest.testtbl.csv" + +total_row_count=$( sed '1d' "${data_file}" | wc -l | xargs echo ) +uniq_row_count=$( sed '1d' "${data_file}" | awk -F, '{print $1}' | sort | uniq -c | awk '{print $1}' | grep -c '1' | xargs echo ) +duplicated_row_count=$(( ${total_row_count} - ${uniq_row_count} )) + +run_sql 'DROP TABLE IF EXISTS mytest.testtbl' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v1' + +stderr_file="/tmp/${TEST_NAME}.stderr" + +set +e +if run_lightning --backend local --config "${mydir}/err_config.toml" 2> "${stderr_file}"; then + echo "The lightning import doesn't fail as expected" >&2 + exit 1 +fi +set -e + +err_msg=$( cat << EOF +tidb lightning encountered error: collect local duplicate rows failed: The number of conflict errors exceeds the threshold configured by \`max-error.conflict\`: '4' +EOF +) +cat "${stderr_file}" +grep -q "${err_msg}" "${stderr_file}" + +run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v1' +# Although conflict error number exceeds the max-error limit, +# all the conflict errors are recorded, +# because recording of conflict errors are executed batch by batch (batch size 1024), +# this batch of conflict errors are all recorded +check_contains "COUNT(*): ${duplicated_row_count}" + +# import a second time + +run_sql 'DROP TABLE IF EXISTS mytest.testtbl' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v1' + +run_lightning --backend local --config "${mydir}/normal_config.toml" + +run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v1' +check_contains "COUNT(*): ${duplicated_row_count}" + +# Check remaining records in the target table +run_sql 'SELECT COUNT(*) FROM mytest.testtbl' +check_contains "COUNT(*): ${uniq_row_count}" + +# import a third time + +run_sql 'DROP TABLE IF EXISTS mytest.testtbl' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v1' + +run_lightning --backend local --config "${mydir}/normal_config_old_style.toml" + +run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v1' +check_contains "COUNT(*): ${duplicated_row_count}" + +# Check remaining records in the target table +run_sql 'SELECT COUNT(*) FROM mytest.testtbl' +check_contains "COUNT(*): ${uniq_row_count}" From c8bffd42c244e41c4e438e3e2f2b55a9ec3d5697 Mon Sep 17 00:00:00 2001 From: hehechen Date: Tue, 31 Jan 2023 14:31:55 +0800 Subject: [PATCH 2/5] DDL: Skip collecting TiFlash status when TiFlash is down (#40872) close pingcap/tidb#38484 --- ddl/ddl_tiflash_api.go | 8 ++++++++ ddl/tiflashtest/ddl_tiflash_test.go | 20 ++++++++++++++++++++ domain/infosync/tiflash_manager.go | 12 +++++++++++- 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/ddl/ddl_tiflash_api.go b/ddl/ddl_tiflash_api.go index 4b8fca2a91c0f..1ade909b93ee9 100644 --- a/ddl/ddl_tiflash_api.go +++ b/ddl/ddl_tiflash_api.go @@ -424,6 +424,14 @@ func (d *ddl) refreshTiFlashTicker(ctx sessionctx.Context, pollTiFlashContext *T return err } } + + failpoint.Inject("OneTiFlashStoreDown", func() { + for storeID, store := range pollTiFlashContext.TiFlashStores { + store.Store.StateName = "Down" + pollTiFlashContext.TiFlashStores[storeID] = store + break + } + }) pollTiFlashContext.PollCounter++ // Start to process every table. diff --git a/ddl/tiflashtest/ddl_tiflash_test.go b/ddl/tiflashtest/ddl_tiflash_test.go index d1d0368138b18..c3ec3a1d2b0fb 100644 --- a/ddl/tiflashtest/ddl_tiflash_test.go +++ b/ddl/tiflashtest/ddl_tiflash_test.go @@ -1334,3 +1334,23 @@ func TestTiFlashAvailableAfterAddPartition(t *testing.T) { require.NotNil(t, pi) require.Equal(t, len(pi.Definitions), 2) } + +func TestTiFlashAvailableAfterDownOneStore(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash") + tk.MustExec("create table ddltiflash(z int) PARTITION BY RANGE(z) (PARTITION p0 VALUES LESS THAN (10))") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/OneTiFlashStoreDown", `return`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/OneTiFlashStoreDown", `return`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/OneTiFlashStoreDown")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/OneTiFlashStoreDown")) + }() + + tk.MustExec("alter table ddltiflash set tiflash replica 1") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + CheckTableAvailable(s.dom, t, 1, []string{}) +} diff --git a/domain/infosync/tiflash_manager.go b/domain/infosync/tiflash_manager.go index 4d01c64de002d..d5cc46f95db95 100644 --- a/domain/infosync/tiflash_manager.go +++ b/domain/infosync/tiflash_manager.go @@ -31,6 +31,7 @@ import ( "github.com/gorilla/mux" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/store/helper" "github.com/pingcap/tidb/tablecodec" @@ -89,10 +90,19 @@ func getTiFlashPeerWithoutLagCount(tiFlashStores map[int64]helper.StoreStat, tab for _, store := range tiFlashStores { regionReplica := make(map[int64]int) err := helper.CollectTiFlashStatus(store.Store.StatusAddress, tableID, ®ionReplica) + failpoint.Inject("OneTiFlashStoreDown", func() { + if store.Store.StateName == "Down" { + err = errors.New("mock TiFlasah down") + } + }) if err != nil { logutil.BgLogger().Error("Fail to get peer status from TiFlash.", zap.Int64("tableID", tableID)) - return 0, err + // Just skip down or offline or tomestone stores, because PD will migrate regions from these stores. + if store.Store.StateName == "Up" || store.Store.StateName == "Disconnected" { + return 0, err + } + continue } flashPeerCount += len(regionReplica) } From 6a55f3ea97701ea02fd515d796d7a0aa2468c367 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Tue, 31 Jan 2023 07:57:54 +0100 Subject: [PATCH 3/5] server: Return error when no password is specified for caching_sha2_password account with password (#40858) close pingcap/tidb#40831 --- server/conn.go | 25 ++++++++++++++++++++----- server/conn_test.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 5 deletions(-) diff --git a/server/conn.go b/server/conn.go index 4d4300c099ecb..4cd2aedcb42c1 100644 --- a/server/conn.go +++ b/server/conn.go @@ -668,12 +668,12 @@ func (cc *clientConn) readOptionalSSLRequestAndHandshakeResponse(ctx context.Con switch resp.AuthPlugin { case mysql.AuthCachingSha2Password: - resp.Auth, err = cc.authSha(ctx) + resp.Auth, err = cc.authSha(ctx, resp) if err != nil { return err } case mysql.AuthTiDBSM3Password: - resp.Auth, err = cc.authSM3(ctx) + resp.Auth, err = cc.authSM3(ctx, resp) if err != nil { return err } @@ -727,7 +727,7 @@ func (cc *clientConn) handleAuthPlugin(ctx context.Context, resp *handshakeRespo } // authSha implements the caching_sha2_password specific part of the protocol. -func (cc *clientConn) authSha(ctx context.Context) ([]byte, error) { +func (cc *clientConn) authSha(ctx context.Context, resp handshakeResponse41) ([]byte, error) { const ( shaCommand = 1 requestRsaPubKey = 2 // Not supported yet, only TLS is supported as secure channel. @@ -735,6 +735,13 @@ func (cc *clientConn) authSha(ctx context.Context) ([]byte, error) { fastAuthFail = 4 ) + // If no password is specified, we don't send the FastAuthFail to do the full authentication + // as that doesn't make sense without a password and confuses the client. + // https://github.com/pingcap/tidb/issues/40831 + if len(resp.Auth) == 0 { + return []byte{}, nil + } + // Currently we always send a "FastAuthFail" as the cached part of the protocol isn't implemented yet. // This triggers the client to send the full response. err := cc.writePacket([]byte{0, 0, 0, 0, shaCommand, fastAuthFail}) @@ -757,8 +764,16 @@ func (cc *clientConn) authSha(ctx context.Context) ([]byte, error) { } // authSM3 implements the tidb_sm3_password specific part of the protocol. -func (cc *clientConn) authSM3(ctx context.Context) ([]byte, error) { - err := cc.writePacket([]byte{0, 0, 0, 0, 1, 4}) +// tidb_sm3_password is very similar to caching_sha2_password. +func (cc *clientConn) authSM3(ctx context.Context, resp handshakeResponse41) ([]byte, error) { + // If no password is specified, we don't send the FastAuthFail to do the full authentication + // as that doesn't make sense without a password and confuses the client. + // https://github.com/pingcap/tidb/issues/40831 + if len(resp.Auth) == 0 { + return []byte{}, nil + } + + err := cc.writePacket([]byte{0, 0, 0, 0, 1, 4}) // fastAuthFail if err != nil { logutil.Logger(ctx).Error("authSM3 packet write failed", zap.Error(err)) return nil, err diff --git a/server/conn_test.go b/server/conn_test.go index 9f8033cd1f98d..fb1cd15102129 100644 --- a/server/conn_test.go +++ b/server/conn_test.go @@ -1806,3 +1806,48 @@ func TestExtensionChangeUser(t *testing.T) { require.Equal(t, expectedConnInfo.Error, logInfo.Error) require.Equal(t, *(expectedConnInfo.ConnectionInfo), *(logInfo.ConnectionInfo)) } + +func TestAuthSha(t *testing.T) { + store := testkit.CreateMockStore(t) + + var outBuffer bytes.Buffer + tidbdrv := NewTiDBDriver(store) + cfg := newTestConfig() + cfg.Port, cfg.Status.StatusPort = 0, 0 + cfg.Status.ReportStatus = false + server, err := NewServer(cfg, tidbdrv) + require.NoError(t, err) + defer server.Close() + + cc := &clientConn{ + connectionID: 1, + salt: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14}, + server: server, + pkt: &packetIO{ + bufWriter: bufio.NewWriter(&outBuffer), + }, + collation: mysql.DefaultCollationID, + peerHost: "localhost", + alloc: arena.NewAllocator(512), + chunkAlloc: chunk.NewAllocator(), + capability: mysql.ClientProtocol41, + } + + tk := testkit.NewTestKit(t, store) + ctx := &TiDBContext{Session: tk.Session()} + cc.setCtx(ctx) + + resp := handshakeResponse41{ + Capability: mysql.ClientProtocol41 | mysql.ClientPluginAuth, + AuthPlugin: mysql.AuthCachingSha2Password, + Auth: []byte{}, // No password + } + + authData, err := cc.authSha(context.Background(), resp) + require.NoError(t, err) + + // If no password is specified authSha() should return an empty byte slice + // which differs from when a password is specified as that should trigger + // fastAuthFail and the rest of the auth process. + require.Equal(t, authData, []byte{}) +} From eb53aa878de312f7729630bc08b54c953ad1a3ab Mon Sep 17 00:00:00 2001 From: MoCuishle28 <32541204+MoCuishle28@users.noreply.github.com> Date: Tue, 31 Jan 2023 15:27:54 +0800 Subject: [PATCH 4/5] br: fix br debug encode panic (#40880) close pingcap/tidb#40878 --- br/pkg/utils/json.go | 4 ++++ br/pkg/utils/json_test.go | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/br/pkg/utils/json.go b/br/pkg/utils/json.go index 736032b4beedc..a29725db6b1b3 100644 --- a/br/pkg/utils/json.go +++ b/br/pkg/utils/json.go @@ -114,6 +114,10 @@ func makeJSONSchema(schema *backuppb.Schema) (*jsonSchema, error) { func fromJSONSchema(jSchema *jsonSchema) (*backuppb.Schema, error) { schema := jSchema.Schema + if schema == nil { + schema = &backuppb.Schema{} + } + var err error schema.Db, err = json.Marshal(jSchema.DB) if err != nil { diff --git a/br/pkg/utils/json_test.go b/br/pkg/utils/json_test.go index 8d8eeb6457332..3f03f287d92f1 100644 --- a/br/pkg/utils/json_test.go +++ b/br/pkg/utils/json_test.go @@ -204,6 +204,44 @@ var testMetaJSONs = [][]byte{ "is_raw_kv": true, "br_version": "BR\nRelease Version: v5.0.0-master\nGit Commit Hash: c0d60dae4998cf9ac40f02e5444731c15f0b2522\nGit Branch: HEAD\nGo Version: go1.13.4\nUTC Build Time: 2021-03-25 08:10:08\nRace Enabled: false" }`), + []byte(`{ + "files": [ + { + "sha256": "3ae857ef9b379d498ae913434f1d47c3e90a55f3a4cd9074950bfbd163d5e5fc", + "start_key": "7480000000000000115f720000000000000000", + "end_key": "7480000000000000115f72ffffffffffffffff00", + "name": "1_20_9_36adb8cedcd7af34708edff520499e712e2cfdcb202f5707dc9305a031d55a98_1675066275424_write.sst", + "end_version": 439108573623222300, + "crc64xor": 16261462091570213000, + "total_kvs": 15, + "total_bytes": 1679, + "cf": "write", + "size": 2514, + "cipher_iv": "56MTbxA4CaNILpirKnBxUw==" + } + ], + "schemas": [ + { + "db": { + "charset": "utf8mb4", + "collate": "utf8mb4_bin", + "db_name": { + "L": "test", + "O": "test" + }, + "id": 1, + "policy_ref_info": null, + "state": 5 + } + } + ], + "ddls": [], + "cluster_id": 7194351714070942000, + "cluster_version": "\"6.1.0\"\n", + "br_version": "BR\nRelease Version: v6.1.0\nGit Commit Hash: 1a89decdb192cbdce6a7b0020d71128bc964d30f\nGit Branch: heads/refs/tags/v6.1.0\nGo Version: go1.18.2\nUTC Build Time: 2022-06-05 05:09:12\nRace Enabled: false", + "end_version": 439108573623222300, + "new_collations_enabled": "True" + }`), } func TestEncodeAndDecode(t *testing.T) { From 05edfd4a7dbcbc34558d3dcbc1359bfc4a2ceed8 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Tue, 31 Jan 2023 16:17:54 +0800 Subject: [PATCH 5/5] planner: fix HashAgg cannot pushdown to tiflash_compute (#40828) close pingcap/tidb#40717 --- executor/tiflashtest/tiflash_test.go | 5 +- planner/core/BUILD.bazel | 1 - planner/core/find_best_task.go | 13 ++- planner/core/logical_plan_builder.go | 19 ----- planner/core/physical_plan_test.go | 73 ++++++++++++++++ planner/core/planbuilder.go | 24 +----- planner/core/rule_aggregation_push_down.go | 6 ++ planner/core/task.go | 6 ++ planner/core/testdata/plan_suite_in.json | 8 ++ planner/core/testdata/plan_suite_out.json | 99 ++++++++++++++++++++++ 10 files changed, 204 insertions(+), 50 deletions(-) diff --git a/executor/tiflashtest/tiflash_test.go b/executor/tiflashtest/tiflash_test.go index e8cd94d889188..fec246a5c5057 100644 --- a/executor/tiflashtest/tiflash_test.go +++ b/executor/tiflashtest/tiflash_test.go @@ -1270,7 +1270,7 @@ func TestDisaggregatedTiFlash(t *testing.T) { tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") err = tk.ExecToErr("select * from t;") - require.Contains(t, err.Error(), "Please check tiflash_compute node is available") + require.Contains(t, err.Error(), "tiflash_compute node is unavailable") config.UpdateGlobal(func(conf *config.Config) { conf.DisaggregatedTiFlash = false @@ -1304,9 +1304,6 @@ func TestDisaggregatedTiFlashQuery(t *testing.T) { require.NoError(t, err) tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") - needCheckTiFlashComputeNode := "false" - failpoint.Enable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery", fmt.Sprintf("return(%s)", needCheckTiFlashComputeNode)) - defer failpoint.Disable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery") tk.MustExec("explain select max( tbl_1.col_1 ) as r0 , sum( tbl_1.col_1 ) as r1 , sum( tbl_1.col_8 ) as r2 from tbl_1 where tbl_1.col_8 != 68 or tbl_1.col_3 between null and 939 order by r0,r1,r2;") tk.MustExec("set @@tidb_partition_prune_mode = 'static';") diff --git a/planner/core/BUILD.bazel b/planner/core/BUILD.bazel index 3afbdf3b8a0bc..12c2730c59364 100644 --- a/planner/core/BUILD.bazel +++ b/planner/core/BUILD.bazel @@ -107,7 +107,6 @@ go_library( "//sessiontxn/staleread", "//statistics", "//statistics/handle", - "//store/driver/backoff", "//table", "//table/tables", "//table/temptable", diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 6c5d2c49fddf0..1ecc9f995243c 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -2015,15 +2015,16 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid } // In disaggregated tiflash mode, only MPP is allowed, cop and batchCop is deprecated. // So if prop.TaskTp is RootTaskType, have to use mppTask then convert to rootTask. - isDisaggregatedTiFlashPath := config.GetGlobalConfig().DisaggregatedTiFlash && ts.StoreType == kv.TiFlash + isDisaggregatedTiFlash := config.GetGlobalConfig().DisaggregatedTiFlash + isDisaggregatedTiFlashPath := isDisaggregatedTiFlash && ts.StoreType == kv.TiFlash canMppConvertToRootForDisaggregatedTiFlash := isDisaggregatedTiFlashPath && prop.TaskTp == property.RootTaskType && ds.SCtx().GetSessionVars().IsMPPAllowed() if prop.TaskTp == property.MppTaskType || canMppConvertToRootForDisaggregatedTiFlash { if ts.KeepOrder { return invalidTask, nil } - if prop.MPPPartitionTp != property.AnyType || (ts.isPartition && !canMppConvertToRootForDisaggregatedTiFlash) { + if prop.MPPPartitionTp != property.AnyType || (ts.isPartition && !isDisaggregatedTiFlash) { // If ts is a single partition, then this partition table is in static-only prune, then we should not choose mpp execution. - // But in disaggregated tiflash mode, we can only use mpp, so we add ExchangeSender and ExchangeReceiver above TableScan for static pruning partition table. + // But in disaggregated tiflash mode, we enable using mpp for static pruning partition table, because cop and batchCop is deprecated. ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because table `" + ds.tableInfo.Name.O + "`is a partition table which is not supported when `@@tidb_partition_prune_mode=static`.") return invalidTask, nil } @@ -2052,7 +2053,11 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid // So have to return a rootTask, but prop requires mppTask, cannot meet this requirement. task = invalidTask } else if prop.TaskTp == property.RootTaskType { - // when got here, canMppConvertToRootForDisaggregatedTiFlash is true. + // When got here, canMppConvertToRootForDisaggregatedTiFlash is true. + // This is for situations like cannot generate mppTask for some operators. + // Such as when the build side of HashJoin is Projection, + // which cannot pushdown to tiflash(because TiFlash doesn't support some expr in Proj) + // So HashJoin cannot pushdown to tiflash. But we still want TableScan to run on tiflash. task = mppTask task = task.convertToRootTask(ds.ctx) if !task.invalid() { diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 2d73534fc2e1e..f6e566bac43a3 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" @@ -50,7 +49,6 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle" - "github.com/pingcap/tidb/store/driver/backoff" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/table/temptable" @@ -67,7 +65,6 @@ import ( "github.com/pingcap/tidb/util/plancodec" "github.com/pingcap/tidb/util/set" "github.com/pingcap/tidb/util/size" - "github.com/tikv/client-go/v2/tikv" ) const ( @@ -692,13 +689,6 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) { ds.preferStoreType = 0 return } - if config.GetGlobalConfig().DisaggregatedTiFlash && !isTiFlashComputeNodeAvailable(ds.ctx) { - // TiFlash is in disaggregated mode, need to make sure tiflash_compute node is available. - errMsg := "No available tiflash_compute node" - warning := ErrInternal.GenWithStack(errMsg) - ds.ctx.GetSessionVars().StmtCtx.AppendWarning(warning) - return - } for _, path := range ds.possibleAccessPaths { if path.StoreType == kv.TiFlash { ds.preferStoreType |= preferTiFlash @@ -716,15 +706,6 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) { } } -func isTiFlashComputeNodeAvailable(ctx sessionctx.Context) bool { - bo := backoff.NewBackofferWithVars(context.Background(), 5000, nil) - stores, err := ctx.GetStore().(tikv.Storage).GetRegionCache().GetTiFlashComputeStores(bo.TiKVBackoffer()) - if err != nil || len(stores) == 0 { - return false - } - return true -} - func resetNotNullFlag(schema *expression.Schema, start, end int) { for i := start; i < end; i++ { col := *schema.Columns[i] diff --git a/planner/core/physical_plan_test.go b/planner/core/physical_plan_test.go index 640c0f04630c1..a8a7a1918cfc0 100644 --- a/planner/core/physical_plan_test.go +++ b/planner/core/physical_plan_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/infoschema" @@ -2626,3 +2627,75 @@ func TestCountStarForTiFlash(t *testing.T) { tk.MustQuery("explain format = 'brief' " + ts).Check(testkit.Rows(output[i].Plan...)) } } + +func TestHashAggPushdownToTiFlashCompute(t *testing.T) { + var ( + input []string + output []struct { + SQL string + Plan []string + Warning []string + } + ) + planSuiteData := core.GetPlanSuiteData() + planSuiteData.LoadTestCases(t, &input, &output) + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists tbl_15;") + tk.MustExec(`create table tbl_15 (col_89 text (473) collate utf8mb4_bin , + col_90 timestamp default '1976-04-03' , + col_91 tinyint unsigned not null , + col_92 tinyint , + col_93 double not null , + col_94 datetime not null default '1970-06-08' , + col_95 datetime default '2028-02-13' , + col_96 int unsigned not null default 2532480521 , + col_97 char (168) default '') partition by hash (col_91) partitions 4;`) + + tk.MustExec("drop table if exists tbl_16;") + tk.MustExec(`create table tbl_16 (col_98 text (246) not null , + col_99 decimal (30 ,19) , + col_100 mediumint unsigned , + col_101 text (410) collate utf8mb4_bin , + col_102 date not null , + col_103 timestamp not null default '2003-08-27' , + col_104 text (391) not null , + col_105 date default '2010-10-24' , + col_106 text (9) not null,primary key (col_100, col_98(5), col_103), + unique key idx_23 (col_100, col_106 (3), col_101 (3))) partition by hash (col_100) partitions 2;`) + + config.UpdateGlobal(func(conf *config.Config) { + conf.DisaggregatedTiFlash = true + }) + defer config.UpdateGlobal(func(conf *config.Config) { + conf.DisaggregatedTiFlash = false + }) + + dom := domain.GetDomain(tk.Session()) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + require.True(t, exists) + for _, tblInfo := range db.Tables { + tableName := tblInfo.Name.L + if tableName == "tbl_15" || tableName == "tbl_16" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + + tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1;") + tk.MustExec("set @@tidb_partition_prune_mode = 'static';") + tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash';") + + for i, ts := range input { + testdata.OnRecord(func() { + output[i].SQL = ts + output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery("explain format = 'brief' " + ts).Rows()) + }) + tk.MustQuery("explain format = 'brief' " + ts).Check(testkit.Rows(output[i].Plan...)) + } +} diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index d14a6bf51ea49..5535faa97ab92 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -26,7 +26,6 @@ import ( "unsafe" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" @@ -1453,8 +1452,6 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath, isolationReadEngines := ctx.GetSessionVars().GetIsolationReadEngines() availableEngine := map[kv.StoreType]struct{}{} var availableEngineStr string - var outputComputeNodeErrMsg bool - noTiFlashComputeNode := config.GetGlobalConfig().DisaggregatedTiFlash && !isTiFlashComputeNodeAvailable(ctx) for i := len(paths) - 1; i >= 0; i-- { // availableEngineStr is for warning message. if _, ok := availableEngine[paths[i].StoreType]; !ok { @@ -1464,20 +1461,7 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath, } availableEngineStr += paths[i].StoreType.Name() } - _, exists := isolationReadEngines[paths[i].StoreType] - // Prune this path if: - // 1. path.StoreType doesn't exists in isolationReadEngines or - // 2. TiFlash is disaggregated and the number of tiflash_compute node is zero. - shouldPruneTiFlashCompute := noTiFlashComputeNode && exists && paths[i].StoreType == kv.TiFlash - failpoint.Inject("testDisaggregatedTiFlashQuery", func(val failpoint.Value) { - // Ignore check if tiflash_compute node number. - // After we support disaggregated tiflash in test framework, can delete this failpoint. - shouldPruneTiFlashCompute = val.(bool) - }) - if shouldPruneTiFlashCompute { - outputComputeNodeErrMsg = true - } - if (!exists && paths[i].StoreType != kv.TiDB) || shouldPruneTiFlashCompute { + if _, ok := isolationReadEngines[paths[i].StoreType]; !ok && paths[i].StoreType != kv.TiDB { paths = append(paths[:i], paths[i+1:]...) } } @@ -1486,11 +1470,7 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath, if len(paths) == 0 { helpMsg := "" if engineVals == "tiflash" { - if outputComputeNodeErrMsg { - helpMsg = ". Please check tiflash_compute node is available" - } else { - helpMsg = ". Please check tiflash replica or ensure the query is readonly" - } + helpMsg = ". Please check tiflash replica or ensure the query is readonly" } err = ErrInternal.GenWithStackByArgs(fmt.Sprintf("No access path for table '%s' is found with '%v' = '%v', valid values can be '%s'%s.", tblName.String(), variable.TiDBIsolationReadEngines, engineVals, availableEngineStr, helpMsg)) diff --git a/planner/core/rule_aggregation_push_down.go b/planner/core/rule_aggregation_push_down.go index c9326929b550f..24aef4161a8ec 100644 --- a/planner/core/rule_aggregation_push_down.go +++ b/planner/core/rule_aggregation_push_down.go @@ -405,6 +405,12 @@ func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, u if err != nil { return nil, err } + // Update mode of new generated firstRow as other agg funcs. + if len(agg.AggFuncs) != 0 { + firstRow.Mode = agg.AggFuncs[0].Mode + } else { + firstRow.Mode = aggregation.Partial1Mode + } newAgg.AggFuncs = append(newAgg.AggFuncs, firstRow) } tmpSchema := expression.NewSchema(newAgg.GetGroupByCols()...) diff --git a/planner/core/task.go b/planner/core/task.go index 5d7ca6e5fd424..ff4e22756f15a 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1163,6 +1163,12 @@ func (p *PhysicalUnionAll) attach2MppTasks(tasks ...task) task { func (p *PhysicalUnionAll) attach2Task(tasks ...task) task { for _, t := range tasks { if _, ok := t.(*mppTask); ok { + if p.TP() == plancodec.TypePartitionUnion { + // In attach2MppTasks(), will attach PhysicalUnion to mppTask directly. + // But PartitionUnion cannot pushdown to tiflash, so here disable PartitionUnion pushdown to tiflash explicitly. + // For now, return invalidTask immediately, we can refine this by letting childTask of PartitionUnion convert to rootTask. + return invalidTask + } return p.attach2MppTasks(tasks...) } } diff --git a/planner/core/testdata/plan_suite_in.json b/planner/core/testdata/plan_suite_in.json index d433f5dd88dbe..6f6e74fac3cfa 100644 --- a/planner/core/testdata/plan_suite_in.json +++ b/planner/core/testdata/plan_suite_in.json @@ -1186,5 +1186,13 @@ "select a, count(*) from t group by a -- shouldn't be rewritten", "select sum(a) from t -- sum shouldn't be rewritten" ] + }, + { + "name": "TestHashAggPushdownToTiFlashCompute", + "cases": [ + "select /*+ agg_to_cop() hash_agg() */ avg( distinct tbl_15.col_96 ) as r0 , min( tbl_15.col_92 ) as r1 , sum( distinct tbl_15.col_91 ) as r2 , max( tbl_15.col_92 ) as r3 from tbl_15 where tbl_15.col_94 != '2033-01-09' and tbl_15.col_93 > 7623.679908049186 order by r0,r1,r2,r3 limit 79 ;", + "select /*+ agg_to_cop() hash_agg() */ count(1) from tbl_15 ;", + "select /*+ agg_to_cop() stream_agg() */ avg( tbl_16.col_100 ) as r0 from tbl_16 where tbl_16.col_100 in ( 10672141 ) or tbl_16.col_104 in ( 'yfEG1t!*b' ,'C1*bqx_qyO' ,'vQ^yUpKHr&j#~' ) group by tbl_16.col_100 order by r0 limit 20 ;" + ] } ] diff --git a/planner/core/testdata/plan_suite_out.json b/planner/core/testdata/plan_suite_out.json index 31964823e95f2..14213e2223dab 100644 --- a/planner/core/testdata/plan_suite_out.json +++ b/planner/core/testdata/plan_suite_out.json @@ -7501,5 +7501,104 @@ "Warning": null } ] + }, + { + "Name": "TestHashAggPushdownToTiFlashCompute", + "Cases": [ + { + "SQL": "select /*+ agg_to_cop() hash_agg() */ avg( distinct tbl_15.col_96 ) as r0 , min( tbl_15.col_92 ) as r1 , sum( distinct tbl_15.col_91 ) as r2 , max( tbl_15.col_92 ) as r3 from tbl_15 where tbl_15.col_94 != '2033-01-09' and tbl_15.col_93 > 7623.679908049186 order by r0,r1,r2,r3 limit 79 ;", + "Plan": [ + "Limit 1.00 root offset:0, count:79", + "└─Sort 1.00 root Column#11, Column#12, Column#13, Column#14", + " └─HashAgg 1.00 root funcs:avg(distinct Column#89)->Column#11, funcs:min(Column#90)->Column#12, funcs:sum(distinct Column#91)->Column#13, funcs:max(Column#92)->Column#14", + " └─Projection 7100.44 root cast(test.tbl_15.col_96, decimal(10,0) UNSIGNED BINARY)->Column#89, Column#15, cast(test.tbl_15.col_91, decimal(3,0) UNSIGNED BINARY)->Column#91, Column#16", + " └─PartitionUnion 7100.44 root ", + " ├─TableReader 1775.11 root data:ExchangeSender", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#18)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#20)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", + " │ └─ExchangeReceiver 1775.11 mpp[tiflash] ", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.tbl_15.col_96, collate: binary], [name: test.tbl_15.col_91, collate: binary]", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#18, funcs:max(test.tbl_15.col_92)->Column#20", + " │ └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p0 keep order:false, stats:pseudo", + " ├─TableReader 1775.11 root data:ExchangeSender", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#30)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#32)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", + " │ └─ExchangeReceiver 1775.11 mpp[tiflash] ", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.tbl_15.col_96, collate: binary], [name: test.tbl_15.col_91, collate: binary]", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#30, funcs:max(test.tbl_15.col_92)->Column#32", + " │ └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p1 keep order:false, stats:pseudo", + " ├─TableReader 1775.11 root data:ExchangeSender", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#42)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#44)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", + " │ └─ExchangeReceiver 1775.11 mpp[tiflash] ", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.tbl_15.col_96, collate: binary], [name: test.tbl_15.col_91, collate: binary]", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#42, funcs:max(test.tbl_15.col_92)->Column#44", + " │ └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p2 keep order:false, stats:pseudo", + " └─TableReader 1775.11 root data:ExchangeSender", + " └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#54)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#56)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", + " └─ExchangeReceiver 1775.11 mpp[tiflash] ", + " └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.tbl_15.col_96, collate: binary], [name: test.tbl_15.col_91, collate: binary]", + " └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#54, funcs:max(test.tbl_15.col_92)->Column#56", + " └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", + " └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p3 keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "select /*+ agg_to_cop() hash_agg() */ count(1) from tbl_15 ;", + "Plan": [ + "HashAgg 1.00 root funcs:count(Column#12)->Column#11", + "└─PartitionUnion 4.00 root ", + " ├─HashAgg 1.00 root funcs:count(Column#13)->Column#12", + " │ └─TableReader 1.00 root data:ExchangeSender", + " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1.00 mpp[tiflash] funcs:count(test.tbl_15.col_91)->Column#13", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p0 keep order:false, stats:pseudo", + " ├─HashAgg 1.00 root funcs:count(Column#14)->Column#12", + " │ └─TableReader 1.00 root data:ExchangeSender", + " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1.00 mpp[tiflash] funcs:count(test.tbl_15.col_91)->Column#14", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p1 keep order:false, stats:pseudo", + " ├─HashAgg 1.00 root funcs:count(Column#15)->Column#12", + " │ └─TableReader 1.00 root data:ExchangeSender", + " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1.00 mpp[tiflash] funcs:count(test.tbl_15.col_91)->Column#15", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p2 keep order:false, stats:pseudo", + " └─HashAgg 1.00 root funcs:count(Column#16)->Column#12", + " └─TableReader 1.00 root data:ExchangeSender", + " └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1.00 mpp[tiflash] funcs:count(test.tbl_15.col_91)->Column#16", + " └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p3 keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "select /*+ agg_to_cop() stream_agg() */ avg( tbl_16.col_100 ) as r0 from tbl_16 where tbl_16.col_100 in ( 10672141 ) or tbl_16.col_104 in ( 'yfEG1t!*b' ,'C1*bqx_qyO' ,'vQ^yUpKHr&j#~' ) group by tbl_16.col_100 order by r0 limit 20 ;", + "Plan": [ + "TopN 20.00 root Column#10, offset:0, count:20", + "└─HashAgg 63.95 root group by:test.tbl_16.col_100, funcs:avg(Column#11, Column#12)->Column#10", + " └─PartitionUnion 63.95 root ", + " ├─StreamAgg 31.98 root group by:Column#22, funcs:count(Column#19)->Column#11, funcs:sum(Column#20)->Column#12, funcs:firstrow(Column#21)->test.tbl_16.col_100", + " │ └─Projection 39.97 root test.tbl_16.col_100, cast(test.tbl_16.col_100, decimal(8,0) UNSIGNED BINARY)->Column#20, test.tbl_16.col_100, test.tbl_16.col_100", + " │ └─Sort 39.97 root test.tbl_16.col_100", + " │ └─TableReader 39.97 root data:ExchangeSender", + " │ └─ExchangeSender 39.97 mpp[tiflash] ExchangeType: PassThrough", + " │ └─Selection 39.97 mpp[tiflash] or(eq(test.tbl_16.col_100, 10672141), in(test.tbl_16.col_104, \"yfEG1t!*b\", \"C1*bqx_qyO\", \"vQ^yUpKHr&j#~\"))", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_16, partition:p0 keep order:false, stats:pseudo", + " └─StreamAgg 31.98 root group by:Column#26, funcs:count(Column#23)->Column#11, funcs:sum(Column#24)->Column#12, funcs:firstrow(Column#25)->test.tbl_16.col_100", + " └─Projection 39.97 root test.tbl_16.col_100, cast(test.tbl_16.col_100, decimal(8,0) UNSIGNED BINARY)->Column#24, test.tbl_16.col_100, test.tbl_16.col_100", + " └─Sort 39.97 root test.tbl_16.col_100", + " └─TableReader 39.97 root data:ExchangeSender", + " └─ExchangeSender 39.97 mpp[tiflash] ExchangeType: PassThrough", + " └─Selection 39.97 mpp[tiflash] or(eq(test.tbl_16.col_100, 10672141), in(test.tbl_16.col_104, \"yfEG1t!*b\", \"C1*bqx_qyO\", \"vQ^yUpKHr&j#~\"))", + " └─TableFullScan 10000.00 mpp[tiflash] table:tbl_16, partition:p1 keep order:false, stats:pseudo" + ], + "Warning": null + } + ] } ]