From e507c79cc9eef6910588d8d2935013c0acef25c1 Mon Sep 17 00:00:00 2001 From: Fullstop000 Date: Wed, 15 Jul 2020 00:31:20 +0800 Subject: [PATCH] forwarder: evict invalid current picked remote (#689) --- pkg/tidb/forwarder.go | 43 +++--------------------------------------- pkg/tidb/proxy.go | 4 +++- pkg/tidb/proxy_test.go | 22 +++++++++++++++++++-- 3 files changed, 26 insertions(+), 43 deletions(-) diff --git a/pkg/tidb/forwarder.go b/pkg/tidb/forwarder.go index 78fed2532b..acbe5cf12d 100644 --- a/pkg/tidb/forwarder.go +++ b/pkg/tidb/forwarder.go @@ -16,7 +16,6 @@ package tidb import ( "context" "crypto/tls" - "encoding/json" "fmt" "net" "net/http" @@ -25,13 +24,11 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/go-sql-driver/mysql" "github.com/joomcode/errorx" - "github.com/pingcap/log" "go.etcd.io/etcd/clientv3" "go.uber.org/fx" - "go.uber.org/zap" "github.com/pingcap-incubator/tidb-dashboard/pkg/config" - "github.com/pingcap-incubator/tidb-dashboard/pkg/pd" + "github.com/pingcap-incubator/tidb-dashboard/pkg/utils/topology" ) var ( @@ -39,13 +36,6 @@ var ( ErrNoAliveTiDB = ErrNS.NewType("no_alive_tidb") ) -type tidbServerInfo struct { - ID string `json:"ddl_id"` - IP string `json:"ip"` - Port int `json:"listening_port"` - StatusPort uint `json:"status_port"` -} - type ForwarderConfig struct { ClusterTLSConfig *tls.Config TiDBTLSConfig *tls.Config @@ -104,33 +94,6 @@ func (f *Forwarder) Start(ctx context.Context) error { return nil } -func (f *Forwarder) getServerInfo() ([]*tidbServerInfo, error) { - ctx, cancel := context.WithTimeout(f.lifecycleCtx, f.config.TiDBRetrieveTimeout) - resp, err := f.etcdClient.Get(ctx, pd.TiDBServerInformationPath, clientv3.WithPrefix()) - cancel() - - if err != nil { - log.Warn("Fail to get TiDB server info from PD", zap.Error(err)) - return nil, ErrPDAccessFailed.WrapWithNoMessage(err) - } - - allTiDB := make([]*tidbServerInfo, 0, len(resp.Kvs)) - for _, kv := range resp.Kvs { - var info *tidbServerInfo - err = json.Unmarshal(kv.Value, &info) - if err != nil { - continue - } - allTiDB = append(allTiDB, info) - } - if len(allTiDB) == 0 { - log.Warn("No TiDB is alive now") - return nil, backoff.Permanent(ErrNoAliveTiDB.NewWithNoMessage()) - } - - return allTiDB, nil -} - func (f *Forwarder) createProxy() (*proxy, error) { l, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { @@ -146,10 +109,10 @@ func (f *Forwarder) pollingForTiDB() { bo := backoff.WithContext(ebo, f.lifecycleCtx) for { - var allTiDB []*tidbServerInfo + var allTiDB []topology.TiDBInfo err := backoff.Retry(func() error { var err error - allTiDB, err = f.getServerInfo() + allTiDB, err = topology.FetchTiDBTopology(bo.Context(), f.etcdClient) return err }, bo) if err != nil { diff --git a/pkg/tidb/proxy.go b/pkg/tidb/proxy.go index 18a199da63..d086b7fbfb 100644 --- a/pkg/tidb/proxy.go +++ b/pkg/tidb/proxy.go @@ -135,7 +135,7 @@ func (p *proxy) serve(in net.Conn) { in.Close() } -// pick returns an active remote. If there +// pick returns an active remote if there is any func (p *proxy) pick() *remote { var picked *remote if p.current == "" { @@ -157,6 +157,8 @@ func (p *proxy) pick() *remote { r, ok := p.remotes.Load(p.current) if ok { picked = r.(*remote) + } else { + p.current = "" } } return picked diff --git a/pkg/tidb/proxy_test.go b/pkg/tidb/proxy_test.go index a7208629ae..25dcce099a 100644 --- a/pkg/tidb/proxy_test.go +++ b/pkg/tidb/proxy_test.go @@ -110,7 +110,25 @@ func TestProxyPick(t *testing.T) { // Shutdown current server to see if we can pick a new one ps.Close() client := &http.Client{} - res, err := client.Get("http://" + l.Addr().String()) + target := "http://" + l.Addr().String() + assertRespData(t, client, responseData, target) + + // Remove current picked from remotes and test out picking + p.remotes.Delete(strconv.Itoa(currentPicked)) + ps = servers[currentPicked] + if ps == nil { + t.Fatal("Fail to get current picked server") + } + ps.Close() + // First conn will be dropped as the current picked remote is deleted + _, err = client.Get(target) + assert.NotNil(t, err) + // Then pick a new remote + assertRespData(t, client, responseData, target) +} + +func assertRespData(t *testing.T, client *http.Client, expect string, target string) { + res, err := client.Get(target) if err != nil { t.Fatal(err) } @@ -118,5 +136,5 @@ func TestProxyPick(t *testing.T) { if err != nil { t.Fatal(err) } - assert.Equal(t, responseData, string(data)) + assert.Equal(t, expect, string(data)) }