Skip to content

Commit

Permalink
Merge pull request #3045 from buildkite/liveness-check
Browse files Browse the repository at this point in the history
k8s exec: Perform liveness check of clients
  • Loading branch information
DrJosh9000 authored Oct 22, 2024
2 parents 1bccf77 + 821f0d4 commit 3c8709a
Show file tree
Hide file tree
Showing 6 changed files with 519 additions and 404 deletions.
11 changes: 6 additions & 5 deletions agent/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,12 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con
if err != nil {
return nil, fmt.Errorf("failed to parse BUILDKITE_CONTAINER_COUNT: %w", err)
}
r.process = kubernetes.New(r.agentLogger, kubernetes.Config{
Stdout: r.jobLogs,
Stderr: r.jobLogs,
ClientCount: containerCount,
Env: processEnv,
r.process = kubernetes.NewRunner(r.agentLogger, kubernetes.RunnerConfig{
Stdout: r.jobLogs,
Stderr: r.jobLogs,
ClientCount: containerCount,
Env: processEnv,
ClientLostTimeout: 30 * time.Second,
})
} else { // not Kubernetes
// The bootstrap-script gets parsed based on the operating system
Expand Down
17 changes: 13 additions & 4 deletions agent/run_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,19 @@ func (r *JobRunner) runJob(ctx context.Context) core.ProcessExit {
// Intended to capture situations where the job-exec (aka bootstrap) container did not
// start. Normally such errors are hidden in the Kubernetes events. Let's feed them up
// to the user as they may be the caused by errors in the pipeline definition.
k8sProcess, ok := r.process.(*kubernetes.Runner)
if ok && r.cancelled && !r.stopped && k8sProcess.ClientStateUnknown() {
fmt.Fprintln(r.jobLogs, "+++ Unknown container exit status")
fmt.Fprintln(r.jobLogs, "Some containers had unknown exit statuses. Perhaps the container image specified in your podSpec could not be pulled (ImagePullBackOff)")
k8sProcess, isK8s := r.process.(*kubernetes.Runner)
if isK8s && !r.stopped {
switch {
case r.cancelled && k8sProcess.AnyClientIn(kubernetes.StateNotYetConnected):
fmt.Fprint(r.jobLogs, `+++ Unknown container exit status
One or more containers never connected to the agent. Perhaps the container image specified in your podSpec could not be pulled (ImagePullBackOff)?
`)
case k8sProcess.AnyClientIn(kubernetes.StateLost):
fmt.Fprint(r.jobLogs, `+++ Unknown container exit status
One or more containers connected to the agent, but then stopped communicating without exiting normally. Perhaps the container was OOM-killed?
`)
}

}

// Collect the finished process' exit status
Expand Down
97 changes: 97 additions & 0 deletions kubernetes/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package kubernetes

import (
"context"
"errors"
"net/rpc"
"time"

"github.com/buildkite/roko"
)

type Client struct {
ID int
SocketPath string

client *rpc.Client
}

var errNotConnected = errors.New("client not connected")

func (c *Client) Connect(ctx context.Context) (*RegisterResponse, error) {
if c.SocketPath == "" {
c.SocketPath = defaultSocketPath
}

// Because k8s might run the containers "out of order", the server socket
// might not exist yet. Try to connect several times.
r := roko.NewRetrier(
roko.WithMaxAttempts(30),
roko.WithStrategy(roko.Constant(time.Second)),
)
client, err := roko.DoFunc(ctx, r, func(*roko.Retrier) (*rpc.Client, error) {
return rpc.DialHTTP("unix", c.SocketPath)
})
if err != nil {
return nil, err
}
c.client = client
var resp RegisterResponse
if err := c.client.Call("Runner.Register", c.ID, &resp); err != nil {
return nil, err
}
return &resp, nil
}

func (c *Client) Exit(exitStatus int) error {
if c.client == nil {
return errNotConnected
}
return c.client.Call("Runner.Exit", ExitCode{
ID: c.ID,
ExitStatus: exitStatus,
}, nil)
}

// Write implements io.Writer
func (c *Client) Write(p []byte) (int, error) {
if c.client == nil {
return 0, errNotConnected
}
n := len(p)
err := c.client.Call("Runner.WriteLogs", Logs{
Data: p,
}, nil)
return n, err
}

var ErrInterrupt = errors.New("interrupt signal received")

func (c *Client) Await(ctx context.Context, desiredState RunState) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
var current RunState
if err := c.client.Call("Runner.Status", c.ID, &current); err != nil {
return err
}
if current == desiredState {
return nil
}
if current == RunStateInterrupt {
return ErrInterrupt
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
}
}
}
}

func (c *Client) Close() {
c.client.Close()
}
Loading

0 comments on commit 3c8709a

Please sign in to comment.