Skip to content

Commit

Permalink
drainer,pump: Add support for enabling gzip grpc compression (#495)
Browse files Browse the repository at this point in the history
  • Loading branch information
suzaku authored and july2993 committed Mar 27, 2019
1 parent 7a4aa61 commit a88a371
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 2 deletions.
3 changes: 3 additions & 0 deletions cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ data-dir = "data.drainer"
# a comma separated list of PD endpoints
pd-urls = "http://127.0.0.1:2379"

# Use the specified compressor to compress payload between pump and drainer
compressor = ""

#[security]
# Path of file that contains list of trusted SSL CAs for connection with cluster components.
# ssl-ca = "/path/to/ca.pem"
Expand Down
17 changes: 17 additions & 0 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
var (
maxBinlogItemCount int
defaultBinlogItemCount = 16 << 12
supportedCompressors = [...]string{"gzip"}
)

// SyncerConfig is the Syncer's configuration.
Expand Down Expand Up @@ -68,6 +69,7 @@ type Config struct {
SyncerCfg *SyncerConfig `toml:"syncer" json:"sycner"`
Security security.Config `toml:"security" json:"security"`
SyncedCheckTime int `toml:"synced-check-time" json:"synced-check-time"`
Compressor string `toml:"compressor" json:"compressor"`
EtcdTimeout time.Duration
MetricsAddr string
MetricsInterval int
Expand Down Expand Up @@ -100,6 +102,7 @@ func NewConfig() *Config {
fs.StringVar(&cfg.LogFile, "log-file", "", "log file path")
fs.StringVar(&cfg.LogRotate, "log-rotate", "", "log file rotate type, hour/day")
fs.Int64Var(&cfg.InitialCommitTS, "initial-commit-ts", 0, "if drainer donesn't have checkpoint, use initial commitTS to initial checkpoint")
fs.StringVar(&cfg.Compressor, "compressor", "", "use the specified compressor to compress payload between pump and drainer, only 'gzip' is supported now (default \"\", ie. compression disabled.)")
fs.IntVar(&cfg.SyncerCfg.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch")
fs.StringVar(&cfg.SyncerCfg.IgnoreSchemas, "ignore-schemas", "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql", "disable sync those schemas")
fs.IntVar(&cfg.SyncerCfg.WorkerCount, "c", 16, "parallel worker count")
Expand Down Expand Up @@ -226,6 +229,20 @@ func (cfg *Config) validate() error {
return errors.Errorf("parse EtcdURLs error: %s, %v", cfg.EtcdURLs, err)
}

if cfg.Compressor != "" {
found := false
for _, c := range supportedCompressors {
if cfg.Compressor == c {
found = true
break
}
}
if !found {
return errors.Errorf(
"Invalid compressor: %v, must be one of these: %v", cfg.Compressor, supportedCompressors)
}
}

return nil
}

Expand Down
10 changes: 9 additions & 1 deletion drainer/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/status"
)

Expand Down Expand Up @@ -177,7 +178,14 @@ func (p *Pump) createPullBinlogsClient(ctx context.Context, last int64) error {
p.grpcConn.Close()
}

conn, err := grpc.Dial(p.addr, grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
callOpts := []grpc.CallOption{grpc.MaxCallRecvMsgSize(maxMsgSize)}

if compressor, ok := ctx.Value(drainerKeyType("compressor")).(string); ok {
log.Infof("[pump %s] grpc compression enabled", p.nodeID)
callOpts = append(callOpts, grpc.UseCompressor(compressor))
}

conn, err := grpc.Dial(p.addr, grpc.WithInsecure(), grpc.WithDefaultCallOptions(callOpts...))
if err != nil {
log.Errorf("[pump %s] create grpc dial error %v", p.nodeID, err)
p.pullCli = nil
Expand Down
4 changes: 4 additions & 0 deletions drainer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ var (
clusterID uint64
)

type drainerKeyType string

// Server implements the gRPC interface,
// and maintains the runtime status
type Server struct {
Expand Down Expand Up @@ -83,6 +85,8 @@ func NewServer(cfg *Config) (*Server, error) {
}

ctx, cancel := context.WithCancel(context.Background())
ctx = context.WithValue(ctx, drainerKeyType("compressor"), cfg.Compressor)

clusterID = pdCli.GetClusterID(ctx)
// update latestTS and latestTime
latestTS, err := util.GetTSO(pdCli)
Expand Down
3 changes: 2 additions & 1 deletion pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/gorilla/mux"
"github.com/ngaut/log"
"github.com/pingcap/errors"
"github.com/pingcap/pd/client"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb-binlog/pkg/flags"
"github.com/pingcap/tidb-binlog/pkg/node"
"github.com/pingcap/tidb-binlog/pkg/util"
Expand All @@ -33,6 +33,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
_ "google.golang.org/grpc/encoding/gzip"
)

var notifyDrainerTimeout = time.Second * 10
Expand Down

0 comments on commit a88a371

Please sign in to comment.