Skip to content

Commit

Permalink
lightning: use gRPC prepared message to avoid duplicated encoding (#4…
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Jul 28, 2023
1 parent 79f0001 commit 50e4911
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 14 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ go_test(
"@com_github_tikv_pd_client//errs",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//encoding",
"@org_golang_google_grpc//status",
"@org_uber_go_atomic//:atomic",
],
Expand Down
29 changes: 29 additions & 0 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"sync/atomic"
"testing"
"time"
_ "unsafe"

"github.com/cockroachdb/pebble"
"github.com/docker/go-units"
Expand Down Expand Up @@ -59,6 +60,7 @@ import (
pd "github.com/tikv/pd/client"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/status"
)

Expand Down Expand Up @@ -742,6 +744,33 @@ func (m mockWriteClient) CloseAndRecv() (*sst.WriteResponse, error) {
return m.writeResp, nil
}

type baseCodec interface {
Marshal(v interface{}) ([]byte, error)
Unmarshal(data []byte, v interface{}) error
}

//go:linkname newContextWithRPCInfo google.golang.org/grpc.newContextWithRPCInfo
func newContextWithRPCInfo(ctx context.Context, failfast bool, codec baseCodec, cp grpc.Compressor, comp encoding.Compressor) context.Context

type mockCodec struct{}

func (m mockCodec) Marshal(v interface{}) ([]byte, error) {
return nil, nil
}

func (m mockCodec) Unmarshal(data []byte, v interface{}) error {
return nil
}

func (m mockWriteClient) Context() context.Context {
ctx := context.Background()
return newContextWithRPCInfo(ctx, false, mockCodec{}, nil, nil)
}

func (m mockWriteClient) SendMsg(_ interface{}) error {
return nil
}

func (c *mockImportClient) Write(ctx context.Context, opts ...grpc.CallOption) (sst.ImportSST_WriteClient, error) {
if c.apiInvokeRecorder != nil {
c.apiInvokeRecorder["Write"] = append(c.apiInvokeRecorder["Write"], c.store.GetId())
Expand Down
34 changes: 20 additions & 14 deletions br/pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/mathutil"
"go.uber.org/zap"
"google.golang.org/grpc"
)

type jobStageTp string
Expand Down Expand Up @@ -225,7 +226,11 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
leaderID := j.region.Leader.GetId()
clients := make([]sst.ImportSST_WriteClient, 0, len(region.GetPeers()))
allPeers := make([]*metapb.Peer, 0, len(region.GetPeers()))
requests := make([]*sst.WriteRequest, 0, len(region.GetPeers()))
req := &sst.WriteRequest{
Chunk: &sst.WriteRequest_Meta{
Meta: meta,
},
}
for _, peer := range region.GetPeers() {
cli, err := clientFactory.Create(ctx, peer.StoreId)
if err != nil {
Expand All @@ -238,23 +243,17 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
}

// Bind uuid for this write request
req := &sst.WriteRequest{
Chunk: &sst.WriteRequest_Meta{
Meta: meta,
},
}
if err = wstream.Send(req); err != nil {
return annotateErr(err, peer)
}
req.Chunk = &sst.WriteRequest_Batch{
Batch: &sst.WriteBatch{
CommitTs: j.engine.TS,
},
}
clients = append(clients, wstream)
requests = append(requests, req)
allPeers = append(allPeers, peer)
}
req.Chunk = &sst.WriteRequest_Batch{
Batch: &sst.WriteBatch{
CommitTs: j.engine.TS,
},
}

bytesBuf := bufferPool.NewBuffer()
defer bytesBuf.Destroy()
Expand All @@ -271,12 +270,19 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
}

flushKVs := func() error {
req.Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count]
preparedMsg := &grpc.PreparedMsg{}
// by reading the source code, Encode need to find codec and compression from the stream
// because all stream has the same codec and compression, we can use any one of them
if err := preparedMsg.Encode(clients[0], req); err != nil {
return err
}

for i := range clients {
if err := writeLimiter.WaitN(ctx, allPeers[i].StoreId, int(size)); err != nil {
return errors.Trace(err)
}
requests[i].Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count]
if err := clients[i].Send(requests[i]); err != nil {
if err := clients[i].SendMsg(preparedMsg); err != nil {
return annotateErr(err, allPeers[i])
}
}
Expand Down

0 comments on commit 50e4911

Please sign in to comment.