Skip to content

Commit

Permalink
Merge pull request #336 from Security-Onion-Solutions/jertel/suri
Browse files Browse the repository at this point in the history
redo the suri module with native pcap extraction; improve local dev
  • Loading branch information
jertel authored Feb 8, 2024
2 parents e8f09a7 + 2fe8931 commit 984ab77
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 143 deletions.
164 changes: 93 additions & 71 deletions agent/modules/suriquery/suriquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,41 +7,41 @@
package suriquery

import (
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path"
"path/filepath"
"slices"
"strconv"
"strings"
"time"

"github.com/apex/log"
"github.com/kennygrant/sanitize"
"github.com/google/gopacket"
"github.com/security-onion-solutions/securityonion-soc/agent"
"github.com/security-onion-solutions/securityonion-soc/model"
"github.com/security-onion-solutions/securityonion-soc/module"
"github.com/security-onion-solutions/securityonion-soc/packet"
)

const DEFAULT_EXECUTABLE_PATH = "suriquery.sh"
const DEFAULT_PCAP_OUTPUT_PATH = "/nsm/pcapout"
const DEFAULT_PCAP_INPUT_PATH = "/nsm/pcap"
const DEFAULT_PCAP_INPUT_PATH = "/nsm/suripcap"
const DEFAULT_EPOCH_REFRESH_MS = 120000
const DEFAULT_TIMEOUT_MS = 1200000
const DEFAULT_DATA_LAG_MS = 120000
const DEFAULT_PCAP_MAX_COUNT = 999999

const SURI_PCAP_PREFIX = "so-pcap."

type SuriQuery struct {
config module.ModuleConfig
executablePath string
pcapOutputPath string
pcapInputPath string
agent *agent.Agent
epochTimeTmp time.Time
epochTime time.Time
epochRefreshTime time.Time
epochRefreshMs int
timeoutMs int
dataLagMs int
pcapMaxCount int
}

func NewSuriQuery(agt *agent.Agent) *SuriQuery {
Expand All @@ -57,12 +57,10 @@ func (lag *SuriQuery) PrerequisiteModules() []string {
func (suri *SuriQuery) Init(cfg module.ModuleConfig) error {
var err error
suri.config = cfg
suri.executablePath = module.GetStringDefault(cfg, "executablePath", DEFAULT_EXECUTABLE_PATH)
suri.pcapOutputPath = module.GetStringDefault(cfg, "pcapOutputPath", DEFAULT_PCAP_OUTPUT_PATH)
suri.pcapInputPath = module.GetStringDefault(cfg, "pcapInputPath", DEFAULT_PCAP_INPUT_PATH)
suri.epochRefreshMs = module.GetIntDefault(cfg, "epochRefreshMs", DEFAULT_EPOCH_REFRESH_MS)
suri.timeoutMs = module.GetIntDefault(cfg, "timeoutMs", DEFAULT_TIMEOUT_MS)
suri.dataLagMs = module.GetIntDefault(cfg, "dataLagMs", DEFAULT_DATA_LAG_MS)
suri.pcapMaxCount = module.GetIntDefault(cfg, "pcapMaxCount", DEFAULT_PCAP_MAX_COUNT)
if suri.agent == nil {
err = errors.New("Unable to invoke JobMgr.AddJobProcessor due to nil agent")
} else {
Expand Down Expand Up @@ -91,8 +89,8 @@ func (suri *SuriQuery) ProcessJob(job *model.Job, reader io.ReadCloser) (io.Read
var err error
if job.GetKind() != "pcap" {
log.WithFields(log.Fields{
"jobId": job.Id,
"kind": job.GetKind(),
"jobId": job.Id,
"jobKind": job.GetKind(),
}).Debug("Skipping suri processor due to unsupported job")
return reader, nil
}
Expand All @@ -112,73 +110,90 @@ func (suri *SuriQuery) ProcessJob(job *model.Job, reader io.ReadCloser) (io.Read
}).Info("Skipping suri processor due to date range conflict")
err = errors.New("No data available for the requested dates")
} else {
job.FileExtension = "pcap"

query := suri.CreateQuery(job)

pcapFilepath := fmt.Sprintf("%s/%d.%s", suri.pcapOutputPath, job.Id, job.FileExtension)

log.WithField("jobId", job.Id).Info("Processing pcap export for job")

ctx, cancel := context.WithTimeout(context.Background(), time.Duration(suri.timeoutMs)*time.Millisecond)
defer cancel()
beginTime := job.Filter.BeginTime.Format(time.RFC3339)
endTime := job.Filter.EndTime.Format(time.RFC3339)

cmd := exec.CommandContext(ctx, suri.executablePath, pcapFilepath, beginTime, endTime, query)
var output []byte
output, err = cmd.CombinedOutput()
log.WithFields(log.Fields{
"executablePath": suri.executablePath,
"query": query,
"output": string(output),
"pcapFilepath": pcapFilepath,
"err": err,
}).Debug("Executed suriread")
if err == nil {
var file *os.File
file, err = os.Open(pcapFilepath)
if err == nil {
reader = file
}
}
"jobId": job.Id,
}).Debug("Starting to process new Suricata PCAP job")
pcapFiles := suri.findFilesInTimeRange(job.Filter.BeginTime, job.Filter.EndTime)
reader, err = suri.streamPacketsInPcaps(pcapFiles, job.Filter)
log.WithFields(log.Fields{
"err": err,
}).Debug("Finished processing PCAP")
}
return reader, err
}

func (suri *SuriQuery) CleanupJob(job *model.Job) {
pcapOutputFilepath := fmt.Sprintf("%s/%d.%s", suri.pcapOutputPath, job.Id, sanitize.Name(job.FileExtension))
os.Remove(pcapOutputFilepath)
// Noop
}

func add(query string, added string) string {
if len(query) > 0 {
query = query + " and "
}
return query + added
}
func (suri *SuriQuery) streamPacketsInPcaps(paths []string, filter *model.Filter) (io.ReadCloser, error) {
allPackets := make([]gopacket.Packet, 0, 0)

func (suri *SuriQuery) CreateQuery(job *model.Job) string {
for _, path := range paths {
packets, perr := packet.ParseRawPcap(path, suri.pcapMaxCount, filter)
if perr != nil {
log.WithError(perr).WithField("pcapPath", path).Error("Failed to parse PCAP file")
}
if packets != nil && len(packets) > 0 {
allPackets = append(allPackets, packets...)
}
}

query := ""
slices.SortFunc(allPackets, func(a, b gopacket.Packet) int {
return a.Metadata().Timestamp.Compare(b.Metadata().Timestamp)
})

if len(job.Filter.SrcIp) > 0 {
query = add(query, fmt.Sprintf("host %s", job.Filter.SrcIp))
}
log.WithField("matchedCount", len(allPackets)).Debug("Finished filtering eligible packets")

if len(job.Filter.DstIp) > 0 {
query = add(query, fmt.Sprintf("host %s", job.Filter.DstIp))
}
return packet.ToStream(allPackets)
}

if job.Filter.SrcPort > 0 {
query = add(query, fmt.Sprintf("port %d", job.Filter.SrcPort))
func (suri *SuriQuery) getPcapCreateTime(filepath string) (time.Time, error) {
var createTime time.Time
var err error
filename := path.Base(filepath)
if !strings.HasPrefix(filename, SURI_PCAP_PREFIX) {
err = errors.New("unsupported pcap file")
} else {
secondsStr := strings.TrimLeft(filename, SURI_PCAP_PREFIX)
var seconds int64
seconds, err = strconv.ParseInt(secondsStr, 10, 64)
if err == nil {
createTime = time.Unix(seconds, 0).UTC()
}
}
return createTime, err
}

if job.Filter.DstPort > 0 {
query = add(query, fmt.Sprintf("port %d", job.Filter.DstPort))
func (suri *SuriQuery) findFilesInTimeRange(start time.Time, stop time.Time) []string {
eligibleFiles := make([]string, 0, 0)
err := filepath.Walk(suri.pcapInputPath, func(filepath string, fileinfo os.FileInfo, err error) error {
createTime, err := suri.getPcapCreateTime(filepath)
if err != nil {
log.WithField("pcapPath", filepath).WithError(err).Warn("PCAP file does not conform to expected format")
return nil
}
modTime := fileinfo.ModTime()
log.WithFields(log.Fields{
"pcapPath": filepath,
"createTime": createTime,
"modTime": modTime,
}).Debug("Reviewing eligibility for PCAP file")

// file was created before the time range but has still open when time range started.
if (createTime.Before(start) && modTime.After(start)) ||
// file was created and finished in between time range start and stop times
(createTime.After(start) && createTime.Before(modTime) && modTime.Before(stop)) ||
// file was created before the end of the time range but was still being written to after the time range stop time
(createTime.Before(stop) && modTime.After(stop)) {
eligibleFiles = append(eligibleFiles, filepath)
}
return nil
})
if err != nil {
log.WithError(err).WithField("pcapInputPath", suri.pcapInputPath).Error("Unable to access path while locating PCAP files in time range")
}

return query
return eligibleFiles
}

func (suri *SuriQuery) GetDataEpoch() time.Time {
Expand All @@ -188,7 +203,7 @@ func (suri *SuriQuery) GetDataEpoch() time.Time {
suri.epochTimeTmp = now
err := filepath.Walk(suri.pcapInputPath, suri.updateEpochTimeTmp)
if err != nil {
log.WithError(err).WithField("pcapInputPath", suri.pcapInputPath)
log.WithError(err).WithField("pcapPath", suri.pcapInputPath)
} else {
suri.epochTime = suri.epochTimeTmp
}
Expand All @@ -199,11 +214,18 @@ func (suri *SuriQuery) GetDataEpoch() time.Time {

func (suri *SuriQuery) updateEpochTimeTmp(path string, info os.FileInfo, err error) error {
if err != nil {
log.WithError(err).WithField("path", path).Error("Unable to access path while updating epoch")
log.WithError(err).WithField("pcapPath", path).Error("Unable to access path while updating epoch")
return err
}
if !info.IsDir() && info.Size() > 0 && info.ModTime().Before(suri.epochTimeTmp) {
suri.epochTimeTmp = info.ModTime()
if !info.IsDir() && info.Size() > 0 {
createTime, err := suri.getPcapCreateTime(path)
if err != nil {
return err
}

if createTime.Before(suri.epochTimeTmp) {
suri.epochTimeTmp = createTime
}
}
return nil
}
95 changes: 62 additions & 33 deletions agent/modules/suriquery/suriquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,60 +7,89 @@
package suriquery

import (
"strconv"
"testing"
"time"

"github.com/security-onion-solutions/securityonion-soc/model"
"github.com/stretchr/testify/assert"
)

func initTest() *SuriQuery {
cfg := make(map[string]interface{})
cfg["pcapInputPath"] = "test_resources"
sq := NewSuriQuery(nil)
sq.Init(cfg)
return sq
}
func TestInitSuriQuery(tester *testing.T) {
cfg := make(map[string]interface{})
sq := NewSuriQuery(nil)
err := sq.Init(cfg)
assert.Error(tester, err)
assert.Equal(tester, DEFAULT_EXECUTABLE_PATH, sq.executablePath)
assert.Equal(tester, DEFAULT_PCAP_OUTPUT_PATH, sq.pcapOutputPath)
assert.Equal(tester, DEFAULT_PCAP_INPUT_PATH, sq.pcapInputPath)
assert.Equal(tester, DEFAULT_TIMEOUT_MS, sq.timeoutMs)
assert.Equal(tester, DEFAULT_EPOCH_REFRESH_MS, sq.epochRefreshMs)
assert.Equal(tester, DEFAULT_DATA_LAG_MS, sq.dataLagMs)
}

func TestDataLag(tester *testing.T) {
cfg := make(map[string]interface{})
sq := NewSuriQuery(nil)
sq.Init(cfg)
sq := initTest()
lagDate := sq.getDataLagDate()
assert.False(tester, lagDate.After(time.Now()), "expected data lag datetime to be before current datetime")
}

func TestCreateQuery(tester *testing.T) {
sq := NewSuriQuery(nil)
func TestFindFilesExcludesMalformedNamesAndImpossibleStartTimes(tester *testing.T) {
sq := initTest()

start, _ := time.Parse(time.RFC3339, "2024-02-05T00:00:00Z")
stop, _ := time.Parse(time.RFC3339, "2099-02-06T00:00:00Z")
files := sq.findFilesInTimeRange(start, stop)
assert.Len(tester, files, 1)
assert.Equal(tester, files[0], "test_resources/3/so-pcap.1575817346")
}

func TestGetPcapCreateTime(tester *testing.T) {
sq := initTest()

_, err := sq.getPcapCreateTime("/some/path/nonconforming.file")
assert.ErrorContains(tester, err, "unsupported pcap file")

_, err = sq.getPcapCreateTime("/some/path/so-pcap.file")
assert.ErrorContains(tester, err, "invalid syntax")

expectedTime, _ := time.Parse(time.RFC3339, "2019-12-08T15:02:26Z")
var created time.Time
created, err = sq.getPcapCreateTime("/some/path/so-pcap.1575817346")
assert.Nil(tester, err)
assert.Equal(tester, expectedTime, created)
}

func TestGetDataEpoch(tester *testing.T) {
sq := initTest()

epoch := sq.GetDataEpoch()
expectedTime, _ := time.Parse(time.RFC3339, "2019-12-08T15:02:26Z")
assert.Equal(tester, expectedTime, epoch)
}

func TestStreamPacketsInPcaps(tester *testing.T) {
sq := initTest()

paths := []string{"test_resources/3/so-pcap.1575817346"}
filter := model.NewFilter()
startTime, _ := time.Parse(time.RFC3339, "2019-12-08T00:00:00Z")
filter.BeginTime = startTime
endTime, _ := time.Parse(time.RFC3339, "2019-12-08T23:59:59Z")
filter.EndTime = endTime
filter.SrcIp = "185.47.63.113"
filter.SrcPort = 19
filter.DstIp = "176.126.243.198"
filter.DstPort = 34515

job := model.NewJob()
expectedQuery := ""
query := sq.CreateQuery(job)
assert.Equal(tester, expectedQuery, query)

job.Filter.SrcIp = "1.2.3.4"
query = sq.CreateQuery(job)
expectedQuery = expectedQuery + "host " + job.Filter.SrcIp
assert.Equal(tester, expectedQuery, query)

job.Filter.DstIp = "1.2.1.2"
query = sq.CreateQuery(job)
expectedQuery = expectedQuery + " and host " + job.Filter.DstIp
assert.Equal(tester, expectedQuery, query)

job.Filter.SrcPort = 123
query = sq.CreateQuery(job)
expectedQuery = expectedQuery + " and port " + strconv.Itoa(job.Filter.SrcPort)
assert.Equal(tester, expectedQuery, query)

job.Filter.DstPort = 123
query = sq.CreateQuery(job)
expectedQuery = expectedQuery + " and port " + strconv.Itoa(job.Filter.DstPort)
assert.Equal(tester, expectedQuery, query)
reader, err := sq.streamPacketsInPcaps(paths, filter)
assert.Nil(tester, err)
pcap_length := 14122 // correlates to so-pcap test file
bytes := make([]byte, 32768)
count, err := reader.Read(bytes)
assert.Nil(tester, err)
assert.Equal(tester, pcap_length, count)
}
Loading

0 comments on commit 984ab77

Please sign in to comment.