From 08625c5134a1436aca095fbfbce6a7b1a3b7a5d2 Mon Sep 17 00:00:00 2001 From: Edward McFarlane Date: Mon, 3 Jul 2023 16:17:18 +0100 Subject: [PATCH] Stream unary if Content-Length is set --- grpcadapter/adapter.go | 49 +++++++++++++++++++++++++++++++++--------- 1 file changed, 39 insertions(+), 10 deletions(-) diff --git a/grpcadapter/adapter.go b/grpcadapter/adapter.go index f6322d1b..cc1fc085 100644 --- a/grpcadapter/adapter.go +++ b/grpcadapter/adapter.go @@ -346,6 +346,22 @@ func (r *bufferedEnvelopeReader) fillBuffer() error { return nil } +type envelopeReader struct { + io.ReadCloser + + head [5]byte + index int +} + +func (r *envelopeReader) Read(data []byte) (int, error) { + if r.index < len(r.head) { + n := copy(data, r.head[r.index:]) + r.index += n + return n, nil + } + return r.ReadCloser.Read(data) +} + func translateConnectCommonToGRPC(request *http.Request) { request.ProtoMajor = 2 request.ProtoMinor = 0 @@ -409,18 +425,32 @@ func translateConnectUnaryToGRPC(request *http.Request, contentType string, read isCompressed: isCompressed, maxBytes: readMaxBytes, } - } else { - isCompressed := false - if contentEncoding := getHeaderCanonical(header, connectUnaryHeaderCompression); len(contentEncoding) > 0 { - isCompressed = contentEncoding != "identity" - delHeaderCanonical(header, connectUnaryHeaderCompression) - setHeaderCanonical(header, grpcHeaderCompression, contentEncoding) - } + return + } + isCompressed := false + if contentEncoding := getHeaderCanonical(header, connectUnaryHeaderCompression); len(contentEncoding) > 0 { + isCompressed = contentEncoding != "identity" + delHeaderCanonical(header, connectUnaryHeaderCompression) + setHeaderCanonical(header, grpcHeaderCompression, contentEncoding) + } + size := uint32(request.ContentLength) + if size <= 0 { + // no content length, buffer message request.Body = &bufferedEnvelopeReader{ ReadCloser: request.Body, isCompressed: isCompressed, maxBytes: readMaxBytes, } + return + } + var head [5]byte + if isCompressed { + head[0] = 1 + } + binary.BigEndian.PutUint32(head[1:], size) + request.Body = &envelopeReader{ + ReadCloser: request.Body, + head: head, } } @@ -491,9 +521,7 @@ func (w *connectStreamingResponseWriter) writeHeader(statusCode int) { } header[key] = values } - if statusCode != http.StatusOK { - w.ResponseWriter.WriteHeader(statusCode) - } + w.ResponseWriter.WriteHeader(statusCode) w.wroteHeader = true } @@ -613,6 +641,7 @@ func (w *connectUnaryResponseWriter) writeHeader(statusCode int) { } header[key] = values } + // Don't send headers to caputre trailers, unless there is an error. if statusCode != http.StatusOK { w.ResponseWriter.WriteHeader(statusCode) }