Skip to content

Commit

Permalink
fix: use io.Copy instead of ReadLine for http streaming (#11)
Browse files Browse the repository at this point in the history
Current behavior may cause unintended response stream since it reads
line and concatenate them with newline.

This commit fixes the issue by using `io.Copy` to stream response body
instead of using `ReadLine`.

---------

Signed-off-by: Sunghoon Kang <hoon@akuity.io>
  • Loading branch information
Sunghoon Kang authored Sep 12, 2024
1 parent e933104 commit 55a48e8
Show file tree
Hide file tree
Showing 10 changed files with 534 additions and 96 deletions.
9 changes: 9 additions & 0 deletions api/proto/testv1/test.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ service TestService {
get: "/download-invitations"
};
}
rpc DownloadLargeFile(DownloadLargeFileRequest) returns (stream google.api.HttpBody) {
option (google.api.http) = {
get: "/download-large-file"
};
}
}

message InvitationMetadata {
Expand Down Expand Up @@ -78,3 +83,7 @@ message TrackInvitationResponse {
message DownloadInvitationsRequest {
optional EventType type = 1;
}

message DownloadLargeFileRequest {
// Explicitly empty
}
8 changes: 8 additions & 0 deletions internal/assets/assets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package assets

import _ "embed"

var (
//go:embed large_file.txt
LargeFile string
)
211 changes: 211 additions & 0 deletions internal/assets/large_file.txt

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions internal/test/gen/testv1/test.gw.client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

202 changes: 132 additions & 70 deletions internal/test/gen/testv1/test.pb.go

Large diffs are not rendered by default.

50 changes: 50 additions & 0 deletions internal/test/gen/testv1/test.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 64 additions & 0 deletions internal/test/gen/testv1/test_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions internal/test/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
"github.com/bufbuild/protoyaml-go"
"google.golang.org/genproto/googleapis/api/httpbody"

"github.com/akuity/grpc-gateway-client/internal/assets"
"github.com/akuity/grpc-gateway-client/internal/test/gen/testv1"

_ "embed"
)

type testServiceServer struct {
Expand Down Expand Up @@ -76,3 +79,13 @@ func (s *testServiceServer) DownloadInvitations(req *testv1.DownloadInvitationsR
}
return nil
}

func (s *testServiceServer) DownloadLargeFile(
req *testv1.DownloadLargeFileRequest,
srv testv1.TestService_DownloadLargeFileServer,
) error {
return srv.Send(&httpbody.HttpBody{
ContentType: "text/plain",
Data: []byte(assets.LargeFile),
})
}
36 changes: 10 additions & 26 deletions pkg/grpc/gateway/request.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package gateway

import (
"bufio"
"bytes"
"context"
"encoding/json"
Expand Down Expand Up @@ -164,32 +163,17 @@ func doHTTPStreamingRequest(ctx context.Context, c Client, req *resty.Request) (
contentType := res.Header().Get("Content-Type")
body := res.RawBody()
defer func() { _ = body.Close() }()
r := bufio.NewReader(body)
for {
var data bytes.Buffer
for {
line, isPrefix, err := r.ReadLine()
if err != nil {
if errors.Is(err, io.EOF) {
close(resCh)
return
}
errCh <- fmt.Errorf("read line: %w", err)
return
}
if _, err := data.Write(append(line, '\n')); err != nil {
errCh <- fmt.Errorf("write line: %w", err)
return
}
if !isPrefix {
break
}
}
resCh <- &httpbody.HttpBody{
ContentType: contentType,
Data: data.Bytes(),
}

var data bytes.Buffer
if _, err := io.Copy(&data, body); err != nil {
errCh <- fmt.Errorf("copy body: %w", err)
return
}
resCh <- &httpbody.HttpBody{
ContentType: contentType,
Data: data.Bytes(),
}
close(resCh)
}()
return resCh, errCh, nil
}
Expand Down
31 changes: 31 additions & 0 deletions pkg/grpc/gateway/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/bufbuild/protoyaml-go"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/genproto/googleapis/api/httpbody"
"google.golang.org/grpc"
Expand All @@ -21,9 +22,12 @@ import (
"google.golang.org/grpc/test/bufconn"
"google.golang.org/protobuf/proto"

"github.com/akuity/grpc-gateway-client/internal/assets"
"github.com/akuity/grpc-gateway-client/internal/test/gen/testv1"
"github.com/akuity/grpc-gateway-client/internal/test/server"
"github.com/akuity/grpc-gateway-client/pkg/grpc/gateway"

_ "embed"
)

type RequestTestSuite struct {
Expand Down Expand Up @@ -141,6 +145,33 @@ read:
}
}

func (s *RequestTestSuite) TestDownloadLargeFileRequest() {
ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond)
defer cancel()
req := s.client.NewRequest(http.MethodGet, "/download-large-file")
resCh, errCh, err := gateway.DoStreamingRequest[httpbody.HttpBody](ctx, s.client, req)
s.Require().NoError(err)
var buf bytes.Buffer

read:
for {
select {
case <-ctx.Done():
break read
case err := <-errCh:
s.Require().NoError(err)
case data, ok := <-resCh:
if !ok {
break read
}
buf.Write(data.GetData())
}
}

require.NoError(s.T(), ctx.Err())
require.Equal(s.T(), strings.TrimSpace(assets.LargeFile), strings.TrimSpace(buf.String()))
}

func (s *RequestTestSuite) TestDownloadRequest_Error() {
ctx, cancel := context.WithTimeout(context.TODO(), 500*time.Millisecond)
defer cancel()
Expand Down

0 comments on commit 55a48e8

Please sign in to comment.