Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events refactor (#30) - cherry-pick #33

Merged
merged 2 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,19 @@ RUN go mod download

RUN make build-linux

# Vmlinux
FROM public.ecr.aws/amazonlinux/amazonlinux:2023 as vmlinuxbuilder
WORKDIR /vmlinuxbuilder
RUN yum update -y && \
yum install -y iproute procps-ng && \
yum install -y llvm clang make gcc && \
yum install -y kernel-devel elfutils-libelf-devel zlib-devel libbpf-devel bpftool && \
yum clean all
COPY . ./
RUN make vmlinuxh

# Build BPF
FROM public.ecr.aws/eks-distro-build-tooling/eks-distro-base:latest-al23 as bpfbuilder
FROM public.ecr.aws/amazonlinux/amazonlinux:2 as bpfbuilder
WORKDIR /bpfbuilder
RUN yum update -y && \
yum install -y iproute procps-ng && \
Expand All @@ -25,6 +36,7 @@ RUN yum update -y && \
yum clean all

COPY . ./
COPY --from=vmlinuxbuilder /vmlinuxbuilder/pkg/ebpf/c/vmlinux.h .
RUN make build-bpf

FROM public.ecr.aws/eks-distro-build-tooling/eks-distro-base:latest.2
Expand Down
4 changes: 2 additions & 2 deletions pkg/ebpf/bpf_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ func (l *bpfClient) detachIngressBPFProbe(hostVethName string) error {
l.logger.Info("Attempting to do an Ingress Detach")
var err error
err = goebpf.TCEgressDetach(hostVethName)
if err != nil && !utils.IsInvalidFilterListError(err.Error()) &&
if err != nil &&
!utils.IsMissingFilterError(err.Error()) {
l.logger.Info("Ingress Detach failed:", "error", err)
return err
Expand All @@ -548,7 +548,7 @@ func (l *bpfClient) detachEgressBPFProbe(hostVethName string) error {
l.logger.Info("Attempting to do an Egress Detach")
var err error
err = goebpf.TCIngressDetach(hostVethName)
if err != nil && !utils.IsInvalidFilterListError(err.Error()) &&
if err != nil &&
!utils.IsMissingFilterError(err.Error()) {
l.logger.Info("Ingress Detach failed:", "error", err)
return err
Expand Down
252 changes: 92 additions & 160 deletions pkg/ebpf/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"os"
"strconv"
"sync"
"time"

"github.com/aws/aws-network-policy-agent/pkg/aws"
Expand All @@ -32,7 +31,7 @@ var (
NON_EKS_CW_PATH = "/aws/"
)

type Event_t struct {
type ringBufferDataV4_t struct {
SourceIP uint32
SourcePort uint32
DestIP uint32
Expand All @@ -41,7 +40,7 @@ type Event_t struct {
Verdict uint32
}

type EventV6_t struct {
type ringBufferDataV6_t struct {
SourceIP [16]byte
SourcePort uint32
DestIP [16]byte
Expand All @@ -50,10 +49,6 @@ type EventV6_t struct {
Verdict uint32
}

type EvProgram struct {
wg sync.WaitGroup
}

func ConfigurePolicyEventsLogging(logger logr.Logger, enableCloudWatchLogs bool, mapFD int, enableIPv6 bool) error {
// Enable logging and setup ring buffer
if mapFD <= 0 {
Expand All @@ -69,8 +64,7 @@ func ConfigurePolicyEventsLogging(logger logr.Logger, enableCloudWatchLogs bool,
return err
} else {
logger.Info("Configure Event loop ... ")
p := EvProgram{wg: sync.WaitGroup{}}
p.capturePolicyEvents(eventChanList[mapFD], logger, enableCloudWatchLogs, enableIPv6)
capturePolicyEvents(eventChanList[mapFD], logger, enableCloudWatchLogs, enableIPv6)
if enableCloudWatchLogs {
logger.Info("Cloudwatch log support is enabled")
err = setupCW(logger)
Expand Down Expand Up @@ -112,182 +106,120 @@ func setupCW(logger logr.Logger) error {
return nil
}

func (p *EvProgram) capturePolicyV6Events(events <-chan []byte, log logr.Logger, enableCloudWatchLogs bool) {
nodeName := os.Getenv("MY_NODE_NAME")
go func(events <-chan []byte) {
defer p.wg.Done()

for {
if b, ok := <-events; ok {
var logQueue []*cloudwatchlogs.InputLogEvent
func getProtocol(protocolNum int) string {
protocolStr := "UNKNOWN"
if protocolNum == utils.TCP_PROTOCOL_NUMBER {
protocolStr = "TCP"
} else if protocolNum == utils.UDP_PROTOCOL_NUMBER {
protocolStr = "UDP"
} else if protocolNum == utils.SCTP_PROTOCOL_NUMBER {
protocolStr = "SCTP"
} else if protocolNum == utils.ICMP_PROTOCOL_NUMBER {
protocolStr = "ICMP"
}
return protocolStr
}

var ev EventV6_t
buf := bytes.NewBuffer(b)
if err := binary.Read(buf, binary.LittleEndian, &ev); err != nil {
log.Info("Read Ring buf", "Failed ", err)
continue
}
func getVerdict(verdict int) string {
verdictStr := "DENY"
if verdict == utils.ACCEPT.Index() {
verdictStr = "ACCEPT"
} else if verdict == utils.EXPIRED_DELETED.Index() {
verdictStr = "EXPIRED/DELETED"
}
return verdictStr
}

protocol := "UNKNOWN"
if int(ev.Protocol) == utils.TCP_PROTOCOL_NUMBER {
protocol = "TCP"
} else if int(ev.Protocol) == utils.UDP_PROTOCOL_NUMBER {
protocol = "UDP"
} else if int(ev.Protocol) == utils.SCTP_PROTOCOL_NUMBER {
protocol = "SCTP"
} else if int(ev.Protocol) == utils.ICMP_PROTOCOL_NUMBER {
protocol = "ICMP"
}
func publishDataToCloudwatch(logQueue []*cloudwatchlogs.InputLogEvent, message string, log logr.Logger) bool {
logQueue = append(logQueue, &cloudwatchlogs.InputLogEvent{
Message: &message,
Timestamp: awssdk.Int64(time.Now().UnixNano() / int64(time.Millisecond)),
})
if len(logQueue) > 0 {
log.Info("Sending logs to CW")
input := cloudwatchlogs.PutLogEventsInput{
LogEvents: logQueue,
LogGroupName: &logGroupName,
}

verdict := "DENY"
if ev.Verdict == 1 {
verdict = "ACCEPT"
} else if ev.Verdict == 2 {
verdict = "EXPIRED/DELETED"
}
if sequenceToken == "" {
err := createLogStream()
if err != nil {
log.Info("Failed to create log stream")
panic(err)
}
} else {
input = *input.SetSequenceToken(sequenceToken)
}

log.Info("Flow Info: ", "Src IP", utils.ConvByteToIPv6(ev.SourceIP).String(), "Src Port", ev.SourcePort,
"Dest IP", utils.ConvByteToIPv6(ev.DestIP).String(), "Dest Port", ev.DestPort,
"Proto", protocol, "Verdict", verdict)
input = *input.SetLogStreamName(logStreamName)

message := "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteToIPv6(ev.SourceIP).String() + ";" + "SPORT: " + strconv.Itoa(int(ev.SourcePort)) + ";" + "DIP: " + utils.ConvByteToIPv6(ev.DestIP).String() + ";" + "DPORT: " + strconv.Itoa(int(ev.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
resp, err := cwl.PutLogEvents(&input)
if err != nil {
log.Info("Push log events", "Failed ", err)
}

if enableCloudWatchLogs {
logQueue = append(logQueue, &cloudwatchlogs.InputLogEvent{
Message: &message,
Timestamp: awssdk.Int64(time.Now().UnixNano() / int64(time.Millisecond)),
})
if len(logQueue) > 0 {
log.Info("Sending CW")
input := cloudwatchlogs.PutLogEventsInput{
LogEvents: logQueue,
LogGroupName: &logGroupName,
}

if sequenceToken == "" {
err := createLogStream()
if err != nil {
log.Info("Failed to create log stream")
panic(err)
}
} else {
input = *input.SetSequenceToken(sequenceToken)
}

input = *input.SetLogStreamName(logStreamName)

resp, err := cwl.PutLogEvents(&input)
if err != nil {
log.Info("Kprobe", "Failed ", err)
}

if resp != nil {
sequenceToken = *resp.NextSequenceToken
}

logQueue = []*cloudwatchlogs.InputLogEvent{}
} else {
break
}
}
}
if resp != nil {
sequenceToken = *resp.NextSequenceToken
}
}(events)

logQueue = []*cloudwatchlogs.InputLogEvent{}
return false
}
return true
}

func (p *EvProgram) capturePolicyV4Events(events <-chan []byte, log logr.Logger, enableCloudWatchLogs bool) {
func capturePolicyEvents(ringbufferdata <-chan []byte, log logr.Logger, enableCloudWatchLogs bool, enableIPv6 bool) {
nodeName := os.Getenv("MY_NODE_NAME")
go func(events <-chan []byte) {
defer p.wg.Done()

// Read from ringbuffer channel, perf buffer support is not there and 5.10 kernel is needed.
go func(ringbufferdata <-chan []byte) {
done := false
for {
if b, ok := <-events; ok {
if record, ok := <-ringbufferdata; ok {
var logQueue []*cloudwatchlogs.InputLogEvent
var message string
if enableIPv6 {
var rb ringBufferDataV6_t
buf := bytes.NewBuffer(record)
if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil {
log.Info("Failed to read from Ring buf", err)
continue
}

var ev Event_t
buf := bytes.NewBuffer(b)
if err := binary.Read(buf, binary.LittleEndian, &ev); err != nil {
log.Info("Read Ring buf", "Failed ", err)
continue
}
protocol := getProtocol(int(rb.Protocol))
verdict := getVerdict(int(rb.Verdict))

protocol := "UNKNOWN"
if int(ev.Protocol) == utils.TCP_PROTOCOL_NUMBER {
protocol = "TCP"
} else if int(ev.Protocol) == utils.UDP_PROTOCOL_NUMBER {
protocol = "UDP"
} else if int(ev.Protocol) == utils.SCTP_PROTOCOL_NUMBER {
protocol = "SCTP"
} else if int(ev.Protocol) == utils.ICMP_PROTOCOL_NUMBER {
protocol = "ICMP"
}
log.Info("Flow Info: ", "Src IP", utils.ConvByteToIPv6(rb.SourceIP).String(), "Src Port", rb.SourcePort,
"Dest IP", utils.ConvByteToIPv6(rb.DestIP).String(), "Dest Port", rb.DestPort,
"Proto", protocol, "Verdict", verdict)

verdict := "DENY"
if ev.Verdict == 1 {
verdict = "ACCEPT"
} else if ev.Verdict == 2 {
verdict = "EXPIRED/DELETED"
}
message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteToIPv6(rb.SourceIP).String() + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteToIPv6(rb.DestIP).String() + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
} else {
var rb ringBufferDataV4_t
buf := bytes.NewBuffer(record)
if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil {
log.Info("Failed to read from Ring buf", err)
continue
}
protocol := getProtocol(int(rb.Protocol))
verdict := getVerdict(int(rb.Verdict))

log.Info("Flow Info: ", "Src IP", utils.ConvByteArrayToIP(ev.SourceIP), "Src Port", ev.SourcePort,
"Dest IP", utils.ConvByteArrayToIP(ev.DestIP), "Dest Port", ev.DestPort,
"Proto", protocol, "Verdict", verdict)
log.Info("Flow Info: ", "Src IP", utils.ConvByteArrayToIP(rb.SourceIP), "Src Port", rb.SourcePort,
"Dest IP", utils.ConvByteArrayToIP(rb.DestIP), "Dest Port", rb.DestPort,
"Proto", protocol, "Verdict", verdict)

message := "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteArrayToIP(ev.SourceIP) + ";" + "SPORT: " + strconv.Itoa(int(ev.SourcePort)) + ";" + "DIP: " + utils.ConvByteArrayToIP(ev.DestIP) + ";" + "DPORT: " + strconv.Itoa(int(ev.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteArrayToIP(rb.SourceIP) + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteArrayToIP(rb.DestIP) + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
}

if enableCloudWatchLogs {
logQueue = append(logQueue, &cloudwatchlogs.InputLogEvent{
Message: &message,
Timestamp: awssdk.Int64(time.Now().UnixNano() / int64(time.Millisecond)),
})
if len(logQueue) > 0 {
log.Info("Sending CW")
input := cloudwatchlogs.PutLogEventsInput{
LogEvents: logQueue,
LogGroupName: &logGroupName,
}

if sequenceToken == "" {
err := createLogStream()
if err != nil {
log.Info("Failed to create log stream")
panic(err)
}
} else {
input = *input.SetSequenceToken(sequenceToken)
}

input = *input.SetLogStreamName(logStreamName)

resp, err := cwl.PutLogEvents(&input)
if err != nil {
log.Info("Kprobe", "Failed ", err)
}

if resp != nil {
sequenceToken = *resp.NextSequenceToken
}

logQueue = []*cloudwatchlogs.InputLogEvent{}
} else {
done = publishDataToCloudwatch(logQueue, message, log)
if done {
break
}
}
}
}
}(events)
}

func (p *EvProgram) capturePolicyEvents(events <-chan []byte, log logr.Logger, enableCloudWatchLogs bool,
enableIPv6 bool) {
p.wg.Add(1)

if enableIPv6 {
p.capturePolicyV6Events(events, log, enableCloudWatchLogs)
} else {
p.capturePolicyV4Events(events, log, enableCloudWatchLogs)
}
}(ringbufferdata)
}

func ensureLogGroupExists(name string) error {
Expand Down
Loading