Skip to content

Commit

Permalink
add container_python_thread_lock_wait_time_seconds metric
Browse files Browse the repository at this point in the history
  • Loading branch information
def committed Jul 11, 2024
1 parent 05c1e51 commit 9009c5c
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 43 deletions.
12 changes: 9 additions & 3 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ type Container struct {
l7Stats L7Stats
dnsStats *L7Metrics

oomKills int
oomKills int
pythonThreadLockWaitTime time.Duration

mounts map[string]proc.MountInfo

Expand Down Expand Up @@ -352,6 +353,10 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
for appType := range appTypes {
ch <- gauge(metrics.ApplicationType, 1, appType)
}
if c.pythonThreadLockWaitTime > 0 {
ch <- counter(metrics.PythonThreadLockWaitTime, c.pythonThreadLockWaitTime.Seconds())
}

if c.dnsStats.Requests != nil {
c.dnsStats.Requests.Collect(ch)
}
Expand All @@ -367,15 +372,16 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
}
}

func (c *Container) onProcessStart(pid uint32) *Process {
func (c *Container) onProcessStart(pid uint32, trace *ebpftracer.Tracer) *Process {
c.lock.Lock()
defer c.lock.Unlock()
stats, err := TaskstatsPID(pid)
if err != nil {
return nil
}
c.zombieAt = time.Time{}
p := NewProcess(pid, stats)
p := NewProcess(pid, stats, trace)

if p == nil {
return nil
}
Expand Down
10 changes: 8 additions & 2 deletions containers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ var metrics = struct {
JvmGCTime *prometheus.Desc
JvmSafepointTime *prometheus.Desc
JvmSafepointSyncTime *prometheus.Desc
Ip2Fqdn *prometheus.Desc

PythonThreadLockWaitTime *prometheus.Desc

Ip2Fqdn *prometheus.Desc
}{
ContainerInfo: metric("container_info", "Meta information about the container", "image", "systemd_triggered_by"),

Expand Down Expand Up @@ -89,7 +92,10 @@ var metrics = struct {
JvmGCTime: metric("container_jvm_gc_time_seconds", "Time spent in the given JVM garbage collector in seconds", "jvm", "gc"),
JvmSafepointTime: metric("container_jvm_safepoint_time_seconds", "Time the application has been stopped for safepoint operations in seconds", "jvm"),
JvmSafepointSyncTime: metric("container_jvm_safepoint_sync_time_seconds", "Time spent getting to safepoints in seconds", "jvm"),
Ip2Fqdn: metric("ip_to_fqdn", "Mapping IP addresses to FQDNs based on DNS requests initiated by containers", "ip", "fqdn"),

Ip2Fqdn: metric("ip_to_fqdn", "Mapping IP addresses to FQDNs based on DNS requests initiated by containers", "ip", "fqdn"),

PythonThreadLockWaitTime: metric("container_python_thread_lock_wait_time_seconds", "Time spent waiting acquiring GIL in seconds"),
}

var (
Expand Down
32 changes: 26 additions & 6 deletions containers/process.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package containers

import (
"bytes"
"context"
"os"
"time"

"github.com/jpillora/backoff"

"github.com/cilium/ebpf/link"
"github.com/coroot/coroot-node-agent/ebpftracer"
"github.com/coroot/coroot-node-agent/proc"
"github.com/jpillora/backoff"
"github.com/mdlayher/taskstats"
)

Expand All @@ -27,12 +28,13 @@ type Process struct {
uprobes []link.Link
goTlsUprobesChecked bool
openSslUprobesChecked bool
pythonGilChecked bool
}

func NewProcess(pid uint32, stats *taskstats.Stats) *Process {
func NewProcess(pid uint32, stats *taskstats.Stats, tracer *ebpftracer.Tracer) *Process {
p := &Process{Pid: pid, StartedAt: stats.BeginTime}
p.ctx, p.cancelFunc = context.WithCancel(context.Background())
go p.instrument()
go p.instrument(tracer)
return p
}

Expand All @@ -52,7 +54,7 @@ func (p *Process) isHostNs() bool {
return p.NetNsId() == hostNetNsId
}

func (p *Process) instrument() {
func (p *Process) instrument(tracer *ebpftracer.Tracer) {
b := backoff.Backoff{Factor: 2, Min: time.Second, Max: time.Minute}
for {
select {
Expand All @@ -64,18 +66,36 @@ func (p *Process) instrument() {
return
}
if dest != "/" {
p.instrumentPython(tracer)
if dotNetAppName, err := dotNetApp(p.Pid); err == nil {
if dotNetAppName != "" {
p.dotNetMonitor = NewDotNetMonitor(p.ctx, p.Pid, dotNetAppName)
}
return
}
return
}
time.Sleep(b.Duration())
}
}
}

func (p *Process) instrumentPython(tracer *ebpftracer.Tracer) {
if p.pythonGilChecked {
return
}
p.pythonGilChecked = true
cmdline := proc.GetCmdline(p.Pid)
if len(cmdline) == 0 {
return
}
parts := bytes.Split(cmdline, []byte{0})
cmd := bytes.TrimSuffix(bytes.Fields(parts[0])[0], []byte{':'})
if !pythonCmd.Match(cmd) {
return
}
p.uprobes = append(p.uprobes, tracer.AttachPythonThreadLockProbes(p.Pid)...)
}

func (p *Process) Close() {
p.cancelFunc()
for _, u := range p.uprobes {
Expand Down
6 changes: 5 additions & 1 deletion containers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
}
}
if c := r.getOrCreateContainer(e.Pid); c != nil {
p := c.onProcessStart(e.Pid)
p := c.onProcessStart(e.Pid, r.tracer)
if r.processInfoCh != nil && p != nil {
r.processInfoCh <- ProcessInfo{Pid: p.Pid, ContainerId: c.id, StartedAt: p.StartedAt}
}
Expand Down Expand Up @@ -272,6 +272,10 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
}
r.ip2fqdnLock.Unlock()
}
case ebpftracer.EventTypePythonThreadLock:
if c := r.containersByPid[e.Pid]; c != nil {
c.pythonThreadLockWaitTime += e.Duration
}
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions ebpftracer/ebpf.go

Large diffs are not rendered by default.

20 changes: 11 additions & 9 deletions ebpftracer/ebpf/ebpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
#include <bpf/bpf_tracing.h>
#include <bpf/bpf_endian.h>

#define EVENT_TYPE_PROCESS_START 1
#define EVENT_TYPE_PROCESS_EXIT 2
#define EVENT_TYPE_CONNECTION_OPEN 3
#define EVENT_TYPE_CONNECTION_CLOSE 4
#define EVENT_TYPE_CONNECTION_ERROR 5
#define EVENT_TYPE_LISTEN_OPEN 6
#define EVENT_TYPE_LISTEN_CLOSE 7
#define EVENT_TYPE_FILE_OPEN 8
#define EVENT_TYPE_TCP_RETRANSMIT 9
#define EVENT_TYPE_PROCESS_START 1
#define EVENT_TYPE_PROCESS_EXIT 2
#define EVENT_TYPE_CONNECTION_OPEN 3
#define EVENT_TYPE_CONNECTION_CLOSE 4
#define EVENT_TYPE_CONNECTION_ERROR 5
#define EVENT_TYPE_LISTEN_OPEN 6
#define EVENT_TYPE_LISTEN_CLOSE 7
#define EVENT_TYPE_FILE_OPEN 8
#define EVENT_TYPE_TCP_RETRANSMIT 9
#define EVENT_TYPE_PYTHON_THREAD_LOCK 11

#define EVENT_REASON_OOM_KILL 1

Expand All @@ -39,5 +40,6 @@
#include "l7/l7.c"
#include "l7/gotls.c"
#include "l7/openssl.c"
#include "python.c"

char _license[] SEC("license") = "GPL";
42 changes: 42 additions & 0 deletions ebpftracer/ebpf/python.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
__uint(key_size, sizeof(int));
__uint(value_size, sizeof(int));
} python_thread_events SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_HASH);
__uint(key_size, sizeof(__u64));
__uint(value_size, sizeof(__u64));
__uint(max_entries, 10240);
} python_thread_locks SEC(".maps");

SEC("uprobe/pthread_cond_timedwait_enter")
int pthread_cond_timedwait_enter(struct pt_regs *ctx) {
__u64 pid_tgid = bpf_get_current_pid_tgid();
__u64 timestamp = bpf_ktime_get_ns();
bpf_map_update_elem(&python_thread_locks, &pid_tgid, &timestamp, BPF_ANY);
return 0;
}

struct python_thread_event {
__u32 type;
__u32 pid;
__u64 duration;
};

SEC("uprobe/pthread_cond_timedwait_exit")
int pthread_cond_timedwait_exit(struct pt_regs *ctx) {
__u64 pid_tgid = bpf_get_current_pid_tgid();
__u64 *timestamp = bpf_map_lookup_elem(&python_thread_locks, &pid_tgid);
if (!timestamp) {
return 0;
}
struct python_thread_event e = {
.type = EVENT_TYPE_PYTHON_THREAD_LOCK,
.pid = pid_tgid >> 32,
.duration = bpf_ktime_get_ns()-*timestamp,
};
bpf_perf_event_output(ctx, &python_thread_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
return 0;
}
84 changes: 84 additions & 0 deletions ebpftracer/python.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package ebpftracer

import (
"bufio"
"os"
"regexp"
"strings"

"github.com/cilium/ebpf/link"
"github.com/coroot/coroot-node-agent/proc"
"k8s.io/klog/v2"
)

var (
libcRegexp = regexp.MustCompile(`libc[\.-]`)
muslRegexp = regexp.MustCompile(`musl[\.-]`)
)

func (t *Tracer) AttachPythonThreadLockProbes(pid uint32) []link.Link {
exePath := getPthreadLib(pid)
if exePath == "" {
return nil
}

log := func(msg string, err error) {
if err != nil {
for _, s := range []string{"no such file or directory", "no such process", "permission denied"} {
if strings.HasSuffix(err.Error(), s) {
return
}
}
klog.ErrorfDepth(1, "pid=%d lib=%s: %s: %s", pid, exePath, msg, err)
return
}
klog.InfofDepth(1, "pid=%d lib=%s: %s", pid, exePath, msg)
}
exe, err := link.OpenExecutable(exePath)
if err != nil {
log("failed to open executable", err)
return nil
}
var links []link.Link
uprobe, err := exe.Uprobe("pthread_cond_timedwait", t.uprobes["pthread_cond_timedwait_enter"], nil)
if err != nil {
log("failed to attach uprobe", err)
return nil
}
links = append(links, uprobe)
uretprobe, err := exe.Uretprobe("pthread_cond_timedwait", t.uprobes["pthread_cond_timedwait_exit"], nil)
if err != nil {
log("failed to attach uretprobe", err)
return nil
}
links = append(links, uretprobe)
log("python uprobes attached", nil)
return links
}

func getPthreadLib(pid uint32) string {
f, err := os.Open(proc.Path(pid, "maps"))
if err != nil {
return ""
}
defer f.Close()
scanner := bufio.NewScanner(f)
scanner.Split(bufio.ScanLines)
libc := ""
for scanner.Scan() {
parts := strings.Fields(scanner.Text())
if len(parts) <= 5 {
continue
}
libPath := parts[5]
switch {
case libcRegexp.MatchString(libPath):
libc = proc.Path(pid, "root", libPath)
case muslRegexp.MatchString(libPath):
return proc.Path(pid, "root", libPath)
case strings.Contains(libPath, "libpthread"):
return proc.Path(pid, "root", libPath)
}
}
return libc
}
Loading

0 comments on commit 9009c5c

Please sign in to comment.