Skip to content

Commit

Permalink
forwarder: evict invalid current picked remote (#689)
Browse files Browse the repository at this point in the history
  • Loading branch information
Fullstop000 authored Jul 14, 2020
1 parent 42d474f commit e507c79
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 43 deletions.
43 changes: 3 additions & 40 deletions pkg/tidb/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package tidb
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net"
"net/http"
Expand All @@ -25,27 +24,18 @@ import (
"github.com/cenkalti/backoff/v4"
"github.com/go-sql-driver/mysql"
"github.com/joomcode/errorx"
"github.com/pingcap/log"
"go.etcd.io/etcd/clientv3"
"go.uber.org/fx"
"go.uber.org/zap"

"github.com/pingcap-incubator/tidb-dashboard/pkg/config"
"github.com/pingcap-incubator/tidb-dashboard/pkg/pd"
"github.com/pingcap-incubator/tidb-dashboard/pkg/utils/topology"
)

var (
ErrPDAccessFailed = ErrNS.NewType("pd_access_failed")
ErrNoAliveTiDB = ErrNS.NewType("no_alive_tidb")
)

type tidbServerInfo struct {
ID string `json:"ddl_id"`
IP string `json:"ip"`
Port int `json:"listening_port"`
StatusPort uint `json:"status_port"`
}

type ForwarderConfig struct {
ClusterTLSConfig *tls.Config
TiDBTLSConfig *tls.Config
Expand Down Expand Up @@ -104,33 +94,6 @@ func (f *Forwarder) Start(ctx context.Context) error {
return nil
}

func (f *Forwarder) getServerInfo() ([]*tidbServerInfo, error) {
ctx, cancel := context.WithTimeout(f.lifecycleCtx, f.config.TiDBRetrieveTimeout)
resp, err := f.etcdClient.Get(ctx, pd.TiDBServerInformationPath, clientv3.WithPrefix())
cancel()

if err != nil {
log.Warn("Fail to get TiDB server info from PD", zap.Error(err))
return nil, ErrPDAccessFailed.WrapWithNoMessage(err)
}

allTiDB := make([]*tidbServerInfo, 0, len(resp.Kvs))
for _, kv := range resp.Kvs {
var info *tidbServerInfo
err = json.Unmarshal(kv.Value, &info)
if err != nil {
continue
}
allTiDB = append(allTiDB, info)
}
if len(allTiDB) == 0 {
log.Warn("No TiDB is alive now")
return nil, backoff.Permanent(ErrNoAliveTiDB.NewWithNoMessage())
}

return allTiDB, nil
}

func (f *Forwarder) createProxy() (*proxy, error) {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
Expand All @@ -146,10 +109,10 @@ func (f *Forwarder) pollingForTiDB() {
bo := backoff.WithContext(ebo, f.lifecycleCtx)

for {
var allTiDB []*tidbServerInfo
var allTiDB []topology.TiDBInfo
err := backoff.Retry(func() error {
var err error
allTiDB, err = f.getServerInfo()
allTiDB, err = topology.FetchTiDBTopology(bo.Context(), f.etcdClient)
return err
}, bo)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/tidb/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (p *proxy) serve(in net.Conn) {
in.Close()
}

// pick returns an active remote. If there
// pick returns an active remote if there is any
func (p *proxy) pick() *remote {
var picked *remote
if p.current == "" {
Expand All @@ -157,6 +157,8 @@ func (p *proxy) pick() *remote {
r, ok := p.remotes.Load(p.current)
if ok {
picked = r.(*remote)
} else {
p.current = ""
}
}
return picked
Expand Down
22 changes: 20 additions & 2 deletions pkg/tidb/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,31 @@ func TestProxyPick(t *testing.T) {
// Shutdown current server to see if we can pick a new one
ps.Close()
client := &http.Client{}
res, err := client.Get("http://" + l.Addr().String())
target := "http://" + l.Addr().String()
assertRespData(t, client, responseData, target)

// Remove current picked from remotes and test out picking
p.remotes.Delete(strconv.Itoa(currentPicked))
ps = servers[currentPicked]
if ps == nil {
t.Fatal("Fail to get current picked server")
}
ps.Close()
// First conn will be dropped as the current picked remote is deleted
_, err = client.Get(target)
assert.NotNil(t, err)
// Then pick a new remote
assertRespData(t, client, responseData, target)
}

func assertRespData(t *testing.T, client *http.Client, expect string, target string) {
res, err := client.Get(target)
if err != nil {
t.Fatal(err)
}
data, err := ioutil.ReadAll(res.Body)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, responseData, string(data))
assert.Equal(t, expect, string(data))
}

0 comments on commit e507c79

Please sign in to comment.