Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http parser performance #468

Merged
merged 3 commits into from
Dec 8, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions libbeat/publisher/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ func updateEventAddresses(publisher *PublisherType, event common.MapStr) bool {

if publisher.GeoLite != nil {
realIP, exists := event["real_ip"]
if exists && len(realIP.(string)) > 0 {
loc := publisher.GeoLite.GetLocationByIP(realIP.(string))
if exists && len(realIP.(common.NetString)) > 0 {
loc := publisher.GeoLite.GetLocationByIP(string(realIP.(common.NetString)))
if loc != nil && loc.Latitude != 0 && loc.Longitude != 0 {
loc := fmt.Sprintf("%f, %f", loc.Latitude, loc.Longitude)
event["client_location"] = loc
Expand Down
83 changes: 60 additions & 23 deletions packetbeat/protos/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ type HTTP struct {
results publisher.Client
}

var (
isDebug = false
isDetailed = false
)

func (http *HTTP) initDefaults() {
http.SendRequest = false
http.SendResponse = false
Expand Down Expand Up @@ -138,6 +143,9 @@ func (http *HTTP) Init(testMode bool, results publisher.Client) error {
}
}

isDebug = logp.IsDebug("http")
isDetailed = logp.IsDebug("httpdetailed")

http.results = results

return nil
Expand All @@ -154,14 +162,17 @@ func (http *HTTP) messageGap(s *stream, nbytes int) (ok bool, complete bool) {
// we know we cannot recover from these
return false, false
case stateBody:
debugf("gap in body: %d", nbytes)
if isDebug {
debugf("gap in body: %d", nbytes)
}

if m.IsRequest {
m.Notes = append(m.Notes, "Packet loss while capturing the request")
} else {
m.Notes = append(m.Notes, "Packet loss while capturing the response")
}
if !m.hasContentLength && (m.connection == "close" ||
(isVersion(m.version, 1, 0) && m.connection != "keep-alive")) {
if !m.hasContentLength && (bytes.Equal(m.connection, constClose) ||
(isVersion(m.version, 1, 0) && !bytes.Equal(m.connection, constKeepAlive))) {

s.bodyReceived += nbytes
m.ContentLength += nbytes
Expand Down Expand Up @@ -256,7 +267,9 @@ func (http *HTTP) doParse(
dir uint8,
) *httpConnectionData {

detailedf("Payload received: [%s]", pkt.Payload)
if isDetailed {
detailedf("Payload received: [%s]", pkt.Payload)
}

st := conn.Streams[dir]
if st == nil {
Expand All @@ -266,7 +279,9 @@ func (http *HTTP) doParse(
// concatenate bytes
st.data = append(st.data, pkt.Payload...)
if len(st.data) > tcp.TCP_MAX_DATA_IN_STREAM {
debugf("Stream data too large, dropping TCP stream")
if isDebug {
debugf("Stream data too large, dropping TCP stream")
}
conn.Streams[dir] = nil
return conn
}
Expand Down Expand Up @@ -355,7 +370,9 @@ func (http *HTTP) GapInStream(tcptuple *common.TcpTuple, dir uint8,
}

ok, complete := http.messageGap(stream, nbytes)
detailedf("messageGap returned ok=%v complete=%v", ok, complete)
if isDetailed {
detailedf("messageGap returned ok=%v complete=%v", ok, complete)
}
if !ok {
// on errors, drop stream
conn.Streams[dir] = nil
Expand Down Expand Up @@ -384,10 +401,14 @@ func (http *HTTP) handleHTTP(
http.hideHeaders(m)

if m.IsRequest {
debugf("Received request with tuple: %s", m.TCPTuple)
if isDebug {
debugf("Received request with tuple: %s", m.TCPTuple)
}
conn.requests.append(m)
} else {
debugf("Received response with tuple: %s", m.TCPTuple)
if isDebug {
debugf("Received response with tuple: %s", m.TCPTuple)
}
conn.responses.append(m)
http.correlate(conn)
}
Expand All @@ -409,7 +430,9 @@ func (http *HTTP) correlate(conn *httpConnectionData) {
resp := conn.responses.pop()
trans := http.newTransaction(requ, resp)

debugf("HTTP transaction completed")
if isDebug {
debugf("HTTP transaction completed")
}
http.publishTransaction(trans)
}
}
Expand Down Expand Up @@ -504,7 +527,7 @@ func (http *HTTP) collectHeaders(m *message) interface{} {
hdrs := map[string]interface{}{}
for name, value := range m.Headers {
if name == cookie {
hdrs[name] = splitCookiesHeader(value)
hdrs[name] = splitCookiesHeader(string(value))
} else {
hdrs[name] = value
}
Expand Down Expand Up @@ -546,22 +569,30 @@ func (http *HTTP) cutMessageBody(m *message) []byte {
if len(m.chunkedBody) > 0 {
cutMsg = append(cutMsg, m.chunkedBody...)
} else {
debugf("Body to include: [%s]", m.Raw[m.bodyOffset:])
if isDebug {
debugf("Body to include: [%s]", m.Raw[m.bodyOffset:])
}
cutMsg = append(cutMsg, m.Raw[m.bodyOffset:]...)
}
}

return cutMsg
}

func (http *HTTP) shouldIncludeInBody(contenttype string) bool {
func (http *HTTP) shouldIncludeInBody(contenttype []byte) bool {
includedBodies := config.ConfigSingleton.Protocols.Http.Include_body_for
for _, include := range includedBodies {
if strings.Contains(contenttype, include) {
debugf("Should Include Body = true Content-Type " + contenttype + " include_body " + include)
if bytes.Contains(contenttype, []byte(include)) {
if isDebug {
debugf("Should Include Body = true Content-Type %s include_body %s",
contenttype, include)
}
return true
}
debugf("Should Include Body = false Content-Type" + contenttype + " include_body " + include)
if isDebug {
debugf("Should Include Body = false Content-Type %s include_body %s",
contenttype, include)
}
}
return false
}
Expand All @@ -582,7 +613,10 @@ func (http *HTTP) hideHeaders(m *message) {
authHeaderEndX := m.bodyOffset

for authHeaderStartX < m.bodyOffset {
debugf("looking for authorization from %d to %d", authHeaderStartX, authHeaderEndX)
if isDebug {
debugf("looking for authorization from %d to %d",
authHeaderStartX, authHeaderEndX)
}

startOfHeader := bytes.Index(msg[authHeaderStartX:m.bodyOffset], authText)
if startOfHeader >= 0 {
Expand All @@ -596,7 +630,9 @@ func (http *HTTP) hideHeaders(m *message) {
authHeaderEndX = m.bodyOffset
}

debugf("Redact authorization from %d to %d", authHeaderStartX, authHeaderEndX)
if isDebug {
debugf("Redact authorization from %d to %d", authHeaderStartX, authHeaderEndX)
}

for i := authHeaderStartX + len(authText); i < authHeaderEndX; i++ {
msg[i] = byte('*')
Expand All @@ -608,16 +644,15 @@ func (http *HTTP) hideHeaders(m *message) {
}

for _, header := range redactHeaders {
if m.Headers[header] != "" {
m.Headers[header] = "*"
if len(m.Headers[header]) > 0 {
m.Headers[header] = []byte("*")
}
}

m.Raw = msg
}

func (http *HTTP) hideSecrets(values url.Values) url.Values {

params := url.Values{}
for key, array := range values {
for _, value := range array {
Expand All @@ -637,7 +672,7 @@ func (http *HTTP) hideSecrets(values url.Values) url.Values {
func (http *HTTP) extractParameters(m *message, msg []byte) (path string, params string, err error) {
var values url.Values

u, err := url.Parse(m.RequestURI)
u, err := url.Parse(string(m.RequestURI))
if err != nil {
return
}
Expand All @@ -646,7 +681,7 @@ func (http *HTTP) extractParameters(m *message, msg []byte) (path string, params

paramsMap := http.hideSecrets(values)

if m.ContentLength > 0 && strings.Contains(m.ContentType, "urlencoded") {
if m.ContentLength > 0 && bytes.Contains(m.ContentType, []byte("urlencoded")) {
values, err = url.ParseQuery(string(msg[m.bodyOffset:]))
if err != nil {
return
Expand All @@ -658,7 +693,9 @@ func (http *HTTP) extractParameters(m *message, msg []byte) (path string, params
}
params = paramsMap.Encode()

detailedf("Parameters: %s", params)
if isDetailed {
detailedf("Parameters: %s", params)
}

return
}
Expand Down
Loading