Skip to content

Commit

Permalink
Stream unary if Content-Length is set
Browse files Browse the repository at this point in the history
  • Loading branch information
emcfarlane committed Jul 3, 2023
1 parent 574b6cb commit 08625c5
Showing 1 changed file with 39 additions and 10 deletions.
49 changes: 39 additions & 10 deletions grpcadapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,22 @@ func (r *bufferedEnvelopeReader) fillBuffer() error {
return nil
}

type envelopeReader struct {
io.ReadCloser

head [5]byte
index int
}

func (r *envelopeReader) Read(data []byte) (int, error) {
if r.index < len(r.head) {
n := copy(data, r.head[r.index:])
r.index += n
return n, nil
}
return r.ReadCloser.Read(data)
}

func translateConnectCommonToGRPC(request *http.Request) {
request.ProtoMajor = 2
request.ProtoMinor = 0
Expand Down Expand Up @@ -409,18 +425,32 @@ func translateConnectUnaryToGRPC(request *http.Request, contentType string, read
isCompressed: isCompressed,
maxBytes: readMaxBytes,
}
} else {
isCompressed := false
if contentEncoding := getHeaderCanonical(header, connectUnaryHeaderCompression); len(contentEncoding) > 0 {
isCompressed = contentEncoding != "identity"
delHeaderCanonical(header, connectUnaryHeaderCompression)
setHeaderCanonical(header, grpcHeaderCompression, contentEncoding)
}
return
}
isCompressed := false
if contentEncoding := getHeaderCanonical(header, connectUnaryHeaderCompression); len(contentEncoding) > 0 {
isCompressed = contentEncoding != "identity"
delHeaderCanonical(header, connectUnaryHeaderCompression)
setHeaderCanonical(header, grpcHeaderCompression, contentEncoding)
}
size := uint32(request.ContentLength)
if size <= 0 {
// no content length, buffer message
request.Body = &bufferedEnvelopeReader{
ReadCloser: request.Body,
isCompressed: isCompressed,
maxBytes: readMaxBytes,
}
return
}
var head [5]byte
if isCompressed {
head[0] = 1
}
binary.BigEndian.PutUint32(head[1:], size)
request.Body = &envelopeReader{
ReadCloser: request.Body,
head: head,
}
}

Expand Down Expand Up @@ -491,9 +521,7 @@ func (w *connectStreamingResponseWriter) writeHeader(statusCode int) {
}
header[key] = values
}
if statusCode != http.StatusOK {
w.ResponseWriter.WriteHeader(statusCode)
}
w.ResponseWriter.WriteHeader(statusCode)
w.wroteHeader = true
}

Expand Down Expand Up @@ -613,6 +641,7 @@ func (w *connectUnaryResponseWriter) writeHeader(statusCode int) {
}
header[key] = values
}
// Don't send headers to caputre trailers, unless there is an error.
if statusCode != http.StatusOK {
w.ResponseWriter.WriteHeader(statusCode)
}
Expand Down

0 comments on commit 08625c5

Please sign in to comment.