Skip to content

Commit

Permalink
x-pack/filebeat/input/http_endpoint: add support for request trace lo…
Browse files Browse the repository at this point in the history
…gging (elastic#36957)
  • Loading branch information
efd6 authored and Scholar-Li committed Feb 5, 2024
1 parent 4158d11 commit 6653b6c
Show file tree
Hide file tree
Showing 9 changed files with 298 additions and 122 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d
- Add network processor in addition to interface based direction resolution. {pull}37023[37023]
- Add setup option `--force-enable-module-filesets`, that will act as if all filesets have been enabled in a module during setup. {issue}30915[30915] {pull}99999[99999]
- Make CEL input log current transaction ID when request tracing is turned on. {pull}37065[37065]
- Add request trace logging to http_endpoint input. {issue}36951[36951] {pull}36957[36957]

*Auditbeat*

Expand Down
40 changes: 40 additions & 0 deletions x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,46 @@ The secret token provided by the webhook owner for the CRC validation. It is req
The HTTP method handled by the endpoint. If specified, `method` must be `POST`, `PUT` or `PATCH`.
The default method is `POST`. If `PUT` or `PATCH` are specified, requests using those method types are accepted, but are treated as `POST` requests and are expected to have a request body containing the request data.

[float]
==== `tracer.filename`

It is possible to log HTTP requests to a local file-system for debugging configurations.
This option is enabled by setting the `tracer.filename` value. Additional options are available to
tune log rotation behavior.

To differentiate the trace files generated from different input instances, a placeholder `*` can be added to the filename and will be replaced with the input instance id.
For Example, `http-request-trace-*.ndjson`.

Enabling this option compromises security and should only be used for debugging.

[float]
==== `tracer.maxsize`

This value sets the maximum size, in megabytes, the log file will reach before it is rotated. By default
logs are allowed to reach 1MB before rotation.

[float]
==== `tracer.maxage`

This specifies the number days to retain rotated log files. If it is not set, log files are retained
indefinitely.

[float]
==== `tracer.maxbackups`

The number of old logs to retain. If it is not set all old logs are retained subject to the `tracer.maxage`
setting.

[float]
==== `tracer.localtime`

Whether to use the host's local time rather that UTC for timestamping rotated log file names.

[float]
==== `tracer.compress`

This determines whether rotated logs should be gzip compressed.

[float]
=== Metrics

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/http_endpoint/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
trace_logs
3 changes: 3 additions & 0 deletions x-pack/filebeat/input/http_endpoint/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"net/textproto"
"strings"

"gopkg.in/natefinch/lumberjack.v2"

"github.com/elastic/elastic-agent-libs/transport/tlscommon"
)

Expand Down Expand Up @@ -45,6 +47,7 @@ type config struct {
CRCSecret string `config:"crc.secret"`
IncludeHeaders []string `config:"include_headers"`
PreserveOriginalEvent bool `config:"preserve_original_event"`
Tracer *lumberjack.Logger `config:"tracer"`
}

func defaultConfig() config {
Expand Down
109 changes: 97 additions & 12 deletions x-pack/filebeat/input/http_endpoint/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@ import (
"errors"
"fmt"
"io"
"net"
"net/http"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/jsontransform"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand All @@ -29,10 +34,14 @@ var (
errNotCRC = errors.New("event not processed as CRC request")
)

type httpHandler struct {
log *logp.Logger
publisher stateless.Publisher
type handler struct {
metrics *inputMetrics
publisher stateless.Publisher
log *logp.Logger
validator apiValidator

reqLogger *zap.Logger
host, scheme string

messageField string
responseCode int
Expand All @@ -42,28 +51,44 @@ type httpHandler struct {
crc *crcValidator
}

// Triggers if middleware validation returns successful
func (h *httpHandler) apiResponse(w http.ResponseWriter, r *http.Request) {
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
status, err := h.validator.validateRequest(r)
if err != nil {
h.sendAPIErrorResponse(w, r, h.log, status, err)
return
}

start := time.Now()
h.metrics.batchesReceived.Add(1)
h.metrics.contentLength.Update(r.ContentLength)
body, status, err := getBodyReader(r)
if err != nil {
sendAPIErrorResponse(w, r, h.log, status, err)
h.sendAPIErrorResponse(w, r, h.log, status, err)
h.metrics.apiErrors.Add(1)
return
}
defer body.Close()

if h.reqLogger != nil {
// If we are logging, keep a copy of the body for the logger.
// This is stashed in the r.Body field. This is only safe
// because we are closing the original body in a defer and
// r.Body is not otherwise referenced by the non-logging logic
// after the call to getBodyReader above.
var buf bytes.Buffer
body = io.NopCloser(io.TeeReader(body, &buf))
r.Body = io.NopCloser(&buf)
}

objs, _, status, err := httpReadJSON(body)
if err != nil {
sendAPIErrorResponse(w, r, h.log, status, err)
h.sendAPIErrorResponse(w, r, h.log, status, err)
h.metrics.apiErrors.Add(1)
return
}

var headers map[string]interface{}
if len(h.includeHeaders) > 0 {
if len(h.includeHeaders) != 0 {
headers = getIncludedHeaders(r, h.includeHeaders)
}

Expand All @@ -82,34 +107,94 @@ func (h *httpHandler) apiResponse(w http.ResponseWriter, r *http.Request) {
break
} else if !errors.Is(err, errNotCRC) {
h.metrics.apiErrors.Add(1)
sendAPIErrorResponse(w, r, h.log, http.StatusBadRequest, err)
h.sendAPIErrorResponse(w, r, h.log, http.StatusBadRequest, err)
return
}
}

if err = h.publishEvent(obj, headers); err != nil {
h.metrics.apiErrors.Add(1)
sendAPIErrorResponse(w, r, h.log, http.StatusInternalServerError, err)
h.sendAPIErrorResponse(w, r, h.log, http.StatusInternalServerError, err)
return
}
h.metrics.eventsPublished.Add(1)
respCode, respBody = h.responseCode, h.responseBody
}

h.sendResponse(w, respCode, respBody)
if h.reqLogger != nil {
h.logRequest(r, respCode, nil)
}
h.metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds())
h.metrics.batchesPublished.Add(1)
}

func (h *httpHandler) sendResponse(w http.ResponseWriter, status int, message string) {
func (h *handler) sendAPIErrorResponse(w http.ResponseWriter, r *http.Request, log *logp.Logger, status int, apiError error) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(status)

var (
mw io.Writer = w
buf bytes.Buffer
)
if h.reqLogger != nil {
mw = io.MultiWriter(mw, &buf)
}
enc := json.NewEncoder(mw)
enc.SetEscapeHTML(false)
err := enc.Encode(map[string]interface{}{"message": apiError.Error()})
if err != nil {
log.Debugw("Failed to write HTTP response.", "error", err, "client.address", r.RemoteAddr)
}
if h.reqLogger != nil {
h.logRequest(r, status, buf.Bytes())
}
}

func (h *handler) logRequest(r *http.Request, status int, respBody []byte) {
// Populate and preserve scheme and host if they are missing;
// they are required for httputil.DumpRequestOut.
var scheme, host string
if r.URL.Scheme == "" {
scheme = r.URL.Scheme
r.URL.Scheme = h.scheme
}
if r.URL.Host == "" {
host = r.URL.Host
r.URL.Host = h.host
}
extra := make([]zapcore.Field, 1, 4)
extra[0] = zap.Int("status", status)
addr, port, err := net.SplitHostPort(r.RemoteAddr)
if err == nil {
extra = append(extra,
zap.String("source.ip", addr),
zap.String("source.port", port),
)
}
if len(respBody) != 0 {
extra = append(extra,
zap.ByteString("http.response.body.content", respBody),
)
}
httplog.LogRequest(h.reqLogger, r, extra...)
if scheme != "" {
r.URL.Scheme = scheme
}
if host != "" {
r.URL.Host = host
}
}

func (h *handler) sendResponse(w http.ResponseWriter, status int, message string) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(status)
if _, err := io.WriteString(w, message); err != nil {
h.log.Debugw("Failed writing response to client.", "error", err)
}
}

func (h *httpHandler) publishEvent(obj, headers mapstr.M) error {
func (h *handler) publishEvent(obj, headers mapstr.M) error {
event := beat.Event{
Timestamp: time.Now().UTC(),
Fields: mapstr.M{},
Expand Down
46 changes: 39 additions & 7 deletions x-pack/filebeat/input/http_endpoint/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,33 @@ package http_endpoint
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"errors"
"flag"
"io"
"io/fs"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/natefinch/lumberjack.v2"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)

var withTraces = flag.Bool("log-traces", false, "specify logging request traces during tests")

const traceLogsDir = "trace_logs"

func Test_httpReadJSON(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -158,6 +169,16 @@ func (p *publisher) Publish(e beat.Event) {
}

func Test_apiResponse(t *testing.T) {
if *withTraces {
err := os.RemoveAll(traceLogsDir)
if err != nil && errors.Is(err, fs.ErrExist) {
t.Fatalf("failed to remove trace logs directory: %v", err)
}
err = os.Mkdir(traceLogsDir, 0o750)
if err != nil {
t.Fatalf("failed to make trace logs directory: %v", err)
}
}
testCases := []struct {
name string // Sub-test name.
conf config // Load configuration.
Expand All @@ -167,7 +188,7 @@ func Test_apiResponse(t *testing.T) {
wantResponse string // Expected response message.
}{
{
name: "single event",
name: "single_event",
conf: defaultConfig(),
request: func() *http.Request {
req := httptest.NewRequest(http.MethodPost, "/", bytes.NewBufferString(`{"id":0}`))
Expand All @@ -185,7 +206,7 @@ func Test_apiResponse(t *testing.T) {
wantResponse: `{"message": "success"}`,
},
{
name: "single event gzip",
name: "single_event_gzip",
conf: defaultConfig(),
request: func() *http.Request {
buf := new(bytes.Buffer)
Expand All @@ -209,7 +230,7 @@ func Test_apiResponse(t *testing.T) {
wantResponse: `{"message": "success"}`,
},
{
name: "multiple events gzip",
name: "multiple_events_gzip",
conf: defaultConfig(),
request: func() *http.Request {
events := []string{
Expand Down Expand Up @@ -243,7 +264,7 @@ func Test_apiResponse(t *testing.T) {
wantResponse: `{"message": "success"}`,
},
{
name: "validate CRC request",
name: "validate_CRC_request",
conf: config{
CRCProvider: "Zoom",
CRCSecret: "secretValueTest",
Expand All @@ -267,7 +288,7 @@ func Test_apiResponse(t *testing.T) {
wantResponse: `{"encryptedToken":"70c1f2e2e6ca2d39297490d1f9142c7d701415ea8e6151f6562a08fa657a40ff","plainToken":"qgg8vlvZRS6UYooatFL8Aw"}`,
},
{
name: "malformed CRC request",
name: "malformed_CRC_request",
conf: config{
CRCProvider: "Zoom",
CRCSecret: "secretValueTest",
Expand All @@ -291,7 +312,7 @@ func Test_apiResponse(t *testing.T) {
wantResponse: `{"message":"malformed JSON object at stream position 0: invalid character '\\n' in string literal"}`,
},
{
name: "empty CRC challenge",
name: "empty_CRC_challenge",
conf: config{
CRCProvider: "Zoom",
CRCSecret: "secretValueTest",
Expand All @@ -316,13 +337,14 @@ func Test_apiResponse(t *testing.T) {
},
}

ctx := context.Background()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Setup
pub := new(publisher)
metrics := newInputMetrics("")
defer metrics.Close()
apiHandler := newHandler(tc.conf, pub, logp.NewLogger("http_endpoint.test"), metrics)
apiHandler := newHandler(ctx, tracerConfig(tc.name, tc.conf, *withTraces), pub, logp.NewLogger("http_endpoint.test"), metrics)

// Execute handler.
respRec := httptest.NewRecorder()
Expand All @@ -339,3 +361,13 @@ func Test_apiResponse(t *testing.T) {
})
}
}

func tracerConfig(name string, cfg config, withTrace bool) config {
if !withTrace {
return cfg
}
cfg.Tracer = &lumberjack.Logger{
Filename: filepath.Join(traceLogsDir, name+".ndjson"),
}
return cfg
}
Loading

0 comments on commit 6653b6c

Please sign in to comment.