Skip to content

Commit

Permalink
perf(rpc): use gogoproto codec (#236)
Browse files Browse the repository at this point in the history
### What this PR does

Official grpc-go has not been compatible with gogoproto since
grpc/grpc-go#3958. To use gogoproto's marshaler and unmarshaler, a custom
codec should be registered.

A simple benchmark show it worth:

```
name          old time/op    new time/op    delta
StreamRPC-16    7.49µs ± 3%    6.98µs ± 5%  -6.85%  (p=0.000 n=20+20)

name          old alloc/op   new alloc/op   delta
StreamRPC-16    16.4kB ± 0%    16.4kB ± 0%  -0.24%  (p=0.000 n=20+20)

name          old allocs/op  new allocs/op  delta
StreamRPC-16      51.0 ± 0%      49.0 ± 0%  -3.92%  (p=0.000 n=19+17)
```

### Which issue(s) this PR resolves

Resolves #235

### Anything else

Links:

- grpc/grpc-go#3958
- grpc/grpc-go#4466
  • Loading branch information
ijsong authored Aug 15, 2023
2 parents 719a85d + aae2495 commit c4224c0
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 0 deletions.
2 changes: 2 additions & 0 deletions internal/storagenode/replication_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (rs *replicationServer) SyncReplicateStream(stream snpb.Replicator_SyncRepl
var err error
req := new(snpb.SyncReplicateRequest)
for {
req.Reset()
err = stream.RecvMsg(req)
if err != nil {
if err == io.EOF {
Expand Down Expand Up @@ -114,6 +115,7 @@ func (rs *replicationServer) recv(ctx context.Context, stream snpb.Replicator_Re
defer close(c)
req := &snpb.ReplicateRequest{}
for {
req.Reset()
err := stream.RecvMsg(req)
rst := newReplicationServerTask(*req, err)
select {
Expand Down
3 changes: 3 additions & 0 deletions internal/storagenode/reportcommit_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (rcs reportCommitServer) GetReport(stream snpb.LogStreamReporter_GetReportS
}
ctx := stream.Context()
for {
req.Reset()
err = stream.RecvMsg(req)
if err == io.EOF {
return nil
Expand Down Expand Up @@ -68,6 +69,7 @@ func (rcs reportCommitServer) Commit(stream snpb.LogStreamReporter_CommitServer)
req := &snpb.CommitRequest{}
ctx := stream.Context()
for {
req.Reset()
err = stream.RecvMsg(req)
if err == io.EOF {
return nil
Expand Down Expand Up @@ -98,6 +100,7 @@ func (rcs reportCommitServer) CommitBatch(stream snpb.LogStreamReporter_CommitBa
req := &snpb.CommitBatchRequest{}
ctx := stream.Context()
for {
req.Reset()
err = stream.RecvMsg(req)
if err == io.EOF {
return nil
Expand Down
35 changes: 35 additions & 0 deletions pkg/rpc/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package rpc

import (
gogoproto "github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/proto" //nolint:staticcheck
"google.golang.org/grpc/encoding"
)

const name = "proto"

type codec struct{}

var _ encoding.Codec = codec{}

func init() {
encoding.RegisterCodec(codec{})
}

func (codec) Marshal(v interface{}) ([]byte, error) {
if m, ok := v.(gogoproto.Marshaler); ok {
return m.Marshal()
}
return proto.Marshal(v.(proto.Message))
}

func (codec) Unmarshal(data []byte, v interface{}) error {
if m, ok := v.(gogoproto.Unmarshaler); ok {
return m.Unmarshal(data)
}
return proto.Unmarshal(data, v.(proto.Message))
}

func (codec) Name() string {
return name
}

0 comments on commit c4224c0

Please sign in to comment.