Skip to content

Commit

Permalink
Support TLS; Reduce the need of config.toml in integration tests (pin…
Browse files Browse the repository at this point in the history
…gcap#270)

* *: go fmt

* *: support TLS

* tests: enable TLS for all components in the integration test

* tests: specify TLS and most default arguments via command line

refactored the tests so only essential settings remained in config.toml

* config: the default csv.null should be a capital \N not small \n

* security: clone the http.DefaultTransport rather than shallow copy

* tests: break PD retry loop

* backend: fix unit test failure

* tests: replace curl by wget

The `curl` on CI is too old to handle ECC keys. But `wget` somehow works.

* tests: fix test failure

* *: fix comments
  • Loading branch information
kennytm authored Feb 27, 2020
1 parent 4348bb1 commit 9068e31
Show file tree
Hide file tree
Showing 107 changed files with 866 additions and 1,453 deletions.
51 changes: 29 additions & 22 deletions cmd/tidb-lightning-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@ import (
"context"
"flag"
"fmt"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/import_sstpb"
uuid "github.com/satori/go.uuid"

kv "github.com/pingcap/tidb-lightning/lightning/backend"
"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pingcap/tidb-lightning/lightning/config"
"github.com/pingcap/tidb-lightning/lightning/restore"
uuid "github.com/satori/go.uuid"
)

func main() {
Expand Down Expand Up @@ -70,19 +71,27 @@ func run() error {
return err
}

tls, err := cfg.ToTLS()
if err != nil {
return err
}
if err = cfg.TiDB.Security.RegisterMySQL(); err != nil {
return err
}

ctx := context.Background()

if *compact {
return errors.Trace(compactCluster(ctx, cfg))
return errors.Trace(compactCluster(ctx, cfg, tls))
}
if len(*mode) != 0 {
return errors.Trace(switchMode(ctx, cfg, *mode))
return errors.Trace(switchMode(ctx, cfg, tls, *mode))
}
if len(*flagImportEngine) != 0 {
return errors.Trace(importEngine(ctx, cfg, *flagImportEngine))
return errors.Trace(importEngine(ctx, cfg, tls, *flagImportEngine))
}
if len(*flagCleanupEngine) != 0 {
return errors.Trace(cleanupEngine(ctx, cfg, *flagCleanupEngine))
return errors.Trace(cleanupEngine(ctx, cfg, tls, *flagCleanupEngine))
}

if len(*cpRemove) != 0 {
Expand All @@ -92,7 +101,7 @@ func run() error {
return errors.Trace(checkpointErrorIgnore(ctx, cfg, *cpErrIgnore))
}
if len(*cpErrDestroy) != 0 {
return errors.Trace(checkpointErrorDestroy(ctx, cfg, *cpErrDestroy))
return errors.Trace(checkpointErrorDestroy(ctx, cfg, tls, *cpErrDestroy))
}
if len(*cpDump) != 0 {
return errors.Trace(checkpointDump(ctx, cfg, *cpDump))
Expand All @@ -102,19 +111,18 @@ func run() error {
return nil
}

func compactCluster(ctx context.Context, cfg *config.Config) error {
func compactCluster(ctx context.Context, cfg *config.Config, tls *common.TLS) error {
return kv.ForAllStores(
ctx,
&http.Client{},
cfg.TiDB.PdAddr,
tls.WithHost(cfg.TiDB.PdAddr),
kv.StoreStateDisconnected,
func(c context.Context, store *kv.Store) error {
return kv.Compact(c, store.Address, restore.FullLevelCompact)
return kv.Compact(c, tls, store.Address, restore.FullLevelCompact)
},
)
}

func switchMode(ctx context.Context, cfg *config.Config, mode string) error {
func switchMode(ctx context.Context, cfg *config.Config, tls *common.TLS, mode string) error {
var m import_sstpb.SwitchMode
switch mode {
case config.ImportMode:
Expand All @@ -127,11 +135,10 @@ func switchMode(ctx context.Context, cfg *config.Config, mode string) error {

return kv.ForAllStores(
ctx,
&http.Client{},
cfg.TiDB.PdAddr,
tls.WithHost(cfg.TiDB.PdAddr),
kv.StoreStateDisconnected,
func(c context.Context, store *kv.Store) error {
return kv.SwitchMode(c, store.Address, m)
return kv.SwitchMode(c, tls, store.Address, m)
},
)
}
Expand All @@ -156,20 +163,20 @@ func checkpointErrorIgnore(ctx context.Context, cfg *config.Config, tableName st
return errors.Trace(cpdb.IgnoreErrorCheckpoint(ctx, tableName))
}

func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tableName string) error {
func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tls *common.TLS, tableName string) error {
cpdb, err := restore.OpenCheckpointsDB(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
defer cpdb.Close()

target, err := restore.NewTiDBManager(cfg.TiDB)
target, err := restore.NewTiDBManager(cfg.TiDB, tls)
if err != nil {
return errors.Trace(err)
}
defer target.Close()

importer, err := kv.NewImporter(ctx, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr)
importer, err := kv.NewImporter(ctx, tls, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -271,8 +278,8 @@ func unsafeCloseEngine(ctx context.Context, importer kv.Backend, engine string)
return ce, errors.Trace(err)
}

func importEngine(ctx context.Context, cfg *config.Config, engine string) error {
importer, err := kv.NewImporter(ctx, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr)
func importEngine(ctx context.Context, cfg *config.Config, tls *common.TLS, engine string) error {
importer, err := kv.NewImporter(ctx, tls, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -285,8 +292,8 @@ func importEngine(ctx context.Context, cfg *config.Config, engine string) error
return errors.Trace(ce.Import(ctx))
}

func cleanupEngine(ctx context.Context, cfg *config.Config, engine string) error {
importer, err := kv.NewImporter(ctx, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr)
func cleanupEngine(ctx context.Context, cfg *config.Config, tls *common.TLS, engine string) error {
importer, err := kv.NewImporter(ctx, tls, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr)
if err != nil {
return errors.Trace(err)
}
Expand Down
5 changes: 3 additions & 2 deletions lightning/backend/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pingcap/tidb-lightning/lightning/log"
)

Expand All @@ -42,8 +43,8 @@ type importer struct {

// NewImporter creates a new connection to tikv-importer. A single connection
// per tidb-lightning instance is enough.
func NewImporter(ctx context.Context, importServerAddr string, pdAddr string) (Backend, error) {
conn, err := grpc.DialContext(ctx, importServerAddr, grpc.WithInsecure())
func NewImporter(ctx context.Context, tls *common.TLS, importServerAddr string, pdAddr string) (Backend, error) {
conn, err := grpc.DialContext(ctx, importServerAddr, tls.ToGRPCDialOption())
if err != nil {
return MakeBackend(nil), errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion lightning/backend/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) {
types.NewUintDatum(18446744073709551615),
types.NewIntDatum(-9223372036854775808),
types.NewUintDatum(0),
types.Datum{},
{},
types.NewFloat32Datum(7.5),
types.NewFloat64Datum(5e-324),
types.NewFloat64Datum(1.7976931348623157e+308),
Expand Down
24 changes: 9 additions & 15 deletions lightning/backend/tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ package backend

import (
"context"
"fmt"
"net/http"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/import_sstpb"
Expand Down Expand Up @@ -76,11 +74,11 @@ type Store struct {
State StoreState `json:"state_name"`
}

func withTiKVConnection(ctx context.Context, tikvAddr string, action func(import_sstpb.ImportSSTClient) error) error {
func withTiKVConnection(ctx context.Context, tls *common.TLS, tikvAddr string, action func(import_sstpb.ImportSSTClient) error) error {
// Connect to the ImportSST service on the given TiKV node.
// The connection is needed for executing `action` and will be tear down
// when this function exits.
conn, err := grpc.DialContext(ctx, tikvAddr, grpc.WithInsecure())
conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption())
if err != nil {
return errors.Trace(err)
}
Expand All @@ -91,7 +89,7 @@ func withTiKVConnection(ctx context.Context, tikvAddr string, action func(import
}

// ForAllStores executes `action` in parallel for all TiKV stores connected to
// the given PD server.
// a PD server given by the HTTPS client `tls`.
//
// Returns the first non-nil error returned in all `action` calls. If all
// `action` returns nil, this method would return nil as well.
Expand All @@ -100,22 +98,18 @@ func withTiKVConnection(ctx context.Context, tikvAddr string, action func(import
// result (Tombstone < Offline < Down < Disconnected < Up).
func ForAllStores(
ctx context.Context,
client *http.Client,
pdAddr string,
tls *common.TLS,
minState StoreState,
action func(c context.Context, store *Store) error,
) error {
// Go through the HTTP interface instead of gRPC so we don't need to keep
// track of the cluster ID.
url := fmt.Sprintf("http://%s/pd/api/v1/stores", pdAddr)

var stores struct {
Stores []struct {
Store Store
}
}

err := common.GetJSON(client, url, &stores)
err := tls.GetJSON("/pd/api/v1/stores", &stores)
if err != nil {
return err
}
Expand All @@ -131,9 +125,9 @@ func ForAllStores(
}

// SwitchMode changes the TiKV node at the given address to a particular mode.
func SwitchMode(ctx context.Context, tikvAddr string, mode import_sstpb.SwitchMode) error {
func SwitchMode(ctx context.Context, tls *common.TLS, tikvAddr string, mode import_sstpb.SwitchMode) error {
task := log.With(zap.Stringer("mode", mode)).Begin(zap.DebugLevel, "switch mode")
err := withTiKVConnection(ctx, tikvAddr, func(client import_sstpb.ImportSSTClient) error {
err := withTiKVConnection(ctx, tls, tikvAddr, func(client import_sstpb.ImportSSTClient) error {
_, err := client.SwitchMode(ctx, &import_sstpb.SwitchModeRequest{
Mode: mode,
})
Expand All @@ -144,9 +138,9 @@ func SwitchMode(ctx context.Context, tikvAddr string, mode import_sstpb.SwitchMo
}

// Compact performs a leveled compaction with the given minimum level.
func Compact(ctx context.Context, tikvAddr string, level int32) error {
func Compact(ctx context.Context, tls *common.TLS, tikvAddr string, level int32) error {
task := log.With(zap.Int32("level", level)).Begin(zap.InfoLevel, "compact cluster")
err := withTiKVConnection(ctx, tikvAddr, func(client import_sstpb.ImportSSTClient) error {
err := withTiKVConnection(ctx, tls, tikvAddr, func(client import_sstpb.ImportSSTClient) error {
_, err := client.Compact(ctx, &import_sstpb.CompactRequest{
OutputLevel: level,
})
Expand Down
10 changes: 4 additions & 6 deletions lightning/backend/tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@ import (
"context"
"net/http"
"net/http/httptest"
"net/url"
"sort"
"sync"

. "github.com/pingcap/check"

kv "github.com/pingcap/tidb-lightning/lightning/backend"
"github.com/pingcap/tidb-lightning/lightning/common"
)

type tikvSuite struct{}

var _ = Suite(&tikvSuite{})

func (s *tikvSuite) TestForAllStores(c *C) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte(`
{
"count": 5,
Expand Down Expand Up @@ -74,15 +74,13 @@ func (s *tikvSuite) TestForAllStores(c *C) {
}))
defer server.Close()

serverURL, err := url.Parse(server.URL)
c.Assert(err, IsNil)

ctx := context.Background()
var (
allStoresLock sync.Mutex
allStores []*kv.Store
)
err = kv.ForAllStores(ctx, server.Client(), serverURL.Host, kv.StoreStateDown, func(c2 context.Context, store *kv.Store) error {
tls := common.NewTLSFromMockServer(server)
err := kv.ForAllStores(ctx, tls, kv.StoreStateDown, func(c2 context.Context, store *kv.Store) error {
allStoresLock.Lock()
allStores = append(allStores, store)
allStoresLock.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion lightning/checkpoints/checkpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,4 +302,4 @@ func (s *checkpointSuite) TestCheckpointMarshallUnmarshall(c *C) {
fileChkp2 := NewFileCheckpointsDB(path)
// if not recover empty map explicitly, it will become nil
c.Assert(fileChkp2.checkpoints.Checkpoints["a"].Engines, NotNil)
}
}
Loading

0 comments on commit 9068e31

Please sign in to comment.