Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/http_endpoint: add support for request trace logging #36957

Merged
merged 4 commits into from
Nov 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d
- Add network processor in addition to interface based direction resolution. {pull}37023[37023]
- Add setup option `--force-enable-module-filesets`, that will act as if all filesets have been enabled in a module during setup. {issue}30915[30915] {pull}99999[99999]
- Make CEL input log current transaction ID when request tracing is turned on. {pull}37065[37065]
- Add request trace logging to http_endpoint input. {issue}36951[36951] {pull}36957[36957]

*Auditbeat*

Expand Down
40 changes: 40 additions & 0 deletions x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,46 @@ The secret token provided by the webhook owner for the CRC validation. It is req
The HTTP method handled by the endpoint. If specified, `method` must be `POST`, `PUT` or `PATCH`.
The default method is `POST`. If `PUT` or `PATCH` are specified, requests using those method types are accepted, but are treated as `POST` requests and are expected to have a request body containing the request data.

[float]
==== `tracer.filename`

It is possible to log HTTP requests to a local file-system for debugging configurations.
This option is enabled by setting the `tracer.filename` value. Additional options are available to
tune log rotation behavior.

To differentiate the trace files generated from different input instances, a placeholder `*` can be added to the filename and will be replaced with the input instance id.
For Example, `http-request-trace-*.ndjson`.

Enabling this option compromises security and should only be used for debugging.

[float]
==== `tracer.maxsize`

This value sets the maximum size, in megabytes, the log file will reach before it is rotated. By default
logs are allowed to reach 1MB before rotation.

[float]
==== `tracer.maxage`

This specifies the number days to retain rotated log files. If it is not set, log files are retained
indefinitely.

[float]
==== `tracer.maxbackups`

The number of old logs to retain. If it is not set all old logs are retained subject to the `tracer.maxage`
setting.

[float]
==== `tracer.localtime`

Whether to use the host's local time rather that UTC for timestamping rotated log file names.

[float]
==== `tracer.compress`

This determines whether rotated logs should be gzip compressed.

[float]
=== Metrics

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/http_endpoint/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
trace_logs
3 changes: 3 additions & 0 deletions x-pack/filebeat/input/http_endpoint/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"net/textproto"
"strings"

"gopkg.in/natefinch/lumberjack.v2"

"github.com/elastic/elastic-agent-libs/transport/tlscommon"
)

Expand Down Expand Up @@ -45,6 +47,7 @@ type config struct {
CRCSecret string `config:"crc.secret"`
IncludeHeaders []string `config:"include_headers"`
PreserveOriginalEvent bool `config:"preserve_original_event"`
Tracer *lumberjack.Logger `config:"tracer"`
}

func defaultConfig() config {
Expand Down
109 changes: 97 additions & 12 deletions x-pack/filebeat/input/http_endpoint/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@ import (
"errors"
"fmt"
"io"
"net"
"net/http"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/jsontransform"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand All @@ -29,10 +34,14 @@ var (
errNotCRC = errors.New("event not processed as CRC request")
)

type httpHandler struct {
log *logp.Logger
publisher stateless.Publisher
type handler struct {
metrics *inputMetrics
publisher stateless.Publisher
log *logp.Logger
validator apiValidator

reqLogger *zap.Logger
host, scheme string

messageField string
responseCode int
Expand All @@ -42,28 +51,44 @@ type httpHandler struct {
crc *crcValidator
}

// Triggers if middleware validation returns successful
func (h *httpHandler) apiResponse(w http.ResponseWriter, r *http.Request) {
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
status, err := h.validator.validateRequest(r)
if err != nil {
h.sendAPIErrorResponse(w, r, h.log, status, err)
return
}

start := time.Now()
h.metrics.batchesReceived.Add(1)
h.metrics.contentLength.Update(r.ContentLength)
body, status, err := getBodyReader(r)
if err != nil {
sendAPIErrorResponse(w, r, h.log, status, err)
h.sendAPIErrorResponse(w, r, h.log, status, err)
h.metrics.apiErrors.Add(1)
return
}
defer body.Close()

if h.reqLogger != nil {
// If we are logging, keep a copy of the body for the logger.
// This is stashed in the r.Body field. This is only safe
// because we are closing the original body in a defer and
// r.Body is not otherwise referenced by the non-logging logic
// after the call to getBodyReader above.
Comment on lines +73 to +77
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is not strictly speaking true. The current handling of gzip content breaks the invariant that request bodies are closed. This is a bug in the current code, so I won't fix it here, but the change is

diff --git a/x-pack/filebeat/input/http_endpoint/gzip.go b/x-pack/filebeat/input/http_endpoint/gzip.go
index 332b482497..5b3e7111f1 100644
--- a/x-pack/filebeat/input/http_endpoint/gzip.go
+++ b/x-pack/filebeat/input/http_endpoint/gzip.go
@@ -18,15 +18,16 @@ var gzipDecoderPool = sync.Pool{

 type pooledGzipReader struct {
        Reader *gzip.Reader
+       closer io.Closer
 }

-func newPooledGzipReader(r io.Reader) (*pooledGzipReader, error) {
+func newPooledGzipReader(r io.ReadCloser) (*pooledGzipReader, error) {
        gzipReader := gzipDecoderPool.Get().(*gzip.Reader)
        if err := gzipReader.Reset(r); err != nil {
                gzipDecoderPool.Put(gzipReader)
                return nil, err
        }
-       return &pooledGzipReader{Reader: gzipReader}, nil
+       return &pooledGzipReader{Reader: gzipReader, closer: r}, nil
 }

 // Read implements io.Reader, reading uncompressed bytes from its underlying Reader.
@@ -34,13 +35,17 @@ func (r *pooledGzipReader) Read(b []byte) (int, error) {
        return r.Reader.Read(b)
 }

-// Close closes the Reader. It does not close the underlying io.Reader.
+// Close closes the Reader and the underlying source.
 // In order for the GZIP checksum to be verified, the reader must be
 // fully consumed until the io.EOF.
 //
 // After this call the reader should not be reused because it is returned to the pool.
 func (r *pooledGzipReader) Close() error {
        err := r.Reader.Close()
+       _err := r.closer.Close()
+       if err == nil {
+               err = _err
+       }
        gzipDecoderPool.Put(r.Reader)
        r.Reader = nil
        return err

var buf bytes.Buffer
body = io.NopCloser(io.TeeReader(body, &buf))
r.Body = io.NopCloser(&buf)
}

objs, _, status, err := httpReadJSON(body)
if err != nil {
sendAPIErrorResponse(w, r, h.log, status, err)
h.sendAPIErrorResponse(w, r, h.log, status, err)
h.metrics.apiErrors.Add(1)
return
}

var headers map[string]interface{}
if len(h.includeHeaders) > 0 {
if len(h.includeHeaders) != 0 {
headers = getIncludedHeaders(r, h.includeHeaders)
}

Expand All @@ -82,34 +107,94 @@ func (h *httpHandler) apiResponse(w http.ResponseWriter, r *http.Request) {
break
} else if !errors.Is(err, errNotCRC) {
h.metrics.apiErrors.Add(1)
sendAPIErrorResponse(w, r, h.log, http.StatusBadRequest, err)
h.sendAPIErrorResponse(w, r, h.log, http.StatusBadRequest, err)
return
}
}

if err = h.publishEvent(obj, headers); err != nil {
h.metrics.apiErrors.Add(1)
sendAPIErrorResponse(w, r, h.log, http.StatusInternalServerError, err)
h.sendAPIErrorResponse(w, r, h.log, http.StatusInternalServerError, err)
return
}
h.metrics.eventsPublished.Add(1)
respCode, respBody = h.responseCode, h.responseBody
}

h.sendResponse(w, respCode, respBody)
if h.reqLogger != nil {
h.logRequest(r, respCode, nil)
}
h.metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds())
h.metrics.batchesPublished.Add(1)
}

func (h *httpHandler) sendResponse(w http.ResponseWriter, status int, message string) {
func (h *handler) sendAPIErrorResponse(w http.ResponseWriter, r *http.Request, log *logp.Logger, status int, apiError error) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(status)

var (
mw io.Writer = w
buf bytes.Buffer
)
if h.reqLogger != nil {
mw = io.MultiWriter(mw, &buf)
}
enc := json.NewEncoder(mw)
enc.SetEscapeHTML(false)
err := enc.Encode(map[string]interface{}{"message": apiError.Error()})
if err != nil {
log.Debugw("Failed to write HTTP response.", "error", err, "client.address", r.RemoteAddr)
}
if h.reqLogger != nil {
h.logRequest(r, status, buf.Bytes())
}
}

func (h *handler) logRequest(r *http.Request, status int, respBody []byte) {
// Populate and preserve scheme and host if they are missing;
// they are required for httputil.DumpRequestOut.
var scheme, host string
if r.URL.Scheme == "" {
scheme = r.URL.Scheme
r.URL.Scheme = h.scheme
}
if r.URL.Host == "" {
host = r.URL.Host
r.URL.Host = h.host
}
extra := make([]zapcore.Field, 1, 4)
extra[0] = zap.Int("status", status)
addr, port, err := net.SplitHostPort(r.RemoteAddr)
if err == nil {
extra = append(extra,
zap.String("source.ip", addr),
zap.String("source.port", port),
)
}
if len(respBody) != 0 {
extra = append(extra,
zap.ByteString("http.response.body.content", respBody),
)
}
httplog.LogRequest(h.reqLogger, r, extra...)
if scheme != "" {
r.URL.Scheme = scheme
}
if host != "" {
r.URL.Host = host
}
}

func (h *handler) sendResponse(w http.ResponseWriter, status int, message string) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(status)
if _, err := io.WriteString(w, message); err != nil {
h.log.Debugw("Failed writing response to client.", "error", err)
}
}

func (h *httpHandler) publishEvent(obj, headers mapstr.M) error {
func (h *handler) publishEvent(obj, headers mapstr.M) error {
event := beat.Event{
Timestamp: time.Now().UTC(),
Fields: mapstr.M{},
Expand Down
46 changes: 39 additions & 7 deletions x-pack/filebeat/input/http_endpoint/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,33 @@ package http_endpoint
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"errors"
"flag"
"io"
"io/fs"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/natefinch/lumberjack.v2"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)

var withTraces = flag.Bool("log-traces", false, "specify logging request traces during tests")

const traceLogsDir = "trace_logs"

func Test_httpReadJSON(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -158,6 +169,16 @@ func (p *publisher) Publish(e beat.Event) {
}

func Test_apiResponse(t *testing.T) {
if *withTraces {
err := os.RemoveAll(traceLogsDir)
if err != nil && errors.Is(err, fs.ErrExist) {
t.Fatalf("failed to remove trace logs directory: %v", err)
}
err = os.Mkdir(traceLogsDir, 0o750)
if err != nil {
t.Fatalf("failed to make trace logs directory: %v", err)
}
}
testCases := []struct {
name string // Sub-test name.
conf config // Load configuration.
Expand All @@ -167,7 +188,7 @@ func Test_apiResponse(t *testing.T) {
wantResponse string // Expected response message.
}{
{
name: "single event",
name: "single_event",
conf: defaultConfig(),
request: func() *http.Request {
req := httptest.NewRequest(http.MethodPost, "/", bytes.NewBufferString(`{"id":0}`))
Expand All @@ -185,7 +206,7 @@ func Test_apiResponse(t *testing.T) {
wantResponse: `{"message": "success"}`,
},
{
name: "single event gzip",
name: "single_event_gzip",
conf: defaultConfig(),
request: func() *http.Request {
buf := new(bytes.Buffer)
Expand All @@ -209,7 +230,7 @@ func Test_apiResponse(t *testing.T) {
wantResponse: `{"message": "success"}`,
},
{
name: "multiple events gzip",
name: "multiple_events_gzip",
conf: defaultConfig(),
request: func() *http.Request {
events := []string{
Expand Down Expand Up @@ -243,7 +264,7 @@ func Test_apiResponse(t *testing.T) {
wantResponse: `{"message": "success"}`,
},
{
name: "validate CRC request",
name: "validate_CRC_request",
conf: config{
CRCProvider: "Zoom",
CRCSecret: "secretValueTest",
Expand All @@ -267,7 +288,7 @@ func Test_apiResponse(t *testing.T) {
wantResponse: `{"encryptedToken":"70c1f2e2e6ca2d39297490d1f9142c7d701415ea8e6151f6562a08fa657a40ff","plainToken":"qgg8vlvZRS6UYooatFL8Aw"}`,
},
{
name: "malformed CRC request",
name: "malformed_CRC_request",
conf: config{
CRCProvider: "Zoom",
CRCSecret: "secretValueTest",
Expand All @@ -291,7 +312,7 @@ func Test_apiResponse(t *testing.T) {
wantResponse: `{"message":"malformed JSON object at stream position 0: invalid character '\\n' in string literal"}`,
},
{
name: "empty CRC challenge",
name: "empty_CRC_challenge",
conf: config{
CRCProvider: "Zoom",
CRCSecret: "secretValueTest",
Expand All @@ -316,13 +337,14 @@ func Test_apiResponse(t *testing.T) {
},
}

ctx := context.Background()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Setup
pub := new(publisher)
metrics := newInputMetrics("")
defer metrics.Close()
apiHandler := newHandler(tc.conf, pub, logp.NewLogger("http_endpoint.test"), metrics)
apiHandler := newHandler(ctx, tracerConfig(tc.name, tc.conf, *withTraces), pub, logp.NewLogger("http_endpoint.test"), metrics)

// Execute handler.
respRec := httptest.NewRecorder()
Expand All @@ -339,3 +361,13 @@ func Test_apiResponse(t *testing.T) {
})
}
}

func tracerConfig(name string, cfg config, withTrace bool) config {
if !withTrace {
return cfg
}
cfg.Tracer = &lumberjack.Logger{
Filename: filepath.Join(traceLogsDir, name+".ndjson"),
}
return cfg
}
Loading
Loading