Skip to content

Commit

Permalink
Revert "change aggregation flow map to hashmap instead perCPU hashmap (
Browse files Browse the repository at this point in the history
…netobserv#118)"

This reverts commit b6e2b87.
  • Loading branch information
msherif1234 committed Aug 31, 2023
1 parent 6d9d2e7 commit 5518398
Show file tree
Hide file tree
Showing 15 changed files with 164 additions and 46 deletions.
2 changes: 1 addition & 1 deletion bpf/maps_definition.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ struct {

// Key: the flow identifier. Value: the flow metrics for that identifier.
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__uint(type, BPF_MAP_TYPE_PERCPU_HASH);
__type(key, flow_id);
__type(value, flow_metrics);
__uint(max_entries, 1 << 24);
Expand Down
3 changes: 2 additions & 1 deletion bpf/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,13 @@ static inline long pkt_drop_lookup_and_update_flow(struct sk_buff *skb, flow_id
enum skb_drop_reason reason) {
flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, id);
if (aggregate_flow != NULL) {
aggregate_flow->end_mono_time_ts = bpf_ktime_get_ns();
aggregate_flow->pkt_drops.packets += 1;
aggregate_flow->pkt_drops.bytes += skb->len;
aggregate_flow->pkt_drops.latest_state = state;
aggregate_flow->pkt_drops.latest_flags = flags;
aggregate_flow->pkt_drops.latest_drop_cause = reason;
long ret = bpf_map_update_elem(&aggregated_flows, id, aggregate_flow, BPF_ANY);
long ret = bpf_map_update_elem(&aggregated_flows, id, aggregate_flow, BPF_EXIST);
if (trace_messages && ret != 0) {
bpf_printk("error packet drop updating flow %d\n", ret);
}
Expand Down
2 changes: 1 addition & 1 deletion docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ flowchart TD
E(ebpf.FlowFetcher) --> |"pushes via<br/>RingBuffer"| RB(flow.RingBufTracer)
style E fill:#990
E --> |"polls<br/>HashMap"| M(flow.MapTracer)
E --> |"polls<br/>PerCPUHashMap"| M(flow.MapTracer)
RB --> |chan *flow.Record| ACC(flow.Accounter)
RB -.-> |flushes| M
ACC --> |"chan []*flow.Record"| DD(flow.Deduper)
Expand Down
10 changes: 8 additions & 2 deletions examples/flowlogs-dump/server/flowlogs-dump-collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func main() {
for records := range receivedRecords {
for _, record := range records.Entries {
if record.EthProtocol == ipv6 {
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v\n",
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v DropPkts: %d DropBytes: %d DropCause %d\n",
ipProto[record.EthProtocol],
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
record.Interface,
Expand All @@ -92,9 +92,12 @@ func main() {
record.GetDnsFlags(),
record.DnsLatency.AsDuration().Milliseconds(),
record.TimeFlowRtt.AsDuration().Nanoseconds(),
record.GetPktDropPackets(),
record.GetPktDropBytes(),
record.GetPktDropLatestDropCause(),
)
} else {
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v\n",
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v DropPkts: %d DropBytes: %d DropCause %d\n",
ipProto[record.EthProtocol],
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
record.Interface,
Expand All @@ -114,6 +117,9 @@ func main() {
record.GetDnsFlags(),
record.DnsLatency.AsDuration().Milliseconds(),
record.TimeFlowRtt.AsDuration().Nanoseconds(),
record.GetPktDropPackets(),
record.GetPktDropBytes(),
record.GetPktDropLatestDropCause(),
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type ebpfFlowFetcher interface {
io.Closer
Register(iface ifaces.Interface) error

LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics
LookupAndDeleteMap() map[ebpf.BpfFlowId][]*ebpf.BpfFlowMetrics
DeleteMapsStaleEntries(timeOut time.Duration)
ReadRingBuf() (ringbuf.Record, error)
}
Expand Down
57 changes: 48 additions & 9 deletions pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ var (
DstPort: 456,
IfIndex: 3,
}
key1Dupe = ebpf.BpfFlowId{
SrcPort: 123,
DstPort: 456,
IfIndex: 4,
}

key2 = ebpf.BpfFlowId{
SrcPort: 333,
Expand Down Expand Up @@ -76,11 +81,21 @@ func TestFlowsAgent_Deduplication(t *testing.T) {
receivedKeys[f.Id] = struct{}{}
switch f.Id {
case key1:
assert.EqualValues(t, 3, f.Metrics.Packets)
assert.EqualValues(t, 44, f.Metrics.Bytes)
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "foo", f.Interface)
key1Flows = append(key1Flows, f)
case key1Dupe:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "bar", f.Interface)
key1Flows = append(key1Flows, f)
case key2:
assert.EqualValues(t, 7, f.Metrics.Packets)
assert.EqualValues(t, 33, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
}
}
assert.Lenf(t, key1Flows, 1, "only one flow should have been forwarded: %#v", key1Flows)
Expand All @@ -104,12 +119,23 @@ func TestFlowsAgent_DeduplicationJustMark(t *testing.T) {
receivedKeys[f.Id] = struct{}{}
switch f.Id {
case key1:
assert.EqualValues(t, 3, f.Metrics.Packets)
assert.EqualValues(t, 44, f.Metrics.Bytes)
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
if f.Duplicate {
duplicates++
}
assert.Equal(t, "foo", f.Interface)
case key1Dupe:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
if f.Duplicate {
duplicates++
}
assert.Equal(t, "bar", f.Interface)
case key2:
assert.EqualValues(t, 7, f.Metrics.Packets)
assert.EqualValues(t, 33, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
}
}
assert.Equalf(t, 0, duplicates, "exported flows should have only one duplicate: %#v", exported)
Expand All @@ -132,11 +158,21 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) {
receivedKeys[f.Id] = struct{}{}
switch f.Id {
case key1:
assert.EqualValues(t, 3, f.Metrics.Packets)
assert.EqualValues(t, 44, f.Metrics.Bytes)
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "foo", f.Interface)
key1Flows = append(key1Flows, f)
case key1Dupe:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "bar", f.Interface)
key1Flows = append(key1Flows, f)
case key2:
assert.EqualValues(t, 7, f.Metrics.Packets)
assert.EqualValues(t, 33, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
}
}
assert.Lenf(t, key1Flows, 1, "both key1 flows should have been forwarded: %#v", key1Flows)
Expand Down Expand Up @@ -183,10 +219,13 @@ func testAgent(t *testing.T, cfg *Config) *test.ExporterFake {
})

now := uint64(monotime.Now())
key1Metrics := ebpf.BpfFlowMetrics{Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000}
key1Metrics := []*ebpf.BpfFlowMetrics{
{Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000},
{Packets: 1, Bytes: 22, StartMonoTimeTs: now, EndMonoTimeTs: now + 3000},
}

ebpfTracer.AppendLookupResults(map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics{
key1: &key1Metrics,
ebpfTracer.AppendLookupResults(map[ebpf.BpfFlowId][]*ebpf.BpfFlowMetrics{
key1: key1Metrics,
})
return export
}
Binary file modified pkg/ebpf/bpf_bpfeb.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_bpfel.o
Binary file not shown.
23 changes: 15 additions & 8 deletions pkg/ebpf/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,27 +371,34 @@ func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) {
// TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively
// Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md
// Race conditions here causes that some flows are lost in high-load scenarios
func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId]*BpfFlowMetrics {
func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId][]*BpfFlowMetrics {
flowMap := m.objects.AggregatedFlows

iterator := flowMap.Iterate()
var flow = make(map[BpfFlowId]*BpfFlowMetrics, m.cacheMaxSize)
var flows = make(map[BpfFlowId][]*BpfFlowMetrics, m.cacheMaxSize)
var id BpfFlowId
var metric BpfFlowMetrics
var metrics []BpfFlowMetrics

// Changing Iterate+Delete by LookupAndDelete would prevent some possible race conditions
// TODO: detect whether LookupAndDelete is supported (Kernel>=4.20) and use it selectively
for iterator.Next(&id, &metric) {
for iterator.Next(&id, &metrics) {
if err := flowMap.Delete(id); err != nil {
log.WithError(err).WithField("flowId", id).
Warnf("couldn't delete flow entry")
}
metricPtr := new(BpfFlowMetrics)
*metricPtr = metric
flow[id] = metricPtr
metricsPtr := make([]*BpfFlowMetrics, 0)
for _, m := range metrics {
metricPtr := new(BpfFlowMetrics)
*metricPtr = m
metricsPtr = append(metricsPtr, metricPtr)
}
// We observed that eBFP PerCPU map might insert multiple times the same key in the map
// (probably due to race conditions) so we need to re-join metrics again at userspace
// TODO: instrument how many times the keys are is repeated in the same eviction
flows[id] = append(flows[id], metricsPtr...)
}

return flow
return flows
}

// DeleteMapsStaleEntries Look for any stale entries in the features maps and delete them
Expand Down
4 changes: 3 additions & 1 deletion pkg/flow/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ func (c *Accounter) Account(in <-chan *RawRecord, out chan<- []*Record) {
alog.Debug("exiting account routine")
return
}
if _, ok := c.entries[record.Id]; !ok {
if stored, ok := c.entries[record.Id]; ok {
Accumulate(stored, &record.Metrics)
} else {
if len(c.entries) >= c.maxEntries {
evictingEntries := c.entries
c.entries = map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics{}
Expand Down
20 changes: 10 additions & 10 deletions pkg/flow/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ func TestEvict_MaxEntries(t *testing.T) {
RawRecord: RawRecord{
Id: k1,
Metrics: ebpf.BpfFlowMetrics{
Bytes: 123, Packets: 1, StartMonoTimeTs: 123, EndMonoTimeTs: 123, Flags: 1,
Bytes: 444, Packets: 2, StartMonoTimeTs: 123, EndMonoTimeTs: 789, Flags: 1,
},
},
TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond),
TimeFlowEnd: now.Add(-(1000 - 123) * time.Nanosecond),
TimeFlowEnd: now.Add(-(1000 - 789) * time.Nanosecond),
},
k2: {
RawRecord: RawRecord{
Expand Down Expand Up @@ -178,31 +178,31 @@ func TestEvict_Period(t *testing.T) {
RawRecord: RawRecord{
Id: k1,
Metrics: ebpf.BpfFlowMetrics{
Bytes: 10,
Packets: 1,
Bytes: 30,
Packets: 3,
StartMonoTimeTs: 123,
EndMonoTimeTs: 123,
EndMonoTimeTs: 789,
Flags: 1,
},
},
TimeFlowStart: now.Add(-1000 + 123),
TimeFlowEnd: now.Add(-1000 + 123),
TimeFlowEnd: now.Add(-1000 + 789),
}, *records[0])
records = receiveTimeout(t, evictor)
require.Len(t, records, 1)
assert.Equal(t, Record{
RawRecord: RawRecord{
Id: k1,
Metrics: ebpf.BpfFlowMetrics{
Bytes: 10,
Packets: 1,
Bytes: 20,
Packets: 2,
StartMonoTimeTs: 1123,
EndMonoTimeTs: 1123,
EndMonoTimeTs: 1456,
Flags: 1,
},
},
TimeFlowStart: now.Add(-1000 + 1123),
TimeFlowEnd: now.Add(-1000 + 1123),
TimeFlowEnd: now.Add(-1000 + 1456),
}, *records[0])

// no more flows are evicted
Expand Down
32 changes: 32 additions & 0 deletions pkg/flow/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,38 @@ func NewRecord(
return &record
}

func Accumulate(r *ebpf.BpfFlowMetrics, src *ebpf.BpfFlowMetrics) {
// time == 0 if the value has not been yet set
if r.StartMonoTimeTs == 0 || r.StartMonoTimeTs > src.StartMonoTimeTs {
r.StartMonoTimeTs = src.StartMonoTimeTs
}
if r.EndMonoTimeTs == 0 || r.EndMonoTimeTs < src.EndMonoTimeTs {
r.EndMonoTimeTs = src.EndMonoTimeTs
}
r.Bytes += src.Bytes
r.Packets += src.Packets
r.Flags |= src.Flags
// Accumulate Drop statistics
r.PktDrops.Bytes += src.PktDrops.Bytes
r.PktDrops.Packets += src.PktDrops.Packets
r.PktDrops.LatestFlags |= src.PktDrops.LatestFlags
if src.PktDrops.LatestDropCause != 0 {
r.PktDrops.LatestDropCause = src.PktDrops.LatestDropCause
}
// Accumulate DNS
r.DnsRecord.Flags |= src.DnsRecord.Flags
if src.DnsRecord.Id != 0 {
r.DnsRecord.Id = src.DnsRecord.Id
}
if r.DnsRecord.Latency < src.DnsRecord.Latency {
r.DnsRecord.Latency = src.DnsRecord.Latency
}
// Accumulate RTT
if r.FlowRtt < src.FlowRtt {
r.FlowRtt = src.FlowRtt
}
}

// IP returns the net.IP equivalent object
func IP(ia IPAddr) net.IP {
return ia[:]
Expand Down
24 changes: 22 additions & 2 deletions pkg/flow/tracer_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type MapTracer struct {
}

type mapFetcher interface {
LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics
LookupAndDeleteMap() map[ebpf.BpfFlowId][]*ebpf.BpfFlowMetrics
DeleteMapsStaleEntries(timeOut time.Duration)
}

Expand Down Expand Up @@ -96,7 +96,7 @@ func (m *MapTracer) evictFlows(ctx context.Context, enableGC bool, forwardFlows
var forwardingFlows []*Record
laterFlowNs := uint64(0)
for flowKey, flowMetrics := range m.mapFetcher.LookupAndDeleteMap() {
aggregatedMetrics := flowMetrics
aggregatedMetrics := m.aggregate(flowMetrics)
// we ignore metrics that haven't been aggregated (e.g. all the mapped values are ignored)
if aggregatedMetrics.EndMonoTimeTs == 0 {
continue
Expand Down Expand Up @@ -126,3 +126,23 @@ func (m *MapTracer) evictFlows(ctx context.Context, enableGC bool, forwardFlows
}
mtlog.Debugf("%d flows evicted", len(forwardingFlows))
}

func (m *MapTracer) aggregate(metrics []*ebpf.BpfFlowMetrics) *ebpf.BpfFlowMetrics {
if len(metrics) == 0 {
mtlog.Warn("invoked aggregate with no values")
return &ebpf.BpfFlowMetrics{}
}
aggr := &ebpf.BpfFlowMetrics{}
for _, mt := range metrics {
if mt != nil {
// eBPF hashmap values are not zeroed when the entry is removed. That causes that we
// might receive entries from previous collect-eviction timeslots.
// We need to check the flow time and discard old flows.
if mt.StartMonoTimeTs <= m.lastEvictionNs || mt.EndMonoTimeTs <= m.lastEvictionNs {
continue
}
Accumulate(aggr, mt)
}
}
return aggr
}
Loading

0 comments on commit 5518398

Please sign in to comment.