Skip to content

Commit

Permalink
Merge pull request #372 from Security-Onion-Solutions/jertel/det
Browse files Browse the repository at this point in the history
suri pcap improvements
  • Loading branch information
jertel authored Mar 7, 2024
2 parents 5419f23 + 270edea commit f438394
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ ARG ELASTIC_VERSION=0.0.0
ARG WAZUH_VERSION=0.0.0

RUN apt update -y
RUN apt install -y bash tzdata ca-certificates wget curl tcpdump unzip tshark
RUN apt install -y bash tzdata ca-certificates wget curl tcpdump unzip
RUN update-ca-certificates
RUN addgroup --gid "$GID" socore
RUN adduser --disabled-password --uid "$UID" --ingroup socore --gecos '' socore
Expand Down
4 changes: 3 additions & 1 deletion agent/jobmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ package agent

import (
"errors"
"fmt"
"io"
"os"
"strconv"
"strings"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -103,7 +105,7 @@ func (mgr *JobManager) ProcessJob(job *model.Job) (io.ReadCloser, error) {
var err error
for _, processor := range mgr.jobProcessors {
reader, err = processor.ProcessJob(job, reader)
if err != nil {
if err != nil && !strings.Contains(fmt.Sprint(err), "No data available") {
log.WithError(err).WithFields(log.Fields{
"jobId": job.Id,
}).Error("Failed to process job; job processing aborted")
Expand Down
30 changes: 27 additions & 3 deletions agent/jobmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,14 @@ func (jp *idJobProcessor) GetDataEpoch() time.Time {
}

// panicProcessor is a JobProcessor that always returns an error.
type panicProcessor struct{}
type panicProcessor struct{
processCount int
errorString string
}

func (jp *panicProcessor) ProcessJob(job *model.Job, reader io.ReadCloser) (io.ReadCloser, error) {
return reader, errors.New("panic")
jp.processCount++
return reader, errors.New(jp.errorString)
}

func (jp *panicProcessor) CleanupJob(*model.Job) {}
Expand All @@ -69,7 +73,7 @@ func TestProcessJob(t *testing.T) {
jm := &JobManager{}

jm.AddJobProcessor(&idJobProcessor{})
jm.AddJobProcessor(&panicProcessor{})
jm.AddJobProcessor(&panicProcessor{ errorString: "panic" })

// prep model
job := &model.Job{
Expand All @@ -87,6 +91,26 @@ func TestProcessJob(t *testing.T) {
assert.ErrorContains(t, err, "panic")
}

func TestProcessJobContinuesIfNoDataAvailable(t *testing.T) {
// prep test object
jm := &JobManager{}

proc := panicProcessor{ errorString: "No data available"}
jm.AddJobProcessor(&proc)
jm.AddJobProcessor(&proc)

// prep model
job := &model.Job{
Id: 101,
}

// test
_, err := jm.ProcessJob(job)

assert.Equal(t, 2, proc.processCount)
assert.ErrorContains(t, err, "No data available")
}

func TestUpdateDataEpoch(t *testing.T) {
// prep test object
jm := &JobManager{
Expand Down
58 changes: 55 additions & 3 deletions agent/modules/suriquery/suriquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package suriquery

import (
"errors"
"fmt"
"io"
"os"
"path"
Expand All @@ -19,6 +20,7 @@ import (

"github.com/apex/log"
"github.com/google/gopacket"
"github.com/pierrec/lz4/v4"
"github.com/security-onion-solutions/securityonion-soc/agent"
"github.com/security-onion-solutions/securityonion-soc/model"
"github.com/security-onion-solutions/securityonion-soc/module"
Expand All @@ -30,6 +32,7 @@ const DEFAULT_EPOCH_REFRESH_MS = 120000
const DEFAULT_DATA_LAG_MS = 120000
const DEFAULT_PCAP_MAX_COUNT = 999999

const SURI_LZ4_SUFFIX = ".lz4"
const SURI_PCAP_PREFIX = "so-pcap."

type SuriQuery struct {
Expand Down Expand Up @@ -126,17 +129,65 @@ func (suri *SuriQuery) CleanupJob(job *model.Job) {
// Noop
}

func (suri *SuriQuery) decompress(path string) (string, error) {
decompressedPath := strings.TrimSuffix(path, SURI_LZ4_SUFFIX)
if decompressedPath != path {
inputReader, oerr := os.Open(path)
if oerr != nil {
return "", oerr
}
defer inputReader.Close()

outputWriter, cerr := os.Create(decompressedPath)
if cerr != nil {
return "", cerr
}
defer outputWriter.Close()

lz4Reader := lz4.NewReader(inputReader)
count, copyErr := io.Copy(outputWriter, lz4Reader)
if copyErr != nil {
if strings.Contains(fmt.Sprint(copyErr), "unexpected EOF") {
log.WithFields(log.Fields {
"decompressedPath": decompressedPath,
}).Debug("ignoring EOF error since the filestream is likely still active")
} else {
return "", copyErr
}
}
log.WithFields(log.Fields {
"pcapPath": path,
"decompressedPath": decompressedPath,
"decompressedBytes": count,
}).Debug("Decompressed lz4 PCAP file")
}
return decompressedPath, nil
}

func (suri *SuriQuery) streamPacketsInPcaps(paths []string, filter *model.Filter) (io.ReadCloser, error) {
allPackets := make([]gopacket.Packet, 0, 0)

for _, path := range paths {
packets, perr := packet.ParseRawPcap(path, suri.pcapMaxCount, filter)
decompressedPath, derr := suri.decompress(path)
if derr != nil {
log.WithError(derr).WithField("pcapPath", path).Error("Failed to decompress PCAP file")
continue
}

packets, perr := packet.ParseRawPcap(decompressedPath, suri.pcapMaxCount, filter)
if perr != nil {
log.WithError(perr).WithField("pcapPath", path).Error("Failed to parse PCAP file")
log.WithError(perr).WithField("pcapPath", decompressedPath).Error("Failed to parse PCAP file")
}
if packets != nil && len(packets) > 0 {
allPackets = append(allPackets, packets...)
}

if path != decompressedPath {
rerr := os.Remove(decompressedPath)
if rerr != nil {
log.WithError(rerr).WithField("pcapPath", decompressedPath).Error("Failed to remove decompressed PCAP file")
}
}
}

slices.SortFunc(allPackets, func(a, b gopacket.Packet) int {
Expand All @@ -155,7 +206,8 @@ func (suri *SuriQuery) getPcapCreateTime(filepath string) (time.Time, error) {
if !strings.HasPrefix(filename, SURI_PCAP_PREFIX) {
err = errors.New("unsupported pcap file")
} else {
secondsStr := strings.TrimLeft(filename, SURI_PCAP_PREFIX)
secondsStr := strings.TrimRight(filename, SURI_LZ4_SUFFIX)
secondsStr = strings.TrimLeft(secondsStr, SURI_PCAP_PREFIX)
var seconds int64
seconds, err = strconv.ParseInt(secondsStr, 10, 64)
if err == nil {
Expand Down
26 changes: 24 additions & 2 deletions agent/modules/suriquery/suriquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package suriquery

import (
"os"
"testing"
"time"

Expand Down Expand Up @@ -43,8 +44,29 @@ func TestFindFilesExcludesMalformedNamesAndImpossibleStartTimes(tester *testing.
start, _ := time.Parse(time.RFC3339, "2024-02-05T00:00:00Z")
stop, _ := time.Parse(time.RFC3339, "2099-02-06T00:00:00Z")
files := sq.findFilesInTimeRange(start, stop)
assert.Len(tester, files, 1)
assert.Equal(tester, files[0], "test_resources/3/so-pcap.1575817346")
assert.Len(tester, files, 2)
assert.Equal(tester, files[0], "test_resources/1/so-pcap.1575817346.lz4")
assert.Equal(tester, files[1], "test_resources/3/so-pcap.1575817346")
}

func TestDecompress(tester *testing.T) {
decompressedFilename := "test_resources/1/so-pcap.1575817346"
compressedFilename := decompressedFilename + SURI_LZ4_SUFFIX

sq := initTest()
defer os.Remove(decompressedFilename)

// Ensure decompressed file does not exist
_, statErrBefore := os.Stat(decompressedFilename)
assert.Error(tester, statErrBefore, os.ErrNotExist)
newPath, err := sq.decompress(compressedFilename)
assert.Nil(tester, err)
assert.Equal(tester, decompressedFilename, newPath)

// Ensure decompressed file does exist
stats, statErrAfter := os.Stat(decompressedFilename)
assert.Nil(tester, err, statErrAfter)
assert.Equal(tester, int64(14918), stats.Size())
}

func TestGetPcapCreateTime(tester *testing.T) {
Expand Down
Binary file not shown.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (

require (
github.com/go-git/go-git/v5 v5.11.0
github.com/pierrec/lz4/v4 v4.1.21
github.com/pkg/errors v0.9.1
github.com/samber/lo v1.39.0
github.com/tj/assert v0.0.3
Expand All @@ -44,6 +45,7 @@ require (
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
github.com/kevinburke/ssh_config v1.2.0 // indirect
github.com/oapi-codegen/runtime v1.0.0 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pjbgf/sha1cd v0.3.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/sergi/go-diff v1.1.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI=
github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M=
github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pjbgf/sha1cd v0.3.0 h1:4D5XXmUUBUl/xQ6IjCkEAbqXskkq/4O7LmGn0AqMDs4=
github.com/pjbgf/sha1cd v0.3.0/go.mod h1:nZ1rrWOcGJ5uZgEEVL1VUM9iRQiZvWdbZjkKyFzPPsI=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down

0 comments on commit f438394

Please sign in to comment.