Skip to content

Commit

Permalink
Ensure ECS compliant logging when enabled. (elastic#3829)
Browse files Browse the repository at this point in the history
If `logging.ecs` is set log data in ECS compliant way.

closes elastic#3796
  • Loading branch information
simitt committed Dec 15, 2020
1 parent 73d40c6 commit 6f1dc86
Show file tree
Hide file tree
Showing 13 changed files with 149 additions and 176 deletions.
11 changes: 4 additions & 7 deletions _meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1054,13 +1054,10 @@ output.elasticsearch:
# Unix epoch. Defaults to disabled.
#interval: 0

# Set to true to log messages in json format.
#logging.json: false

# Set to true to log with minimal Elastic Common Schema (ECS) fields set.
# It is recommended to set `logging.json=true` when enabling ECS logging.
# Defaults to false.
#logging.ecs: false
# Set to true, to log messages with minimal required Elastic Common Schema (ECS)
# information. Recommended to use in combination with `logging.json=true`
# Defaults to true.
#logging.ecs: true


#=============================== HTTP Endpoint ===============================
Expand Down
9 changes: 4 additions & 5 deletions apm-server.docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1057,11 +1057,10 @@ output.elasticsearch:
# Set to true to log messages in json format.
#logging.json: false

# Set to true to log with minimal Elastic Common Schema (ECS) fields set.
# It is recommended to set `logging.json=true` when enabling ECS logging.
# Defaults to false.
#logging.ecs: false

# Set to true, to log messages with minimal required Elastic Common Schema (ECS)
# information. Recommended to use in combination with `logging.json=true`
# Defaults to true.
#logging.ecs: true

#=============================== HTTP Endpoint ===============================

Expand Down
9 changes: 4 additions & 5 deletions apm-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1057,11 +1057,10 @@ output.elasticsearch:
# Set to true to log messages in json format.
#logging.json: false

# Set to true to log with minimal Elastic Common Schema (ECS) fields set.
# It is recommended to set `logging.json=true` when enabling ECS logging.
# Defaults to false.
#logging.ecs: false

# Set to true, to log messages with minimal required Elastic Common Schema (ECS)
# information. Recommended to use in combination with `logging.json=true`
# Defaults to true.
#logging.ecs: true

#=============================== HTTP Endpoint ===============================

Expand Down
111 changes: 57 additions & 54 deletions beater/middleware/log_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,73 +34,76 @@ import (

// LogMiddleware returns a middleware taking care of logging processing a request in the middleware and the request handler
func LogMiddleware() Middleware {
logger := logp.NewLogger(logs.Request)
return func(h request.Handler) (request.Handler, error) {

return func(c *request.Context) {
var reqID, transactionID, traceID string
start := time.Now()
tx := apm.TransactionFromContext(c.Request.Context())
if tx != nil {
// This request is being traced, grab its IDs to add to logs.
traceContext := tx.TraceContext()
transactionID = traceContext.Span.String()
traceID = traceContext.Trace.String()
reqID = transactionID
} else {
uuid, err := uuid.NewV4()
if err != nil {
id := request.IDResponseErrorsInternal
logger.Errorw(request.MapResultIDToStatus[id].Keyword, "error", err)
c.Result.SetWithError(id, err)
c.Write()
return
}
reqID = uuid.String()
}

reqLogger := logger.With(
"request_id", reqID,
"method", c.Request.Method,
"URL", c.Request.URL,
"content_length", c.Request.ContentLength,
"remote_address", utility.RemoteAddr(c.Request),
"user-agent", c.Request.Header.Get(headers.UserAgent))

if traceID != "" {
reqLogger = reqLogger.With(
"trace.id", traceID,
"transaction.id", transactionID,
)
c.Logger = loggerWithRequestContext(c)
var err error
if c.Logger, err = loggerWithTraceContext(c); err != nil {
id := request.IDResponseErrorsInternal
c.Logger.Error(request.MapResultIDToStatus[id].Keyword, logp.Error(err))
c.Result.SetWithError(id, err)
c.Write()
return
}

c.Logger = reqLogger
h(c)
reqLogger = reqLogger.With("event.duration", time.Since(start))

c.Logger = c.Logger.With("event.duration", time.Since(start))
if c.MultipleWriteAttempts() {
reqLogger.Warn("multiple write attempts")
c.Logger.Warn("multiple write attempts")
}

keyword := c.Result.Keyword
if keyword == "" {
keyword = "handled request"
}

keysAndValues := []interface{}{"response_code", c.Result.StatusCode}
if c.Result.Err != nil {
keysAndValues = append(keysAndValues, "error", c.Result.Err.Error())
}
if c.Result.Stacktrace != "" {
keysAndValues = append(keysAndValues, "stacktrace", c.Result.Stacktrace)
}

c.Logger = loggerWithResult(c)
if c.Result.Failure() {
reqLogger.Errorw(keyword, keysAndValues...)
} else {
reqLogger.Infow(keyword, keysAndValues...)
c.Logger.Error(keyword)
return
}

c.Logger.Info(keyword)
}, nil
}
}

func loggerWithRequestContext(c *request.Context) *logp.Logger {
logger := logp.NewLogger(logs.Request).With(
"url.original", c.Request.URL.String(),
"http.request.method", c.Request.Method,
"user_agent.original", c.Request.Header.Get(headers.UserAgent),
"source.address", utility.RemoteAddr(c.Request))
if c.Request.ContentLength != -1 {
logger = logger.With("http.request.body.bytes", c.Request.ContentLength)
}
return logger
}

func loggerWithTraceContext(c *request.Context) (*logp.Logger, error) {
tx := apm.TransactionFromContext(c.Request.Context())
if tx == nil {
uuid, err := uuid.NewV4()
if err != nil {
return c.Logger, err
}
return c.Logger.With("http.request.id", uuid.String()), nil
}
// This request is being traced, grab its IDs to add to logs.
traceContext := tx.TraceContext()
transactionID := traceContext.Span.String()
return c.Logger.With(
"trace.id", traceContext.Trace.String(),
"transaction.id", transactionID,
"http.request.id", transactionID,
), nil
}

func loggerWithResult(c *request.Context) *logp.Logger {
logger := c.Logger.With(
"http.response.status_code", c.Result.StatusCode)
if c.Result.Err != nil {
logger = logger.With("error.message", c.Result.Err.Error())
}
if c.Result.Stacktrace != "" {
logger = logger.With("error.stack_trace", c.Result.Stacktrace)
}
return logger
}
84 changes: 39 additions & 45 deletions beater/middleware/log_middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ import (
"net/http"
"testing"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"

"go.elastic.co/apm"
"go.elastic.co/apm/apmtest"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/logp/configure"

"github.com/elastic/apm-server/beater/beatertest"
"github.com/elastic/apm-server/beater/headers"
Expand All @@ -38,31 +39,30 @@ import (
)

func TestLogMiddleware(t *testing.T) {
err := logp.DevelopmentSetup(logp.ToObserverOutput())
require.NoError(t, err)

testCases := []struct {
name, message string
level zapcore.Level
handler request.Handler
code int
error error
stacktrace bool
traced bool
ecsKeys []string
}{
{
name: "Accepted",
message: "request accepted",
level: zapcore.InfoLevel,
handler: beatertest.Handler202,
code: http.StatusAccepted,
ecsKeys: []string{"url.original"},
},
{
name: "Traced",
message: "request accepted",
level: zapcore.InfoLevel,
handler: beatertest.Handler202,
code: http.StatusAccepted,
ecsKeys: []string{"url.original", "trace.id", "transaction.id"},
traced: true,
},
{
Expand All @@ -71,16 +71,15 @@ func TestLogMiddleware(t *testing.T) {
level: zapcore.ErrorLevel,
handler: beatertest.Handler403,
code: http.StatusForbidden,
error: errors.New("forbidden request"),
ecsKeys: []string{"url.original", "error.message"},
},
{
name: "Panic",
message: "internal error",
level: zapcore.ErrorLevel,
handler: Apply(RecoverPanicMiddleware(), beatertest.HandlerPanic),
code: http.StatusInternalServerError,
error: errors.New("panic on Handle"),
stacktrace: true,
name: "Panic",
message: "internal error",
level: zapcore.ErrorLevel,
handler: Apply(RecoverPanicMiddleware(), beatertest.HandlerPanic),
code: http.StatusInternalServerError,
ecsKeys: []string{"url.original", "error.message", "error.stack_trace"},
},
{
name: "Error without keyword",
Expand All @@ -90,12 +89,19 @@ func TestLogMiddleware(t *testing.T) {
c.Result.StatusCode = http.StatusForbidden
c.Write()
},
code: http.StatusForbidden,
code: http.StatusForbidden,
ecsKeys: []string{"url.original"},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// log setup
configure.Logging("APM Server test",
common.MustNewConfigFrom(`{"ecs":true}`))
require.NoError(t, logp.DevelopmentSetup(logp.ToObserverOutput()))

// prepare and record request
c, rec := beatertest.DefaultContextWithResponseRecorder()
c.Request.Header.Set(headers.UserAgent, tc.name)
if tc.traced {
Expand All @@ -105,39 +111,27 @@ func TestLogMiddleware(t *testing.T) {
}
Apply(LogMiddleware(), tc.handler)(c)

// check log lines
assert.Equal(t, tc.code, rec.Code)
for i, entry := range logp.ObserverLogs().TakeAll() {
// expect only one log entry per request
assert.Equal(t, i, 0)
assert.Equal(t, logs.Request, entry.LoggerName)
assert.Equal(t, tc.level, entry.Level)
assert.Equal(t, tc.message, entry.Message)
entries := logp.ObserverLogs().TakeAll()
require.Equal(t, 1, len(entries))
entry := entries[0]
assert.Equal(t, logs.Request, entry.LoggerName)
assert.Equal(t, tc.level, entry.Level)
assert.Equal(t, tc.message, entry.Message)

ec := entry.ContextMap()
t.Logf("context map: %v", ec)

assert.NotEmpty(t, ec["request_id"])
assert.NotEmpty(t, ec["method"])
assert.Equal(t, c.Request.URL.String(), ec["URL"])
assert.NotEmpty(t, ec["remote_address"])
assert.Contains(t, ec, "event.duration")
assert.Equal(t, c.Request.Header.Get(headers.UserAgent), ec["user-agent"])
// zap encoded type
assert.Equal(t, tc.code, int(ec["response_code"].(int64)))
if tc.error != nil {
assert.Equal(t, tc.error.Error(), ec["error"])
}
if tc.stacktrace {
assert.NotZero(t, ec["stacktrace"])
}
if tc.traced {
assert.NotEmpty(t, ec, "trace.id")
assert.NotEmpty(t, ec, "transaction.id")
assert.Equal(t, ec["request_id"], ec["transaction.id"])
} else {
assert.NotContains(t, ec, "trace.id")
assert.NotContains(t, ec, "transaction.id")
}
encoder := zapcore.NewMapObjectEncoder()
ec := common.MapStr{}
for _, f := range entry.Context {
f.AddTo(encoder)
ec.DeepUpdate(encoder.Fields)
}
keys := []string{"http.request.id", "http.request.method", "http.request.body.bytes",
"source.address", "user_agent.original", "http.response.status_code", "event.duration"}
keys = append(keys, tc.ecsKeys...)
for _, key := range keys {
ok, _ := ec.HasKey(key)
assert.True(t, ok, key)
}
})
}
Expand Down
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var libbeatConfigOverrides = []cfgfile.ConditionalOverride{{
"metrics": map[string]interface{}{
"enabled": false,
},
"ecs": true,
},
}),
}}
Expand Down
3 changes: 1 addition & 2 deletions docs/copied-from-beats/docs/loggingconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,7 @@ When true, logs messages in JSON format. The default is false.
[float]
==== `logging.ecs`

When true, logs messages with minimal required Elastic Common Schema (ECS)
information.
When true, logs messages in Elastic Common Schema (ECS) compliant format.

ifndef::serverless[]
[float]
Expand Down
2 changes: 2 additions & 0 deletions systemtest/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func TestExportConfigDefaults(t *testing.T) {

expectedConfig := strings.ReplaceAll(`
logging:
ecs: true
metrics:
enabled: false
path:
Expand All @@ -69,6 +70,7 @@ func TestExportConfigOverrideDefaults(t *testing.T) {

expectedConfig := strings.ReplaceAll(`
logging:
ecs: true
metrics:
enabled: true
path:
Expand Down
8 changes: 4 additions & 4 deletions systemtest/logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func TestAPMServerRequestLoggingValid(t *testing.T) {

srv.Close()
for _, entry := range srv.Logs.All() {
if entry.Logger == "request" && entry.Fields["URL"] == "/intake/v2/events" {
statusCode, _ := entry.Fields["response_code"].(float64)
if entry.Logger == "request" && entry.Fields["url.original"] == "/intake/v2/events" {
statusCode, _ := entry.Fields["http.response.status_code"].(float64)
logEntries = append(logEntries, entry)
requestEntries = append(requestEntries, requestEntry{
level: entry.Level,
Expand All @@ -95,8 +95,8 @@ func TestAPMServerRequestLoggingValid(t *testing.T) {
}}, requestEntries)

assert.NotContains(t, logEntries[0].Fields, "error")
assert.Regexp(t, "validation error: 'transaction' required", logEntries[1].Fields["error"])
assert.Equal(t, "event exceeded the permitted size.", logEntries[2].Fields["error"])
assert.Regexp(t, "validation error: 'transaction' required", logEntries[1].Fields["error.message"])
assert.Equal(t, "event exceeded the permitted size.", logEntries[2].Fields["error.message"])
}

// validMetadataJSON returns valid JSON-encoded metadata,
Expand Down
Loading

0 comments on commit 6f1dc86

Please sign in to comment.