Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/cel: allow users to redact state fields in logs #34302

Merged
merged 1 commit into from
Jan 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Add metrics for unix socket packet processing. {pull}34335[34335]
- Add beta `take over` mode for `filestream` for simple migration from `log` inputs {pull}34292[34292]
- Add pagination support for Salesforce module. {issue}34057[34057] {pull}34065[34065]
- Allow users to redact sensitive data from CEL input debug logs. {pull}34302[34302]

*Auditbeat*

Expand Down
10 changes: 10 additions & 0 deletions x-pack/filebeat/docs/inputs/input-cel.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,16 @@ filebeat.inputs:
})
----

[float]
==== `redact.fields`

This specifies fields in the `state` to be redacted prior to debug logging. Fields listed in this array will be either replaced with a `*` or deleted entirely from messages sent to debug logs.

[float]
==== `redact.delete`

This specifies whether fields should be replaced with a `*` or deleted entirely from messages sent to debug logs. If delete is `true`, fields will be deleted rather than replaced.

[id="{beatname_lc}-input-{type}-common-options"]
include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[]

Expand Down
11 changes: 11 additions & 0 deletions x-pack/filebeat/input/cel/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type config struct {
// be overwritten by any stored cursor, but will be
// available if no stored cursor exists.
State map[string]interface{} `config:"state"`
// Redact is the debug log state redaction configuration.
Redact redact `config:"redact"`

// Auth is the authentication config for connection to an HTTP
// API endpoint.
Expand All @@ -43,6 +45,15 @@ type config struct {
Resource *ResourceConfig `config:"resource" validate:"required"`
}

type redact struct {
// Fields indicates which fields to apply redaction to prior
// to logging.
Fields []string `config:"fields"`
// Delete indicates that fields should be completely deleted
// before logging rather than redaction with a "*".
Delete bool `config:"delete"`
}

func (c config) Validate() error {
if c.Interval <= 0 {
return errors.New("interval must be greater than 0")
Expand Down
105 changes: 103 additions & 2 deletions x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,11 @@ func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub
}

// Process a set of event requests.
log.Debugw("request state", logp.Namespace("cel"), "state", state)
log.Debugw("request state", logp.Namespace("cel"), "state", redactor{state: state, mask: cfg.Redact.Fields, delete: cfg.Redact.Delete})
metrics.executions.Add(1)
start := time.Now()
state, err = evalWith(ctx, prg, state)
log.Debugw("response state", logp.Namespace("cel"), "state", state)
log.Debugw("response state", logp.Namespace("cel"), "state", redactor{state: state, mask: cfg.Redact.Fields, delete: cfg.Redact.Delete})
if err != nil {
switch {
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
Expand Down Expand Up @@ -950,3 +950,104 @@ func newInputMetrics(id string) *inputMetrics {
func (m *inputMetrics) Close() {
m.unregister()
}

// redactor implements lazy field redaction of sets of a mapstr.M.
type redactor struct {
state mapstr.M
mask []string // mask is the set of dotted paths to redact from state.
delete bool // if delete is true, delete redacted fields instead of showing a redaction.
}

// String renders the JSON corresponding to r.state after applying redaction
// operations.
func (r redactor) String() string {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you wanted the data to remain in a structured format and do it lazily you could implement https://pkg.go.dev/go.uber.org/zap@v1.24.0/zapcore#ObjectMarshalerFunc.MarshalLogObject. I don't have any preference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The recommended approach for doing that does not really simplify things from what I can see. This is from zap issue 750.

if len(r.mask) == 0 {
return r.state.String()
}
c := make(mapstr.M, len(r.state))
cloneMap(c, r.state)
for _, mask := range r.mask {
if r.delete {
walkMap(c, mask, func(parent mapstr.M, key string) {
delete(parent, key)
})
continue
}
walkMap(c, mask, func(parent mapstr.M, key string) {
parent[key] = "*"
})
}
return c.String()
}

// cloneMap is an enhanced version of mapstr.M.Clone that handles cloning arrays
// within objects. Nested arrays are not handled.
func cloneMap(dst, src mapstr.M) {
for k, v := range src {
switch v := v.(type) {
case mapstr.M:
d := make(mapstr.M, len(v))
dst[k] = d
cloneMap(d, v)
case map[string]interface{}:
d := make(map[string]interface{}, len(v))
dst[k] = d
cloneMap(d, v)
case []mapstr.M:
a := make([]mapstr.M, 0, len(v))
for _, m := range v {
d := make(mapstr.M, len(m))
cloneMap(d, m)
a = append(a, d)
}
dst[k] = a
case []map[string]interface{}:
a := make([]map[string]interface{}, 0, len(v))
for _, m := range v {
d := make(map[string]interface{}, len(m))
cloneMap(d, m)
a = append(a, d)
}
dst[k] = a
default:
dst[k] = v
}
}
}

// walkMap walks to all ends of the provided path in m and applies fn to the
// final element of each walk. Nested arrays are not handled.
func walkMap(m mapstr.M, path string, fn func(parent mapstr.M, key string)) {
key, rest, more := strings.Cut(path, ".")
v, ok := m[key]
if !ok {
return
}
if !more {
fn(m, key)
return
}
switch v := v.(type) {
case mapstr.M:
walkMap(v, rest, fn)
case map[string]interface{}:
walkMap(v, rest, fn)
case []mapstr.M:
for _, m := range v {
walkMap(m, rest, fn)
}
case []map[string]interface{}:
for _, m := range v {
walkMap(m, rest, fn)
}
case []interface{}:
for _, v := range v {
switch m := v.(type) {
case mapstr.M:
walkMap(m, rest, fn)
case map[string]interface{}:
walkMap(m, rest, fn)
}
}
}
}
110 changes: 110 additions & 0 deletions x-pack/filebeat/input/cel/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1358,3 +1358,113 @@ func paginationArrayHandler() http.HandlerFunc {
count++
}
}

var redactorTests = []struct {
name string
state mapstr.M
mask []string
delete bool

wantOrig string
wantRedact string
}{
{
name: "auth_no_delete",
state: mapstr.M{
"auth": mapstr.M{
"user": "fred",
"pass": "top_secret",
},
"other": "data",
},
mask: []string{"auth"},
delete: false,
wantOrig: `{"auth":{"pass":"top_secret","user":"fred"},"other":"data"}`,
wantRedact: `{"auth":"*","other":"data"}`,
},
{
name: "auth_delete",
state: mapstr.M{
"auth": mapstr.M{
"user": "fred",
"pass": "top_secret",
},
"other": "data",
},
mask: []string{"auth"},
delete: true,
wantOrig: `{"auth":{"pass":"top_secret","user":"fred"},"other":"data"}`,
wantRedact: `{"other":"data"}`,
},
{
name: "pass_no_delete",
state: mapstr.M{
"auth": mapstr.M{
"user": "fred",
"pass": "top_secret",
},
"other": "data",
},
mask: []string{"auth.pass"},
delete: false,
wantOrig: `{"auth":{"pass":"top_secret","user":"fred"},"other":"data"}`,
wantRedact: `{"auth":{"pass":"*","user":"fred"},"other":"data"}`,
},
{
name: "pass_delete",
state: mapstr.M{
"auth": mapstr.M{
"user": "fred",
"pass": "top_secret",
},
"other": "data",
},
mask: []string{"auth.pass"},
delete: true,
wantOrig: `{"auth":{"pass":"top_secret","user":"fred"},"other":"data"}`,
wantRedact: `{"auth":{"user":"fred"},"other":"data"}`,
},
{
name: "multi_cursor_no_delete",
state: mapstr.M{
"cursor": []mapstr.M{
{"key": "val_one", "other": "data"},
{"key": "val_two", "other": "data"},
},
"other": "data",
},
mask: []string{"cursor.key"},
delete: false,
wantOrig: `{"cursor":[{"key":"val_one","other":"data"},{"key":"val_two","other":"data"}],"other":"data"}`,
wantRedact: `{"cursor":[{"key":"*","other":"data"},{"key":"*","other":"data"}],"other":"data"}`,
},
{
name: "multi_cursor_delete",
state: mapstr.M{
"cursor": []mapstr.M{
{"key": "val_one", "other": "data"},
{"key": "val_two", "other": "data"},
},
"other": "data",
},
mask: []string{"cursor.key"},
delete: true,
wantOrig: `{"cursor":[{"key":"val_one","other":"data"},{"key":"val_two","other":"data"}],"other":"data"}`,
wantRedact: `{"cursor":[{"other":"data"},{"other":"data"}],"other":"data"}`,
},
}

func TestRedactor(t *testing.T) {
for _, test := range redactorTests {
t.Run(test.name, func(t *testing.T) {
got := fmt.Sprint(redactor{state: test.state, mask: test.mask, delete: test.delete})
orig := fmt.Sprint(test.state)
if orig != test.wantOrig {
t.Errorf("unexpected original state after redaction:\n--- got\n--- want\n%s", cmp.Diff(orig, test.wantOrig))
}
if got != test.wantRedact {
t.Errorf("unexpected redaction:\n--- got\n--- want\n%s", cmp.Diff(got, test.wantRedact))
}
})
}
}