diff --git a/Dockerfile b/Dockerfile index e42f46681b89..ef0dab44d24e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -216,6 +216,7 @@ RUN --mount=target=/root/.cache,type=cache \ FROM buildkit-export AS buildkit-linux COPY --link --from=binaries / /usr/bin/ +ENV BUILDKIT_SETUP_CGROUPV2_ROOT=1 ENTRYPOINT ["buildkitd"] FROM binaries AS buildkit-darwin @@ -255,6 +256,7 @@ ENTRYPOINT ["/docker-entrypoint.sh"] # musl is needed to directly use the registry binary that is built on alpine ENV BUILDKIT_INTEGRATION_CONTAINERD_EXTRA="containerd-1.6=/opt/containerd-alt-16/bin" ENV BUILDKIT_INTEGRATION_SNAPSHOTTER=stargz +ENV BUILDKIT_SETUP_CGROUPV2_ROOT=1 ENV CGO_ENABLED=0 ENV GOTESTSUM_FORMAT=standard-verbose COPY --link --from=gotestsum /out/gotestsum /usr/bin/ diff --git a/executor/containerdexecutor/executor.go b/executor/containerdexecutor/executor.go index 6a6033df5de0..fa578c6d48fa 100644 --- a/executor/containerdexecutor/executor.go +++ b/executor/containerdexecutor/executor.go @@ -21,6 +21,7 @@ import ( "github.com/docker/docker/pkg/idtools" "github.com/moby/buildkit/executor" "github.com/moby/buildkit/executor/oci" + resourcestypes "github.com/moby/buildkit/executor/resources/types" gatewayapi "github.com/moby/buildkit/frontend/gateway/pb" "github.com/moby/buildkit/identity" "github.com/moby/buildkit/snapshot" @@ -78,7 +79,7 @@ func New(client *containerd.Client, root, cgroup string, networkProviders map[pb } } -func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.Mount, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) { +func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.Mount, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (rec resourcestypes.Recorder, err error) { if id == "" { id = identity.NewID() } @@ -105,12 +106,12 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.M resolvConf, err := oci.GetResolvConf(ctx, w.root, nil, w.dnsConfig) if err != nil { - return err + return nil, err } hostsFile, clean, err := oci.GetHostsFile(ctx, w.root, meta.ExtraHosts, nil, meta.Hostname) if err != nil { - return err + return nil, err } if clean != nil { defer clean() @@ -118,12 +119,12 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.M mountable, err := root.Src.Mount(ctx, false) if err != nil { - return err + return nil, err } rootMounts, release, err := mountable.Mount() if err != nil { - return err + return nil, err } if release != nil { defer release() @@ -132,14 +133,14 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.M lm := snapshot.LocalMounterWithMounts(rootMounts) rootfsPath, err := lm.Mount() if err != nil { - return err + return nil, err } defer lm.Unmount() defer executor.MountStubsCleaner(ctx, rootfsPath, mounts, meta.RemoveMountStubsRecursive)() uid, gid, sgids, err := oci.GetUser(rootfsPath, meta.User) if err != nil { - return err + return nil, err } identity := idtools.Identity{ @@ -149,21 +150,21 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.M newp, err := fs.RootPath(rootfsPath, meta.Cwd) if err != nil { - return errors.Wrapf(err, "working dir %s points to invalid target", newp) + return nil, errors.Wrapf(err, "working dir %s points to invalid target", newp) } if _, err := os.Stat(newp); err != nil { if err := idtools.MkdirAllAndChown(newp, 0755, identity); err != nil { - return errors.Wrapf(err, "failed to create working directory %s", newp) + return nil, errors.Wrapf(err, "failed to create working directory %s", newp) } } provider, ok := w.networkProviders[meta.NetMode] if !ok { - return errors.Errorf("unknown network mode %s", meta.NetMode) + return nil, errors.Errorf("unknown network mode %s", meta.NetMode) } namespace, err := provider.New(ctx, meta.Hostname) if err != nil { - return err + return nil, err } defer namespace.Close() @@ -179,13 +180,13 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.M processMode := oci.ProcessSandbox // FIXME(AkihiroSuda) spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, namespace, w.cgroupParent, processMode, nil, w.apparmorProfile, w.selinux, w.traceSocket, opts...) if err != nil { - return err + return nil, err } defer cleanup() spec.Process.Terminal = meta.Tty if w.rootless { if err := rootlessspecconv.ToRootless(spec); err != nil { - return err + return nil, err } } @@ -193,7 +194,7 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.M containerd.WithSpec(spec), ) if err != nil { - return err + return nil, err } defer func() { @@ -214,7 +215,7 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.M Options: []string{"rbind"}, }})) if err != nil { - return err + return nil, err } defer func() { @@ -225,7 +226,7 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.M if nn, ok := namespace.(OnCreateRuntimer); ok { if err := nn.OnCreateRuntime(task.Pid()); err != nil { - return err + return nil, err } } @@ -238,7 +239,7 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.M } }) }) - return err + return nil, err } func (w *containerdExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) (err error) { diff --git a/executor/executor.go b/executor/executor.go index a323bcc9cc94..741f347cd9ca 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -6,6 +6,7 @@ import ( "net" "syscall" + resourcestypes "github.com/moby/buildkit/executor/resources/types" "github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/solver/pb" ) @@ -55,7 +56,7 @@ type Executor interface { // Run will start a container for the given process with rootfs, mounts. // `id` is an optional name for the container so it can be referenced later via Exec. // `started` is an optional channel that will be closed when the container setup completes and has started running. - Run(ctx context.Context, id string, rootfs Mount, mounts []Mount, process ProcessInfo, started chan<- struct{}) error + Run(ctx context.Context, id string, rootfs Mount, mounts []Mount, process ProcessInfo, started chan<- struct{}) (resourcestypes.Recorder, error) // Exec will start a process in container matching `id`. An error will be returned // if the container failed to start (via Run) or has exited before Exec is called. Exec(ctx context.Context, id string, process ProcessInfo) error diff --git a/executor/resources/cpu.go b/executor/resources/cpu.go new file mode 100644 index 000000000000..c51ebe4d6bf3 --- /dev/null +++ b/executor/resources/cpu.go @@ -0,0 +1,140 @@ +package resources + +import ( + "bufio" + "os" + "path/filepath" + "strconv" + "strings" + + "github.com/moby/buildkit/executor/resources/types" + "github.com/pkg/errors" +) + +const ( + cpuUsageUsec = "usage_usec" + cpuUserUsec = "user_usec" + cpuSystemUsec = "system_usec" + cpuNrPeriods = "nr_periods" + cpuNrThrottled = "nr_throttled" + cpuThrottledUsec = "throttled_usec" +) + +func getCgroupCPUStat(cgroupPath string) (*types.CPUStat, error) { + cpuStat := &types.CPUStat{} + + // Read cpu.stat file + cpuStatFile, err := os.Open(filepath.Join(cgroupPath, "cpu.stat")) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil, nil + } + return nil, err + } + defer cpuStatFile.Close() + + scanner := bufio.NewScanner(cpuStatFile) + for scanner.Scan() { + line := scanner.Text() + fields := strings.Fields(line) + + if len(fields) < 2 { + continue + } + + key := fields[0] + value, err := strconv.ParseUint(fields[1], 10, 64) + if err != nil { + continue + } + + switch key { + case cpuUsageUsec: + cpuStat.UsageNanos = uint64Ptr(value * 1000) + case cpuUserUsec: + cpuStat.UserNanos = uint64Ptr(value * 1000) + case cpuSystemUsec: + cpuStat.SystemNanos = uint64Ptr(value * 1000) + case cpuNrPeriods: + cpuStat.NrPeriods = new(uint32) + *cpuStat.NrPeriods = uint32(value) + case cpuNrThrottled: + cpuStat.NrThrottled = new(uint32) + *cpuStat.NrThrottled = uint32(value) + case cpuThrottledUsec: + cpuStat.ThrottledNanos = uint64Ptr(value * 1000) + } + } + + if err := scanner.Err(); err != nil { + return nil, err + } + + // Read cpu.pressure file + pressure, err := parsePressureFile(filepath.Join(cgroupPath, "cpu.pressure")) + if err == nil { + cpuStat.Pressure = pressure + } + + return cpuStat, nil +} +func parsePressureFile(filename string) (*types.Pressure, error) { + content, err := os.ReadFile(filename) + if err != nil { + if errors.Is(err, os.ErrNotExist) { // pressure file requires CONFIG_PSI + return nil, nil + } + return nil, err + } + + lines := strings.Split(string(content), "\n") + + pressure := &types.Pressure{} + for _, line := range lines { + // Skip empty lines + if len(strings.TrimSpace(line)) == 0 { + continue + } + + fields := strings.Fields(line) + prefix := fields[0] + pressureValues := &types.PressureValues{} + + for i := 1; i < len(fields); i++ { + keyValue := strings.Split(fields[i], "=") + key := keyValue[0] + valueStr := keyValue[1] + + if key == "total" { + totalValue, err := strconv.ParseUint(valueStr, 10, 64) + if err != nil { + return nil, err + } + pressureValues.Total = &totalValue + } else { + value, err := strconv.ParseFloat(valueStr, 64) + if err != nil { + return nil, err + } + + switch key { + case "avg10": + pressureValues.Avg10 = &value + case "avg60": + pressureValues.Avg60 = &value + case "avg300": + pressureValues.Avg300 = &value + } + } + } + + switch prefix { + case "some": + pressure.Some = pressureValues + case "full": + pressure.Full = pressureValues + } + } + + return pressure, nil +} diff --git a/executor/resources/cpu_test.go b/executor/resources/cpu_test.go new file mode 100644 index 000000000000..e85e2545cf3f --- /dev/null +++ b/executor/resources/cpu_test.go @@ -0,0 +1,91 @@ +package resources + +import ( + "os" + "path/filepath" + "testing" + + "github.com/moby/buildkit/executor/resources/types" + "github.com/stretchr/testify/require" +) + +func createDummyCgroupFS(t *testing.T, cpuStatContents string) (string, error) { + tmpDir := t.TempDir() + + err := os.WriteFile(filepath.Join(tmpDir, "cpu.stat"), []byte(cpuStatContents), 0644) + if err != nil { + return "", err + } + + return tmpDir, nil +} + +func TestGetCgroupCPUStat(t *testing.T) { + cpuStatContents := `usage_usec 1234567 +user_usec 123456 +system_usec 123456 +nr_periods 123 +nr_throttled 12 +throttled_usec 123456` + + tmpDir, err := createDummyCgroupFS(t, cpuStatContents) + require.NoError(t, err) + + cpuStat, err := getCgroupCPUStat(tmpDir) + require.NoError(t, err) + + require.NotNil(t, cpuStat.UsageNanos) + require.Equal(t, uint64(1234567000), *cpuStat.UsageNanos) + + require.NotNil(t, cpuStat.UserNanos) + require.Equal(t, uint64(123456000), *cpuStat.UserNanos) + + require.NotNil(t, cpuStat.SystemNanos) + require.Equal(t, uint64(123456000), *cpuStat.SystemNanos) + + require.NotNil(t, cpuStat.NrPeriods) + require.Equal(t, uint32(123), *cpuStat.NrPeriods) + + require.NotNil(t, cpuStat.NrThrottled) + require.Equal(t, uint32(12), *cpuStat.NrThrottled) + + require.NotNil(t, cpuStat.ThrottledNanos) + require.Equal(t, uint64(123456000), *cpuStat.ThrottledNanos) +} +func TestReadPressureFile(t *testing.T) { + pressureContents := `some avg10=1.23 avg60=4.56 avg300=7.89 total=3031 +full avg10=0.12 avg60=0.34 avg300=0.56 total=9876` + + tmpFile := filepath.Join(t.TempDir(), "pressure_test") + err := os.WriteFile(tmpFile, []byte(pressureContents), os.ModePerm) + require.NoError(t, err) + + pressure, err := parsePressureFile(tmpFile) + require.NoError(t, err) + + some123 := 1.23 + some456 := 4.56 + some789 := 7.89 + some3031 := uint64(3031) + full12 := 0.12 + full34 := 0.34 + full56 := 0.56 + full9876 := uint64(9876) + + expected := &types.Pressure{ + Some: &types.PressureValues{ + Avg10: &some123, + Avg60: &some456, + Avg300: &some789, + Total: &some3031, + }, + Full: &types.PressureValues{ + Avg10: &full12, + Avg60: &full34, + Avg300: &full56, + Total: &full9876, + }, + } + + require.Equal(t, expected, pressure) +} diff --git a/executor/resources/io.go b/executor/resources/io.go new file mode 100644 index 000000000000..be56d7637535 --- /dev/null +++ b/executor/resources/io.go @@ -0,0 +1,117 @@ +package resources + +import ( + "os" + "path/filepath" + "strconv" + "strings" + + "github.com/moby/buildkit/executor/resources/types" + "github.com/pkg/errors" +) + +const ( + ioStatFile = "io.stat" + ioPressureFile = "io.pressure" +) + +const ( + ioReadBytes = "rbytes" + ioWriteBytes = "wbytes" + ioDiscardBytes = "dbytes" + ioReadIOs = "rios" + ioWriteIOs = "wios" + ioDiscardIOs = "dios" +) + +func getCgroupIOStat(cgroupPath string) (*types.IOStat, error) { + ioStatPath := filepath.Join(cgroupPath, ioStatFile) + data, err := os.ReadFile(ioStatPath) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil, nil + } + return nil, errors.Wrapf(err, "failed to read %s", ioStatPath) + } + + ioStat := &types.IOStat{} + lines := strings.Split(string(data), "\n") + for _, line := range lines { + parts := strings.Fields(line) + if len(parts) < 2 { + continue + } + + for _, part := range parts[1:] { + key, value := parseKeyValue(part) + if key == "" { + continue + } + + switch key { + case ioReadBytes: + if ioStat.ReadBytes != nil { + *ioStat.ReadBytes += value + } else { + ioStat.ReadBytes = uint64Ptr(value) + } + case ioWriteBytes: + if ioStat.WriteBytes != nil { + *ioStat.WriteBytes += value + } else { + ioStat.WriteBytes = uint64Ptr(value) + } + case ioDiscardBytes: + if ioStat.DiscardBytes != nil { + *ioStat.DiscardBytes += value + } else { + ioStat.DiscardBytes = uint64Ptr(value) + } + case ioReadIOs: + if ioStat.ReadIOs != nil { + *ioStat.ReadIOs += value + } else { + ioStat.ReadIOs = uint64Ptr(value) + } + case ioWriteIOs: + if ioStat.WriteIOs != nil { + *ioStat.WriteIOs += value + } else { + ioStat.WriteIOs = uint64Ptr(value) + } + case ioDiscardIOs: + if ioStat.DiscardIOs != nil { + *ioStat.DiscardIOs += value + } else { + ioStat.DiscardIOs = uint64Ptr(value) + } + } + } + } + + // Parse the pressure + pressure, err := parsePressureFile(filepath.Join(cgroupPath, ioPressureFile)) + if err != nil { + return nil, err + } + ioStat.Pressure = pressure + + return ioStat, nil +} + +func parseKeyValue(kv string) (key string, value uint64) { + parts := strings.SplitN(kv, "=", 2) + if len(parts) != 2 { + return "", 0 + } + key = parts[0] + value, err := strconv.ParseUint(parts[1], 10, 64) + if err != nil { + return "", 0 + } + return key, value +} + +func uint64Ptr(v uint64) *uint64 { + return &v +} diff --git a/executor/resources/io_test.go b/executor/resources/io_test.go new file mode 100644 index 000000000000..ef7989493503 --- /dev/null +++ b/executor/resources/io_test.go @@ -0,0 +1,53 @@ +package resources + +import ( + "os" + "path/filepath" + "testing" + + "github.com/moby/buildkit/executor/resources/types" + "github.com/stretchr/testify/assert" +) + +func TestParseIOStat(t *testing.T) { + testDir := t.TempDir() + + ioStatContents := `8:0 rbytes=1024 wbytes=2048 dbytes=4096 rios=16 wios=32 dios=64 +8:1 rbytes=512 wbytes=1024 dbytes=2048 rios=8 wios=16 dios=32` + err := os.WriteFile(filepath.Join(testDir, "io.stat"), []byte(ioStatContents), 0644) + assert.NoError(t, err) + + ioPressureContents := `some avg10=1.23 avg60=4.56 avg300=7.89 total=3031 +full avg10=0.12 avg60=0.34 avg300=0.56 total=9876` + err = os.WriteFile(filepath.Join(testDir, "io.pressure"), []byte(ioPressureContents), 0644) + assert.NoError(t, err) + + ioStat, err := getCgroupIOStat(testDir) + assert.NoError(t, err) + + var expectedPressure = &types.Pressure{ + Some: &types.PressureValues{ + Avg10: float64Ptr(1.23), + Avg60: float64Ptr(4.56), + Avg300: float64Ptr(7.89), + Total: uint64Ptr(3031), + }, + Full: &types.PressureValues{ + Avg10: float64Ptr(0.12), + Avg60: float64Ptr(0.34), + Avg300: float64Ptr(0.56), + Total: uint64Ptr(9876), + }, + } + + expectedIOStat := &types.IOStat{ + ReadBytes: uint64Ptr(1024 + 512), + WriteBytes: uint64Ptr(2048 + 1024), + DiscardBytes: uint64Ptr(4096 + 2048), + ReadIOs: uint64Ptr(16 + 8), + WriteIOs: uint64Ptr(32 + 16), + DiscardIOs: uint64Ptr(64 + 32), + Pressure: expectedPressure, + } + assert.Equal(t, expectedIOStat, ioStat) +} diff --git a/executor/resources/memory.go b/executor/resources/memory.go new file mode 100644 index 000000000000..775f0f8dae61 --- /dev/null +++ b/executor/resources/memory.go @@ -0,0 +1,159 @@ +package resources + +import ( + "os" + "path/filepath" + "strconv" + "strings" + + "github.com/moby/buildkit/executor/resources/types" + "github.com/pkg/errors" +) + +const ( + memoryStatFile = "memory.stat" + memoryPressureFile = "memory.pressure" + memoryPeakFile = "memory.peak" + memorySwapCurrentFile = "memory.swap.current" + memoryEventsFile = "memory.events" +) + +const ( + memoryAnon = "anon" + memoryFile = "file" + memoryKernelStack = "kernel_stack" + memoryPageTables = "pagetables" + memorySock = "sock" + memoryShmem = "shmem" + memoryFileMapped = "file_mapped" + memoryFileDirty = "file_dirty" + memoryFileWriteback = "file_writeback" + memorySlab = "slab" + memoryPgscan = "pgscan" + memoryPgsteal = "pgsteal" + memoryPgfault = "pgfault" + memoryPgmajfault = "pgmajfault" + + memoryLow = "low" + memoryHigh = "high" + memoryMax = "max" + memoryOom = "oom" + memoryOomKill = "oom_kill" +) + +func getCgroupMemoryStat(path string) (*types.MemoryStat, error) { + memoryStat := &types.MemoryStat{} + + // Parse memory.stat + err := parseKeyValueFile(filepath.Join(path, memoryStatFile), func(key string, value uint64) { + switch key { + case memoryAnon: + memoryStat.Anon = &value + case memoryFile: + memoryStat.File = &value + case memoryKernelStack: + memoryStat.KernelStack = &value + case memoryPageTables: + memoryStat.PageTables = &value + case memorySock: + memoryStat.Sock = &value + case memoryShmem: + memoryStat.Shmem = &value + case memoryFileMapped: + memoryStat.FileMapped = &value + case memoryFileDirty: + memoryStat.FileDirty = &value + case memoryFileWriteback: + memoryStat.FileWriteback = &value + case memorySlab: + memoryStat.Slab = &value + case memoryPgscan: + memoryStat.Pgscan = &value + case memoryPgsteal: + memoryStat.Pgsteal = &value + case memoryPgfault: + memoryStat.Pgfault = &value + case memoryPgmajfault: + memoryStat.Pgmajfault = &value + } + }) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil, nil + } + return nil, err + } + + pressure, err := parsePressureFile(filepath.Join(path, memoryPressureFile)) + if err != nil { + return nil, err + } + if pressure != nil { + memoryStat.Pressure = pressure + } + + err = parseKeyValueFile(filepath.Join(path, memoryEventsFile), func(key string, value uint64) { + switch key { + case memoryLow: + memoryStat.LowEvents = value + case memoryHigh: + memoryStat.HighEvents = value + case memoryMax: + memoryStat.MaxEvents = value + case memoryOom: + memoryStat.OomEvents = value + case memoryOomKill: + memoryStat.OomKillEvents = value + } + }) + + if err != nil { + return nil, err + } + + peak, err := parseSingleValueFile(filepath.Join(path, memoryPeakFile)) + if err != nil { + if !errors.Is(err, os.ErrNotExist) { + return nil, err + } + } else { + memoryStat.Peak = &peak + } + + swap, err := parseSingleValueFile(filepath.Join(path, memorySwapCurrentFile)) + if err != nil { + if !errors.Is(err, os.ErrNotExist) { + return nil, err + } + } else { + memoryStat.SwapBytes = &swap + } + + return memoryStat, nil +} + +func parseKeyValueFile(filePath string, callback func(key string, value uint64)) error { + content, err := os.ReadFile(filePath) + if err != nil { + return errors.Wrapf(err, "failed to read %s", filePath) + } + + lines := strings.Split(string(content), "\n") + for _, line := range lines { + if len(strings.TrimSpace(line)) == 0 { + continue + } + + fields := strings.Fields(line) + key := fields[0] + valueStr := fields[1] + value, err := strconv.ParseUint(valueStr, 10, 64) + if err != nil { + return errors.Wrapf(err, "failed to parse value for %s", key) + } + + callback(key, value) + } + + return nil +} diff --git a/executor/resources/memory_test.go b/executor/resources/memory_test.go new file mode 100644 index 000000000000..2b68d66bb26d --- /dev/null +++ b/executor/resources/memory_test.go @@ -0,0 +1,98 @@ +package resources + +import ( + "os" + "path/filepath" + "testing" + + "github.com/moby/buildkit/executor/resources/types" + "github.com/stretchr/testify/assert" +) + +func TestParseMemoryStat(t *testing.T) { + testDir := t.TempDir() + + memoryStatContents := `anon 24576 +file 12791808 +kernel_stack 8192 +pagetables 4096 +sock 2048 +shmem 16384 +file_mapped 8192 +file_dirty 32768 +file_writeback 16384 +slab 1503104 +pgscan 100 +pgsteal 99 +pgfault 32711 +pgmajfault 12` + err := os.WriteFile(filepath.Join(testDir, memoryStatFile), []byte(memoryStatContents), 0644) + assert.NoError(t, err) + + memoryPressureContents := `some avg10=1.23 avg60=4.56 avg300=7.89 total=3031 +full avg10=0.12 avg60=0.34 avg300=0.56 total=9876` + err = os.WriteFile(filepath.Join(testDir, memoryPressureFile), []byte(memoryPressureContents), 0644) + assert.NoError(t, err) + + memoryEventsContents := `low 4 +high 3 +max 2 +oom 1 +oom_kill 5` + err = os.WriteFile(filepath.Join(testDir, memoryEventsFile), []byte(memoryEventsContents), 0644) + assert.NoError(t, err) + + err = os.WriteFile(filepath.Join(testDir, memoryPeakFile), []byte("123456"), 0644) + assert.NoError(t, err) + + err = os.WriteFile(filepath.Join(testDir, memorySwapCurrentFile), []byte("987654"), 0644) + assert.NoError(t, err) + + memoryStat, err := getCgroupMemoryStat(testDir) + assert.NoError(t, err) + + var expectedPressure = &types.Pressure{ + Some: &types.PressureValues{ + Avg10: float64Ptr(1.23), + Avg60: float64Ptr(4.56), + Avg300: float64Ptr(7.89), + Total: uint64Ptr(3031), + }, + Full: &types.PressureValues{ + Avg10: float64Ptr(0.12), + Avg60: float64Ptr(0.34), + Avg300: float64Ptr(0.56), + Total: uint64Ptr(9876), + }, + } + + expectedMemoryStat := &types.MemoryStat{ + SwapBytes: uint64Ptr(987654), + Anon: uint64Ptr(24576), + File: uint64Ptr(12791808), + KernelStack: uint64Ptr(8192), + PageTables: uint64Ptr(4096), + Sock: uint64Ptr(2048), + Shmem: uint64Ptr(16384), + FileMapped: uint64Ptr(8192), + FileDirty: uint64Ptr(32768), + FileWriteback: uint64Ptr(16384), + Slab: uint64Ptr(1503104), + Pgscan: uint64Ptr(100), + Pgsteal: uint64Ptr(99), + Pgfault: uint64Ptr(32711), + Pgmajfault: uint64Ptr(12), + Peak: uint64Ptr(123456), + LowEvents: 4, + HighEvents: 3, + MaxEvents: 2, + OomEvents: 1, + OomKillEvents: 5, + Pressure: expectedPressure, + } + assert.Equal(t, expectedMemoryStat, memoryStat) +} + +func float64Ptr(v float64) *float64 { + return &v +} diff --git a/executor/resources/monitor.go b/executor/resources/monitor.go new file mode 100644 index 000000000000..78438b26df76 --- /dev/null +++ b/executor/resources/monitor.go @@ -0,0 +1,280 @@ +package resources + +import ( + "bufio" + "context" + "os" + "path/filepath" + "strconv" + "strings" + "sync" + "time" + + "github.com/moby/buildkit/executor/resources/types" + "github.com/moby/buildkit/util/network" + "github.com/prometheus/procfs" + "github.com/sirupsen/logrus" +) + +const ( + cgroupProcsFile = "cgroup.procs" + cgroupControllersFile = "cgroup.controllers" + cgroupSubtreeFile = "cgroup.subtree_control" + defaultMountpoint = "/sys/fs/cgroup" + initGroup = "init" +) + +var initOnce sync.Once +var isCgroupV2 bool + +type cgroupRecord struct { + once sync.Once + ns string + sampler *Sub[*types.Sample] + closeSampler func() error + samples []*types.Sample + err error + done chan struct{} + monitor *Monitor + netSampler NetworkSampler + startCPUStat *procfs.CPUStat + sysCPUStat *types.SysCPUStat +} + +func (r *cgroupRecord) Wait() error { + go r.close() + <-r.done + return r.err +} + +func (r *cgroupRecord) Start() { + if stat, err := r.monitor.proc.Stat(); err == nil { + r.startCPUStat = &stat.CPUTotal + } + s := NewSampler(2*time.Second, 10, r.sample) + r.sampler = s.Record() + r.closeSampler = s.Close +} + +func (r *cgroupRecord) CloseAsync(next func(context.Context) error) error { + go func() { + r.close() + next(context.TODO()) + }() + return nil +} + +func (r *cgroupRecord) close() { + r.once.Do(func() { + defer close(r.done) + go func() { + r.monitor.mu.Lock() + delete(r.monitor.records, r.ns) + r.monitor.mu.Unlock() + }() + if r.sampler == nil { + return + } + s, err := r.sampler.Close(true) + if err != nil { + r.err = err + } else { + r.samples = s + } + r.closeSampler() + + if r.startCPUStat != nil { + stat, err := r.monitor.proc.Stat() + if err == nil { + cpu := &types.SysCPUStat{ + User: stat.CPUTotal.User - r.startCPUStat.User, + Nice: stat.CPUTotal.Nice - r.startCPUStat.Nice, + System: stat.CPUTotal.System - r.startCPUStat.System, + Idle: stat.CPUTotal.Idle - r.startCPUStat.Idle, + Iowait: stat.CPUTotal.Iowait - r.startCPUStat.Iowait, + IRQ: stat.CPUTotal.IRQ - r.startCPUStat.IRQ, + SoftIRQ: stat.CPUTotal.SoftIRQ - r.startCPUStat.SoftIRQ, + Steal: stat.CPUTotal.Steal - r.startCPUStat.Steal, + Guest: stat.CPUTotal.Guest - r.startCPUStat.Guest, + GuestNice: stat.CPUTotal.GuestNice - r.startCPUStat.GuestNice, + } + r.sysCPUStat = cpu + } + } + }) +} + +func (r *cgroupRecord) sample(tm time.Time) (*types.Sample, error) { + cpu, err := getCgroupCPUStat(filepath.Join(defaultMountpoint, r.ns)) + if err != nil { + return nil, err + } + memory, err := getCgroupMemoryStat(filepath.Join(defaultMountpoint, r.ns)) + if err != nil { + return nil, err + } + io, err := getCgroupIOStat(filepath.Join(defaultMountpoint, r.ns)) + if err != nil { + return nil, err + } + pids, err := getCgroupPIDsStat(filepath.Join(defaultMountpoint, r.ns)) + if err != nil { + return nil, err + } + sample := &types.Sample{ + Timestamp_: tm, + CPUStat: cpu, + MemoryStat: memory, + IOStat: io, + PIDsStat: pids, + } + if r.netSampler != nil { + net, err := r.netSampler.Sample() + if err != nil { + return nil, err + } + sample.NetStat = net + } + return sample, nil +} + +func (r *cgroupRecord) Samples() (*types.Samples, error) { + <-r.done + if r.err != nil { + return nil, r.err + } + return &types.Samples{ + Samples: r.samples, + SysCPUStat: r.sysCPUStat, + }, nil +} + +type nopRecord struct { +} + +func (r *nopRecord) Wait() error { + return nil +} + +func (r *nopRecord) Samples() (*types.Samples, error) { + return nil, nil +} + +func (r *nopRecord) CloseAsync(next func(context.Context) error) error { + return next(context.TODO()) +} + +func (r *nopRecord) Start() { +} + +type Monitor struct { + mu sync.Mutex + closed chan struct{} + records map[string]*cgroupRecord + proc procfs.FS +} + +type NetworkSampler interface { + Sample() (*network.Sample, error) +} + +type RecordOpt struct { + NetworkSampler NetworkSampler +} + +func (m *Monitor) RecordNamespace(ns string, opt RecordOpt) (types.Recorder, error) { + isClosed := false + select { + case <-m.closed: + isClosed = true + default: + } + if !isCgroupV2 || isClosed { + return &nopRecord{}, nil + } + r := &cgroupRecord{ + ns: ns, + done: make(chan struct{}), + monitor: m, + netSampler: opt.NetworkSampler, + } + m.mu.Lock() + m.records[ns] = r + m.mu.Unlock() + return r, nil +} + +func (m *Monitor) Close() error { + close(m.closed) + m.mu.Lock() + defer m.mu.Unlock() + + for _, r := range m.records { + r.close() + } + return nil +} + +func NewMonitor() (*Monitor, error) { + initOnce.Do(func() { + isCgroupV2 = isCgroup2() + if !isCgroupV2 { + return + } + if err := prepareCgroupControllers(); err != nil { + logrus.Warnf("failed to prepare cgroup controllers: %+v", err) + } + }) + + fs, err := procfs.NewDefaultFS() + if err != nil { + return nil, err + } + + return &Monitor{ + closed: make(chan struct{}), + records: make(map[string]*cgroupRecord), + proc: fs, + }, nil +} + +func prepareCgroupControllers() error { + v, ok := os.LookupEnv("BUILDKIT_SETUP_CGROUPV2_ROOT") + if !ok { + return nil + } + if b, _ := strconv.ParseBool(v); !b { + return nil + } + // move current process to init cgroup + if err := os.MkdirAll(filepath.Join(defaultMountpoint, initGroup), 0755); err != nil { + return err + } + f, err := os.OpenFile(filepath.Join(defaultMountpoint, cgroupProcsFile), os.O_RDONLY, 0) + if err != nil { + return err + } + s := bufio.NewScanner(f) + for s.Scan() { + if err := os.WriteFile(filepath.Join(defaultMountpoint, initGroup, cgroupProcsFile), s.Bytes(), 0); err != nil { + return err + } + } + if err := f.Close(); err != nil { + return err + } + dt, err := os.ReadFile(filepath.Join(defaultMountpoint, cgroupControllersFile)) + if err != nil { + return err + } + for _, c := range strings.Split(string(dt), " ") { + if c == "" { + continue + } + if err := os.WriteFile(filepath.Join(defaultMountpoint, cgroupSubtreeFile), []byte("+"+c), 0); err != nil { + // ignore error + logrus.Warnf("failed to enable cgroup controller %q: %+v", c, err) + } + } + return nil +} diff --git a/executor/resources/monitor_linux.go b/executor/resources/monitor_linux.go new file mode 100644 index 000000000000..aefc2adce7d5 --- /dev/null +++ b/executor/resources/monitor_linux.go @@ -0,0 +1,15 @@ +//go:build linux +// +build linux + +package resources + +import "golang.org/x/sys/unix" + +func isCgroup2() bool { + var st unix.Statfs_t + err := unix.Statfs(defaultMountpoint, &st) + if err != nil { + return false + } + return st.Type == unix.CGROUP2_SUPER_MAGIC +} diff --git a/executor/resources/monitor_nolinux.go b/executor/resources/monitor_nolinux.go new file mode 100644 index 000000000000..20a50a648cb6 --- /dev/null +++ b/executor/resources/monitor_nolinux.go @@ -0,0 +1,8 @@ +//go:build !linux +// +build !linux + +package resources + +func isCgroup2() bool { + return false +} diff --git a/executor/resources/pids.go b/executor/resources/pids.go new file mode 100644 index 000000000000..88493d805eb3 --- /dev/null +++ b/executor/resources/pids.go @@ -0,0 +1,45 @@ +package resources + +import ( + "os" + "path/filepath" + "strconv" + "strings" + + "github.com/moby/buildkit/executor/resources/types" + "github.com/pkg/errors" +) + +const ( + pidsCurrentFile = "pids.current" +) + +func getCgroupPIDsStat(path string) (*types.PIDsStat, error) { + pidsStat := &types.PIDsStat{} + + v, err := parseSingleValueFile(filepath.Join(path, pidsCurrentFile)) + if err != nil { + if !errors.Is(err, os.ErrNotExist) { + return nil, err + } + } else { + pidsStat.Current = &v + } + + return pidsStat, nil +} + +func parseSingleValueFile(filePath string) (uint64, error) { + content, err := os.ReadFile(filePath) + if err != nil { + return 0, errors.Wrapf(err, "failed to read %s", filePath) + } + + valueStr := strings.TrimSpace(string(content)) + value, err := strconv.ParseUint(valueStr, 10, 64) + if err != nil { + return 0, errors.Wrapf(err, "failed to parse value: %s", valueStr) + } + + return value, nil +} diff --git a/executor/resources/pids_test.go b/executor/resources/pids_test.go new file mode 100644 index 000000000000..928be80798e1 --- /dev/null +++ b/executor/resources/pids_test.go @@ -0,0 +1,25 @@ +package resources + +import ( + "os" + "path/filepath" + "testing" + + "github.com/moby/buildkit/executor/resources/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParsePidsStat(t *testing.T) { + testDir := t.TempDir() + + err := os.WriteFile(filepath.Join(testDir, "pids.current"), []byte("123"), 0644) + assert.NoError(t, err) + + expectedPidsStat := &types.PIDsStat{ + Current: uint64Ptr(123), + } + stats, err := getCgroupPIDsStat(filepath.Join(testDir)) + require.NoError(t, err) + assert.Equal(t, expectedPidsStat, stats) +} diff --git a/executor/resources/sampler.go b/executor/resources/sampler.go new file mode 100644 index 000000000000..b2645cae0e25 --- /dev/null +++ b/executor/resources/sampler.go @@ -0,0 +1,133 @@ +package resources + +import ( + "sync" + "time" +) + +type WithTimestamp interface { + Timestamp() time.Time +} + +type Sampler[T WithTimestamp] struct { + mu sync.RWMutex + minInterval time.Duration + maxSamples int + callback func(ts time.Time) (T, error) + doneOnce sync.Once + done chan struct{} + running bool + subs map[*Sub[T]]struct{} +} + +type Sub[T WithTimestamp] struct { + sampler *Sampler[T] + interval time.Duration + first time.Time + last time.Time + samples []T + err error +} + +func (s *Sub[T]) Close(captureLast bool) ([]T, error) { + s.sampler.mu.Lock() + delete(s.sampler.subs, s) + s.sampler.mu.Unlock() + + if s.err != nil { + return nil, s.err + } + current := s.first + out := make([]T, 0, len(s.samples)+1) + for i, v := range s.samples { + ts := v.Timestamp() + if i == 0 || ts.Sub(current) >= s.interval { + out = append(out, v) + current = ts + } + } + + if captureLast { + v, err := s.sampler.callback(time.Now()) + if err != nil { + return nil, err + } + out = append(out, v) + } + + return out, nil +} + +func NewSampler[T WithTimestamp](minInterval time.Duration, maxSamples int, cb func(time.Time) (T, error)) *Sampler[T] { + s := &Sampler[T]{ + minInterval: minInterval, + maxSamples: maxSamples, + callback: cb, + done: make(chan struct{}), + subs: make(map[*Sub[T]]struct{}), + } + return s +} + +func (s *Sampler[T]) Record() *Sub[T] { + ss := &Sub[T]{ + interval: s.minInterval, + first: time.Now(), + sampler: s, + } + s.mu.Lock() + s.subs[ss] = struct{}{} + if !s.running { + s.running = true + go s.run() + } + s.mu.Unlock() + return ss +} + +func (s *Sampler[T]) run() { + ticker := time.NewTimer(s.minInterval) + for { + select { + case <-s.done: + ticker.Stop() + return + case <-ticker.C: + tm := time.Now() + active := make([]*Sub[T], 0, len(s.subs)) + s.mu.RLock() + for ss := range s.subs { + if tm.Sub(ss.last) < ss.interval { + continue + } + ss.last = tm + active = append(active, ss) + } + s.mu.RUnlock() + ticker = time.NewTimer(s.minInterval) + if len(active) == 0 { + continue + } + value, err := s.callback(tm) + for _, ss := range active { + if err != nil { + ss.err = err + } else { + ss.samples = append(ss.samples, value) + ss.err = nil + } + dur := ss.last.Sub(ss.first) + if time.Duration(ss.interval)*time.Duration(s.maxSamples) <= dur { + ss.interval *= 2 + } + } + } + } +} + +func (s *Sampler[T]) Close() error { + s.doneOnce.Do(func() { + close(s.done) + }) + return nil +} diff --git a/executor/resources/sys.go b/executor/resources/sys.go new file mode 100644 index 000000000000..9b906bfad85f --- /dev/null +++ b/executor/resources/sys.go @@ -0,0 +1,95 @@ +package resources + +import ( + "os" + "time" + + "github.com/moby/buildkit/executor/resources/types" + "github.com/prometheus/procfs" +) + +type SysSampler = Sub[*types.SysSample] + +func NewSysSampler() (*Sampler[*types.SysSample], error) { + procfs, err := procfs.NewDefaultFS() + if err != nil { + return nil, err + } + + return NewSampler(2*time.Second, 20, func(tm time.Time) (*types.SysSample, error) { + return sampleSys(procfs, tm) + }), nil +} + +func sampleSys(proc procfs.FS, tm time.Time) (*types.SysSample, error) { + stat, err := proc.Stat() + if err != nil { + return nil, err + } + + s := &types.SysSample{ + Timestamp_: tm, + } + + s.CPUStat = &types.SysCPUStat{ + User: stat.CPUTotal.User, + Nice: stat.CPUTotal.Nice, + System: stat.CPUTotal.System, + Idle: stat.CPUTotal.Idle, + Iowait: stat.CPUTotal.Iowait, + IRQ: stat.CPUTotal.IRQ, + SoftIRQ: stat.CPUTotal.SoftIRQ, + Steal: stat.CPUTotal.Steal, + Guest: stat.CPUTotal.Guest, + GuestNice: stat.CPUTotal.GuestNice, + } + + s.ProcStat = &types.ProcStat{ + ContextSwitches: stat.ContextSwitches, + ProcessCreated: stat.ProcessCreated, + ProcessesRunning: stat.ProcessesRunning, + } + + mem, err := proc.Meminfo() + if err != nil { + return nil, err + } + + s.MemoryStat = &types.SysMemoryStat{ + Total: mem.MemTotal, + Free: mem.MemFree, + Buffers: mem.Buffers, + Cached: mem.Cached, + Active: mem.Active, + Inactive: mem.Inactive, + Swap: mem.SwapTotal, + Available: mem.MemAvailable, + Dirty: mem.Dirty, + Writeback: mem.Writeback, + Slab: mem.Slab, + } + + if _, err := os.Lstat("/proc/pressure"); err != nil { + return s, nil + } + + cp, err := parsePressureFile("/proc/pressure/cpu") + if err != nil { + return nil, err + } + s.CPUPressure = cp + + mp, err := parsePressureFile("/proc/pressure/memory") + if err != nil { + return nil, err + } + s.MemoryPressure = mp + + ip, err := parsePressureFile("/proc/pressure/io") + if err != nil { + return nil, err + } + s.IOPressure = ip + + return s, nil +} diff --git a/executor/resources/types/systypes.go b/executor/resources/types/systypes.go new file mode 100644 index 000000000000..56db46945b23 --- /dev/null +++ b/executor/resources/types/systypes.go @@ -0,0 +1,72 @@ +package types + +import ( + "encoding/json" + "math" + "time" +) + +type SysCPUStat struct { + User float64 `json:"user"` + Nice float64 `json:"nice"` + System float64 `json:"system"` + Idle float64 `json:"idle"` + Iowait float64 `json:"iowait"` + IRQ float64 `json:"irq"` + SoftIRQ float64 `json:"softirq"` + Steal float64 `json:"steal"` + Guest float64 `json:"guest"` + GuestNice float64 `json:"guestNice"` +} + +type sysCPUStatAlias SysCPUStat // avoid recursion of MarshalJSON + +func (s SysCPUStat) MarshalJSON() ([]byte, error) { + return json.Marshal(sysCPUStatAlias{ + User: math.Round(s.User*1000) / 1000, + Nice: math.Round(s.Nice*1000) / 1000, + System: math.Round(s.System*1000) / 1000, + Idle: math.Round(s.Idle*1000) / 1000, + Iowait: math.Round(s.Iowait*1000) / 1000, + IRQ: math.Round(s.IRQ*1000) / 1000, + SoftIRQ: math.Round(s.SoftIRQ*1000) / 1000, + Steal: math.Round(s.Steal*1000) / 1000, + Guest: math.Round(s.Guest*1000) / 1000, + GuestNice: math.Round(s.GuestNice*1000) / 1000, + }) +} + +type ProcStat struct { + ContextSwitches uint64 `json:"contextSwitches"` + ProcessCreated uint64 `json:"processCreated"` + ProcessesRunning uint64 `json:"processesRunning"` +} + +type SysMemoryStat struct { + Total *uint64 `json:"total"` + Free *uint64 `json:"free"` + Available *uint64 `json:"available"` + Buffers *uint64 `json:"buffers"` + Cached *uint64 `json:"cached"` + Active *uint64 `json:"active"` + Inactive *uint64 `json:"inactive"` + Swap *uint64 `json:"swap"` + Dirty *uint64 `json:"dirty"` + Writeback *uint64 `json:"writeback"` + Slab *uint64 `json:"slab"` +} + +type SysSample struct { + //nolint + Timestamp_ time.Time `json:"timestamp"` + CPUStat *SysCPUStat `json:"cpuStat,omitempty"` + ProcStat *ProcStat `json:"procStat,omitempty"` + MemoryStat *SysMemoryStat `json:"memoryStat,omitempty"` + CPUPressure *Pressure `json:"cpuPressure,omitempty"` + MemoryPressure *Pressure `json:"memoryPressure,omitempty"` + IOPressure *Pressure `json:"ioPressure,omitempty"` +} + +func (s *SysSample) Timestamp() time.Time { + return s.Timestamp_ +} diff --git a/executor/resources/types/types.go b/executor/resources/types/types.go new file mode 100644 index 000000000000..8ef316a167e3 --- /dev/null +++ b/executor/resources/types/types.go @@ -0,0 +1,103 @@ +package types + +import ( + "context" + "time" + + "github.com/moby/buildkit/util/network" +) + +type Recorder interface { + Start() + CloseAsync(func(context.Context) error) error + Wait() error + Samples() (*Samples, error) +} + +type Samples struct { + Samples []*Sample `json:"samples,omitempty"` + SysCPUStat *SysCPUStat `json:"sysCPUStat,omitempty"` +} + +// Sample represents a wrapper for sampled data of cgroupv2 controllers +type Sample struct { + //nolint + Timestamp_ time.Time `json:"timestamp"` + CPUStat *CPUStat `json:"cpuStat,omitempty"` + MemoryStat *MemoryStat `json:"memoryStat,omitempty"` + IOStat *IOStat `json:"ioStat,omitempty"` + PIDsStat *PIDsStat `json:"pidsStat,omitempty"` + NetStat *network.Sample `json:"netStat,omitempty"` +} + +func (s *Sample) Timestamp() time.Time { + return s.Timestamp_ +} + +// CPUStat represents the sampling state of the cgroupv2 CPU controller +type CPUStat struct { + UsageNanos *uint64 `json:"usageNanos,omitempty"` + UserNanos *uint64 `json:"userNanos,omitempty"` + SystemNanos *uint64 `json:"systemNanos,omitempty"` + NrPeriods *uint32 `json:"nrPeriods,omitempty"` + NrThrottled *uint32 `json:"nrThrottled,omitempty"` + ThrottledNanos *uint64 `json:"throttledNanos,omitempty"` + Pressure *Pressure `json:"pressure,omitempty"` +} + +// MemoryStat represents the sampling state of the cgroupv2 memory controller +type MemoryStat struct { + SwapBytes *uint64 `json:"swapBytes,omitempty"` + Anon *uint64 `json:"anon,omitempty"` + File *uint64 `json:"file,omitempty"` + Kernel *uint64 `json:"kernel,omitempty"` + KernelStack *uint64 `json:"kernelStack,omitempty"` + PageTables *uint64 `json:"pageTables,omitempty"` + Sock *uint64 `json:"sock,omitempty"` + Vmalloc *uint64 `json:"vmalloc,omitempty"` + Shmem *uint64 `json:"shmem,omitempty"` + FileMapped *uint64 `json:"fileMapped,omitempty"` + FileDirty *uint64 `json:"fileDirty,omitempty"` + FileWriteback *uint64 `json:"fileWriteback,omitempty"` + Slab *uint64 `json:"slab,omitempty"` + Pgscan *uint64 `json:"pgscan,omitempty"` + Pgsteal *uint64 `json:"pgsteal,omitempty"` + Pgfault *uint64 `json:"pgfault,omitempty"` + Pgmajfault *uint64 `json:"pgmajfault,omitempty"` + Peak *uint64 `json:"peak,omitempty"` + LowEvents uint64 `json:"lowEvents,omitempty"` + HighEvents uint64 `json:"highEvents,omitempty"` + MaxEvents uint64 `json:"maxEvents,omitempty"` + OomEvents uint64 `json:"oomEvents,omitempty"` + OomKillEvents uint64 `json:"oomKillEvents,omitempty"` + Pressure *Pressure `json:"pressure,omitempty"` +} + +// IOStat represents the sampling state of the cgroupv2 IO controller +type IOStat struct { + ReadBytes *uint64 `json:"readBytes,omitempty"` + WriteBytes *uint64 `json:"writeBytes,omitempty"` + DiscardBytes *uint64 `json:"discardBytes,omitempty"` + ReadIOs *uint64 `json:"readIOs,omitempty"` + WriteIOs *uint64 `json:"writeIOs,omitempty"` + DiscardIOs *uint64 `json:"discardIOs,omitempty"` + Pressure *Pressure `json:"pressure,omitempty"` +} + +// PIDsStat represents the sampling state of the cgroupv2 PIDs controller +type PIDsStat struct { + Current *uint64 `json:"current,omitempty"` +} + +// Pressure represents the sampling state of pressure files +type Pressure struct { + Some *PressureValues `json:"some"` + Full *PressureValues `json:"full"` +} + +type PressureValues struct { + Avg10 *float64 `json:"avg10"` + Avg60 *float64 `json:"avg60"` + Avg300 *float64 `json:"avg300"` + Total *uint64 `json:"total"` +} diff --git a/executor/runcexecutor/executor.go b/executor/runcexecutor/executor.go index c2587b33bd7c..9c12996e53da 100644 --- a/executor/runcexecutor/executor.go +++ b/executor/runcexecutor/executor.go @@ -23,6 +23,8 @@ import ( "github.com/docker/docker/pkg/idtools" "github.com/moby/buildkit/executor" "github.com/moby/buildkit/executor/oci" + "github.com/moby/buildkit/executor/resources" + resourcestypes "github.com/moby/buildkit/executor/resources/types" gatewayapi "github.com/moby/buildkit/frontend/gateway/pb" "github.com/moby/buildkit/identity" "github.com/moby/buildkit/solver/pb" @@ -51,6 +53,7 @@ type Opt struct { ApparmorProfile string SELinux bool TracingSocket string + ResourceMonitor *resources.Monitor } var defaultCommandCandidates = []string{"buildkit-runc", "runc"} @@ -71,6 +74,7 @@ type runcExecutor struct { apparmorProfile string selinux bool tracingSocket string + resmon *resources.Monitor } func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Executor, error) { @@ -136,11 +140,12 @@ func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Ex apparmorProfile: opt.ApparmorProfile, selinux: opt.SELinux, tracingSocket: opt.TracingSocket, + resmon: opt.ResourceMonitor, } return w, nil } -func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) { +func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (rec resourcestypes.Recorder, err error) { meta := process.Meta startedOnce := sync.Once{} @@ -163,13 +168,18 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, provider, ok := w.networkProviders[meta.NetMode] if !ok { - return errors.Errorf("unknown network mode %s", meta.NetMode) + return nil, errors.Errorf("unknown network mode %s", meta.NetMode) } namespace, err := provider.New(ctx, meta.Hostname) if err != nil { - return err + return nil, err } - defer namespace.Close() + doReleaseNetwork := true + defer func() { + if doReleaseNetwork { + namespace.Close() + } + }() if meta.NetMode == pb.NetMode_HOST { bklog.G(ctx).Info("enabling HostNetworking") @@ -177,12 +187,12 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, resolvConf, err := oci.GetResolvConf(ctx, w.root, w.idmap, w.dns) if err != nil { - return err + return nil, err } hostsFile, clean, err := oci.GetHostsFile(ctx, w.root, meta.ExtraHosts, w.idmap, meta.Hostname) if err != nil { - return err + return nil, err } if clean != nil { defer clean() @@ -190,12 +200,12 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, mountable, err := root.Src.Mount(ctx, false) if err != nil { - return err + return nil, err } rootMount, release, err := mountable.Mount() if err != nil { - return err + return nil, err } if release != nil { defer release() @@ -207,7 +217,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, bundle := filepath.Join(w.root, id) if err := os.Mkdir(bundle, 0o711); err != nil { - return err + return nil, err } defer os.RemoveAll(bundle) @@ -218,10 +228,10 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, rootFSPath := filepath.Join(bundle, "rootfs") if err := idtools.MkdirAllAndChown(rootFSPath, 0o700, identity); err != nil { - return err + return nil, err } if err := mount.All(rootMount, rootFSPath); err != nil { - return err + return nil, err } defer mount.Unmount(rootFSPath, 0) @@ -229,12 +239,12 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, uid, gid, sgids, err := oci.GetUser(rootFSPath, meta.User) if err != nil { - return err + return nil, err } f, err := os.Create(filepath.Join(bundle, "config.json")) if err != nil { - return err + return nil, err } defer f.Close() @@ -251,13 +261,13 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, if w.idmap != nil { identity, err = w.idmap.ToHost(identity) if err != nil { - return err + return nil, err } } spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, namespace, w.cgroupParent, w.processMode, w.idmap, w.apparmorProfile, w.selinux, w.tracingSocket, opts...) if err != nil { - return err + return nil, err } defer cleanup() @@ -268,11 +278,11 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, newp, err := fs.RootPath(rootFSPath, meta.Cwd) if err != nil { - return errors.Wrapf(err, "working dir %s points to invalid target", newp) + return nil, errors.Wrapf(err, "working dir %s points to invalid target", newp) } if _, err := os.Stat(newp); err != nil { if err := idtools.MkdirAllAndChown(newp, 0o755, identity); err != nil { - return errors.Wrapf(err, "failed to create working directory %s", newp) + return nil, errors.Wrapf(err, "failed to create working directory %s", newp) } } @@ -280,16 +290,26 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, spec.Process.OOMScoreAdj = w.oomScoreAdj if w.rootless { if err := rootlessspecconv.ToRootless(spec); err != nil { - return err + return nil, err } } if err := json.NewEncoder(f).Encode(spec); err != nil { - return err + return nil, err } bklog.G(ctx).Debugf("> creating %s %v", id, meta.Args) + cgroupPath := spec.Linux.CgroupsPath + if cgroupPath != "" { + rec, err = w.resmon.RecordNamespace(cgroupPath, resources.RecordOpt{ + NetworkSampler: namespace, + }) + if err != nil { + return nil, err + } + } + trace.SpanFromContext(ctx).AddEvent("Container created") err = w.run(ctx, id, bundle, process, func() { startedOnce.Do(func() { @@ -297,9 +317,33 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, if started != nil { close(started) } + if rec != nil { + rec.Start() + } }) - }) - return exitError(ctx, err) + }, true) + + releaseContainer := func(ctx context.Context) error { + err := w.runc.Delete(ctx, id, &runc.DeleteOpts{}) + err1 := namespace.Close() + if err == nil { + err = err1 + } + return err + } + doReleaseNetwork = false + + err = exitError(ctx, err) + if err != nil { + releaseContainer(context.TODO()) + return nil, err + } + + if rec == nil { + return nil, releaseContainer(context.TODO()) + } + + return rec, rec.CloseAsync(releaseContainer) } func exitError(ctx context.Context, err error) error { diff --git a/executor/runcexecutor/executor_common.go b/executor/runcexecutor/executor_common.go index 52f52ca39728..28955f9a4596 100644 --- a/executor/runcexecutor/executor_common.go +++ b/executor/runcexecutor/executor_common.go @@ -18,16 +18,21 @@ var unsupportedConsoleError = errors.New("tty for runc is only supported on linu func updateRuncFieldsForHostOS(runtime *runc.Runc) {} -func (w *runcExecutor) run(ctx context.Context, id, bundle string, process executor.ProcessInfo, started func()) error { +func (w *runcExecutor) run(ctx context.Context, id, bundle string, process executor.ProcessInfo, started func(), keep bool) error { if process.Meta.Tty { return unsupportedConsoleError } + extraArgs := []string{} + if keep { + extraArgs = append(extraArgs, "--keep") + } killer := newRunProcKiller(w.runc, id) return w.commonCall(ctx, id, bundle, process, started, killer, func(ctx context.Context, started chan<- int, io runc.IO, pidfile string) error { _, err := w.runc.Run(ctx, id, bundle, &runc.CreateOpts{ - NoPivot: w.noPivot, - Started: started, - IO: io, + NoPivot: w.noPivot, + Started: started, + IO: io, + ExtraArgs: extraArgs, }) return err }) diff --git a/executor/runcexecutor/executor_linux.go b/executor/runcexecutor/executor_linux.go index a9474a4e9129..e2c14950f0c0 100644 --- a/executor/runcexecutor/executor_linux.go +++ b/executor/runcexecutor/executor_linux.go @@ -21,13 +21,18 @@ func updateRuncFieldsForHostOS(runtime *runc.Runc) { runtime.PdeathSignal = syscall.SIGKILL // this can still leak the process } -func (w *runcExecutor) run(ctx context.Context, id, bundle string, process executor.ProcessInfo, started func()) error { +func (w *runcExecutor) run(ctx context.Context, id, bundle string, process executor.ProcessInfo, started func(), keep bool) error { killer := newRunProcKiller(w.runc, id) return w.callWithIO(ctx, id, bundle, process, started, killer, func(ctx context.Context, started chan<- int, io runc.IO, pidfile string) error { + extraArgs := []string{} + if keep { + extraArgs = append(extraArgs, "--keep") + } _, err := w.runc.Run(ctx, id, bundle, &runc.CreateOpts{ - NoPivot: w.noPivot, - Started: started, - IO: io, + NoPivot: w.noPivot, + Started: started, + IO: io, + ExtraArgs: extraArgs, }) return err }) diff --git a/frontend/gateway/container.go b/frontend/gateway/container.go index c7a8158796c7..a68f32e9d4fd 100644 --- a/frontend/gateway/container.go +++ b/frontend/gateway/container.go @@ -345,7 +345,7 @@ func (gwCtr *gatewayContainer) Start(_ context.Context, req client.StartRequest) startedCh := make(chan struct{}) gwProc.errGroup.Go(func() error { bklog.G(gwCtr.ctx).Debugf("Starting new container for %s with args: %q", gwCtr.id, procInfo.Meta.Args) - err := gwCtr.executor.Run(ctx, gwCtr.id, gwCtr.rootFS, gwCtr.mounts, procInfo, startedCh) + _, err := gwCtr.executor.Run(ctx, gwCtr.id, gwCtr.rootFS, gwCtr.mounts, procInfo, startedCh) return stack.Enable(err) }) select { diff --git a/frontend/gateway/gateway.go b/frontend/gateway/gateway.go index 5f42edee55e1..9d4fa496de2f 100644 --- a/frontend/gateway/gateway.go +++ b/frontend/gateway/gateway.go @@ -275,7 +275,7 @@ func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.Fronten mnts = append(mnts, *mdmnt) } - err = w.Executor().Run(ctx, "", mountWithSession(rootFS, session.NewGroup(sid)), mnts, executor.ProcessInfo{Meta: meta, Stdin: lbf.Stdin, Stdout: lbf.Stdout, Stderr: os.Stderr}, nil) + _, err = w.Executor().Run(ctx, "", mountWithSession(rootFS, session.NewGroup(sid)), mnts, executor.ProcessInfo{Meta: meta, Stdin: lbf.Stdin, Stdout: lbf.Stdout, Stderr: os.Stderr}, nil) if err != nil { if errdefs.IsCanceled(ctx, err) && lbf.isErrServerClosed { diff --git a/go.mod b/go.mod index f320e74f119f..aba31870f00f 100644 --- a/go.mod +++ b/go.mod @@ -57,6 +57,7 @@ require ( github.com/pelletier/go-toml v1.9.5 github.com/pkg/errors v0.9.1 github.com/pkg/profile v1.5.0 + github.com/prometheus/procfs v0.9.0 github.com/serialx/hashring v0.0.0-20190422032157-8b2912629002 github.com/sirupsen/logrus v1.9.0 github.com/spdx/tools-golang v0.5.1 @@ -146,7 +147,6 @@ require ( github.com/prometheus/client_golang v1.14.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.42.0 // indirect - github.com/prometheus/procfs v0.9.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/secure-systems-lab/go-securesystemslib v0.4.0 // indirect github.com/shibumi/go-pathspec v1.3.0 // indirect diff --git a/solver/llbsolver/bridge.go b/solver/llbsolver/bridge.go index 8c3fbed51c43..2ba05d724e70 100644 --- a/solver/llbsolver/bridge.go +++ b/solver/llbsolver/bridge.go @@ -177,6 +177,9 @@ func (rp *resultProxy) Definition() *pb.Definition { } func (rp *resultProxy) Provenance() interface{} { + if rp.provenance == nil { + return nil + } return rp.provenance } @@ -270,17 +273,18 @@ func (rp *resultProxy) Result(ctx context.Context) (res solver.CachedResult, err rp.mu.Unlock() return nil, errors.Errorf("evaluating released result") } - rp.v = v - rp.err = err if err == nil { - capture, err := captureProvenance(ctx, v) - if err != nil && rp.err != nil { - rp.err = errors.Wrapf(rp.err, "failed to capture provenance: %v", err) + var capture *provenance.Capture + capture, err = captureProvenance(ctx, v) + if err != nil { + err = errors.Errorf("failed to capture provenance: %v", err) v.Release(context.TODO()) - rp.v = nil + v = nil } rp.provenance = capture } + rp.v = v + rp.err = err rp.mu.Unlock() return v, err }) diff --git a/solver/llbsolver/ops/exec.go b/solver/llbsolver/ops/exec.go index 2bee1283b436..73481534a7e5 100644 --- a/solver/llbsolver/ops/exec.go +++ b/solver/llbsolver/ops/exec.go @@ -13,6 +13,7 @@ import ( "github.com/containerd/containerd/platforms" "github.com/moby/buildkit/cache" "github.com/moby/buildkit/executor" + resourcestypes "github.com/moby/buildkit/executor/resources/types" "github.com/moby/buildkit/frontend/gateway" "github.com/moby/buildkit/session" "github.com/moby/buildkit/session/secrets" @@ -43,6 +44,8 @@ type ExecOp struct { platform *pb.Platform numInputs int parallelism *semaphore.Weighted + rec resourcestypes.Recorder + digest digest.Digest } var _ solver.Op = &ExecOp{} @@ -62,9 +65,14 @@ func NewExecOp(v solver.Vertex, op *pb.Op_Exec, platform *pb.Platform, cm cache. w: w, platform: platform, parallelism: parallelism, + digest: v.Digest(), }, nil } +func (e *ExecOp) Digest() digest.Digest { + return e.digest +} + func (e *ExecOp) Proto() *pb.ExecOp { return e.op } @@ -357,7 +365,7 @@ func (e *ExecOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu } }() - execErr := e.exec.Run(ctx, "", p.Root, p.Mounts, executor.ProcessInfo{ + rec, execErr := e.exec.Run(ctx, "", p.Root, p.Mounts, executor.ProcessInfo{ Meta: meta, Stdin: nil, Stdout: stdout, @@ -377,6 +385,7 @@ func (e *ExecOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu // Prevent the result from being released. p.OutputRefs[i].Ref = nil } + e.rec = rec return results, errors.Wrapf(execErr, "process %q did not complete successfully", strings.Join(e.op.Meta.Args, " ")) } @@ -446,3 +455,10 @@ func (e *ExecOp) loadSecretEnv(ctx context.Context, g session.Group) ([]string, func (e *ExecOp) IsProvenanceProvider() { } + +func (e *ExecOp) Samples() (*resourcestypes.Samples, error) { + if e.rec == nil { + return nil, nil + } + return e.rec.Samples() +} diff --git a/solver/llbsolver/proc/provenance.go b/solver/llbsolver/proc/provenance.go index 1af3af196028..ee29cceb05b0 100644 --- a/solver/llbsolver/proc/provenance.go +++ b/solver/llbsolver/proc/provenance.go @@ -6,6 +6,7 @@ import ( "strconv" slsa02 "github.com/in-toto/in-toto-golang/in_toto/slsa_provenance/v0.2" + "github.com/moby/buildkit/executor/resources" "github.com/moby/buildkit/exporter/containerimage/exptypes" gatewaypb "github.com/moby/buildkit/frontend/gateway/pb" "github.com/moby/buildkit/solver" @@ -15,7 +16,7 @@ import ( ) func ProvenanceProcessor(attrs map[string]string) llbsolver.Processor { - return func(ctx context.Context, res *llbsolver.Result, s *llbsolver.Solver, j *solver.Job) (*llbsolver.Result, error) { + return func(ctx context.Context, res *llbsolver.Result, s *llbsolver.Solver, j *solver.Job, usage *resources.SysSampler) (*llbsolver.Result, error) { ps, err := exptypes.ParsePlatforms(res.Metadata) if err != nil { return nil, err @@ -41,7 +42,7 @@ func ProvenanceProcessor(attrs map[string]string) llbsolver.Processor { return nil, errors.Errorf("could not find ref %s", p.ID) } - pc, err := llbsolver.NewProvenanceCreator(ctx, cp, ref, attrs, j) + pc, err := llbsolver.NewProvenanceCreator(ctx, cp, ref, attrs, j, usage) if err != nil { return nil, err } diff --git a/solver/llbsolver/proc/sbom.go b/solver/llbsolver/proc/sbom.go index 37de1c4c3e78..1a13a0dcd717 100644 --- a/solver/llbsolver/proc/sbom.go +++ b/solver/llbsolver/proc/sbom.go @@ -4,6 +4,7 @@ import ( "context" "github.com/moby/buildkit/client/llb" + "github.com/moby/buildkit/executor/resources" "github.com/moby/buildkit/exporter/containerimage/exptypes" "github.com/moby/buildkit/frontend" "github.com/moby/buildkit/frontend/attestations/sbom" @@ -14,7 +15,7 @@ import ( ) func SBOMProcessor(scannerRef string, useCache bool) llbsolver.Processor { - return func(ctx context.Context, res *llbsolver.Result, s *llbsolver.Solver, j *solver.Job) (*llbsolver.Result, error) { + return func(ctx context.Context, res *llbsolver.Result, s *llbsolver.Solver, j *solver.Job, usage *resources.SysSampler) (*llbsolver.Result, error) { // skip sbom generation if we already have an sbom if sbom.HasSBOM(res.Result) { return res, nil diff --git a/solver/llbsolver/provenance.go b/solver/llbsolver/provenance.go index d288b3fa0894..9f2fb9e29965 100644 --- a/solver/llbsolver/provenance.go +++ b/solver/llbsolver/provenance.go @@ -12,6 +12,7 @@ import ( "github.com/moby/buildkit/cache" "github.com/moby/buildkit/cache/config" "github.com/moby/buildkit/client/llb" + "github.com/moby/buildkit/executor/resources" "github.com/moby/buildkit/exporter/containerimage" "github.com/moby/buildkit/exporter/containerimage/exptypes" "github.com/moby/buildkit/frontend" @@ -357,6 +358,13 @@ func captureProvenance(ctx context.Context, res solver.CachedResultWithProvenanc if pr.Network != pb.NetMode_NONE { c.NetworkAccess = true } + samples, err := op.Samples() + if err != nil { + return err + } + if samples != nil { + c.AddSamples(op.Digest(), samples) + } case *ops.BuildOp: c.IncompleteMaterials = true // not supported yet } @@ -371,10 +379,11 @@ func captureProvenance(ctx context.Context, res solver.CachedResultWithProvenanc type ProvenanceCreator struct { pr *provenance.ProvenancePredicate j *solver.Job + sampler *resources.SysSampler addLayers func() error } -func NewProvenanceCreator(ctx context.Context, cp *provenance.Capture, res solver.ResultProxy, attrs map[string]string, j *solver.Job) (*ProvenanceCreator, error) { +func NewProvenanceCreator(ctx context.Context, cp *provenance.Capture, res solver.ResultProxy, attrs map[string]string, j *solver.Job, usage *resources.SysSampler) (*ProvenanceCreator, error) { var reproducible bool if v, ok := attrs["reproducible"]; ok { b, err := strconv.ParseBool(v) @@ -396,6 +405,12 @@ func NewProvenanceCreator(ctx context.Context, cp *provenance.Capture, res solve } } + withUsage := false + if v, ok := attrs["capture-usage"]; ok { + b, err := strconv.ParseBool(v) + withUsage = err == nil && b + } + pr, err := provenance.NewPredicate(cp) if err != nil { return nil, err @@ -425,7 +440,7 @@ func NewProvenanceCreator(ctx context.Context, cp *provenance.Capture, res solve pr.Invocation.Parameters.Secrets = nil pr.Invocation.Parameters.SSH = nil case "max": - dgsts, err := AddBuildConfig(ctx, pr, res) + dgsts, err := AddBuildConfig(ctx, pr, cp, res, withUsage) if err != nil { return nil, err } @@ -480,11 +495,15 @@ func NewProvenanceCreator(ctx context.Context, cp *provenance.Capture, res solve return nil, errors.Errorf("invalid mode %q", mode) } - return &ProvenanceCreator{ + pc := &ProvenanceCreator{ pr: pr, j: j, addLayers: addLayers, - }, nil + } + if withUsage { + pc.sampler = usage + } + return pc, nil } func (p *ProvenanceCreator) Predicate() (*provenance.ProvenancePredicate, error) { @@ -497,6 +516,14 @@ func (p *ProvenanceCreator) Predicate() (*provenance.ProvenancePredicate, error) } } + if p.sampler != nil { + sysSamples, err := p.sampler.Close(true) + if err != nil { + return nil, err + } + p.pr.Metadata.BuildKitMetadata.SysUsage = sysSamples + } + return p.pr, nil } @@ -572,9 +599,9 @@ func resolveRemotes(ctx context.Context, res solver.Result) ([]*solver.Remote, e return remotes, nil } -func AddBuildConfig(ctx context.Context, p *provenance.ProvenancePredicate, rp solver.ResultProxy) (map[digest.Digest]int, error) { +func AddBuildConfig(ctx context.Context, p *provenance.ProvenancePredicate, c *provenance.Capture, rp solver.ResultProxy, withUsage bool) (map[digest.Digest]int, error) { def := rp.Definition() - steps, indexes, err := toBuildSteps(def) + steps, indexes, err := toBuildSteps(def, c, withUsage) if err != nil { return nil, err } @@ -589,7 +616,7 @@ func AddBuildConfig(ctx context.Context, p *provenance.ProvenancePredicate, rp s if def.Source != nil { sis := make([]provenance.SourceInfo, len(def.Source.Infos)) for i, si := range def.Source.Infos { - steps, indexes, err := toBuildSteps(si.Definition) + steps, indexes, err := toBuildSteps(si.Definition, c, withUsage) if err != nil { return nil, err } @@ -634,7 +661,7 @@ func digestMap(idx map[digest.Digest]int) map[digest.Digest]string { return m } -func toBuildSteps(def *pb.Definition) ([]provenance.BuildStep, map[digest.Digest]int, error) { +func toBuildSteps(def *pb.Definition, c *provenance.Capture, withUsage bool) ([]provenance.BuildStep, map[digest.Digest]int, error) { if def == nil || len(def.Def) == 0 { return nil, nil, nil } @@ -694,11 +721,15 @@ func toBuildSteps(def *pb.Definition) ([]provenance.BuildStep, map[digest.Digest inputs[i] = fmt.Sprintf("step%d:%d", indexes[inp.Digest], inp.Index) } op.Inputs = nil - out = append(out, provenance.BuildStep{ + s := provenance.BuildStep{ ID: fmt.Sprintf("step%d", i), Inputs: inputs, Op: op, - }) + } + if withUsage { + s.ResourceUsage = c.Samples[dgst] + } + out = append(out, s) } return out, indexes, nil } diff --git a/solver/llbsolver/provenance/buildconfig.go b/solver/llbsolver/provenance/buildconfig.go index 6fb061d9c918..8f903585be5c 100644 --- a/solver/llbsolver/provenance/buildconfig.go +++ b/solver/llbsolver/provenance/buildconfig.go @@ -1,6 +1,7 @@ package provenance import ( + resourcestypes "github.com/moby/buildkit/executor/resources/types" "github.com/moby/buildkit/solver/pb" digest "github.com/opencontainers/go-digest" ) @@ -11,9 +12,10 @@ type BuildConfig struct { } type BuildStep struct { - ID string `json:"id,omitempty"` - Op interface{} `json:"op,omitempty"` - Inputs []string `json:"inputs,omitempty"` + ID string `json:"id,omitempty"` + Op interface{} `json:"op,omitempty"` + Inputs []string `json:"inputs,omitempty"` + ResourceUsage *resourcestypes.Samples `json:"resourceUsage,omitempty"` } type Source struct { diff --git a/solver/llbsolver/provenance/capture.go b/solver/llbsolver/provenance/capture.go index 874f3987cd55..f4d43fba4c6f 100644 --- a/solver/llbsolver/provenance/capture.go +++ b/solver/llbsolver/provenance/capture.go @@ -4,6 +4,7 @@ import ( "sort" distreference "github.com/docker/distribution/reference" + resourcestypes "github.com/moby/buildkit/executor/resources/types" "github.com/moby/buildkit/solver/result" "github.com/moby/buildkit/util/urlutil" digest "github.com/opencontainers/go-digest" @@ -58,6 +59,7 @@ type Capture struct { SSH []SSH NetworkAccess bool IncompleteMaterials bool + Samples map[digest.Digest]*resourcestypes.Samples } func (c *Capture) Merge(c2 *Capture) error { @@ -215,6 +217,13 @@ func (c *Capture) AddSSH(s SSH) { c.SSH = append(c.SSH, s) } +func (c *Capture) AddSamples(dgst digest.Digest, samples *resourcestypes.Samples) { + if c.Samples == nil { + c.Samples = map[digest.Digest]*resourcestypes.Samples{} + } + c.Samples[dgst] = samples +} + func parseRefName(s string) (distreference.Named, string, error) { ref, err := distreference.ParseNormalizedNamed(s) if err != nil { diff --git a/solver/llbsolver/provenance/predicate.go b/solver/llbsolver/provenance/predicate.go index 05564912bebb..f07ce879d7d6 100644 --- a/solver/llbsolver/provenance/predicate.go +++ b/solver/llbsolver/provenance/predicate.go @@ -6,6 +6,7 @@ import ( "github.com/containerd/containerd/platforms" slsa "github.com/in-toto/in-toto-golang/in_toto/slsa_provenance/common" slsa02 "github.com/in-toto/in-toto-golang/in_toto/slsa_provenance/v0.2" + resourcetypes "github.com/moby/buildkit/executor/resources/types" "github.com/moby/buildkit/util/purl" "github.com/moby/buildkit/util/urlutil" ocispecs "github.com/opencontainers/image-spec/specs-go/v1" @@ -50,9 +51,10 @@ type ProvenanceMetadata struct { } type BuildKitMetadata struct { - VCS map[string]string `json:"vcs,omitempty"` - Source *Source `json:"source,omitempty"` - Layers map[string][][]ocispecs.Descriptor `json:"layers,omitempty"` + VCS map[string]string `json:"vcs,omitempty"` + Source *Source `json:"source,omitempty"` + Layers map[string][][]ocispecs.Descriptor `json:"layers,omitempty"` + SysUsage []*resourcetypes.SysSample `json:"sysUsage,omitempty"` } func slsaMaterials(srcs Sources) ([]slsa.ProvenanceMaterial, error) { diff --git a/solver/llbsolver/solver.go b/solver/llbsolver/solver.go index 3252982b0645..4788486b1993 100644 --- a/solver/llbsolver/solver.go +++ b/solver/llbsolver/solver.go @@ -17,6 +17,8 @@ import ( "github.com/moby/buildkit/cache/remotecache" "github.com/moby/buildkit/client" controlgateway "github.com/moby/buildkit/control/gateway" + "github.com/moby/buildkit/executor/resources" + resourcetypes "github.com/moby/buildkit/executor/resources/types" "github.com/moby/buildkit/exporter" "github.com/moby/buildkit/exporter/containerimage/exptypes" "github.com/moby/buildkit/frontend" @@ -74,6 +76,7 @@ type Opt struct { SessionManager *session.Manager WorkerController *worker.Controller HistoryQueue *HistoryQueue + ResourceMonitor *resources.Monitor } type Solver struct { @@ -87,11 +90,12 @@ type Solver struct { sm *session.Manager entitlements []string history *HistoryQueue + sysSampler *resources.Sampler[*resourcetypes.SysSample] } // Processor defines a processing function to be applied after solving, but // before exporting -type Processor func(ctx context.Context, result *Result, s *Solver, j *solver.Job) (*Result, error) +type Processor func(ctx context.Context, result *Result, s *Solver, j *solver.Job, usage *resources.SysSampler) (*Result, error) func New(opt Opt) (*Solver, error) { s := &Solver{ @@ -106,6 +110,12 @@ func New(opt Opt) (*Solver, error) { history: opt.HistoryQueue, } + sampler, err := resources.NewSysSampler() + if err != nil { + return nil, err + } + s.sysSampler = sampler + s.solver = solver.NewSolver(solver.SolverOpt{ ResolveOpFunc: s.resolver(), DefaultCache: opt.CacheManager, @@ -139,7 +149,7 @@ func (s *Solver) Bridge(b solver.Builder) frontend.FrontendLLBBridge { return s.bridge(b) } -func (s *Solver) recordBuildHistory(ctx context.Context, id string, req frontend.SolveRequest, exp ExporterRequest, j *solver.Job) (func(*Result, exporter.DescriptorReference, error) error, error) { +func (s *Solver) recordBuildHistory(ctx context.Context, id string, req frontend.SolveRequest, exp ExporterRequest, j *solver.Job, usage *resources.SysSampler) (func(*Result, exporter.DescriptorReference, error) error, error) { var stopTrace func() []tracetest.SpanStub if s := trace.SpanFromContext(ctx); s.SpanContext().IsValid() { @@ -194,11 +204,12 @@ func (s *Solver) recordBuildHistory(ctx context.Context, id string, req frontend var releasers []func() attrs := map[string]string{ - "mode": "max", + "mode": "max", + "capture-usage": "true", } makeProvenance := func(res solver.ResultProxy, cap *provenance.Capture) (*controlapi.Descriptor, func(), error) { - prc, err := NewProvenanceCreator(ctx2, cap, res, attrs, j) + prc, err := NewProvenanceCreator(ctx2, cap, res, attrs, j, usage) if err != nil { return nil, nil, err } @@ -405,6 +416,9 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro defer j.Discard() + usage := s.sysSampler.Record() + defer usage.Close(false) + var res *frontend.Result var resProv *Result var descref exporter.DescriptorReference @@ -451,7 +465,7 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro } if !internal { - rec, err1 := s.recordBuildHistory(ctx, id, req, exp, j) + rec, err1 := s.recordBuildHistory(ctx, id, req, exp, j, usage) if err1 != nil { defer j.CloseProgress() return nil, err1 @@ -508,7 +522,7 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro } for _, post := range post { - res2, err := post(ctx, resProv, s, j) + res2, err := post(ctx, resProv, s, j, usage) if err != nil { return nil, err } @@ -735,8 +749,12 @@ func getRefProvenance(ref solver.ResultProxy, br *provenanceBridge) (*provenance } p := ref.Provenance() if p == nil { - return nil, errors.Errorf("missing provenance for %s", ref.ID()) + if br.req != nil { + return nil, errors.Errorf("missing provenance for %s", ref.ID()) + } + return nil, nil } + pr, ok := p.(*provenance.Capture) if !ok { return nil, errors.Errorf("invalid provenance type %T", p) diff --git a/util/network/cniprovider/cni.go b/util/network/cniprovider/cni.go index 2bebfd638ee0..2d37aa94a18c 100644 --- a/util/network/cniprovider/cni.go +++ b/util/network/cniprovider/cni.go @@ -242,28 +242,56 @@ func (c *cniProvider) newNS(ctx context.Context, hostname string) (*cniNS, error cni.WithArgs("IgnoreUnknown", "1")) } - if _, err := c.CNI.Setup(context.TODO(), id, nativeID, nsOpts...); err != nil { + cniRes, err := c.CNI.Setup(context.TODO(), id, nativeID, nsOpts...) + if err != nil { deleteNetNS(nativeID) return nil, errors.Wrap(err, "CNI setup error") } trace.SpanFromContext(ctx).AddEvent("finished setting up network namespace") bklog.G(ctx).Debugf("finished setting up network namespace %s", id) - return &cniNS{ + vethName := "" + for k := range cniRes.Interfaces { + if strings.HasPrefix(k, "veth") { + if vethName != "" { + // invalid config + vethName = "" + break + } + vethName = k + } + } + + ns := &cniNS{ nativeID: nativeID, id: id, handle: c.CNI, opts: nsOpts, - }, nil + vethName: vethName, + } + + if ns.vethName != "" { + sample, err := ns.sample() + if err == nil && sample != nil { + ns.canSample = true + ns.offsetSample = sample + } + } + + return ns, nil } type cniNS struct { - pool *cniPool - handle cni.CNI - id string - nativeID string - opts []cni.NamespaceOpts - lastUsed time.Time + pool *cniPool + handle cni.CNI + id string + nativeID string + opts []cni.NamespaceOpts + lastUsed time.Time + vethName string + canSample bool + offsetSample *network.Sample + prevSample *network.Sample } func (ns *cniNS) Set(s *specs.Spec) error { @@ -271,6 +299,9 @@ func (ns *cniNS) Set(s *specs.Spec) error { } func (ns *cniNS) Close() error { + if ns.prevSample != nil { + ns.offsetSample = ns.prevSample + } if ns.pool == nil { return ns.release() } @@ -278,6 +309,30 @@ func (ns *cniNS) Close() error { return nil } +func (ns *cniNS) Sample() (*network.Sample, error) { + if !ns.canSample { + return nil, nil + } + s, err := ns.sample() + if err != nil { + return nil, err + } + if s == nil { + return nil, nil + } + if ns.offsetSample != nil { + s.TxBytes -= ns.offsetSample.TxBytes + s.RxBytes -= ns.offsetSample.RxBytes + s.TxPackets -= ns.offsetSample.TxPackets + s.RxPackets -= ns.offsetSample.RxPackets + s.TxErrors -= ns.offsetSample.TxErrors + s.RxErrors -= ns.offsetSample.RxErrors + s.TxDropped -= ns.offsetSample.TxDropped + s.RxDropped -= ns.offsetSample.RxDropped + } + return s, nil +} + func (ns *cniNS) release() error { bklog.L.Debugf("releasing cni network namespace %s", ns.id) err := ns.handle.Remove(context.TODO(), ns.id, ns.nativeID, ns.opts...) diff --git a/util/network/cniprovider/cni_linux.go b/util/network/cniprovider/cni_linux.go new file mode 100644 index 000000000000..8c4ac437e1c3 --- /dev/null +++ b/util/network/cniprovider/cni_linux.go @@ -0,0 +1,70 @@ +package cniprovider + +import ( + "path/filepath" + "strconv" + "strings" + "syscall" + + "github.com/moby/buildkit/util/network" + "github.com/pkg/errors" +) + +func (ns *cniNS) sample() (*network.Sample, error) { + dirfd, err := syscall.Open(filepath.Join("/sys/class/net", ns.vethName, "statistics"), syscall.O_RDONLY, 0) + if err != nil { + if errors.Is(err, syscall.ENOENT) || errors.Is(err, syscall.ENOTDIR) { + return nil, nil + } + return nil, err + } + defer syscall.Close(dirfd) + + buf := make([]byte, 32) + stat := &network.Sample{} + + for _, name := range []string{"tx_bytes", "rx_bytes", "tx_packets", "rx_packets", "tx_errors", "rx_errors", "tx_dropped", "rx_dropped"} { + n, err := readFileAt(dirfd, name, buf) + if err != nil { + return nil, errors.Wrapf(err, "failed to read %s", name) + } + switch name { + case "tx_bytes": + stat.TxBytes = n + case "rx_bytes": + stat.RxBytes = n + case "tx_packets": + stat.TxPackets = n + case "rx_packets": + stat.RxPackets = n + case "tx_errors": + stat.TxErrors = n + case "rx_errors": + stat.RxErrors = n + case "tx_dropped": + stat.TxDropped = n + case "rx_dropped": + stat.RxDropped = n + } + } + ns.prevSample = stat + return stat, nil +} + +func readFileAt(dirfd int, filename string, buf []byte) (int64, error) { + fd, err := syscall.Openat(dirfd, filename, syscall.O_RDONLY, 0) + if err != nil { + return 0, err + } + defer syscall.Close(fd) + + n, err := syscall.Read(fd, buf[:]) + if err != nil { + return 0, err + } + nn, err := strconv.ParseInt(strings.TrimSpace(string(buf[:n])), 10, 64) + if err != nil { + return 0, err + } + return nn, nil +} diff --git a/util/network/cniprovider/cni_nolinux.go b/util/network/cniprovider/cni_nolinux.go new file mode 100644 index 000000000000..383798b9624b --- /dev/null +++ b/util/network/cniprovider/cni_nolinux.go @@ -0,0 +1,12 @@ +//go:build !linux +// +build !linux + +package cniprovider + +import ( + "github.com/moby/buildkit/util/network" +) + +func (ns *cniNS) sample() (*network.Sample, error) { + return nil, nil +} diff --git a/util/network/host.go b/util/network/host.go index fbd6747d002f..d1725dd22a4c 100644 --- a/util/network/host.go +++ b/util/network/host.go @@ -35,3 +35,7 @@ func (h *hostNS) Set(s *specs.Spec) error { func (h *hostNS) Close() error { return nil } + +func (h *hostNS) Sample() (*Sample, error) { + return nil, nil +} diff --git a/util/network/network.go b/util/network/network.go index c48f1984f030..4ff1bb81c366 100644 --- a/util/network/network.go +++ b/util/network/network.go @@ -7,6 +7,17 @@ import ( specs "github.com/opencontainers/runtime-spec/specs-go" ) +type Sample struct { + RxBytes int64 `json:"rxBytes,omitempty"` + RxPackets int64 `json:"rxPackets,omitempty"` + RxErrors int64 `json:"rxErrors,omitempty"` + RxDropped int64 `json:"rxDropped,omitempty"` + TxBytes int64 `json:"txBytes,omitempty"` + TxPackets int64 `json:"txPackets,omitempty"` + TxErrors int64 `json:"txErrors,omitempty"` + TxDropped int64 `json:"txDropped,omitempty"` +} + // Provider interface for Network type Provider interface { io.Closer @@ -18,4 +29,6 @@ type Namespace interface { io.Closer // Set the namespace on the spec Set(*specs.Spec) error + + Sample() (*Sample, error) } diff --git a/util/network/none.go b/util/network/none.go index e2b9d122d64c..954229b05964 100644 --- a/util/network/none.go +++ b/util/network/none.go @@ -31,3 +31,7 @@ func (h *noneNS) Set(s *specs.Spec) error { func (h *noneNS) Close() error { return nil } + +func (h *noneNS) Sample() (*Sample, error) { + return nil, nil +} diff --git a/worker/base/worker.go b/worker/base/worker.go index 4fb67499c541..13d3ac714e08 100644 --- a/worker/base/worker.go +++ b/worker/base/worker.go @@ -20,6 +20,7 @@ import ( "github.com/moby/buildkit/client" "github.com/moby/buildkit/client/llb" "github.com/moby/buildkit/executor" + "github.com/moby/buildkit/executor/resources" "github.com/moby/buildkit/exporter" imageexporter "github.com/moby/buildkit/exporter/containerimage" localexporter "github.com/moby/buildkit/exporter/local" @@ -79,6 +80,7 @@ type WorkerOpt struct { ParallelismSem *semaphore.Weighted MetadataStore *metadata.Store MountPoolRoot string + ResourceMonitor *resources.Monitor } // Worker is a local worker instance with dedicated snapshotter, cache, and so on. @@ -214,6 +216,11 @@ func (w *Worker) Close() error { rerr = multierror.Append(rerr, err) } } + if w.ResourceMonitor != nil { + if err := w.ResourceMonitor.Close(); err != nil { + rerr = multierror.Append(rerr, err) + } + } return rerr } diff --git a/worker/runc/runc.go b/worker/runc/runc.go index d4e428d9a04e..974dfab3a05d 100644 --- a/worker/runc/runc.go +++ b/worker/runc/runc.go @@ -16,6 +16,7 @@ import ( "github.com/moby/buildkit/cache" "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/executor/oci" + "github.com/moby/buildkit/executor/resources" "github.com/moby/buildkit/executor/runcexecutor" containerdsnapshot "github.com/moby/buildkit/snapshot/containerd" "github.com/moby/buildkit/util/leaseutil" @@ -54,6 +55,11 @@ func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, proc cmds = append(cmds, binary) } + rm, err := resources.NewMonitor() + if err != nil { + return opt, err + } + exe, err := runcexecutor.New(runcexecutor.Opt{ // Root directory Root: filepath.Join(root, "executor"), @@ -69,6 +75,7 @@ func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, proc SELinux: selinux, TracingSocket: traceSocket, DefaultCgroupParent: defaultCgroupParent, + ResourceMonitor: rm, }, np) if err != nil { return opt, err @@ -155,6 +162,7 @@ func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, proc GarbageCollect: mdb.GarbageCollect, ParallelismSem: parallelismSem, MountPoolRoot: filepath.Join(root, "cachemounts"), + ResourceMonitor: rm, } return opt, nil } diff --git a/worker/runc/runc_test.go b/worker/runc/runc_test.go index 62870d548f98..3ca4a68090ae 100644 --- a/worker/runc/runc_test.go +++ b/worker/runc/runc_test.go @@ -107,7 +107,7 @@ func TestRuncWorker(t *testing.T) { } stderr := bytes.NewBuffer(nil) - err = w.WorkerOpt.Executor.Run(ctx, "", execMount(snap, true), nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil) + _, err = w.WorkerOpt.Executor.Run(ctx, "", execMount(snap, true), nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil) require.Error(t, err) // Read-only root // typical error is like `mkdir /.../rootfs/proc: read-only file system`. // make sure the error is caused before running `echo foo > /bar`. @@ -116,7 +116,7 @@ func TestRuncWorker(t *testing.T) { root, err := w.CacheMgr.New(ctx, snap, nil, cache.CachePolicyRetain) require.NoError(t, err) - err = w.WorkerOpt.Executor.Run(ctx, "", execMount(root, false), nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil) + _, err = w.WorkerOpt.Executor.Run(ctx, "", execMount(root, false), nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil) require.NoError(t, err) meta = executor.Meta{ @@ -124,7 +124,7 @@ func TestRuncWorker(t *testing.T) { Cwd: "/", } - err = w.WorkerOpt.Executor.Run(ctx, "", execMount(root, false), nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil) + _, err = w.WorkerOpt.Executor.Run(ctx, "", execMount(root, false), nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil) require.NoError(t, err) rf, err := root.Commit(ctx) @@ -202,7 +202,7 @@ func TestRuncWorkerNoProcessSandbox(t *testing.T) { } stdout := bytes.NewBuffer(nil) stderr := bytes.NewBuffer(nil) - err = w.WorkerOpt.Executor.Run(ctx, "", execMount(root, false), nil, executor.ProcessInfo{Meta: meta, Stdout: &nopCloser{stdout}, Stderr: &nopCloser{stderr}}, nil) + _, err = w.WorkerOpt.Executor.Run(ctx, "", execMount(root, false), nil, executor.ProcessInfo{Meta: meta, Stdout: &nopCloser{stdout}, Stderr: &nopCloser{stderr}}, nil) require.NoError(t, err, fmt.Sprintf("stdout=%q, stderr=%q", stdout.String(), stderr.String())) require.Equal(t, string(selfCmdline), stdout.String()) } diff --git a/worker/tests/common.go b/worker/tests/common.go index 8ee295a41a50..de2c7ed08d8f 100644 --- a/worker/tests/common.go +++ b/worker/tests/common.go @@ -63,7 +63,7 @@ func TestWorkerExec(t *testing.T, w *base.Worker) { }() stdout := bytes.NewBuffer(nil) stderr := bytes.NewBuffer(nil) - err = w.WorkerOpt.Executor.Run(ctxTimeout, id, execMount(root), nil, executor.ProcessInfo{ + _, err = w.WorkerOpt.Executor.Run(ctxTimeout, id, execMount(root), nil, executor.ProcessInfo{ Meta: executor.Meta{ Args: []string{"cat"}, Cwd: "/", @@ -84,13 +84,14 @@ func TestWorkerExec(t *testing.T, w *base.Worker) { eg := errgroup.Group{} started = make(chan struct{}) eg.Go(func() error { - return w.WorkerOpt.Executor.Run(ctx, id, execMount(root), nil, executor.ProcessInfo{ + _, err := w.WorkerOpt.Executor.Run(ctx, id, execMount(root), nil, executor.ProcessInfo{ Meta: executor.Meta{ Args: []string{"sleep", "10"}, Cwd: "/", Env: []string{"PATH=/bin:/usr/bin:/sbin:/usr/sbin"}, }, }, started) + return err }) select { @@ -175,12 +176,13 @@ func TestWorkerExecFailures(t *testing.T, w *base.Worker) { eg := errgroup.Group{} started := make(chan struct{}) eg.Go(func() error { - return w.WorkerOpt.Executor.Run(ctx, id, execMount(root), nil, executor.ProcessInfo{ + _, err := w.WorkerOpt.Executor.Run(ctx, id, execMount(root), nil, executor.ProcessInfo{ Meta: executor.Meta{ Args: []string{"/bin/false"}, Cwd: "/", }, }, started) + return err }) select { @@ -204,11 +206,12 @@ func TestWorkerExecFailures(t *testing.T, w *base.Worker) { eg = errgroup.Group{} started = make(chan struct{}) eg.Go(func() error { - return w.WorkerOpt.Executor.Run(ctx, id, execMount(root), nil, executor.ProcessInfo{ + _, err := w.WorkerOpt.Executor.Run(ctx, id, execMount(root), nil, executor.ProcessInfo{ Meta: executor.Meta{ Args: []string{"bogus"}, }, }, started) + return err }) select { @@ -256,7 +259,7 @@ func TestWorkerCancel(t *testing.T, w *base.Worker) { go func() { defer close(pid1Done) - pid1Err = w.WorkerOpt.Executor.Run(pid1Ctx, id, execMount(root), nil, executor.ProcessInfo{ + _, pid1Err = w.WorkerOpt.Executor.Run(pid1Ctx, id, execMount(root), nil, executor.ProcessInfo{ Meta: executor.Meta{ Args: []string{"/bin/sleep", "10"}, Cwd: "/",