Skip to content

Commit

Permalink
[Auditbeat] system/socket: Monitor all online CPUs (elastic#22827)
Browse files Browse the repository at this point in the history
Auditbeat's system/socket dataset needs to install kprobes on all
online CPUs.

Previously, it was using runtime.NumCPU() to determine the CPUs in the
system, and monitoring CPUs 0 to NumCPU. This was a mistake that lead
to startup failures or loss of events in any of the following scenarios:
- When Auditbeat is started with a CPU affinity mask that excludes some CPUs
- When there are offline or isolated CPUs in the system.

This patch updates the tracing library in Auditbeat to fetch the list of
online CPUs from /sys/devices/system/cpu/online so that it can install
kprobes in all of them regardless of its own affinity mask, and correctly
skipping offline CPUs.

Related elastic#18755

(cherry picked from commit 6356887)
  • Loading branch information
adriansr committed Dec 2, 2020
1 parent c6914e4 commit 87c73f4
Show file tree
Hide file tree
Showing 4 changed files with 310 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- system/socket: Fixed tracking of long-running connections. {pull}19033[19033]
- auditd: Fix an error condition causing a lot of `audit_send_reply` kernel threads being created. {pull}22673[22673]
- system/socket: Fixed start failure when run under config reloader. {issue}20851[20851] {pull}21693[21693]
- system/socket: Having some CPUs unavailable to Auditbeat could cause startup errors or event loss. {pull}22827[22827]

*Filebeat*

Expand Down
139 changes: 139 additions & 0 deletions x-pack/auditbeat/tracing/cpu.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build linux

package tracing

import (
"bytes"
"io/ioutil"
"strconv"
"strings"

"github.com/pkg/errors"
)

const (
// OnlineCPUsPath is the path to the system file listing the online CPUs.
OnlineCPUsPath = "/sys/devices/system/cpu/online"

// OfflineCPUsPath is the path to the system file listing the offline CPUs.
OfflineCPUsPath = "/sys/devices/system/cpu/offline"

// PossibleCPUsPath is the path to the system file listing the CPUs that can be brought online.
PossibleCPUsPath = "/sys/devices/system/cpu/possible"

// PresentCPUsPath is the path to the system file listing the CPUs that are identified as present.
PresentCPUsPath = "/sys/devices/system/cpu/present"

// See `Documentation/admin-guide/cputopology.rst` in the Linux kernel docs for more information
// on the above files.

// IsolatedCPUsPath is only present when CPU isolation is active, for example using the `isolcpus`
// kernel argument.
IsolatedCPUsPath = "/sys/devices/system/cpu/isolated"
)

// CPUSet represents a group of CPUs.
type CPUSet struct {
mask []bool
count int
}

// Mask returns a bitmask where each bit is set if the given CPU is present in the set.
func (s CPUSet) Mask() []bool {
return s.mask
}

// NumCPU returns the number of CPUs in the set.
func (s CPUSet) NumCPU() int {
return s.count
}

// Contains allows to query if a given CPU exists in the set.
func (s CPUSet) Contains(cpu int) bool {
if cpu < 0 || cpu >= len(s.mask) {
return false
}
return s.mask[cpu]
}

// AsList returns the list of CPUs in the set.
func (s CPUSet) AsList() []int {
list := make([]int, 0, s.count)
for num, bit := range s.mask {
if bit {
list = append(list, num)
}
}
return list
}

// NewCPUSetFromFile creates a new CPUSet from the contents of a file.
func NewCPUSetFromFile(path string) (cpus CPUSet, err error) {
contents, err := ioutil.ReadFile(path)
if err != nil {
return cpus, err
}
return NewCPUSetFromExpression(string(bytes.TrimRight(contents, "\n\r")))
}

// NewCPUSetFromExpression creates a new CPUSet from a range expression.
// Expression: RANGE ( ',' RANGE )*
// Where:
// RANGE := <NUMBER> | <NUMBER>-<NUMBER>
func NewCPUSetFromExpression(contents string) (CPUSet, error) {
var ranges [][]int
var max, count int
for _, expr := range strings.Split(contents, ",") {
if len(expr) == 0 {
continue
}
parts := strings.Split(expr, "-")
r := make([]int, 0, len(parts))
for _, numStr := range parts {
num16, err := strconv.ParseInt(numStr, 10, 16)
if err != nil || num16 < 0 {
return CPUSet{}, errors.Errorf("failed to parse integer '%s' from range '%s' at '%s'", numStr, expr, contents)
}
num := int(num16)
r = append(r, num)
if num+1 > max {
max = num + 1
}
}
ranges = append(ranges, r)
}
if max == 0 {
return CPUSet{}, nil
}
mask := make([]bool, max)
for _, r := range ranges {
from, to := -1, -1
switch len(r) {
case 0:
continue // Ignore empty range.
case 1:
from = r[0]
to = r[0]
case 2:
from = r[0]
to = r[1]
}
if from == -1 || to < from {
return CPUSet{}, errors.Errorf("invalid cpu range %v in '%s'", r, contents)
}
for i := from; i <= to; i++ {
if !mask[i] {
count++
mask[i] = true
}
}
}
return CPUSet{
mask: mask,
count: count,
}, nil
}
152 changes: 152 additions & 0 deletions x-pack/auditbeat/tracing/cpu_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build linux

package tracing

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestNewCPUSetFromExpression(t *testing.T) {
for _, testCase := range []struct {
content string
result CPUSet
fail bool
}{
{
content: "0",
result: CPUSet{
mask: []bool{true},
count: 1,
},
},
{
content: "0-3",
result: CPUSet{
mask: []bool{true, true, true, true},
count: 4,
},
},
{
content: "5-0",
fail: true,
},
{
content: "5-2147483648",
fail: true,
},
{
content: "0,2-2",
result: CPUSet{
mask: []bool{true, false, true},
count: 2,
},
},
{
content: "7",
result: CPUSet{
mask: []bool{false, false, false, false, false, false, false, true},
count: 1,
},
},
{
content: "-1",
fail: true,
},
{
content: "",
},
{
content: ",",
},
{
content: "-",
fail: true,
},
{
content: "3,-",
fail: true,
},
{
content: "3-4-5",
fail: true,
},
{
content: "0-4,5,6-6,,,,15",
result: CPUSet{
mask: []bool{
true, true, true, true, true, true, true, false,
false, false, false, false, false, false, false, true,
},
count: 8,
},
},
} {
mask, err := NewCPUSetFromExpression(testCase.content)
if !assert.Equal(t, testCase.fail, err != nil, testCase.content) {
t.Fatal(err)
}
assert.Equal(t, testCase.result, mask, testCase.content)
}
}

func TestCPUSet(t *testing.T) {
for _, test := range []struct {
expr string
num int
isSet func(int) bool
list []int
}{
{
expr: "0-2,5",
num: 4,
isSet: func(i int) bool { return i == 5 || (i >= 0 && i < 3) },
list: []int{0, 1, 2, 5},
},
{
expr: "0",
num: 1,
isSet: func(i int) bool { return i == 0 },
list: []int{0},
},
{
expr: "2",
num: 1,
isSet: func(i int) bool { return i == 2 },
list: []int{2},
},
{
expr: "0-7",
num: 8,
isSet: func(i int) bool { return i >= 0 && i < 8 },
list: []int{0, 1, 2, 3, 4, 5, 6, 7},
},
{
expr: "",
num: 0,
isSet: func(i int) bool { return false },
list: []int{},
},
{
expr: "1-2,0,2,0-0,0-1",
num: 3,
isSet: func(i int) bool { return i >= 0 && i < 3 },
list: []int{0, 1, 2},
},
} {
set, err := NewCPUSetFromExpression(test.expr)
if !assert.NoError(t, err, test.expr) {
t.Fatal(err)
}
assert.Equal(t, test.num, set.NumCPU(), test.expr)
for i := -1; i < 10; i++ {
assert.Equal(t, test.isSet(i), set.Contains(i), test.expr)
}
assert.Equal(t, test.list, set.AsList(), test.expr)
}
}
24 changes: 18 additions & 6 deletions x-pack/auditbeat/tracing/perfevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"context"
"fmt"
"os"
"runtime"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -55,7 +54,7 @@ type PerfChannel struct {

running uintptr
wg sync.WaitGroup
numCPUs int
cpus CPUSet

// Settings
attr perf.Attr
Expand Down Expand Up @@ -98,7 +97,6 @@ func NewPerfChannel(cfg ...PerfChannelConf) (channel *PerfChannel, err error) {
done: make(chan struct{}, 0),
streams: make(map[uint64]stream),
pid: perf.AllThreads,
numCPUs: runtime.NumCPU(),
attr: perf.Attr{
Type: perf.TracepointEvent,
SampleFormat: perf.SampleFormat{
Expand All @@ -112,6 +110,19 @@ func NewPerfChannel(cfg ...PerfChannelConf) (channel *PerfChannel, err error) {
channel.attr.SetSamplePeriod(1)
channel.attr.SetWakeupEvents(1)

// Load the list of online CPUs from /sys/devices/system/cpu/online.
// This is necessary in order to to install each kprobe on all online CPUs.
//
// Note:
// There's currently no mechanism to adapt to CPUs being added or removed
// at runtime (CPU hotplug).
channel.cpus, err = NewCPUSetFromFile(OnlineCPUsPath)
if err != nil {
return nil, errors.Wrap(err, "error listing online CPUs")
}
if channel.cpus.NumCPU() < 1 {
return nil, errors.New("couldn't list online CPUs")
}
// Set configuration
for _, fun := range cfg {
if err = fun(channel); err != nil {
Expand Down Expand Up @@ -210,14 +221,15 @@ func WithPollTimeout(timeout time.Duration) PerfChannelConf {
func (c *PerfChannel) MonitorProbe(format ProbeFormat, decoder Decoder) error {
c.attr.Config = uint64(format.ID)
doGroup := len(c.events) > 0
for idx := 0; idx < c.numCPUs; idx++ {
cpuList := c.cpus.AsList()
for idx, cpu := range cpuList {
var group *perf.Event
var flags int
if doGroup {
group = c.events[idx]
flags = unix.PERF_FLAG_FD_NO_GROUP | unix.PERF_FLAG_FD_OUTPUT
}
ev, err := perf.OpenWithFlags(&c.attr, c.pid, idx, group, flags)
ev, err := perf.OpenWithFlags(&c.attr, c.pid, cpu, group, flags)
if err != nil {
return err
}
Expand Down Expand Up @@ -352,7 +364,7 @@ func makeMetadata(eventID int, record *perf.SampleRecord) Metadata {
func (c *PerfChannel) channelLoop() {
defer c.wg.Done()
ctx := doneWrapperContext(c.done)
merger := newRecordMerger(c.events[:c.numCPUs], c, c.pollTimeout)
merger := newRecordMerger(c.events[:c.cpus.NumCPU()], c, c.pollTimeout)
for {
// Read the available event from all the monitored ring-buffers that
// has the smallest timestamp.
Expand Down

0 comments on commit 87c73f4

Please sign in to comment.