Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow signals to be sent to gateway exec containers #2590

Merged
merged 1 commit into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions client/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path/filepath"
"strconv"
"strings"
"syscall"
"testing"
"time"

Expand Down Expand Up @@ -52,6 +53,7 @@ func TestClientGatewayIntegration(t *testing.T) {
testClientGatewaySlowCacheExecError,
testClientGatewayExecFileActionError,
testClientGatewayContainerExtraHosts,
testClientGatewayContainerSignal,
testWarnings,
), integration.WithMirroredImages(integration.OfficialImages("busybox:latest")))

Expand Down Expand Up @@ -1894,6 +1896,104 @@ func testClientGatewayContainerHostNetworking(t *testing.T, sb integration.Sandb
checkAllReleasable(t, c, sb, true)
}

// testClientGatewayContainerSignal is testing that we can send a signal
func testClientGatewayContainerSignal(t *testing.T, sb integration.Sandbox) {
requiresLinux(t)
ctx := sb.Context()

c, err := New(ctx, sb.Address())
require.NoError(t, err)
defer c.Close()

product := "buildkit_test"

b := func(ctx context.Context, c client.Client) (*client.Result, error) {
ctx, timeout := context.WithTimeout(ctx, 10*time.Second)
defer timeout()

st := llb.Image("busybox:latest")

def, err := st.Marshal(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal state")
}

r, err := c.Solve(ctx, client.SolveRequest{
Definition: def.ToPB(),
})
if err != nil {
return nil, errors.Wrap(err, "failed to solve")
}

ctr1, err := c.NewContainer(ctx, client.NewContainerRequest{
Mounts: []client.Mount{{
Dest: "/",
MountType: pb.MountType_BIND,
Ref: r.Ref,
}},
})
require.NoError(t, err)
defer ctr1.Release(ctx)

pid1, err := ctr1.Start(ctx, client.StartRequest{
Args: []string{"sh", "-c", `trap 'kill $(jobs -p); exit 99' HUP; sleep 10 & wait`},
})
require.NoError(t, err)

// allow for the shell script to setup the trap before we signal it
time.Sleep(time.Second)

err = pid1.Signal(ctx, syscall.SIGHUP)
require.NoError(t, err)

err = pid1.Wait()
var exitError *gatewayapi.ExitError
require.ErrorAs(t, err, &exitError)
require.Equal(t, uint32(99), exitError.ExitCode)

// Now try again to signal an exec process

ctr2, err := c.NewContainer(ctx, client.NewContainerRequest{
Mounts: []client.Mount{{
Dest: "/",
MountType: pb.MountType_BIND,
Ref: r.Ref,
}},
})
require.NoError(t, err)
defer ctr2.Release(ctx)

pid1, err = ctr2.Start(ctx, client.StartRequest{
Args: []string{"sleep", "10"},
})
require.NoError(t, err)

pid2, err := ctr2.Start(ctx, client.StartRequest{
Args: []string{"sh", "-c", `trap 'kill $(jobs -p); exit 111' INT; sleep 10 & wait`},
})
require.NoError(t, err)

// allow for the shell script to setup the trap before we signal it
time.Sleep(time.Second)

err = pid2.Signal(ctx, syscall.SIGINT)
require.NoError(t, err)

err = pid2.Wait()
require.ErrorAs(t, err, &exitError)
require.Equal(t, uint32(111), exitError.ExitCode)

pid1.Signal(ctx, syscall.SIGKILL)
pid1.Wait()
return &client.Result{}, err
}

_, err = c.Build(ctx, SolveOpt{}, product, b, nil)
require.Error(t, err)

checkAllReleasable(t, c, sb, true)
}

type nopCloser struct {
io.Writer
}
Expand Down
36 changes: 26 additions & 10 deletions executor/containerdexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.M
}
}()

err = w.runProcess(ctx, task, process.Resize, func() {
err = w.runProcess(ctx, task, process.Resize, process.Signal, func() {
startedOnce.Do(func() {
if started != nil {
close(started)
Expand Down Expand Up @@ -293,7 +293,7 @@ func (w *containerdExecutor) Exec(ctx context.Context, id string, process execut
return errors.WithStack(err)
}

err = w.runProcess(ctx, taskProcess, process.Resize, nil)
err = w.runProcess(ctx, taskProcess, process.Resize, process.Signal, nil)
return err
}

Expand All @@ -310,7 +310,7 @@ func fixProcessOutput(process *executor.ProcessInfo) {
}
}

func (w *containerdExecutor) runProcess(ctx context.Context, p containerd.Process, resize <-chan executor.WinSize, started func()) error {
func (w *containerdExecutor) runProcess(ctx context.Context, p containerd.Process, resize <-chan executor.WinSize, signal <-chan syscall.Signal, started func()) error {
// Not using `ctx` here because the context passed only affects the statusCh which we
// don't want cancelled when ctx.Done is sent. We want to process statusCh on cancel.
statusCh, err := p.Wait(context.Background())
Expand All @@ -335,22 +335,38 @@ func (w *containerdExecutor) runProcess(ctx context.Context, p containerd.Proces

p.CloseIO(ctx, containerd.WithStdinCloser)

// resize in separate go loop so it does not potentially block
// the container cancel/exit status loop below.
resizeCtx, resizeCancel := context.WithCancel(ctx)
defer resizeCancel()
// handle signals (and resize) in separate go loop so it does not
// potentially block the container cancel/exit status loop below.
eventCtx, eventCancel := context.WithCancel(ctx)
defer eventCancel()
go func() {
for {
select {
case <-resizeCtx.Done():
case <-eventCtx.Done():
return
case size, ok := <-resize:
if !ok {
return // chan closed
}
err = p.Resize(resizeCtx, size.Cols, size.Rows)
err = p.Resize(eventCtx, size.Cols, size.Rows)
if err != nil {
bklog.G(resizeCtx).Warnf("Failed to resize %s: %s", p.ID(), err)
bklog.G(eventCtx).Warnf("Failed to resize %s: %s", p.ID(), err)
}
}
}
}()
go func() {
for {
select {
case <-eventCtx.Done():
return
case sig, ok := <-signal:
if !ok {
return // chan closed
}
err = p.Kill(eventCtx, sig)
if err != nil {
bklog.G(eventCtx).Warnf("Failed to signal %s: %s", p.ID(), err)
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"io"
"net"
"syscall"

"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver/pb"
Expand Down Expand Up @@ -45,6 +46,7 @@ type ProcessInfo struct {
Stdin io.ReadCloser
Stdout, Stderr io.WriteCloser
Resize <-chan WinSize
Signal <-chan syscall.Signal
}

type Executor interface {
Expand Down
89 changes: 82 additions & 7 deletions executor/runcexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,14 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount,
}()

bklog.G(ctx).Debugf("> creating %s %v", id, meta.Args)
// this is a cheat, we have not actually started, but as close as we can get with runc for now
if started != nil {

err = w.run(runCtx, id, bundle, process, func() {
startedOnce.Do(func() {
close(started)
if started != nil {
close(started)
}
})
}

err = w.run(runCtx, id, bundle, process)
})
close(ended)
return exitError(ctx, err)
}
Expand Down Expand Up @@ -414,7 +414,7 @@ func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.Pro
spec.Process.Env = process.Meta.Env
}

err = w.exec(ctx, id, state.Bundle, spec.Process, process)
err = w.exec(ctx, id, state.Bundle, spec.Process, process, nil)
return exitError(ctx, err)
}

Expand Down Expand Up @@ -444,3 +444,78 @@ func (s *forwardIO) Stdout() io.ReadCloser {
func (s *forwardIO) Stderr() io.ReadCloser {
return nil
}

// startingProcess is to track the os process so we can send signals to it.
type startingProcess struct {
Process *os.Process
ready chan struct{}
}

// Release will free resources with a startingProcess.
func (p *startingProcess) Release() {
if p.Process != nil {
p.Process.Release()
}
}

// WaitForReady will wait until the Process has been populated or the
// provided context was cancelled. This should be called before using
// the Process field.
func (p *startingProcess) WaitForReady(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-p.ready:
return nil
}
}

// WaitForStart will record the pid reported by Runc via the channel.
// We wait for up to 10s for the runc process to start. If the started
// callback is non-nil it will be called after receiving the pid.
func (p *startingProcess) WaitForStart(ctx context.Context, startedCh <-chan int, started func()) error {
startedCtx, timeout := context.WithTimeout(ctx, 10*time.Second)
defer timeout()
var err error
select {
case <-startedCtx.Done():
return errors.New("runc started message never received")
case pid, ok := <-startedCh:
if !ok {
return errors.New("runc process failed to send pid")
}
if started != nil {
started()
}
p.Process, err = os.FindProcess(pid)
if err != nil {
return errors.Wrapf(err, "unable to find runc process for pid %d", pid)
}
close(p.ready)
}
return nil
}

// handleSignals will wait until the runcProcess is ready then will
// send each signal received on the channel to the process.
func handleSignals(ctx context.Context, runcProcess *startingProcess, signals <-chan syscall.Signal) error {
if signals == nil {
return nil
}
err := runcProcess.WaitForReady(ctx)
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return nil
case sig := <-signals:
err := runcProcess.Process.Signal(sig)
if err != nil {
bklog.G(ctx).Errorf("failed to signal %s to process: %s", sig, err)
return err
}
}
}
}
51 changes: 43 additions & 8 deletions executor/runcexecutor/executor_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,63 @@ import (
"github.com/moby/buildkit/executor"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)

var unsupportedConsoleError = errors.New("tty for runc is only supported on linux")

func updateRuncFieldsForHostOS(runtime *runc.Runc) {}

func (w *runcExecutor) run(ctx context.Context, id, bundle string, process executor.ProcessInfo) error {
func (w *runcExecutor) run(ctx context.Context, id, bundle string, process executor.ProcessInfo, started func()) error {
if process.Meta.Tty {
return unsupportedConsoleError
}
_, err := w.runc.Run(ctx, id, bundle, &runc.CreateOpts{
IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr},
NoPivot: w.noPivot,
return w.commonCall(ctx, id, bundle, process, started, func(ctx context.Context, started chan<- int, io runc.IO) error {
_, err := w.runc.Run(ctx, id, bundle, &runc.CreateOpts{
NoPivot: w.noPivot,
Started: started,
IO: io,
})
return err
})
return err
}

func (w *runcExecutor) exec(ctx context.Context, id, bundle string, specsProcess *specs.Process, process executor.ProcessInfo) error {
func (w *runcExecutor) exec(ctx context.Context, id, bundle string, specsProcess *specs.Process, process executor.ProcessInfo, started func()) error {
if process.Meta.Tty {
return unsupportedConsoleError
}
return w.runc.Exec(ctx, id, *specsProcess, &runc.ExecOpts{
IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr},
return w.commonCall(ctx, id, bundle, process, started, func(ctx context.Context, started chan<- int, io runc.IO) error {
return w.runc.Exec(ctx, id, *specsProcess, &runc.ExecOpts{
Started: started,
IO: io,
})
})
}

type runcCall func(ctx context.Context, started chan<- int, io runc.IO) error

// commonCall is the common run/exec logic used for non-linux runtimes. A tty
// is only supported for linux, so this really just handles signal propagation
// to the started runc process.
func (w *runcExecutor) commonCall(ctx context.Context, id, bundle string, process executor.ProcessInfo, started func(), call runcCall) error {
runcProcess := &startingProcess{
ready: make(chan struct{}),
}
defer runcProcess.Release()

var eg errgroup.Group
egCtx, cancel := context.WithCancel(ctx)
defer eg.Wait()
defer cancel()

startedCh := make(chan int, 1)
eg.Go(func() error {
return runcProcess.WaitForStart(egCtx, startedCh, started)
})

eg.Go(func() error {
return handleSignals(egCtx, runcProcess, process.Signal)
})

return call(ctx, startedCh, &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr})
}
Loading