Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
Set session var for every new conn (#280)
Browse files Browse the repository at this point in the history
Note the previous `setSessionConcurrencyVars` will set one connection
from the db connection pool, so we can't make sure the session will take
affect later using the `sql.DB`.
  • Loading branch information
july2993 authored Mar 10, 2020
1 parent 619c2dc commit 093ce9f
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 39 deletions.
10 changes: 9 additions & 1 deletion lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"reflect"
"regexp"
Expand Down Expand Up @@ -55,12 +56,19 @@ type MySQLConnectParam struct {
SQLMode string
MaxAllowedPacket uint64
TLS string
Vars map[string]string
}

func (param *MySQLConnectParam) ToDSN() string {
return fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&sql_mode='%s'&maxAllowedPacket=%d&tls=%s",
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&sql_mode='%s'&maxAllowedPacket=%d&tls=%s",
param.User, param.Password, param.Host, param.Port,
param.SQLMode, param.MaxAllowedPacket, param.TLS)

for k, v := range param.Vars {
dsn += fmt.Sprintf("&%s=%s", k, url.QueryEscape(v))
}

return dsn
}

func (param *MySQLConnectParam) Connect() (*sql.DB, error) {
Expand Down
5 changes: 4 additions & 1 deletion lightning/common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,11 @@ func (s *utilSuite) TestToDSN(c *C) {
SQLMode: "strict",
MaxAllowedPacket: 1234,
TLS: "cluster",
Vars: map[string]string{
"tidb_distsql_scan_concurrency": "1",
},
}
c.Assert(param.ToDSN(), Equals, "root:123456@tcp(127.0.0.1:4000)/?charset=utf8mb4&sql_mode='strict'&maxAllowedPacket=1234&tls=cluster")
c.Assert(param.ToDSN(), Equals, "root:123456@tcp(127.0.0.1:4000)/?charset=utf8mb4&sql_mode='strict'&maxAllowedPacket=1234&tls=cluster&tidb_distsql_scan_concurrency=1")
}

func (s *utilSuite) TestIsContextCanceledError(c *C) {
Expand Down
11 changes: 0 additions & 11 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1002,8 +1002,6 @@ func (t *TableRestore) postProcess(ctx context.Context, rc *RestoreController, c
return nil
}

setSessionConcurrencyVars(ctx, rc.tidbMgr.db, rc.cfg.TiDB)

// 3. alter table set auto_increment
if cp.Status < CheckpointStatusAlteredAutoInc {
rc.alterTableLock.Lock()
Expand Down Expand Up @@ -1466,15 +1464,6 @@ type RemoteChecksum struct {
TotalBytes uint64
}

func setSessionConcurrencyVars(ctx context.Context, db *sql.DB, dsn config.DBStore) {
common.SQLWithRetry{DB: db, Logger: log.L()}.Exec(ctx, "set session concurrency variables", `SET
SESSION tidb_build_stats_concurrency = ?,
SESSION tidb_distsql_scan_concurrency = ?,
SESSION tidb_index_serial_scan_concurrency = ?,
SESSION tidb_checksum_table_concurrency = ?;
`, dsn.BuildStatsConcurrency, dsn.DistSQLScanConcurrency, dsn.IndexSerialScanConcurrency, dsn.ChecksumTableConcurrency)
}

// DoChecksum do checksum for tables.
// table should be in <db>.<table>, format. e.g. foo.bar
func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum, error) {
Expand Down
26 changes: 0 additions & 26 deletions lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,32 +264,6 @@ func (s *restoreSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) {
c.Assert(mock.ExpectationsWereMet(), IsNil)
}

func (s *restoreSuite) TestSetSessionConcurrencyVars(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)

mock.ExpectExec(
`SET\s+`+
`SESSION tidb_build_stats_concurrency = \?,\s+`+
`SESSION tidb_distsql_scan_concurrency = \?,\s+`+
`SESSION tidb_index_serial_scan_concurrency = \?,\s+`+
`SESSION tidb_checksum_table_concurrency = \?`).
WithArgs(123, 456, 789, 543).
WillReturnResult(sqlmock.NewResult(1, 4))
mock.ExpectClose()

ctx := context.Background()
setSessionConcurrencyVars(ctx, db, config.DBStore{
BuildStatsConcurrency: 123,
DistSQLScanConcurrency: 456,
IndexSerialScanConcurrency: 789,
ChecksumTableConcurrency: 543,
})

c.Assert(db.Close(), IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
}

var _ = Suite(&tableRestoreSuite{})

type tableRestoreSuite struct {
Expand Down
7 changes: 7 additions & 0 deletions lightning/restore/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"database/sql"
"fmt"
"strconv"
"strings"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -49,6 +50,12 @@ func NewTiDBManager(dsn config.DBStore, tls *common.TLS) (*TiDBManager, error) {
SQLMode: dsn.StrSQLMode,
MaxAllowedPacket: dsn.MaxAllowedPacket,
TLS: dsn.TLS,
Vars: map[string]string{
"tidb_build_stats_concurrency": strconv.Itoa(dsn.BuildStatsConcurrency),
"tidb_distsql_scan_concurrency": strconv.Itoa(dsn.DistSQLScanConcurrency),
"tidb_index_serial_scan_concurrency": strconv.Itoa(dsn.IndexSerialScanConcurrency),
"tidb_checksum_table_concurrency": strconv.Itoa(dsn.ChecksumTableConcurrency),
},
}
db, err := param.Connect()
if err != nil {
Expand Down

0 comments on commit 093ce9f

Please sign in to comment.