From 8a4a8459c2d847fc59b603f1f93a6768d81e5214 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Wed, 16 Oct 2024 16:25:20 +1100 Subject: [PATCH 1/3] Reorganise kubernetes subpackage --- agent/job_runner.go | 2 +- kubernetes/client.go | 97 ++++++++++ kubernetes/kubernetes_test.go | 2 +- kubernetes/{kubernetes.go => runner.go} | 230 +++++++++--------------- 4 files changed, 183 insertions(+), 148 deletions(-) create mode 100644 kubernetes/client.go rename kubernetes/{kubernetes.go => runner.go} (68%) diff --git a/agent/job_runner.go b/agent/job_runner.go index a5e5d6ca8e..4d09b9afb3 100644 --- a/agent/job_runner.go +++ b/agent/job_runner.go @@ -362,7 +362,7 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con if err != nil { return nil, fmt.Errorf("failed to parse BUILDKITE_CONTAINER_COUNT: %w", err) } - r.process = kubernetes.New(r.agentLogger, kubernetes.Config{ + r.process = kubernetes.NewRunner(r.agentLogger, kubernetes.RunnerConfig{ Stdout: r.jobLogs, Stderr: r.jobLogs, ClientCount: containerCount, diff --git a/kubernetes/client.go b/kubernetes/client.go new file mode 100644 index 0000000000..48f144234f --- /dev/null +++ b/kubernetes/client.go @@ -0,0 +1,97 @@ +package kubernetes + +import ( + "context" + "errors" + "net/rpc" + "time" + + "github.com/buildkite/roko" +) + +type Client struct { + ID int + SocketPath string + + client *rpc.Client +} + +var errNotConnected = errors.New("client not connected") + +func (c *Client) Connect(ctx context.Context) (*RegisterResponse, error) { + if c.SocketPath == "" { + c.SocketPath = defaultSocketPath + } + + // Because k8s might run the containers "out of order", the server socket + // might not exist yet. Try to connect several times. + r := roko.NewRetrier( + roko.WithMaxAttempts(30), + roko.WithStrategy(roko.Constant(time.Second)), + ) + client, err := roko.DoFunc(ctx, r, func(*roko.Retrier) (*rpc.Client, error) { + return rpc.DialHTTP("unix", c.SocketPath) + }) + if err != nil { + return nil, err + } + c.client = client + var resp RegisterResponse + if err := c.client.Call("Runner.Register", c.ID, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *Client) Exit(exitStatus int) error { + if c.client == nil { + return errNotConnected + } + return c.client.Call("Runner.Exit", ExitCode{ + ID: c.ID, + ExitStatus: exitStatus, + }, nil) +} + +// Write implements io.Writer +func (c *Client) Write(p []byte) (int, error) { + if c.client == nil { + return 0, errNotConnected + } + n := len(p) + err := c.client.Call("Runner.WriteLogs", Logs{ + Data: p, + }, nil) + return n, err +} + +var ErrInterrupt = errors.New("interrupt signal received") + +func (c *Client) Await(ctx context.Context, desiredState RunState) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + var current RunState + if err := c.client.Call("Runner.Status", c.ID, ¤t); err != nil { + return err + } + if current == desiredState { + return nil + } + if current == RunStateInterrupt { + return ErrInterrupt + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Second): + } + } + } +} + +func (c *Client) Close() { + c.client.Close() +} diff --git a/kubernetes/kubernetes_test.go b/kubernetes/kubernetes_test.go index 193042e30e..a15d11b2bf 100644 --- a/kubernetes/kubernetes_test.go +++ b/kubernetes/kubernetes_test.go @@ -135,7 +135,7 @@ func newRunner(t *testing.T, clientCount int) *Runner { t.Cleanup(func() { os.RemoveAll(tempDir) }) - runner := New(logger.Discard, Config{ + runner := NewRunner(logger.Discard, RunnerConfig{ SocketPath: socketPath, ClientCount: clientCount, }) diff --git a/kubernetes/kubernetes.go b/kubernetes/runner.go similarity index 68% rename from kubernetes/kubernetes.go rename to kubernetes/runner.go index 460bfdae4e..a043fee5b3 100644 --- a/kubernetes/kubernetes.go +++ b/kubernetes/runner.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/gob" - "errors" "fmt" "io" "net" @@ -13,11 +12,9 @@ import ( "os" "sync" "syscall" - "time" "github.com/buildkite/agent/v3/logger" "github.com/buildkite/agent/v3/process" - "github.com/buildkite/roko" ) func init() { @@ -26,7 +23,15 @@ func init() { const defaultSocketPath = "/workspace/buildkite.sock" -func New(l logger.Logger, c Config) *Runner { +type RunnerConfig struct { + SocketPath string + ClientCount int + Stdout, Stderr io.Writer + Env []string +} + +// NewRunner returns a runner, implementing the agent's jobRunner interface. +func NewRunner(l logger.Logger, c RunnerConfig) *Runner { if c.SocketPath == "" { c.SocketPath = defaultSocketPath } @@ -46,9 +51,12 @@ func New(l logger.Logger, c Config) *Runner { } } +// Runner implements the agent's jobRunner interface, but instead of directly +// managing a subprocess, it runs a socket server that is connected to from +// another container. type Runner struct { logger logger.Logger - conf Config + conf RunnerConfig mu sync.Mutex listener net.Listener @@ -60,26 +68,7 @@ type Runner struct { clients map[int]*clientResult } -type clientResult struct { - ExitStatus int - State clientState -} - -type clientState int - -const ( - stateUnknown clientState = iota - stateConnected - stateExited -) - -type Config struct { - SocketPath string - ClientCount int - Stdout, Stderr io.Writer - Env []string -} - +// Run runs the socket server. func (r *Runner) Run(ctx context.Context) error { r.server.Register(r) r.mux.Handle(rpc.DefaultRPCPath, r.server) @@ -103,6 +92,7 @@ func (r *Runner) Run(ctx context.Context) error { return nil } +// Started returns a channel that is closed when the job has started running. func (r *Runner) Started() <-chan struct{} { r.mu.Lock() defer r.mu.Unlock() @@ -110,6 +100,7 @@ func (r *Runner) Started() <-chan struct{} { return r.started } +// Done returns a channel that is closed when the job is completed. func (r *Runner) Done() <-chan struct{} { r.mu.Lock() defer r.mu.Unlock() @@ -117,7 +108,7 @@ func (r *Runner) Done() <-chan struct{} { return r.done } -// Interrupts all clients, triggering graceful shutdown +// Interrupts all clients, triggering graceful shutdown. func (r *Runner) Interrupt() error { r.mu.Lock() defer r.mu.Unlock() @@ -128,7 +119,7 @@ func (r *Runner) Interrupt() error { return nil } -// Stops the RPC server, allowing Run to return immediately +// Terminate stops the RPC server, allowing Run to return immediately. func (r *Runner) Terminate() error { r.mu.Lock() defer r.mu.Unlock() @@ -139,24 +130,7 @@ func (r *Runner) Terminate() error { return nil } -type waitStatus struct { - Code int - SignalCode *int -} - -func (w waitStatus) ExitStatus() int { - return w.Code -} - -func (w waitStatus) Signal() syscall.Signal { - var signal syscall.Signal - return signal -} - -func (w waitStatus) Signaled() bool { - return false -} - +// WaitStatus returns a wait status that represents all the clients. func (r *Runner) WaitStatus() process.WaitStatus { ws := waitStatus{} for _, client := range r.clients { @@ -172,6 +146,7 @@ func (r *Runner) WaitStatus() process.WaitStatus { return ws } +// ClientStateUnknown reports whether anhy of the client states is stateUnknown. func (r *Runner) ClientStateUnknown() bool { for _, client := range r.clients { if client.State == stateUnknown { @@ -183,24 +158,10 @@ func (r *Runner) ClientStateUnknown() bool { // ==== sidecar api ==== +// Empty is an empty RPC message. type Empty struct{} -type Logs struct { - Data []byte -} - -type ExitCode struct { - ID int - ExitStatus int -} - -type Status struct { - Ready bool -} - -type RegisterResponse struct { - Env []string -} +// WriteLogs is called to pass logs on to Buildkite. func (r *Runner) WriteLogs(args Logs, reply *Empty) error { r.startedOnce.Do(func() { close(r.started) @@ -209,6 +170,12 @@ func (r *Runner) WriteLogs(args Logs, reply *Empty) error { return err } +// Logs is an RPC message that contains log data. +type Logs struct { + Data []byte +} + +// Exit is called when the client exits. func (r *Runner) Exit(args ExitCode, reply *Empty) error { r.mu.Lock() defer r.mu.Unlock() @@ -238,6 +205,15 @@ func (r *Runner) Exit(args ExitCode, reply *Empty) error { return nil } +// ExitCode is an RPC message that specifies an exit status for a client ID. +type ExitCode struct { + ID int + ExitStatus int +} + +// Register is called when the client registers with the runner. The reply +// contains the env vars that would normally be in the environment of the +// bootstrap subcommand, particularly, the agent session token. func (r *Runner) Register(id int, reply *RegisterResponse) error { r.mu.Lock() defer r.mu.Unlock() @@ -258,6 +234,14 @@ func (r *Runner) Register(id int, reply *RegisterResponse) error { return nil } +// RegisterResponse is an RPC message to registering clients containing info +// needed to run. +type RegisterResponse struct { + Env []string +} + +// Status is called by the client to check the status of the job, so that it can +// pack things up if the job is cancelled. func (r *Runner) Status(id int, reply *RunState) error { r.mu.Lock() defer r.mu.Unlock() @@ -265,114 +249,68 @@ func (r *Runner) Status(id int, reply *RunState) error { select { case <-r.done: return rpc.ErrShutdown + case <-r.interrupt: *reply = RunStateInterrupt return nil + default: if id == 0 { *reply = RunStateStart - } else if client, found := r.clients[id-1]; found && client.State == stateExited { + return nil + } + if client, found := r.clients[id-1]; found && client.State == stateExited { *reply = RunStateStart } return nil } } -type Client struct { - ID int - SocketPath string - client *rpc.Client -} - -var errNotConnected = errors.New("client not connected") +// RunState is an RPC message that describes to a client whether the job should +// continue waiting before running, start running, or stop running. +type RunState int -func (c *Client) Connect(ctx context.Context) (*RegisterResponse, error) { - if c.SocketPath == "" { - c.SocketPath = defaultSocketPath - } +const ( + // RunStateWait means the job is not ready to start executing yet. + RunStateWait RunState = iota - // Because k8s might run the containers "out of order", the server socket - // might not exist yet. Try to connect several times. - r := roko.NewRetrier( - roko.WithMaxAttempts(30), - roko.WithStrategy(roko.Constant(time.Second)), - ) - client, err := roko.DoFunc(ctx, r, func(*roko.Retrier) (*rpc.Client, error) { - return rpc.DialHTTP("unix", c.SocketPath) - }) - if err != nil { - return nil, err - } - c.client = client - var resp RegisterResponse - if err := c.client.Call("Runner.Register", c.ID, &resp); err != nil { - return nil, err - } - return &resp, nil -} + // RunStateStart means the job can begin. + RunStateStart -func (c *Client) Exit(exitStatus int) error { - if c.client == nil { - return errNotConnected - } - return c.client.Call("Runner.Exit", ExitCode{ - ID: c.ID, - ExitStatus: exitStatus, - }, nil) -} + // RunStateInterrupt means the job is cancelled or should be terminated for + // some other reason. + RunStateInterrupt +) -// Write implements io.Writer -func (c *Client) Write(p []byte) (int, error) { - if c.client == nil { - return 0, errNotConnected - } - n := len(p) - err := c.client.Call("Runner.WriteLogs", Logs{ - Data: p, - }, nil) - return n, err -} +// ==== related types and consts ==== -type WaitReadyResponse struct { - Err error - Status +type clientResult struct { + ExitStatus int + State clientState } -type RunState int +type clientState int const ( - RunStateWait RunState = iota - RunStateStart - RunStateInterrupt + stateUnknown clientState = iota + stateConnected + stateExited ) -var ErrInterrupt = errors.New("interrupt signal received") - -func (c *Client) Await(ctx context.Context, desiredState RunState) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - var current RunState - if err := c.client.Call("Runner.Status", c.ID, ¤t); err != nil { - return err - } - if current == desiredState { - return nil - } - if current == RunStateInterrupt { - return ErrInterrupt - } - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(time.Second): - } - } - } +type waitStatus struct { + Code int + SignalCode *int +} + +func (w waitStatus) ExitStatus() int { + return w.Code } -func (c *Client) Close() { - c.client.Close() +func (w waitStatus) Signal() syscall.Signal { + var signal syscall.Signal + return signal +} + +func (w waitStatus) Signaled() bool { + return false } From d27729188c5c607ff0455d55fe5493d6f8665944 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Thu, 17 Oct 2024 10:13:17 +1100 Subject: [PATCH 2/3] Move mutex from Runner to clientResult --- agent/run_job.go | 4 +- kubernetes/runner.go | 123 +++++++++++++++++++++---------------------- 2 files changed, 63 insertions(+), 64 deletions(-) diff --git a/agent/run_job.go b/agent/run_job.go index a78fc659ea..31a8dd6a33 100644 --- a/agent/run_job.go +++ b/agent/run_job.go @@ -291,9 +291,9 @@ func (r *JobRunner) runJob(ctx context.Context) core.ProcessExit { // start. Normally such errors are hidden in the Kubernetes events. Let's feed them up // to the user as they may be the caused by errors in the pipeline definition. k8sProcess, ok := r.process.(*kubernetes.Runner) - if ok && r.cancelled && !r.stopped && k8sProcess.ClientStateUnknown() { + if ok && r.cancelled && !r.stopped && k8sProcess.AnyClientNotConnectedYet() { fmt.Fprintln(r.jobLogs, "+++ Unknown container exit status") - fmt.Fprintln(r.jobLogs, "Some containers had unknown exit statuses. Perhaps the container image specified in your podSpec could not be pulled (ImagePullBackOff)") + fmt.Fprintln(r.jobLogs, "Some containers never connected to the agent. Perhaps the container image specified in your podSpec could not be pulled (ImagePullBackOff)") } // Collect the finished process' exit status diff --git a/kubernetes/runner.go b/kubernetes/runner.go index a043fee5b3..affe8c8100 100644 --- a/kubernetes/runner.go +++ b/kubernetes/runner.go @@ -35,7 +35,7 @@ func NewRunner(l logger.Logger, c RunnerConfig) *Runner { if c.SocketPath == "" { c.SocketPath = defaultSocketPath } - clients := make(map[int]*clientResult, c.ClientCount) + clients := make([]*clientResult, c.ClientCount) for i := range c.ClientCount { clients[i] = &clientResult{} } @@ -57,15 +57,17 @@ func NewRunner(l logger.Logger, c RunnerConfig) *Runner { type Runner struct { logger logger.Logger conf RunnerConfig - mu sync.Mutex listener net.Listener - started, done, interrupt chan struct{} - startedOnce, closedOnce, interruptOnce sync.Once + // Channels that are closed at certain points in the job lifecycle + started, done, interrupt chan struct{} + + // Guards the closing of the channels to ensure they are only closed once + startedOnce, doneOnce, interruptOnce sync.Once server *rpc.Server mux *http.ServeMux - clients map[int]*clientResult + clients []*clientResult } // Run runs the socket server. @@ -93,40 +95,22 @@ func (r *Runner) Run(ctx context.Context) error { } // Started returns a channel that is closed when the job has started running. -func (r *Runner) Started() <-chan struct{} { - r.mu.Lock() - defer r.mu.Unlock() +func (r *Runner) Started() <-chan struct{} { return r.started } - return r.started -} +func (r *Runner) markStarted() { r.startedOnce.Do(func() { close(r.started) }) } // Done returns a channel that is closed when the job is completed. -func (r *Runner) Done() <-chan struct{} { - r.mu.Lock() - defer r.mu.Unlock() - - return r.done -} +func (r *Runner) Done() <-chan struct{} { return r.done } // Interrupts all clients, triggering graceful shutdown. func (r *Runner) Interrupt() error { - r.mu.Lock() - defer r.mu.Unlock() - - r.interruptOnce.Do(func() { - close(r.interrupt) - }) + r.interruptOnce.Do(func() { close(r.interrupt) }) return nil } // Terminate stops the RPC server, allowing Run to return immediately. func (r *Runner) Terminate() error { - r.mu.Lock() - defer r.mu.Unlock() - - r.closedOnce.Do(func() { - close(r.done) - }) + r.doneOnce.Do(func() { close(r.done) }) return nil } @@ -134,22 +118,31 @@ func (r *Runner) Terminate() error { func (r *Runner) WaitStatus() process.WaitStatus { ws := waitStatus{} for _, client := range r.clients { - if client.ExitStatus != 0 { - return waitStatus{Code: client.ExitStatus} + client.mu.Lock() + exitStatus, state := client.ExitStatus, client.State + client.mu.Unlock() + + if exitStatus != 0 { + return waitStatus{Code: exitStatus} } // use an unusual status code to distinguish this unusual state - if client.State == stateUnknown { + if state == stateNotYetConnected { ws.Code -= 10 } } return ws } -// ClientStateUnknown reports whether anhy of the client states is stateUnknown. -func (r *Runner) ClientStateUnknown() bool { +// AnyClientNotConnectedYet reports whether any of the clients have not yet +// connected. +func (r *Runner) AnyClientNotConnectedYet() bool { for _, client := range r.clients { - if client.State == stateUnknown { + client.mu.Lock() + state := client.State + client.mu.Unlock() + + if state == stateNotYetConnected { return true } } @@ -163,9 +156,7 @@ type Empty struct{} // WriteLogs is called to pass logs on to Buildkite. func (r *Runner) WriteLogs(args Logs, reply *Empty) error { - r.startedOnce.Do(func() { - close(r.started) - }) + r.markStarted() _, err := io.Copy(r.conf.Stdout, bytes.NewReader(args.Data)) return err } @@ -177,30 +168,29 @@ type Logs struct { // Exit is called when the client exits. func (r *Runner) Exit(args ExitCode, reply *Empty) error { - r.mu.Lock() - defer r.mu.Unlock() - - client, found := r.clients[args.ID] - if !found { + if args.ID < 0 || args.ID >= len(r.clients) { return fmt.Errorf("unrecognized client id: %d", args.ID) } + client := r.clients[args.ID] r.logger.Info("client %d exited with code %d", args.ID, args.ExitStatus) + + client.mu.Lock() client.ExitStatus = args.ExitStatus client.State = stateExited - if client.ExitStatus != 0 { - r.closedOnce.Do(func() { - close(r.done) - }) + client.mu.Unlock() + + if args.ExitStatus != 0 { + r.Terminate() } allExited := true for _, client := range r.clients { + client.mu.Lock() allExited = client.State == stateExited && allExited + client.mu.Unlock() } if allExited { - r.closedOnce.Do(func() { - close(r.done) - }) + r.Terminate() } return nil } @@ -215,16 +205,16 @@ type ExitCode struct { // contains the env vars that would normally be in the environment of the // bootstrap subcommand, particularly, the agent session token. func (r *Runner) Register(id int, reply *RegisterResponse) error { - r.mu.Lock() - defer r.mu.Unlock() - r.startedOnce.Do(func() { - close(r.started) - }) - client, found := r.clients[id] - if !found { - return fmt.Errorf("client id %d not found", id) + if id < 0 || id >= len(r.clients) { + return fmt.Errorf("unrecognized client id: %d", id) } - if client.State != stateUnknown { + + r.markStarted() + + client := r.clients[id] + client.mu.Lock() + defer client.mu.Unlock() + if client.State != stateNotYetConnected { return fmt.Errorf("client id %d already registered", id) } r.logger.Info("client %d connected", id) @@ -242,9 +232,11 @@ type RegisterResponse struct { // Status is called by the client to check the status of the job, so that it can // pack things up if the job is cancelled. +// If the client stops calling Status before calling Exit, we assume it is lost. func (r *Runner) Status(id int, reply *RunState) error { - r.mu.Lock() - defer r.mu.Unlock() + if id < 0 || id >= len(r.clients) { + return fmt.Errorf("unrecognized client id: %d", id) + } select { case <-r.done: @@ -255,11 +247,17 @@ func (r *Runner) Status(id int, reply *RunState) error { return nil default: + // First client should start first. if id == 0 { *reply = RunStateStart return nil } - if client, found := r.clients[id-1]; found && client.State == stateExited { + + // Client N can start after Client N-1 has exited. + client := r.clients[id-1] + client.mu.Lock() + defer client.mu.Unlock() + if client.State == stateExited { *reply = RunStateStart } return nil @@ -285,6 +283,7 @@ const ( // ==== related types and consts ==== type clientResult struct { + mu sync.Mutex ExitStatus int State clientState } @@ -292,7 +291,7 @@ type clientResult struct { type clientState int const ( - stateUnknown clientState = iota + stateNotYetConnected clientState = iota stateConnected stateExited ) From 821f0d40b4a2d89508bde0e7b585e432f0aaecac Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Mon, 21 Oct 2024 16:35:56 +1100 Subject: [PATCH 3/3] Perform liveness check of clients --- agent/job_runner.go | 9 +-- agent/run_job.go | 17 +++-- kubernetes/kubernetes_test.go | 46 +++++++++----- kubernetes/runner.go | 113 +++++++++++++++++++++++++--------- 4 files changed, 133 insertions(+), 52 deletions(-) diff --git a/agent/job_runner.go b/agent/job_runner.go index 4d09b9afb3..5359a57e2e 100644 --- a/agent/job_runner.go +++ b/agent/job_runner.go @@ -363,10 +363,11 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con return nil, fmt.Errorf("failed to parse BUILDKITE_CONTAINER_COUNT: %w", err) } r.process = kubernetes.NewRunner(r.agentLogger, kubernetes.RunnerConfig{ - Stdout: r.jobLogs, - Stderr: r.jobLogs, - ClientCount: containerCount, - Env: processEnv, + Stdout: r.jobLogs, + Stderr: r.jobLogs, + ClientCount: containerCount, + Env: processEnv, + ClientLostTimeout: 30 * time.Second, }) } else { // not Kubernetes // The bootstrap-script gets parsed based on the operating system diff --git a/agent/run_job.go b/agent/run_job.go index 31a8dd6a33..a2f916d57e 100644 --- a/agent/run_job.go +++ b/agent/run_job.go @@ -290,10 +290,19 @@ func (r *JobRunner) runJob(ctx context.Context) core.ProcessExit { // Intended to capture situations where the job-exec (aka bootstrap) container did not // start. Normally such errors are hidden in the Kubernetes events. Let's feed them up // to the user as they may be the caused by errors in the pipeline definition. - k8sProcess, ok := r.process.(*kubernetes.Runner) - if ok && r.cancelled && !r.stopped && k8sProcess.AnyClientNotConnectedYet() { - fmt.Fprintln(r.jobLogs, "+++ Unknown container exit status") - fmt.Fprintln(r.jobLogs, "Some containers never connected to the agent. Perhaps the container image specified in your podSpec could not be pulled (ImagePullBackOff)") + k8sProcess, isK8s := r.process.(*kubernetes.Runner) + if isK8s && !r.stopped { + switch { + case r.cancelled && k8sProcess.AnyClientIn(kubernetes.StateNotYetConnected): + fmt.Fprint(r.jobLogs, `+++ Unknown container exit status +One or more containers never connected to the agent. Perhaps the container image specified in your podSpec could not be pulled (ImagePullBackOff)? +`) + case k8sProcess.AnyClientIn(kubernetes.StateLost): + fmt.Fprint(r.jobLogs, `+++ Unknown container exit status +One or more containers connected to the agent, but then stopped communicating without exiting normally. Perhaps the container was OOM-killed? +`) + } + } // Collect the finished process' exit status diff --git a/kubernetes/kubernetes_test.go b/kubernetes/kubernetes_test.go index a15d11b2bf..6ace69cffe 100644 --- a/kubernetes/kubernetes_test.go +++ b/kubernetes/kubernetes_test.go @@ -24,13 +24,6 @@ func TestOrderedClients(t *testing.T) { client2 := &Client{ID: 2} clients := []*Client{client0, client1, client2} - // wait for runner to listen - require.Eventually(t, func() bool { - _, err := os.Lstat(socketPath) - return err == nil - - }, time.Second*10, time.Millisecond, "expected socket file to exist") - for _, client := range clients { client.SocketPath = socketPath require.NoError(t, connect(client)) @@ -60,6 +53,33 @@ func TestOrderedClients(t *testing.T) { } } +func TestLivenessCheck(t *testing.T) { + runner := newRunner(t, 2) + socketPath := runner.conf.SocketPath + + client0 := &Client{ID: 0} + client1 := &Client{ID: 1} + clients := []*Client{client0, client1} + + for _, client := range clients { + client.SocketPath = socketPath + require.NoError(t, connect(client)) + t.Cleanup(client.Close) + } + ctx := context.Background() + require.NoError(t, client0.Await(ctx, RunStateStart)) + require.NoError(t, client1.Await(ctx, RunStateWait)) + + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + select { + case <-runner.Done(): + break + case <-ctx.Done(): + t.Fatalf("timed out waiting for client0 to be declared lost and job terminated") + } +} + func TestDuplicateClients(t *testing.T) { runner := newRunner(t, 2) socketPath := runner.conf.SocketPath @@ -67,13 +87,6 @@ func TestDuplicateClients(t *testing.T) { client0 := &Client{ID: 0, SocketPath: socketPath} client1 := &Client{ID: 0, SocketPath: socketPath} - // wait for runner to listen - require.Eventually(t, func() bool { - _, err := os.Lstat(socketPath) - return err == nil - - }, time.Second*10, time.Millisecond, "expected socket file to exist") - require.NoError(t, connect(client0)) require.Error(t, connect(client1), "expected an error when connecting a client with a duplicate ID") } @@ -136,8 +149,9 @@ func newRunner(t *testing.T, clientCount int) *Runner { os.RemoveAll(tempDir) }) runner := NewRunner(logger.Discard, RunnerConfig{ - SocketPath: socketPath, - ClientCount: clientCount, + SocketPath: socketPath, + ClientCount: clientCount, + ClientLostTimeout: 2 * time.Second, }) runnerCtx, cancelRunner := context.WithCancel(context.Background()) go runner.Run(runnerCtx) diff --git a/kubernetes/runner.go b/kubernetes/runner.go index affe8c8100..4c532ba697 100644 --- a/kubernetes/runner.go +++ b/kubernetes/runner.go @@ -12,6 +12,7 @@ import ( "os" "sync" "syscall" + "time" "github.com/buildkite/agent/v3/logger" "github.com/buildkite/agent/v3/process" @@ -24,10 +25,11 @@ func init() { const defaultSocketPath = "/workspace/buildkite.sock" type RunnerConfig struct { - SocketPath string - ClientCount int - Stdout, Stderr io.Writer - Env []string + SocketPath string + ClientCount int + Stdout, Stderr io.Writer + Env []string + ClientLostTimeout time.Duration } // NewRunner returns a runner, implementing the agent's jobRunner interface. @@ -90,10 +92,50 @@ func (r *Runner) Run(ctx context.Context) error { r.listener = l go http.Serve(l, r.mux) + if r.conf.ClientLostTimeout > 0 { + go r.livenessCheck(ctx) + } + <-r.done return nil } +func (r *Runner) livenessCheck(ctx context.Context) { + // 100ms chosen for snappiness; we should easily be able to scan all + // clients quickly. + tick := time.NewTicker(100 * time.Millisecond) + + for { + select { + case <-ctx.Done(): + return + + case <-r.done: + return + + case <-tick.C: + // Scan through the clients to see if any have become lost. + for id, client := range r.clients { + client.mu.Lock() + + // If the client has connected, and was last heard from too + // long ago, it's lost. + // This usually happens if k8s has OOM-killed the container. + // The next client in the sequence won't have started yet, so + // we can just terminate. + lhf := time.Since(client.LastHeardFrom) + if client.State == StateConnected && lhf > r.conf.ClientLostTimeout { + r.logger.Error("Container (ID %d) was last heard from %v ago; marking lost and self-terminating...", id, lhf) + client.State = StateLost + r.Terminate() + } + client.mu.Unlock() + } + + } + } +} + // Started returns a channel that is closed when the job has started running. func (r *Runner) Started() <-chan struct{} { return r.started } @@ -116,7 +158,6 @@ func (r *Runner) Terminate() error { // WaitStatus returns a wait status that represents all the clients. func (r *Runner) WaitStatus() process.WaitStatus { - ws := waitStatus{} for _, client := range r.clients { client.mu.Lock() exitStatus, state := client.ExitStatus, client.State @@ -126,23 +167,25 @@ func (r *Runner) WaitStatus() process.WaitStatus { return waitStatus{Code: exitStatus} } - // use an unusual status code to distinguish this unusual state - if state == stateNotYetConnected { - ws.Code -= 10 + // use an unusual status code to distinguish unusual states + switch state { + case StateLost: + return waitStatus{Code: -7} + case StateNotYetConnected: + return waitStatus{Code: -10} } } - return ws + return waitStatus{} } -// AnyClientNotConnectedYet reports whether any of the clients have not yet -// connected. -func (r *Runner) AnyClientNotConnectedYet() bool { +// AnyClientIn reports whether any of the clients are in a particular state. +func (r *Runner) AnyClientIn(state ClientState) bool { for _, client := range r.clients { client.mu.Lock() - state := client.State + s := client.State client.mu.Unlock() - if state == stateNotYetConnected { + if s == state { return true } } @@ -176,20 +219,25 @@ func (r *Runner) Exit(args ExitCode, reply *Empty) error { client.mu.Lock() client.ExitStatus = args.ExitStatus - client.State = stateExited + client.State = StateExited client.mu.Unlock() if args.ExitStatus != 0 { r.Terminate() } - allExited := true + allTerminal := true for _, client := range r.clients { client.mu.Lock() - allExited = client.State == stateExited && allExited + if client.State == StateNotYetConnected || client.State == StateConnected { + allTerminal = false + } client.mu.Unlock() + if !allTerminal { + break + } } - if allExited { + if allTerminal { r.Terminate() } return nil @@ -214,11 +262,13 @@ func (r *Runner) Register(id int, reply *RegisterResponse) error { client := r.clients[id] client.mu.Lock() defer client.mu.Unlock() - if client.State != stateNotYetConnected { + + if client.State != StateNotYetConnected { return fmt.Errorf("client id %d already registered", id) } r.logger.Info("client %d connected", id) - client.State = stateConnected + client.LastHeardFrom = time.Now() + client.State = StateConnected reply.Env = r.conf.Env return nil @@ -238,6 +288,11 @@ func (r *Runner) Status(id int, reply *RunState) error { return fmt.Errorf("unrecognized client id: %d", id) } + client := r.clients[id] + client.mu.Lock() + client.LastHeardFrom = time.Now() + client.mu.Unlock() + select { case <-r.done: return rpc.ErrShutdown @@ -257,7 +312,7 @@ func (r *Runner) Status(id int, reply *RunState) error { client := r.clients[id-1] client.mu.Lock() defer client.mu.Unlock() - if client.State == stateExited { + if client.State == StateExited { *reply = RunStateStart } return nil @@ -283,17 +338,19 @@ const ( // ==== related types and consts ==== type clientResult struct { - mu sync.Mutex - ExitStatus int - State clientState + mu sync.Mutex + ExitStatus int + State ClientState + LastHeardFrom time.Time } -type clientState int +type ClientState int const ( - stateNotYetConnected clientState = iota - stateConnected - stateExited + StateNotYetConnected ClientState = iota + StateConnected + StateExited + StateLost ) type waitStatus struct {