Skip to content

Commit

Permalink
filters/fields: do a deep copy before filtering
Browse files Browse the repository at this point in the history
[upstream commit 09de1f8]

We need to deep copy the event here to avoid issues caused by filtering out information
that is shared between events through the event cache (e.g. process info). This can cause
segmentation faults and other nasty bugs. Avoid all that by doing a deep copy here before
filtering. This is not pretty or great for performance, but it at least works as a stopgap
until we're able to refactor things so that it's no longer necessary.

Signed-off-by: William Findlay <will@isovalent.com>
  • Loading branch information
willfindlay committed Nov 6, 2023
1 parent 9eacb40 commit a5f4235
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 38 deletions.
22 changes: 8 additions & 14 deletions cmd/tetra/getevents/io_reader_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
hubbleFilters "github.com/cilium/tetragon/pkg/oldhubble/filters"
"google.golang.org/grpc"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)

// ioReaderClient implements tetragon.FineGuidanceSensors_GetEventsClient.
Expand Down Expand Up @@ -101,28 +100,23 @@ func (i *ioReaderClient) GetVersion(_ context.Context, _ *tetragon.GetVersionReq

func (i *ioReaderClient) Recv() (*tetragon.GetEventsResponse, error) {
for i.scanner.Scan() {
var res tetragon.GetEventsResponse
res := &tetragon.GetEventsResponse{}
line := i.scanner.Bytes()
err := i.unmarshaller.Unmarshal(line, &res)
err := i.unmarshaller.Unmarshal(line, res)
if err != nil && i.debug {
fmt.Fprintf(os.Stderr, "DEBUG: failed unmarshal: %s: %s\n", line, err)
continue
}
if !hubbleFilters.Apply(i.allowlist, nil, &hubbleV1.Event{Event: &res}) {
if !hubbleFilters.Apply(i.allowlist, nil, &hubbleV1.Event{Event: res}) {
continue
}
filterEvent := &res
if len(i.fieldFilters) > 0 && filterEvent.GetProcessExec() != nil { // this is an exec event and we have fieldFilters
// We need a copy of the exec event as modifing the original message
// can cause issues in the process cache (we keep a copy of that message there).
filterEvent = proto.Clone(&res).(*tetragon.GetEventsResponse)
}
for _, filter := range i.fieldFilters {
// we need not to change res
// maybe only for exec events
filter.Filter(filterEvent)
res, err = filter.Filter(res)
if err != nil {
return nil, err
}
}
return filterEvent, nil
return res, nil
}
if err := i.scanner.Err(); err != nil {
return nil, err
Expand Down
19 changes: 15 additions & 4 deletions pkg/filters/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/cilium/tetragon/api/v1/tetragon"
"github.com/mennanov/fmutils"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
)

Expand Down Expand Up @@ -144,7 +145,17 @@ func FieldFiltersFromGetEventsRequest(request *tetragon.GetEventsRequest) []*Fie
// Filter filters the fields in the GetEventsResponse, keeping fields specified in the
// inclusion filter and discarding fields specified in the exclusion filter. Exclusion
// takes precedence over inclusion and an empty filter set will keep all remaining fields.
func (f *FieldFilter) Filter(event *tetragon.GetEventsResponse) error {
func (f *FieldFilter) Filter(event *tetragon.GetEventsResponse) (*tetragon.GetEventsResponse, error) {
// We need to deep copy the event here to avoid issues caused by filtering out
// information that is shared between events through the event cache (e.g. process
// info). This can cause segmentation faults and other nasty bugs. Avoid all that by
// doing a deep copy here before filtering.
//
// FIXME: We need to fix this so that it doesn't kill performance by doing a deep
// copy. This will require architectural changes to both the field filters and the
// event cache.
event = proto.Clone(event).(*tetragon.GetEventsResponse)

if len(f.eventSet) > 0 {
// skip filtering by default unless the event set is inverted, in which case we
// want to filter by default and skip only if we have a match
Expand All @@ -170,7 +181,7 @@ func (f *FieldFilter) Filter(event *tetragon.GetEventsResponse) error {
}

if skipFiltering {
return nil
return event, nil
}
}

Expand All @@ -190,8 +201,8 @@ func (f *FieldFilter) Filter(event *tetragon.GetEventsResponse) error {
})

if !rft.IsValid() {
return fmt.Errorf("invalid event after field filter")
return nil, fmt.Errorf("invalid event after field filter")
}

return nil
return event, nil
}
17 changes: 9 additions & 8 deletions pkg/filters/fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestEventFieldFilters(t *testing.T) {
// Construct the filter
filters := FieldFiltersFromGetEventsRequest(request)
for _, filter := range filters {
filter.Filter(ev)
ev, _ = filter.Filter(ev)
}

// These fields should all have been included and so should not be empty
Expand Down Expand Up @@ -125,12 +125,12 @@ func TestFieldFilterByEventType(t *testing.T) {
}

filter := NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_EXIT}, []string{"process.pid"}, false)
filter.Filter(ev)
ev, _ = filter.Filter(ev)

assert.NotEmpty(t, ev.GetProcessExec().Process.Pid)

filter = NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_EXEC}, []string{"process.pid"}, false)
filter.Filter(ev)
ev, _ = filter.Filter(ev)

assert.Empty(t, ev.GetProcessExec().Process.Pid)
}
Expand Down Expand Up @@ -225,7 +225,7 @@ func TestEmptyFieldFilter(t *testing.T) {
}

assert.True(t, proto.Equal(ev, expected), "events are equal before filter")
filter.Filter(ev)
ev, _ = filter.Filter(ev)
assert.True(t, proto.Equal(ev, expected), "events are equal after filter")
}

Expand All @@ -250,7 +250,7 @@ func TestFieldFilterInvertedEventSet(t *testing.T) {

filter := NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_EXEC}, []string{"process", "parent"}, true)
assert.True(t, proto.Equal(ev, expected), "events are equal before filter")
filter.Filter(ev)
ev, _ = filter.Filter(ev)
assert.True(t, proto.Equal(ev, expected), "events are equal after filter")

ev = &tetragon.GetEventsResponse{
Expand All @@ -270,7 +270,7 @@ func TestFieldFilterInvertedEventSet(t *testing.T) {

filter = NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_KPROBE}, []string{"process", "parent"}, true)
assert.False(t, proto.Equal(ev, expected), "events are not equal before filter")
filter.Filter(ev)
ev, _ = filter.Filter(ev)
assert.True(t, proto.Equal(ev, expected), "events are equal after filter")
}

Expand Down Expand Up @@ -599,8 +599,9 @@ func TestSlimExecEventsFieldFilterExample(t *testing.T) {
}

for _, filter := range filters {
for _, ev := range evs {
filter.Filter(ev)
for i, ev := range evs {
ev, _ = filter.Filter(ev)
evs[i] = ev
}
}
for i := range evs {
Expand Down
21 changes: 9 additions & 12 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cilium/tetragon/pkg/tracingpolicy"
"github.com/cilium/tetragon/pkg/version"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)

type Listener interface {
Expand Down Expand Up @@ -165,30 +164,28 @@ func (s *Server) GetEventsWG(request *tetragon.GetEventsRequest, server tetragon

// Filter the GetEventsResponse fields
filters := filters.FieldFiltersFromGetEventsRequest(request)
filterEvent := event
if len(filters) > 0 && filterEvent.GetProcessExec() != nil { // this is an exec event and we have fieldFilters
// We need a copy of the exec event as modifing the original message
// can cause issues in the process cache (we keep a copy of that message there).
filterEvent = proto.Clone(event).(*tetragon.GetEventsResponse)
}

for _, filter := range filters {
// we need not to change res
// maybe only for exec events
filter.Filter(filterEvent)
ev, err := filter.Filter(event)
if err != nil {
logger.GetLogger().WithField("filter", filter).WithError(err).Warn("Failed to apply field filter")
continue
}
event = ev
}

if aggregator != nil {
// Send event to aggregator.
select {
case aggregator.GetEventChannel() <- filterEvent:
case aggregator.GetEventChannel() <- event:
default:
logger.GetLogger().
WithField("request", request).
Warn("Aggregator buffer is full. Consider increasing AggregatorOptions.channel_buffer_size.")
}
} else {
// No need to aggregate. Directly send out the response.
if err = server.Send(filterEvent); err != nil {
if err = server.Send(event); err != nil {
s.ctxCleanupWG.Done()
return err
}
Expand Down

0 comments on commit a5f4235

Please sign in to comment.