Skip to content

Commit

Permalink
fix: incorrect stream id in http2 protocol data frame
Browse files Browse the repository at this point in the history
  • Loading branch information
yuweizzz committed Feb 15, 2025
1 parent 6f78052 commit d89a60d
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 44 deletions.
52 changes: 30 additions & 22 deletions pkg/event_processor/http2_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,8 @@ func (h2r *HTTP2Request) Display() []byte {
log.Println("[http2 request] Discard HTTP2 Magic error:", err)
return h2r.reader.Bytes()
}
var encoding string
var dataFrameStreamID uint32
dataBuf := bytes.NewBuffer(nil)
encodingMap := make(map[uint32]string)
dataBufMap := make(map[uint32]*bytes.Buffer)
frameBuf := bytes.NewBufferString("")
for {
f, err := h2r.framer.ReadFrame()
Expand All @@ -105,46 +104,55 @@ func (h2r *HTTP2Request) Display() []byte {
}
switch f := f.(type) {
case *http2.MetaHeadersFrame:
frameBuf.WriteString(fmt.Sprintf("\nFrame Type\t=>\tHEADERS\nFrame StreamID\t=>\t%d\n", f.StreamID))
streamID := f.StreamID
dataBufMap[streamID] = bytes.NewBuffer(nil)
frameBuf.WriteString(fmt.Sprintf("\nFrame Type\t=>\tHEADERS\nFrame StreamID\t=>\t%d\n", streamID))
for _, header := range f.Fields {
frameBuf.WriteString(fmt.Sprintf("%s\n", header.String()))
if header.Name == "content-encoding" {
encoding = header.Value
encodingMap[streamID] = header.Value
}
}
case *http2.DataFrame:
dataFrameStreamID = f.StreamID
_, err := dataBuf.Write(f.Data())
if err != nil {
log.Println("[http2 request] Write HTTP2 Data Frame buffuer error:", err)
streamID := f.StreamID
frameBuf.WriteString(fmt.Sprintf("\nFrame Type\t=>\tDATA\nFrame StreamID\t=>\t%d\n", streamID))
payload := f.Data()
switch encodingMap[streamID] {
case "gzip":
h2r.packerType = PacketTypeGzip
frameBuf.WriteString("Partial entity body with gzip encoding ... \n")
_, err := dataBufMap[streamID].Write(payload)
if err != nil {
log.Println("[http2 request] Write HTTP2 Data Frame buffuer error:", err)
}
default:
h2r.packerType = PacketTypeNull
frameBuf.Write(payload)
frameBuf.WriteString("\n")
}
default:
fh := f.Header()
frameBuf.WriteString(fmt.Sprintf("\nFrame Type\t=>\t%s\nFrame StreamID\t=>\t%d\n", fh.Type.String(), fh.StreamID))
}
}
// merge data frame
if dataBuf.Len() > 0 {
frameBuf.WriteString(fmt.Sprintf("\nFrame Type\t=>\tDATA\nFrame StreamID\t=>\t%d\n", dataFrameStreamID))
payload := dataBuf.Bytes()
switch encoding {
case "gzip":
// merge data frame with encoding
for id, buf := range dataBufMap {
if buf.Len() > 0 && encodingMap[id] == "gzip" {
payload := buf.Bytes()
reader, err := gzip.NewReader(bytes.NewReader(payload))
if err != nil {
log.Println("[http2 request] Create gzip reader error:", err)
break
continue
}
defer reader.Close()
payload, err = io.ReadAll(reader)
if err != nil {
log.Println("[http2 request] Uncompress gzip data error:", err)
break
continue
}
h2r.packerType = PacketTypeGzip
defer reader.Close()
default:
h2r.packerType = PacketTypeNull
frameBuf.WriteString(fmt.Sprintf("\nMerged Data Frame, StreamID\t=>\t%d\n", id))
frameBuf.Write(payload)
}
frameBuf.Write(payload)
}
return frameBuf.Bytes()
}
Expand Down
52 changes: 30 additions & 22 deletions pkg/event_processor/http2_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,8 @@ func (h2r *HTTP2Response) IsDone() bool {
}

func (h2r *HTTP2Response) Display() []byte {
var encoding string
var dataFrameStreamID uint32
dataBuf := bytes.NewBuffer(nil)
encodingMap := make(map[uint32]string)
dataBufMap := make(map[uint32]*bytes.Buffer)
frameBuf := bytes.NewBufferString("")
for {
f, err := h2r.framer.ReadFrame()
Expand All @@ -136,46 +135,55 @@ func (h2r *HTTP2Response) Display() []byte {
}
switch f := f.(type) {
case *http2.MetaHeadersFrame:
frameBuf.WriteString(fmt.Sprintf("\nFrame Type\t=>\tHEADERS\nFrame StreamID\t=>\t%d\n", f.StreamID))
streamID := f.StreamID
dataBufMap[streamID] = bytes.NewBuffer(nil)
frameBuf.WriteString(fmt.Sprintf("\nFrame Type\t=>\tHEADERS\nFrame StreamID\t=>\t%d\n", streamID))
for _, header := range f.Fields {
frameBuf.WriteString(fmt.Sprintf("%s\n", header.String()))
if header.Name == "content-encoding" {
encoding = header.Value
encodingMap[streamID] = header.Value
}
}
case *http2.DataFrame:
dataFrameStreamID = f.StreamID
_, err := dataBuf.Write(f.Data())
if err != nil {
log.Println("[http2 response] Write HTTP2 Data Frame buffuer error:", err)
streamID := f.StreamID
frameBuf.WriteString(fmt.Sprintf("\nFrame Type\t=>\tDATA\nFrame StreamID\t=>\t%d\n", streamID))
payload := f.Data()
switch encodingMap[streamID] {
case "gzip":
h2r.packerType = PacketTypeGzip
frameBuf.WriteString("Partial entity body with gzip encoding ... \n")
_, err := dataBufMap[streamID].Write(payload)
if err != nil {
log.Println("[http2 response] Write HTTP2 Data Frame buffuer error:", err)
}
default:
h2r.packerType = PacketTypeNull
frameBuf.Write(payload)
frameBuf.WriteString("\n")
}
default:
fh := f.Header()
frameBuf.WriteString(fmt.Sprintf("\nFrame Type\t=>\t%s\nFrame StreamID\t=>\t%d\n", fh.Type.String(), fh.StreamID))
}
}
// merge data frame
if dataBuf.Len() > 0 {
frameBuf.WriteString(fmt.Sprintf("\nFrame Type\t=>\tDATA\nFrame StreamID\t=>\t%d\n", dataFrameStreamID))
payload := dataBuf.Bytes()
switch encoding {
case "gzip":
// merge data frame with encoding
for id, buf := range dataBufMap {
if buf.Len() > 0 && encodingMap[id] == "gzip" {
payload := buf.Bytes()
reader, err := gzip.NewReader(bytes.NewReader(payload))
if err != nil {
log.Println("[http2 response] Create gzip reader error:", err)
break
continue
}
defer reader.Close()
payload, err = io.ReadAll(reader)
if err != nil {
log.Println("[http2 response] Uncompress gzip data error:", err)
break
continue
}
h2r.packerType = PacketTypeGzip
defer reader.Close()
default:
h2r.packerType = PacketTypeNull
frameBuf.WriteString(fmt.Sprintf("\nMerged Data Frame, StreamID\t=>\t%d\n", id))
frameBuf.Write(payload)
}
frameBuf.Write(payload)
}
return frameBuf.Bytes()
}
Expand Down

0 comments on commit d89a60d

Please sign in to comment.