From 31da8ce827cbb5d269def799a4a38a476ae7223e Mon Sep 17 00:00:00 2001 From: Johan Brandhorst Date: Mon, 16 Oct 2017 21:10:52 +0100 Subject: [PATCH] Update to gRPC 1.7 functions. Changes: - ServerTransport.Write now takes 1 header slice and 1 payload slice. - metadata.FromContext was removed. Replace with metadata.FromIncomingContext. --- test/server/main.go | 6 +++--- wsproxy/wsproxy.go | 16 +++++++++++----- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/test/server/main.go b/test/server/main.go index b1aecc31..19f8413e 100644 --- a/test/server/main.go +++ b/test/server/main.go @@ -125,7 +125,7 @@ func (s *testSrv) PingEmpty(ctx context.Context, _ *empty.Empty) (*testproto.Pin func (s *testSrv) Ping(ctx context.Context, ping *testproto.PingRequest) (*testproto.PingResponse, error) { if ping.GetCheckMetadata() { - md, ok := metadata.FromContext(ctx) + md, ok := metadata.FromIncomingContext(ctx) if !ok || len(md[strings.ToLower(shared.ClientMDTestKey)]) == 0 || md[strings.ToLower(shared.ClientMDTestKey)][0] != shared.ClientMDTestValue { return nil, status.Errorf(codes.InvalidArgument, "Metadata was invalid") @@ -174,7 +174,7 @@ func (s *testSrv) PingError(ctx context.Context, ping *testproto.PingRequest) (* func (s *testSrv) PingList(ping *testproto.PingRequest, stream testproto.TestService_PingListServer) error { if ping.GetCheckMetadata() { - md, ok := metadata.FromContext(stream.Context()) + md, ok := metadata.FromIncomingContext(stream.Context()) if !ok || len(md[strings.ToLower(shared.ClientMDTestKey)]) == 0 || md[strings.ToLower(shared.ClientMDTestKey)][0] != shared.ClientMDTestValue { return status.Errorf(codes.InvalidArgument, "Metadata was invalid") @@ -208,7 +208,7 @@ func (s *testSrv) PingList(ping *testproto.PingRequest, stream testproto.TestSer if !ok { return status.Errorf(codes.Internal, "lowLevelServerStream does not exist in context") } - lowLevelServerStream.ServerTransport().Write(lowLevelServerStream, make([]byte, 0), &transport.Options{ + lowLevelServerStream.ServerTransport().Write(lowLevelServerStream, nil, nil, &transport.Options{ Delay: false, }) } diff --git a/wsproxy/wsproxy.go b/wsproxy/wsproxy.go index 2605eec7..fe454366 100644 --- a/wsproxy/wsproxy.go +++ b/wsproxy/wsproxy.go @@ -8,6 +8,7 @@ import ( "net" "net/http" "strings" + "time" "github.com/gorilla/websocket" "google.golang.org/grpc/codes" @@ -18,6 +19,8 @@ import ( "github.com/johanbrandhorst/protobuf/internal" ) +const headerSize = 5 + // Logger is the interface used by the proxy to log events type Logger interface { Debugln(...interface{}) @@ -115,11 +118,14 @@ func (p *proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { host := withPort(r.Host) p.logger.Debugln("Creating new transport with addr:", host) - t, err := transport.NewClientTransport(ctx, + t, err := transport.NewClientTransport( + ctx, transport.TargetInfo{Addr: host}, transport.ConnectOptions{ TransportCredentials: p.creds, - }) + }, + time.Second*20, + ) if err != nil { closeMsg := formatCloseMessage(websocket.CloseInternalServerErr, err.Error()) _ = conn.WriteMessage(websocket.CloseMessage, closeMsg) @@ -182,14 +188,14 @@ func (p *proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { } p.logger.Debugln("[READ] Read payload:", payload) if internal.IsCloseMessage(payload) { - err = t.Write(s, nil, &transport.Options{Last: true}) + err = t.Write(s, nil, nil, &transport.Options{Last: true}) if err == io.EOF || err == nil { // Do not want to cancel context here, want // Writer to read io.EOF then exit. return } } else { - err = t.Write(s, payload, &transport.Options{Last: false}) + err = t.Write(s, payload[:headerSize], payload[headerSize:], &transport.Options{Last: false}) } if err != nil { @@ -204,7 +210,7 @@ func (p *proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { }() // Write loop -- take messages from stream and write to websocket - var header [5]byte + var header [headerSize]byte var msg []byte for { // Read header