Skip to content

Commit

Permalink
Merge branch 'master' into fix_index_merge_in_trans
Browse files Browse the repository at this point in the history
  • Loading branch information
guo-shaoge authored Dec 20, 2021
2 parents 4e8a87a + e1fb2f5 commit 97a24b1
Show file tree
Hide file tree
Showing 16 changed files with 368 additions and 139 deletions.
1 change: 1 addition & 0 deletions br/pkg/restore/pipeline_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan drainResul
eg.Go(func() error {
e := b.client.RestoreFiles(ectx, files, r.result.RewriteRules, b.updateCh)
if e != nil {
r.done()
return e
}
log.Info("restore batch done", rtree.ZapRanges(r.result.Ranges))
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

const (
localDirPerm os.FileMode = 0o755
localDirPerm os.FileMode = 0o777
localFilePerm os.FileMode = 0o644
// LocalURIPrefix represents the local storage prefix.
LocalURIPrefix = "file://"
Expand Down
41 changes: 9 additions & 32 deletions docs/design/2021-08-18-charsets.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,10 @@ After receiving the non-utf-8 character set request, this solution will convert
### Collation

Add gbk_chinese_ci and gbk_bin collations. In addition, considering the performance, we can add the collation of utf8mb4 (gbk_utf8mb4_bin).
- To support gbk_chinese_ci and gbk_bin collations, it needs to turn on the `new_collations_enabled_on_first_bootstrap` switch.
- If `new_collations_enabled_on_first_bootstrap` is off, it only supports gbk_utf8mb4_bin which does not need to be converted to gbk charset before processing.
- Implement the Collator and WildcardPattern interface functions for each collation.
- gbk_chinese_ci and gbk_bin need to convert utf-8 to gbk encoding and then generate a sort key. gbk_utf8mb4_bin does not need to be converted to gbk code for processing.
- gbk_chinese_ci and gbk_bin need to convert utf-8 to gbk encoding and then generate a sort key.
- Implement the corresponding functions in the Coprocessor.

### DDL
Expand All @@ -119,43 +121,18 @@ Other behaviors that need to be dealt with:
#### Compatibility between TiDB versions

- Upgrade compatibility:
- Upgrades from versions below 4.0 do not support gbk or any character sets other than the original five (binary, ascii, latin1, utf8, utf8mb4).
- Upgrade from version 4.0 or higher
- There may be compatibility issues when performing non-utf-8-related operations during the rolling upgrade.
- The new version of the cluster is expected to have no compatibility issues when reading old data.
- There may be compatibility issues when performing operations during the rolling upgrade.
- The new version of the cluster is expected to have no compatibility issues when reading old data.
- Downgrade compatibility:
- Downgrade is not compatible. The index key uses the table of gbk_bin/gbk_chinese_ci. The lower version of TiDB will have problems when decoding, and it needs to be transcoded before downgrading.

#### Compatibility with MySQL

Illegal character related issue:
- Illegal character related issue:
- Due to the internal conversion of non-utf-8-related encoding to utf8 for processing, it is not fully compatible with MySQL in some cases in terms of illegal character processing. TiDB controls its behavior through sql_mode.

```sql
create table t3(a char(10) charset gbk);
insert into t3 values ('a');
// 0xcee5 is a valid gbk hex literal but invalid utf8mb4 hex literal.
select hex(concat(a, 0xcee5)) from t3;
-- mysql 61cee5
// 0xe4b880 is an invalid gbk hex literal but valid utf8mb4 hex literal.
select hex(concat(a, 0xe4b880)) from t3;
-- mysql 61e4b880 (test on mysql 5.7 and 8.0.22)
-- mysql returns "Cannot convert string '\x80' from binary to gbk" (test on mysql 8.0.25 and 8.0.26). TiDB will be compatible with this behavior.
// 0x80 is a hex literal that invalid for neither gbk nor utf8mb4.
select hex(concat(a, 0x80)) from t3;
-- mysql 6180 (test on mysql 5.7 and 8.0.22)
-- mysql returns "Cannot convert string '\x80' from binary to gbk" (test on mysql 8.0.25 and 8.0.26). TiDB will be compatible with this behavior.
set @@sql_mode = '';
insert into t3 values (0x80);
-- mysql gets a warning and insert null values (warning: "Incorrect string value: '\x80' for column 'a' at row 1")
set @@sql_mode = 'STRICT_TRANS_TABLES';
insert into t3 values (0x80);
-- mysql returns "Incorrect string value: '\x80' for column 'a' at row 1"
```
- Collation
- Fully support `gbk_bin` and `gbk_chinese_ci` only when the config `new_collations_enabled_on_first_bootstrap` is enabled. Otherwise, it only supports gbk_utf8mb4_bin.

#### Compatibility with other components

Expand Down
28 changes: 27 additions & 1 deletion executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *testSerialSuite1) TestSetVar(c *C) {
tk.MustQuery(`select @@global.low_priority_updates;`).Check(testkit.Rows("0"))
tk.MustExec(`set @@global.low_priority_updates="ON";`)
tk.MustQuery(`select @@global.low_priority_updates;`).Check(testkit.Rows("1"))
tk.MustExec(`set @@global.low_priority_updates=DEFAULT;`) // It will be set to compiled-in default value.
tk.MustExec(`set @@global.low_priority_updates=DEFAULT;`) // It will be set to default var value.
tk.MustQuery(`select @@global.low_priority_updates;`).Check(testkit.Rows("0"))
// For session
tk.MustQuery(`select @@session.low_priority_updates;`).Check(testkit.Rows("0"))
Expand Down Expand Up @@ -1387,6 +1387,32 @@ func (s *testSuite5) TestEnableNoopFunctionsVar(c *C) {

}

// https://github.com/pingcap/tidb/issues/29670
func (s *testSuite5) TestDefaultBehavior(c *C) {
tk := testkit.NewTestKit(c, s.store)

tk.MustQuery("SELECT @@default_storage_engine").Check(testkit.Rows("InnoDB"))
tk.MustExec("SET GLOBAL default_storage_engine = 'somethingweird'")
tk.MustExec("SET default_storage_engine = 'MyISAM'")
tk.MustQuery("SELECT @@default_storage_engine").Check(testkit.Rows("MyISAM"))
tk.MustExec("SET default_storage_engine = DEFAULT") // reads from global value
tk.MustQuery("SELECT @@default_storage_engine").Check(testkit.Rows("somethingweird"))
tk.MustExec("SET @@SESSION.default_storage_engine = @@GLOBAL.default_storage_engine") // example from MySQL manual
tk.MustQuery("SELECT @@default_storage_engine").Check(testkit.Rows("somethingweird"))
tk.MustExec("SET GLOBAL default_storage_engine = 'somethingweird2'")
tk.MustExec("SET default_storage_engine = @@GLOBAL.default_storage_engine") // variation of example
tk.MustQuery("SELECT @@default_storage_engine").Check(testkit.Rows("somethingweird2"))
tk.MustExec("SET default_storage_engine = DEFAULT") // restore default again for session global
tk.MustExec("SET GLOBAL default_storage_engine = DEFAULT") // restore default for global
tk.MustQuery("SELECT @@SESSION.default_storage_engine, @@GLOBAL.default_storage_engine").Check(testkit.Rows("somethingweird2 InnoDB"))

// Try sql_mode option which has validation
err := tk.ExecToErr("SET GLOBAL sql_mode = 'DEFAULT'") // illegal now
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, `ERROR 1231 (42000): Variable 'sql_mode' can't be set to the value of 'DEFAULT'`)
tk.MustExec("SET GLOBAL sql_mode = DEFAULT")
}

func (s *testSuite5) TestRemovedSysVars(c *C) {
tk := testkit.NewTestKit(c, s.store)

Expand Down
2 changes: 1 addition & 1 deletion expression/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestGetTimeValue(t *testing.T) {
require.Equal(t, "2012-12-12 00:00:00", timeValue.String())

sessionVars := ctx.GetSessionVars()
err = variable.SetSessionSystemVar(sessionVars, "timestamp", "default")
err = variable.SetSessionSystemVar(sessionVars, "timestamp", "0")
require.NoError(t, err)
v, err = GetTimeValue(ctx, "2012-12-12 00:00:00", mysql.TypeTimestamp, types.MinFsp)
require.NoError(t, err)
Expand Down
8 changes: 7 additions & 1 deletion server/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1536,7 +1536,13 @@ func TestTopSQLAgent(t *testing.T) {
dbt.MustExec("set @@global.tidb_top_sql_report_interval_seconds=2;")
dbt.MustExec("set @@global.tidb_top_sql_max_statement_count=5;")

r := reporter.NewRemoteTopSQLReporter(reporter.NewSingleTargetDataSink(), plancodec.DecodeNormalizedPlan)
r := reporter.NewRemoteTopSQLReporter(plancodec.DecodeNormalizedPlan)
s := reporter.NewSingleTargetDataSink(r)
defer func() {
r.Close()
s.Close()
}()

tracecpu.GlobalSQLCPUProfiler.SetCollector(&collectorWrapper{r})

// TODO: change to ensure that the right sql statements are reported, not just counts
Expand Down
4 changes: 2 additions & 2 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -1764,8 +1764,8 @@ func doDMLWorks(s Session) {
// Init global system variables table.
values := make([]string, 0, len(variable.GetSysVars()))
for k, v := range variable.GetSysVars() {
// Session only variable should not be inserted.
if v.Scope != variable.ScopeSession {
// Only global variables should be inserted.
if v.HasGlobalScope() {
vVal := v.Value
if v.Name == variable.TiDBTxnMode && config.GetGlobalConfig().Store == "tikv" {
vVal = "pessimistic"
Expand Down
2 changes: 1 addition & 1 deletion session/bootstrap_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestBootstrap(t *testing.T) {
func globalVarsCount() int64 {
var count int64
for _, v := range variable.GetSysVars() {
if v.Scope != variable.ScopeSession {
if v.HasGlobalScope() {
count++
}
}
Expand Down
6 changes: 0 additions & 6 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,6 @@ var defaultSysVars = []*SysVar{
}
timestamp := s.StmtCtx.GetOrStoreStmtCache(stmtctx.StmtNowTsCacheKey, time.Now()).(time.Time)
return types.ToString(float64(timestamp.UnixNano()) / float64(time.Second))
}, GetGlobal: func(s *SessionVars) (string, error) {
// The Timestamp sysvar will have GetGlobal func even though it does not have global scope.
// It's GetGlobal func will only be called when "set timestamp = default".
// Setting timestamp to DEFAULT causes its value to be the current date and time as of the time it is accessed.
// See https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_timestamp
return DefTimestamp, nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: CollationDatabase, Value: mysql.DefaultCollationName, skipInit: true, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) {
return checkCollation(vars, normalizedValue, originalValue, scope)
Expand Down
5 changes: 0 additions & 5 deletions sessionctx/variable/variable.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,6 @@ func (sv *SysVar) Validate(vars *SessionVars, value string, scope ScopeFlag) (st

// ValidateFromType provides automatic validation based on the SysVar's type
func (sv *SysVar) ValidateFromType(vars *SessionVars, value string, scope ScopeFlag) (string, error) {
// The string "DEFAULT" is a special keyword in MySQL, which restores
// the compiled sysvar value. In which case we can skip further validation.
if strings.EqualFold(value, "DEFAULT") {
return sv.Value, nil
}
// Some sysvars in TiDB have a special behavior where the empty string means
// "use the config file value". This needs to be cleaned up once the behavior
// for instance variables is determined.
Expand Down
12 changes: 2 additions & 10 deletions util/topsql/reporter/datasink.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,6 @@ type DataSink interface {
// the specified deadline, or the sink is closed, an error will be returned.
TrySend(data *ReportData, deadline time.Time) error

// IsPaused indicates that the DataSink is not expecting to receive records for now
// and may resume in the future.
IsPaused() bool

// IsDown indicates that the DataSink has been down and can be cleared.
// Note that: once a DataSink is down, it cannot go back to be up.
IsDown() bool

// Close cleans up resources owned by this DataSink
Close()
// OnReporterClosing notifies DataSink that the reporter is closing.
OnReporterClosing()
}
90 changes: 80 additions & 10 deletions util/topsql/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package reporter
import (
"bytes"
"context"
"errors"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -53,6 +54,12 @@ type TopSQLReporter interface {
Close()
}

// DataSinkRegisterer is for registering DataSink
type DataSinkRegisterer interface {
Register(dataSink DataSink) error
Deregister(dataSink DataSink)
}

type cpuData struct {
timestamp uint64
records []tracecpu.SQLCPUTimeRecord
Expand Down Expand Up @@ -118,9 +125,11 @@ type planBinaryDecodeFunc func(string) (string, error)
// RemoteTopSQLReporter implements a TopSQL reporter that sends data to a remote agent
// This should be called periodically to collect TopSQL resource usage metrics
type RemoteTopSQLReporter struct {
ctx context.Context
cancel context.CancelFunc
dataSink DataSink
ctx context.Context
cancel context.CancelFunc

dataSinkMu sync.Mutex
dataSinks map[DataSink]struct{}

// normalizedSQLMap is an map, whose keys are SQL digest strings and values are SQLMeta.
normalizedSQLMap atomic.Value // sync.Map
Expand Down Expand Up @@ -148,12 +157,14 @@ type SQLMeta struct {
//
// planBinaryDecoder is a decoding function which will be called asynchronously to decode the plan binary to string
// MaxStatementsNum is the maximum SQL and plan number, which will restrict the memory usage of the internal LFU cache
func NewRemoteTopSQLReporter(dataSink DataSink, decodePlan planBinaryDecodeFunc) *RemoteTopSQLReporter {
func NewRemoteTopSQLReporter(decodePlan planBinaryDecodeFunc) *RemoteTopSQLReporter {
ctx, cancel := context.WithCancel(context.Background())
tsr := &RemoteTopSQLReporter{
ctx: ctx,
cancel: cancel,
dataSink: dataSink,
ctx: ctx,
cancel: cancel,

dataSinks: make(map[DataSink]struct{}, 10),

collectCPUDataChan: make(chan cpuData, 1),
reportCollectedDataChan: make(chan collectedData, 1),
decodePlan: decodePlan,
Expand Down Expand Up @@ -222,6 +233,47 @@ func (tsr *RemoteTopSQLReporter) RegisterPlan(planDigest []byte, normalizedBinar
}
}

var _ DataSinkRegisterer = &RemoteTopSQLReporter{}

// Register implements DataSinkRegisterer interface.
func (tsr *RemoteTopSQLReporter) Register(dataSink DataSink) error {
tsr.dataSinkMu.Lock()
defer tsr.dataSinkMu.Unlock()

select {
case <-tsr.ctx.Done():
return errors.New("reporter is closed")
default:
if len(tsr.dataSinks) >= 10 {
return errors.New("too many datasinks")
}

tsr.dataSinks[dataSink] = struct{}{}

if len(tsr.dataSinks) > 0 {
variable.TopSQLVariable.Enable.Store(true)
}

return nil
}
}

// Deregister implements DataSinkRegisterer interface.
func (tsr *RemoteTopSQLReporter) Deregister(dataSink DataSink) {
tsr.dataSinkMu.Lock()
defer tsr.dataSinkMu.Unlock()

select {
case <-tsr.ctx.Done():
default:
delete(tsr.dataSinks, dataSink)

if len(tsr.dataSinks) == 0 {
variable.TopSQLVariable.Enable.Store(false)
}
}
}

// Collect receives CPU time records for processing. WARN: It will drop the records if the processing is not in time.
// This function is thread-safe and efficient.
func (tsr *RemoteTopSQLReporter) Collect(timestamp uint64, records []tracecpu.SQLCPUTimeRecord) {
Expand All @@ -242,7 +294,15 @@ func (tsr *RemoteTopSQLReporter) Collect(timestamp uint64, records []tracecpu.SQ
// Close uses to close and release the reporter resource.
func (tsr *RemoteTopSQLReporter) Close() {
tsr.cancel()
tsr.dataSink.Close()

var m map[DataSink]struct{}
tsr.dataSinkMu.Lock()
m, tsr.dataSinks = tsr.dataSinks, make(map[DataSink]struct{})
tsr.dataSinkMu.Unlock()

for d := range m {
d.OnReporterClosing()
}
}

func addEvictedCPUTime(collectTarget map[string]*dataPoints, timestamp uint64, totalCPUTimeMs uint32) {
Expand Down Expand Up @@ -585,7 +645,17 @@ func (tsr *RemoteTopSQLReporter) doReport(data *ReportData) {
}
})
deadline := time.Now().Add(timeout)
if err := tsr.dataSink.TrySend(data, deadline); err != nil {
logutil.BgLogger().Warn("[top-sql] failed to send data to datasink", zap.Error(err))

tsr.dataSinkMu.Lock()
dataSinks := make([]DataSink, 0, len(tsr.dataSinks))
for ds := range tsr.dataSinks {
dataSinks = append(dataSinks, ds)
}
tsr.dataSinkMu.Unlock()

for _, ds := range dataSinks {
if err := ds.TrySend(data, deadline); err != nil {
logutil.BgLogger().Warn("[top-sql] failed to send data to datasink", zap.Error(err))
}
}
}
Loading

0 comments on commit 97a24b1

Please sign in to comment.