Skip to content

Commit

Permalink
*: parse the data source directly into data and skip the KV encoder (p…
Browse files Browse the repository at this point in the history
…ingcap#145)

* *: parse the data source directly into data and skip the KV encoder

This skips the more complex pingcap/parser, and speeds up parsing speed
by 50%.

We have also refactored the KV delivery mechanism to use channels
directly, and revamped metrics:

- Make the metrics about engines into its own `engines` counter. The
  `tables` counter is exclusively about tables now.
- Removed `block_read_seconds`, `block_read_bytes`, `block_encode_seconds`
  since the concept of "block" no longer applies. Replaced by the
  equivalents named `row_***`.
- Removed `chunk_parser_read_row_seconds` for being overlapping with
  `row_read_seconds`.
- Changed `block_deliver_bytes` into a histogram vec, with kind=index
  or kind=data. Introduced `block_deliver_kv_pairs`.

* tests,restore: prevent spurious error in checkpoint_chunks test

Only kill Lightning if the whole chunk is imported exactly. The chunk
checkpoint may be recorded before a chunk is fully written, and this will
hit the failpoint more than 5 times.

* kv: use composed interface to simplify some types

* kv: properly handle the SQL mode

* common: disable IsContextCanceledError() when log level = debug

This helps debugging some mysterious cancellation where the log is
inhibited.

Added IsReallyContextCanceledError() for code logic affected by error
type.

* restore: made some log more detailed

* restore: made the SlowDownImport failpoint apply to index engines too

* restore: do not open a write stream when there are no KV pairs to send

* tests: ensure we drop the checkpoints DB before re-run

* mydump: fixed various off-by-one errors in the CSV parser

* *: rename `!IsContextCanceledError` to `ShouldLogError`

* *: addressed comments

* restore: zero the checksums and column permutations on initialization

* *: addressed comments

* tests: add back a missing license header

* tests: improve a comment.
  • Loading branch information
kennytm authored Apr 3, 2019
1 parent 82ed51b commit 7bae12e
Show file tree
Hide file tree
Showing 42 changed files with 4,715 additions and 3,091 deletions.
7 changes: 6 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ require (
github.com/coreos/go-semver v0.2.0
github.com/coreos/go-systemd v0.0.0-20181031085051-9002847aa142 // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/cznic/golex v0.0.0-20181122101858-9c343928389c // indirect
github.com/cznic/mathutil v0.0.0-20181021201202-eba54fb065b7
github.com/cznic/parser v0.0.0-20181122101858-d773202d5b1f
github.com/cznic/sortutil v0.0.0-20150617083342-4c7342852e65
github.com/cznic/strutil v0.0.0-20181122101858-275e90344537
github.com/cznic/y v0.0.0-20181122101901-b05e8c2e8d7b
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/etcd-io/gofail v0.0.0-20180808172546-51ce9a71510a
github.com/go-sql-driver/mysql v1.4.1
github.com/gogo/protobuf v1.2.0
github.com/gorilla/context v1.1.1 // indirect
Expand All @@ -23,6 +27,7 @@ require (
github.com/pingcap/parser v0.0.0-20190305073013-4f60445a0550
github.com/pingcap/tidb v0.0.0-20190309032432-ea9970968c73
github.com/pingcap/tidb-tools v2.1.3-0.20190116051332-34c808eef588+incompatible
github.com/pingcap/tipb v0.0.0-20180910045846-371b48b15d93
github.com/prometheus/client_golang v0.9.2
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910
github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446 // indirect
Expand Down
44 changes: 44 additions & 0 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion lightning/common/once_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (oe *OnceError) Set(tag string, e error) {
oe.err = e
}
oe.lock.Unlock()
if !IsContextCanceledError(e) {
if ShouldLogError(e) {
AppLogger.Errorf("[%s] error %v", tag, e)
}
}
Expand Down
32 changes: 28 additions & 4 deletions lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
tmysql "github.com/pingcap/parser/mysql"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -102,7 +103,7 @@ func TransactWithRetry(ctx context.Context, db *sql.DB, purpose string, action f
if IsRetryableError(err) {
continue
}
if !IsContextCanceledError(err) {
if ShouldLogError(err) {
AppLogger.Errorf("transaction %s [error] %v", purpose, err)
}
return errors.Trace(err)
Expand All @@ -125,7 +126,7 @@ func transactImpl(ctx context.Context, db *sql.DB, purpose string, action func(c
AppLogger.Warnf("transaction %s [error]%v", purpose, err)
rerr := txn.Rollback()
if rerr != nil {
if !IsContextCanceledError(rerr) {
if ShouldLogError(rerr) {
AppLogger.Errorf("transaction %s [error] %v", purpose, rerr)
}
}
Expand Down Expand Up @@ -181,9 +182,32 @@ func IsRetryableError(err error) bool {
}
}

// ShouldLogError returns whether the error should be logged.
// This function should only be used for inhabiting logs related to canceling,
// where the log is usually just noise.
//
// This function returns `false` when:
//
// - the error `IsContextCanceledError`
// - the log level is above "debug"
//
// This function also returns `false` when `err == nil`.
func ShouldLogError(err error) bool {
if err == nil {
return false
}
if AppLogger.IsLevelEnabled(logrus.DebugLevel) {
return true
}
return !IsContextCanceledError(err)
}

// IsContextCanceledError returns whether the error is caused by context
// cancellation. This function returns `false` (not a context-canceled error) if
// `err == nil`.
// cancellation. This function should only be used when the code logic is
// affected by whether the error is canceling or not. Normally, you should
// simply use ShouldLogError.
//
// This function returns `false` (not a context-canceled error) if `err == nil`.
func IsContextCanceledError(err error) bool {
err = errors.Cause(err)
return err == context.Canceled || status.Code(err) == codes.Canceled
Expand Down
12 changes: 10 additions & 2 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pingcap/tidb-tools/pkg/filter"
)
Expand All @@ -41,9 +42,11 @@ type DBStore struct {
Psw string `toml:"password" json:"-"`
StatusPort int `toml:"status-port" json:"status-port"`
PdAddr string `toml:"pd-addr" json:"pd-addr"`
SQLMode string `toml:"sql-mode" json:"sql-mode"`
StrSQLMode string `toml:"sql-mode" json:"sql-mode"`
LogLevel string `toml:"log-level" json:"log-level"`

SQLMode mysql.SQLMode `toml:"-" json:"-"`

DistSQLScanConcurrency int `toml:"distsql-scan-concurrency" json:"distsql-scan-concurrency"`
BuildStatsConcurrency int `toml:"build-stats-concurrency" json:"build-stats-concurrency"`
IndexSerialScanConcurrency int `toml:"index-serial-scan-concurrency" json:"index-serial-scan-concurrency"`
Expand Down Expand Up @@ -161,7 +164,7 @@ func NewConfig() *Config {
CheckRequirements: true,
},
TiDB: DBStore{
SQLMode: "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION",
StrSQLMode: mysql.DefaultSQLMode,
BuildStatsConcurrency: 20,
DistSQLScanConcurrency: 100,
IndexSerialScanConcurrency: 20,
Expand Down Expand Up @@ -226,6 +229,11 @@ func (cfg *Config) Load() error {
return errors.New("invalid config: `mydumper.csv.delimiter` must be one byte long or empty")
}

cfg.TiDB.SQLMode, err = mysql.GetSQLMode(cfg.TiDB.StrSQLMode)
if err != nil {
return errors.Annotate(err, "invalid config: `mydumper.tidb.sql_mode` must be a valid SQL_MODE")
}

// handle mydumper
if cfg.Mydumper.BatchSize <= 0 {
cfg.Mydumper.BatchSize = 100 * _G
Expand Down
26 changes: 9 additions & 17 deletions lightning/kv/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,24 @@

package kv

import "sync/atomic"
import (
"sync/atomic"

"github.com/pingcap/tidb/meta/autoid"
)

// PanickingAllocator is an ID allocator which panics on all operations except Rebase
type PanickingAllocator struct {
autoid.Allocator
base int64
}

// NewPanickingAllocator creates a new PanickingAllocator.
func NewPanickingAllocator(base int64) *PanickingAllocator {
return &PanickingAllocator{base: base}
}

func (alloc *PanickingAllocator) Alloc(int64) (int64, error) {
panic("unexpected Alloc() call")
}

func (alloc *PanickingAllocator) Reset(newBase int64) {
panic("unexpected Reset() call")
}

// Rebase implements the autoid.Allocator interface
func (alloc *PanickingAllocator) Rebase(tableID, newBase int64, allocIDs bool) error {
// CAS
for {
Expand All @@ -46,14 +45,7 @@ func (alloc *PanickingAllocator) Rebase(tableID, newBase int64, allocIDs bool) e
return nil
}

// Base implements the autoid.Allocator interface
func (alloc *PanickingAllocator) Base() int64 {
return atomic.LoadInt64(&alloc.base)
}

func (alloc *PanickingAllocator) End() int64 {
panic("unexpected End() call")
}

func (alloc *PanickingAllocator) NextGlobalAutoID(tableID int64) (int64, error) {
panic("unexpected NextGlobalAutoID() call")
}
15 changes: 10 additions & 5 deletions lightning/kv/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,13 @@ func (importer *Importer) OpenEngine(
return nil, errors.Trace(err)
}

openCounter := metric.EngineCounter.WithLabelValues("open")
openCounter := metric.ImporterEngineCounter.WithLabelValues("open")
openCounter.Inc()
common.AppLogger.Infof("[%s] open engine %s", tag, engineUUID)

// gofail: var FailIfEngineCountExceeds int
// {
// closedCounter := metric.EngineCounter.WithLabelValues("closed")
// closedCounter := metric.ImporterEngineCounter.WithLabelValues("closed")
// openCount := metric.ReadCounter(openCounter)
// closedCount := metric.ReadCounter(closedCounter)
// if openCount - closedCount > float64(FailIfEngineCountExceeds) {
Expand Down Expand Up @@ -276,7 +276,7 @@ func (stream *WriteStream) Put(kvs []kvec.KvPair) error {
// Close the write stream.
func (stream *WriteStream) Close() error {
if _, err := stream.wstream.CloseAndRecv(); err != nil {
if !common.IsContextCanceledError(err) {
if common.ShouldLogError(err) {
common.AppLogger.Errorf("[%s] close write stream cause failed : %v", stream.engine.tag, err)
}
return errors.Trace(err)
Expand All @@ -303,7 +303,7 @@ func (engine *OpenedEngine) Close(ctx context.Context) (*ClosedEngine, error) {
return nil, errors.Trace(err)
}
common.AppLogger.Infof("[%s] [%s] engine close takes %v", engine.tag, engine.uuid, time.Since(timer))
metric.EngineCounter.WithLabelValues("closed").Inc()
metric.ImporterEngineCounter.WithLabelValues("closed").Inc()
return closedEngine, nil
}

Expand Down Expand Up @@ -350,7 +350,7 @@ func (engine *ClosedEngine) Import(ctx context.Context) error {
if !common.IsRetryableError(err) {
if err == nil {
common.AppLogger.Infof("[%s] [%s] import takes %v", engine.tag, engine.uuid, time.Since(timer))
} else if !common.IsContextCanceledError(err) {
} else if common.ShouldLogError(err) {
common.AppLogger.Errorf("[%s] [%s] import failed and cannot retry, err %v", engine.tag, engine.uuid, err)
}
return errors.Trace(err)
Expand All @@ -373,3 +373,8 @@ func (engine *ClosedEngine) Cleanup(ctx context.Context) error {
common.AppLogger.Infof("[%s] [%s] cleanup takes %v", engine.tag, engine.uuid, time.Since(timer))
return errors.Trace(err)
}

// Tag gets an identification stirng of this engine for logging.
func (engine *ClosedEngine) Tag() string {
return engine.tag
}
86 changes: 86 additions & 0 deletions lightning/kv/session.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package kv

import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
kvec "github.com/pingcap/tidb/util/kvencoder"
)

// transaction is a trimmed down Transaction type which only supports adding a
// new KV pair.
type transaction struct {
kv.Transaction
kvPairs []kvec.KvPair
}

// Set implements the kv.Transaction interface
func (t *transaction) Set(k kv.Key, v []byte) error {
t.kvPairs = append(t.kvPairs, kvec.KvPair{
Key: k.Clone(),
Val: append([]byte{}, v...),
})
return nil
}

// SetOption implements the kv.Transaction interface
func (t *transaction) SetOption(opt kv.Option, val interface{}) {}

// DelOption implements the kv.Transaction interface
func (t *transaction) DelOption(kv.Option) {}

// session is a trimmed down Session type which only wraps our own trimmed-down
// transaction type and provides the session variables to the TiDB library
// optimized for Lightning.
type session struct {
sessionctx.Context
txn transaction
vars *variable.SessionVars
}

func newSession(sqlMode mysql.SQLMode) *session {
vars := variable.NewSessionVars()
vars.LightningMode = true
vars.SkipUTF8Check = true
vars.StmtCtx.InInsertStmt = true
vars.StmtCtx.BadNullAsWarning = !sqlMode.HasStrictMode()
vars.StmtCtx.TruncateAsWarning = !sqlMode.HasStrictMode()
vars.StmtCtx.OverflowAsWarning = !sqlMode.HasStrictMode()
vars.StmtCtx.AllowInvalidDate = sqlMode.HasAllowInvalidDatesMode()
vars.StmtCtx.IgnoreZeroInDate = !sqlMode.HasStrictMode() || sqlMode.HasAllowInvalidDatesMode()
vars.StmtCtx.TimeZone = vars.Location()
return &session{
txn: transaction{},
vars: vars,
}
}

func (se *session) takeKvPairs() []kvec.KvPair {
pairs := se.txn.kvPairs
se.txn.kvPairs = make([]kvec.KvPair, 0, len(pairs))
return pairs
}

// Txn implements the sessionctx.Context interface
func (se *session) Txn(active bool) (kv.Transaction, error) {
return &se.txn, nil
}

// GetSessionVars implements the sessionctx.Context interface
func (se *session) GetSessionVars() *variable.SessionVars {
return se.vars
}
Loading

0 comments on commit 7bae12e

Please sign in to comment.