Skip to content

Commit

Permalink
implement faster method of adding fields
Browse files Browse the repository at this point in the history
  • Loading branch information
satta committed Jun 15, 2021
1 parent 3afa90f commit ad5ecac
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 17 deletions.
46 changes: 29 additions & 17 deletions processing/forward_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ package processing

import (
"crypto/tls"
"fmt"
"net"
"sync"
"time"

"github.com/DCSO/fever/types"
"github.com/DCSO/fever/util"
"github.com/buger/jsonparser"

log "github.com/sirupsen/logrus"
)
Expand All @@ -29,7 +29,7 @@ type ForwardHandler struct {
Logger *log.Entry
DoRDNS bool
RDNSHandler *RDNSHandler
AddedFields map[string]string
AddedFields string
ContextCollector *ContextCollector
StenosisIface string
StenosisConnector *StenosisConnector
Expand Down Expand Up @@ -210,19 +210,13 @@ func (fh *ForwardHandler) Consume(e *types.Entry) error {
return err
}
}
for k, v := range fh.AddedFields {
val, err := util.EscapeJSON(v)
if err != nil {
fh.Logger.Warningf("cannot escape value: %s", v)
continue
}
newJSON, err := jsonparser.Set([]byte(e.JSONLine), val, k)
if err != nil {
fh.Logger.Warningf("cannot set %s: %s", k, v)
continue
} else {
e.JSONLine = string(newJSON)
}
// if length is 1 then there are no added fields, only a '}'
if len(fh.AddedFields) > 1 {
j := e.JSONLine
l := len(j)
j = j[:l-1]
j += fh.AddedFields
e.JSONLine = j
}
// if we use Stenosis, the Stenosis connector will take ownership of
// alerts
Expand Down Expand Up @@ -263,8 +257,26 @@ func (fh *ForwardHandler) EnableRDNS(expiryPeriod time.Duration) {

// AddFields enables the addition of a custom set of top-level fields to the
// forwarded JSON.
func (fh *ForwardHandler) AddFields(fields map[string]string) {
fh.AddedFields = fields
func (fh *ForwardHandler) AddFields(fields map[string]string) error {
j := ""
// We preprocess the JSON to be able to only use fast string operations
// later.
for k, v := range fields {
kval, err := util.EscapeJSON(k)
if err != nil {
fh.Logger.Warningf("cannot escape value: %s", v)
return err
}
vval, err := util.EscapeJSON(v)
if err != nil {
fh.Logger.Warningf("cannot escape value: %s", v)
return err
}
j += fmt.Sprintf(", %s: %s", kval, vval)
}
j += "}"
fh.AddedFields = j
return nil
}

// EnableStenosis ...
Expand Down
70 changes: 70 additions & 0 deletions processing/forward_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,73 @@ func TestForwardAllHandler(t *testing.T) {
t.Fatalf("invalid event data, expected 'foo3', got %s", eve.DNS.Rrname)
}
}

type TestAddedFieldsInfo struct {
EventType string `json:"event_type,omitempty"`
FooBar string `json:"foobar,omitempty"`
}

func TestForwardAddedFields(t *testing.T) {
util.PrepareEventFilter([]string{}, true)

dir, err := ioutil.TempDir("", "test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
tmpfn := filepath.Join(dir, fmt.Sprintf("t%d", rand.Int63()))

inputListener, err := net.Listen("unix", tmpfn)
if err != nil {
t.Fatal("error opening input socket:", err)
}
defer inputListener.Close()

// prepare slice to hold collected strings
coll := make([]string, 0)

// setup comms channels
clCh := make(chan bool)
cldCh := make(chan bool)

// start socket consumer
var wg sync.WaitGroup
wg.Add(1)
go consumeSocket(inputListener, clCh, cldCh, t, &coll, &wg)

// start forwarding handler
fh := MakeForwardHandler(5, tmpfn)
fh.AddFields(map[string]string{
"foobar": "baz",
})
fh.Run()

e := makeEvent("alert", "foo1")
fh.Consume(&e)

wg.Wait()

// stop forwarding handler
scChan := make(chan bool)
fh.Stop(scChan)
<-scChan

// stop socket consumer
inputListener.Close()
close(clCh)

if len(coll) != 1 {
t.Fatalf("unexpected number of alerts: %d != 1", len(coll))
}
var tfi TestAddedFieldsInfo
err = json.Unmarshal([]byte(coll[0]), &tfi)
if err != nil {
t.Fatal(err)
}
if tfi.EventType != "alert" {
t.Fatalf("event type is not alert, got %s", tfi.EventType)
}
if tfi.FooBar != "baz" {
t.Fatalf("added field incorrect")
}
}

0 comments on commit ad5ecac

Please sign in to comment.