Skip to content

Commit

Permalink
runtime: marshal custom stream delimiters
Browse files Browse the repository at this point in the history
  • Loading branch information
emcfarlane authored and achew22 committed Nov 26, 2017
1 parent f33cdd4 commit e4b8a93
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 0 deletions.
9 changes: 9 additions & 0 deletions runtime/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ func ForwardResponseStream(ctx context.Context, mux *ServeMux, marshaler Marshal
return
}

var delimiter []byte
if d, ok := marshaler.(Delimited); ok {
delimiter = d.Delimiter()
}

var wroteHeader bool
for {
resp, err := recv()
Expand All @@ -64,6 +69,10 @@ func ForwardResponseStream(ctx context.Context, mux *ServeMux, marshaler Marshal
return
}
wroteHeader = true
if _, err = w.Write(delimiter); err != nil {
grpclog.Printf("Failed to send delimiter chunk: %v", err)
return
}
f.Flush()
}
}
Expand Down
1 change: 1 addition & 0 deletions runtime/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func TestForwardResponseStream(t *testing.T) {
t.Errorf("marshaler.Marshal() failed %v", err)
}
want = append(want, b...)
want = append(want, marshaler.Delimiter()...)
}

if string(body) != string(want) {
Expand Down
5 changes: 5 additions & 0 deletions runtime/marshal_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,8 @@ func (j *JSONBuiltin) NewDecoder(r io.Reader) Decoder {
func (j *JSONBuiltin) NewEncoder(w io.Writer) Encoder {
return json.NewEncoder(w)
}

// Delimiter for newline encoded JSON streams.
func (j *JSONBuiltin) Delimiter() []byte {
return []byte("\n")
}
5 changes: 5 additions & 0 deletions runtime/marshal_jsonpb.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,8 @@ type protoEnum interface {
}

var typeProtoMessage = reflect.TypeOf((*proto.Message)(nil)).Elem()

// Delimiter for newline encoded JSON streams.
func (j *JSONPb) Delimiter() []byte {
return []byte("\n")
}
6 changes: 6 additions & 0 deletions runtime/marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,9 @@ type EncoderFunc func(v interface{}) error

// Encode delegates invocations to the underlying function itself.
func (f EncoderFunc) Encode(v interface{}) error { return f(v) }

// Delimited defines the streaming delimiter.
type Delimited interface {
// Delimiter returns the record seperator for the stream.
Delimiter() []byte
}

0 comments on commit e4b8a93

Please sign in to comment.