Skip to content

Commit

Permalink
NETOBSERV-934: Add support for SCTP, ICMPv4/v6 protocols to ebpf agent (
Browse files Browse the repository at this point in the history
#103)

* Add support for SCTP, ICMP amd ICMPv6 protocols to ebpf code

Signed-off-by: Mohamed Mahmoud <mmahmoud@redhat.com>

* update GRPC protobuf definition to include icmp fields

Signed-off-by: msherif1234 <mmahmoud@redhat.com>

* Add ICMP and ICMPv6 ipfix support

Signed-off-by: msherif1234 <mmahmoud@redhat.com>

* Add ICMPv4/6 ebpf agent support

Signed-off-by: msherif1234 <mmahmoud@redhat.com>

* Update unit-test cases

Signed-off-by: msherif1234 <mmahmoud@redhat.com>

* Add verifier error check to catch JIT errors

Signed-off-by: msherif1234 <mmahmoud@redhat.com>

* update flowlogs dump collector tool to include ICMP

Signed-off-by: msherif1234 <mmahmoud@redhat.com>

---------

Signed-off-by: Mohamed Mahmoud <mmahmoud@redhat.com>
Signed-off-by: msherif1234 <mmahmoud@redhat.com>
  • Loading branch information
msherif1234 authored Mar 20, 2023
1 parent 8eb86a9 commit c62173a
Show file tree
Hide file tree
Showing 15 changed files with 287 additions and 89 deletions.
3 changes: 3 additions & 0 deletions bpf/flow.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ typedef struct flow_id_t {
u16 src_port;
u16 dst_port;
u8 transport_protocol;
// ICMP protocol
u8 icmp_type;
u8 icmp_code;
// OS interface index
u32 if_index;
} __attribute__((packed)) flow_id;
Expand Down
137 changes: 93 additions & 44 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
until an entry is available.
4) When hash collision is detected, we send the new entry to userpace via ringbuffer.
*/

#include <linux/bpf.h>
#include <linux/in.h>
#include <linux/if_packet.h>
Expand Down Expand Up @@ -55,6 +54,15 @@
#define FIN_ACK_FLAG 0x200
#define RST_ACK_FLAG 0x400

// SCTP protocol header structure, its defined here because its not
// exported by the kernel headers like other protocols.
struct sctphdr {
__be16 source;
__be16 dest;
__be32 vtag;
__le32 checksum;
};

// Common Ringbuffer as a conduit for ingress/egress flows to userspace
struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
Expand Down Expand Up @@ -104,71 +112,111 @@ static inline void set_flags(struct tcphdr *th, u16 *flags) {
*flags |= CWR_FLAG;
}
}
// sets flow fields from IPv4 header information
static inline int fill_iphdr(struct iphdr *ip, void *data_end, flow_id *id, u16 *flags) {
if ((void *)ip + sizeof(*ip) > data_end) {
return DISCARD;
}

__builtin_memcpy(id->src_ip, ip4in6, sizeof(ip4in6));
__builtin_memcpy(id->dst_ip, ip4in6, sizeof(ip4in6));
__builtin_memcpy(id->src_ip + sizeof(ip4in6), &ip->saddr, sizeof(ip->saddr));
__builtin_memcpy(id->dst_ip + sizeof(ip4in6), &ip->daddr, sizeof(ip->daddr));
id->transport_protocol = ip->protocol;
id->src_port = 0;
id->dst_port = 0;
switch (ip->protocol) {
// L4_info structure contains L4 headers parsed information.
struct l4_info_t {
// TCP/UDP/SCTP source port in host byte order
u16 src_port;
// TCP/UDP/SCTP destination port in host byte order
u16 dst_port;
// ICMPv4/ICMPv6 type value
u8 icmp_type;
// ICMPv4/ICMPv6 code value
u8 icmp_code;
// TCP flags
u16 flags;
};

// Extract L4 info for the supported protocols
static inline void fill_l4info(void *l4_hdr_start, void *data_end, u8 protocol,
struct l4_info_t *l4_info) {
switch (protocol) {
case IPPROTO_TCP: {
struct tcphdr *tcp = (void *)ip + sizeof(*ip);
struct tcphdr *tcp = l4_hdr_start;
if ((void *)tcp + sizeof(*tcp) <= data_end) {
id->src_port = __bpf_ntohs(tcp->source);
id->dst_port = __bpf_ntohs(tcp->dest);
set_flags(tcp, flags);
l4_info->src_port = __bpf_ntohs(tcp->source);
l4_info->dst_port = __bpf_ntohs(tcp->dest);
set_flags(tcp, &l4_info->flags);
}
} break;
case IPPROTO_UDP: {
struct udphdr *udp = (void *)ip + sizeof(*ip);
struct udphdr *udp = l4_hdr_start;
if ((void *)udp + sizeof(*udp) <= data_end) {
id->src_port = __bpf_ntohs(udp->source);
id->dst_port = __bpf_ntohs(udp->dest);
l4_info->src_port = __bpf_ntohs(udp->source);
l4_info->dst_port = __bpf_ntohs(udp->dest);
}
} break;
case IPPROTO_SCTP: {
struct sctphdr *sctph = l4_hdr_start;
if ((void *)sctph + sizeof(*sctph) <= data_end) {
l4_info->src_port = __bpf_ntohs(sctph->source);
l4_info->dst_port = __bpf_ntohs(sctph->dest);
}
} break;
case IPPROTO_ICMP: {
struct icmphdr *icmph = l4_hdr_start;
if ((void *)icmph + sizeof(*icmph) <= data_end) {
l4_info->icmp_type = icmph->type;
l4_info->icmp_code = icmph->code;
}
} break;
case IPPROTO_ICMPV6: {
struct icmp6hdr *icmp6h = l4_hdr_start;
if ((void *)icmp6h + sizeof(*icmp6h) <= data_end) {
l4_info->icmp_type = icmp6h->icmp6_type;
l4_info->icmp_code = icmp6h->icmp6_code;
}
} break;
default:
break;
}
}

// sets flow fields from IPv4 header information
static inline int fill_iphdr(struct iphdr *ip, void *data_end, flow_id *id, u16 *flags) {
struct l4_info_t l4_info;
void *l4_hdr_start;

l4_hdr_start = (void *)ip + sizeof(*ip);
if (l4_hdr_start > data_end) {
return DISCARD;
}
__builtin_memset(&l4_info, 0, sizeof(l4_info));
__builtin_memcpy(id->src_ip, ip4in6, sizeof(ip4in6));
__builtin_memcpy(id->dst_ip, ip4in6, sizeof(ip4in6));
__builtin_memcpy(id->src_ip + sizeof(ip4in6), &ip->saddr, sizeof(ip->saddr));
__builtin_memcpy(id->dst_ip + sizeof(ip4in6), &ip->daddr, sizeof(ip->daddr));
id->transport_protocol = ip->protocol;
fill_l4info(l4_hdr_start, data_end, ip->protocol, &l4_info);
id->src_port = l4_info.src_port;
id->dst_port = l4_info.dst_port;
id->icmp_type = l4_info.icmp_type;
id->icmp_code = l4_info.icmp_code;
*flags = l4_info.flags;

return SUBMIT;
}

// sets flow fields from IPv6 header information
static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, flow_id *id, u16 *flags) {
if ((void *)ip + sizeof(*ip) > data_end) {
struct l4_info_t l4_info;
void *l4_hdr_start;

l4_hdr_start = (void *)ip + sizeof(*ip);
if (l4_hdr_start > data_end) {
return DISCARD;
}

__builtin_memset(&l4_info, 0, sizeof(l4_info));
__builtin_memcpy(id->src_ip, ip->saddr.in6_u.u6_addr8, 16);
__builtin_memcpy(id->dst_ip, ip->daddr.in6_u.u6_addr8, 16);
id->transport_protocol = ip->nexthdr;
id->src_port = 0;
id->dst_port = 0;
switch (ip->nexthdr) {
case IPPROTO_TCP: {
struct tcphdr *tcp = (void *)ip + sizeof(*ip);
if ((void *)tcp + sizeof(*tcp) <= data_end) {
id->src_port = __bpf_ntohs(tcp->source);
id->dst_port = __bpf_ntohs(tcp->dest);
set_flags(tcp, flags);
}
} break;
case IPPROTO_UDP: {
struct udphdr *udp = (void *)ip + sizeof(*ip);
if ((void *)udp + sizeof(*udp) <= data_end) {
id->src_port = __bpf_ntohs(udp->source);
id->dst_port = __bpf_ntohs(udp->dest);
}
} break;
default:
break;
}
fill_l4info(l4_hdr_start, data_end, ip->nexthdr, &l4_info);
id->src_port = l4_info.src_port;
id->dst_port = l4_info.dst_port;
id->icmp_type = l4_info.icmp_type;
id->icmp_code = l4_info.icmp_code;
*flags = l4_info.flags;

return SUBMIT;
}
// sets flow fields from Ethernet header information
Expand Down Expand Up @@ -207,6 +255,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
void *data = (void *)(long)skb->data;

flow_id id;
__builtin_memset(&id, 0, sizeof(id));
u64 current_time = bpf_ktime_get_ns();
struct ethhdr *eth = data;
u16 flags = 0;
Expand Down
8 changes: 6 additions & 2 deletions examples/flowlogs-dump/server/flowlogs-dump-collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func main() {
for records := range receivedRecords {
for _, record := range records.Entries {
if record.EthProtocol == ipv6 {
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s dir:%d bytes:%d packets:%d flags:%d ends: %v\n",
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v\n",
ipProto[record.EthProtocol],
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
record.Interface,
Expand All @@ -81,14 +81,16 @@ func main() {
net.IP(record.Network.GetDstAddr().GetIpv6()).To16(),
record.Transport.DstPort,
protocolByNumber[record.Transport.Protocol],
record.Icmp.IcmpType,
record.Icmp.IcmpCode,
record.Direction,
record.Bytes,
record.Packets,
record.Flags,
record.TimeFlowEnd.AsTime().Local().Format("15:04:05.000000"),
)
} else {
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s dir:%d bytes:%d packets:%d flags:%d ends: %v\n",
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v\n",
ipProto[record.EthProtocol],
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
record.Interface,
Expand All @@ -97,6 +99,8 @@ func main() {
ipIntToNetIP(record.Network.GetDstAddr().GetIpv4()).String(),
record.Transport.DstPort,
protocolByNumber[record.Transport.Protocol],
record.Icmp.IcmpType,
record.Icmp.IcmpCode,
record.Direction,
record.Bytes,
record.Packets,
Expand Down
2 changes: 2 additions & 0 deletions pkg/ebpf/bpf_bpfeb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_bpfeb.o
Binary file not shown.
2 changes: 2 additions & 0 deletions pkg/ebpf/bpf_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_bpfel.o
Binary file not shown.
7 changes: 7 additions & 0 deletions pkg/ebpf/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io/fs"
"strings"

"github.com/cilium/ebpf"
"github.com/cilium/ebpf/ringbuf"
"github.com/cilium/ebpf/rlimit"
"github.com/netobserv/netobserv-ebpf-agent/pkg/ifaces"
Expand Down Expand Up @@ -72,6 +73,12 @@ func NewFlowFetcher(
return nil, fmt.Errorf("rewriting BPF constants definition: %w", err)
}
if err := spec.LoadAndAssign(&objects, nil); err != nil {
var ve *ebpf.VerifierError
if errors.As(err, &ve) {
// Using %+v will print the whole verifier error, not just the last
// few lines.
log.Infof("Verifier error: %+v", ve)
}
return nil, fmt.Errorf("loading and assigning BPF objects: %w", err)
}

Expand Down
20 changes: 20 additions & 0 deletions pkg/exporter/ipfix.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ func SendTemplateRecordv4(log *logrus.Entry, exporter *ipfixExporter.ExportingPr
if err != nil {
return 0, nil, err
}
err = addElementToTemplate(log, "icmpTypeIPv4", nil, &elements)
if err != nil {
return 0, nil, err
}
err = addElementToTemplate(log, "icmpCodeIPv4", nil, &elements)
if err != nil {
return 0, nil, err
}
err = AddRecordValuesToTemplate(log, &elements)
if err != nil {
return 0, nil, err
Expand Down Expand Up @@ -183,6 +191,14 @@ func SendTemplateRecordv6(log *logrus.Entry, exporter *ipfixExporter.ExportingPr
if err != nil {
return 0, nil, err
}
err = addElementToTemplate(log, "icmpTypeIPv6", nil, &elements)
if err != nil {
return 0, nil, err
}
err = addElementToTemplate(log, "icmpCodeIPv6", nil, &elements)
if err != nil {
return 0, nil, err
}
err = AddRecordValuesToTemplate(log, &elements)
if err != nil {
return 0, nil, err
Expand Down Expand Up @@ -299,6 +315,10 @@ func setIEValue(record *flow.Record, ieValPtr *entities.InfoElementWithValue) {
ieVal.SetUnsigned16Value(record.Id.SrcPort)
case "destinationTransportPort":
ieVal.SetUnsigned16Value(record.Id.DstPort)
case "icmpTypeIPv4", "icmpTypeIPv6":
ieVal.SetUnsigned8Value(record.Id.IcmpType)
case "icmpCodeIPv4", "icmpCodeIPv6":
ieVal.SetUnsigned8Value(record.Id.IcmpCode)
}
}
func setEntities(record *flow.Record, elements *[]entities.InfoElementWithValue) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/exporter/kafka_proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func TestProtoConversion(t *testing.T) {
record.Id.DstIp = IPAddrFromNetIP(net.ParseIP("127.3.2.1"))
record.Id.SrcPort = 4321
record.Id.DstPort = 1234
record.Id.IcmpType = 8
record.Id.TransportProtocol = 210
record.TimeFlowStart = time.Now().Add(-5 * time.Second)
record.TimeFlowEnd = time.Now()
Expand All @@ -58,6 +59,7 @@ func TestProtoConversion(t *testing.T) {
assert.EqualValues(t, 4321, r.Transport.SrcPort)
assert.EqualValues(t, 1234, r.Transport.DstPort)
assert.EqualValues(t, 210, r.Transport.Protocol)
assert.EqualValues(t, 8, r.Icmp.IcmpType)
assert.Equal(t, record.TimeFlowStart.UnixMilli(), r.TimeFlowStart.AsTime().UnixMilli())
assert.Equal(t, record.TimeFlowEnd.UnixMilli(), r.TimeFlowEnd.AsTime().UnixMilli())
assert.EqualValues(t, 789, r.Bytes)
Expand Down
8 changes: 8 additions & 0 deletions pkg/exporter/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func v4FlowToPB(fr *flow.Record) *pbflow.Record {
SrcPort: uint32(fr.Id.SrcPort),
DstPort: uint32(fr.Id.DstPort),
},
Icmp: &pbflow.Icmp{
IcmpType: uint32(fr.Id.IcmpType),
IcmpCode: uint32(fr.Id.IcmpCode),
},
Bytes: fr.Metrics.Bytes,
TimeFlowStart: &timestamppb.Timestamp{
Seconds: fr.TimeFlowStart.Unix(),
Expand Down Expand Up @@ -88,6 +92,10 @@ func v6FlowToPB(fr *flow.Record) *pbflow.Record {
SrcPort: uint32(fr.Id.SrcPort),
DstPort: uint32(fr.Id.DstPort),
},
Icmp: &pbflow.Icmp{
IcmpType: uint32(fr.Id.IcmpType),
IcmpCode: uint32(fr.Id.IcmpCode),
},
Bytes: fr.Metrics.Bytes,
TimeFlowStart: &timestamppb.Timestamp{
Seconds: fr.TimeFlowStart.Unix(),
Expand Down
4 changes: 4 additions & 0 deletions pkg/flow/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ func TestRecordBinaryEncoding(t *testing.T) {
0x0e, 0x0f, // transport: u16 src_port
0x10, 0x11, // transport: u16 dst_port
0x12, // transport: u8 transport_protocol
0x00, // icmp: u8 icmp_type
0x00, // icmp: u8 icmp_code
0x13, 0x14, 0x15, 0x16, // interface index
0x06, 0x07, 0x08, 0x09, // u32 packets
0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, // u64 bytes
Expand All @@ -45,6 +47,8 @@ func TestRecordBinaryEncoding(t *testing.T) {
SrcPort: 0x0f0e,
DstPort: 0x1110,
TransportProtocol: 0x12,
IcmpType: 0x00,
IcmpCode: 0x00,
IfIndex: 0x16151413,
},
Metrics: ebpf.BpfFlowMetrics{
Expand Down
8 changes: 8 additions & 0 deletions pkg/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ func BenchmarkIPv4GRPCCommunication(b *testing.B) {
SrcPort: 23000,
DstPort: 443,
},
Icmp: &pbflow.Icmp{
IcmpType: 8,
IcmpCode: 10,
},
}
records := &pbflow.Records{}
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -211,6 +215,10 @@ func BenchmarkIPv6GRPCCommunication(b *testing.B) {
SrcPort: 23000,
DstPort: 443,
},
Icmp: &pbflow.Icmp{
IcmpType: 8,
IcmpCode: 10,
},
}
records := &pbflow.Records{}
for i := 0; i < 100; i++ {
Expand Down
Loading

0 comments on commit c62173a

Please sign in to comment.