Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convey env vars between k8s containers #2851

Merged
merged 4 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,10 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con
return nil, fmt.Errorf("failed to parse BUILDKITE_CONTAINER_COUNT: %w", err)
}
r.process = kubernetes.New(r.agentLogger, kubernetes.Config{
AccessToken: r.apiClient.Config().Token,
Stdout: r.jobLogs,
Stderr: r.jobLogs,
ClientCount: containerCount,
Env: processEnv,
})
} else { // not Kubernetes
// The bootstrap-script gets parsed based on the operating system
Expand Down
71 changes: 70 additions & 1 deletion clicommand/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import (
"syscall"
"time"

"github.com/buildkite/agent/v3/env"
"github.com/buildkite/agent/v3/internal/job"
"github.com/buildkite/agent/v3/kubernetes"
"github.com/buildkite/agent/v3/process"
"github.com/buildkite/roko"
"github.com/urfave/cli"
)

Expand Down Expand Up @@ -98,6 +101,7 @@ type BootstrapConfig struct {
NoJobAPI bool `cli:"no-job-api"`
DisableWarningsFor []string `cli:"disable-warnings-for" normalize:"list"`
KubernetesExec bool `cli:"kubernetes-exec"`
KubernetesContainerID int `cli:"kubernetes-container-id"`
}

var BootstrapCommand = cli.Command{
Expand Down Expand Up @@ -362,6 +366,13 @@ var BootstrapCommand = cli.Command{
Usage: "A list of warning IDs to disable",
EnvVar: "BUILDKITE_AGENT_DISABLE_WARNINGS_FOR",
},
cli.IntFlag{
Name: "kubernetes-container-id",
Usage: "This is intended to be used only by the Buildkite k8s stack " +
"(github.com/buildkite/agent-stack-k8s); it sets an ID number " +
"used to identify this container within the pod",
EnvVar: "BUILDKITE_CONTAINER_ID",
},
cancelSignalFlag,
signalGracePeriodSecondsFlag,

Expand All @@ -376,6 +387,64 @@ var BootstrapCommand = cli.Command{
},
Action: func(c *cli.Context) error {
ctx := context.Background()

// Surprise! Before doing anything else, even loading the other
// flags/config, we connect the socket to get env vars. These are
// normally present in the environment if the bootstrap was forked,
// directly, but because in k8s land we use containers to separate the
// agent and its executors, such vars won't be available unless we do
// this.
var k8sAgentSocket *kubernetes.Client
if c.Bool(KubernetesExecFlag.Name) {
k8sAgentSocket = &kubernetes.Client{ID: c.Int("kubernetes-container-id")}

rtr := roko.NewRetrier(
roko.WithMaxAttempts(7),
roko.WithStrategy(roko.Exponential(2*time.Second, 0)),
)
regResp, err := roko.DoFunc(ctx, rtr, func(rtr *roko.Retrier) (*kubernetes.RegisterResponse, error) {
return k8sAgentSocket.Connect(ctx)
})
if err != nil {
return fmt.Errorf("error connecting to kubernetes runner: %w", err)
}

// Set our environment vars based on the registration response.
// But note that the k8s stack interprets the job definition itself,
// and sets a variety of env vars (e.g. BUILDKITE_COMMAND) that
// *could* be different to the ones the agent normally supplies.
// Examples:
// * The command container could be passed a specific
// BUILDKITE_COMMAND that is computed from the command+args
// podSpec attributes (in the kubernetes "plugin"), instead of the
// "command" attribute of the step.
// * BUILDKITE_PLUGINS is pre-processed by the k8s stack to remove
// the kubernetes "plugin". If we used the agent's default
// BUILDKITE_PLUGINS, we'd be trying to find a kubernetes plugin
// that doesn't exist.
// So we should skip setting any vars that are already set, and
// specifically any that could be deliberately *unset* by the
// k8s stack (BUILDKITE_PLUGINS could be unset if kubernetes is
// the only "plugin" in the step).
// (Maybe we could move some of the k8s stack processing in here?)
for n, v := range env.FromSlice(regResp.Env).Dump() {
// Skip these ones specifically.
// See agent-stack-k8s/internal/controller/scheduler/scheduler.go#(*jobWrapper).Build
switch n {
case "BUILDKITE_COMMAND", "BUILDKITE_ARTIFACT_PATHS", "BUILDKITE_PLUGINS":
continue
}
// Skip any that are already set.
if _, set := os.LookupEnv(n); set {
continue
}
// Set it!
if err := os.Setenv(n, v); err != nil {
return err
}
}
}

ctx, cfg, l, _, done := setupLoggerAndConfig[BootstrapConfig](ctx, c)
defer done()

Expand Down Expand Up @@ -454,7 +523,7 @@ var BootstrapCommand = cli.Command{
TracingServiceName: cfg.TracingServiceName,
JobAPI: !cfg.NoJobAPI,
DisabledWarnings: cfg.DisableWarningsFor,
KubernetesExec: cfg.KubernetesExec,
K8sAgentSocket: k8sAgentSocket,
})

cctx, cancel := context.WithCancel(ctx)
Expand Down
5 changes: 3 additions & 2 deletions internal/job/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/buildkite/agent/v3/env"
"github.com/buildkite/agent/v3/kubernetes"
"github.com/buildkite/agent/v3/process"
)

Expand Down Expand Up @@ -168,8 +169,8 @@ type ExecutorConfig struct {
// Whether to start the JobAPI
JobAPI bool

// Whether to connect to the Kubernetes socket
KubernetesExec bool
// The connected Kubernetes socket, if needed
K8sAgentSocket *kubernetes.Client

// The warnings that have been disabled by the user
DisabledWarnings []string
Expand Down
47 changes: 14 additions & 33 deletions internal/job/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,14 @@ func (e *Executor) Run(ctx context.Context) (exitCode int) {
e.shell.InterruptSignal = e.ExecutorConfig.CancelSignal
e.shell.SignalGracePeriod = e.ExecutorConfig.SignalGracePeriod
}
if e.KubernetesExec {
kubernetesClient := &kubernetes.Client{}
if err := e.startKubernetesClient(ctx, kubernetesClient); err != nil {
e.shell.Errorf("Failed to start kubernetes client: %v", err)

if e.K8sAgentSocket != nil {
if err := e.kubernetesSetup(ctx, e.K8sAgentSocket); err != nil {
e.shell.Errorf("Failed to start kubernetes socket client: %v", err)
return 1
}
defer func() {
_ = kubernetesClient.Exit(exitCode)
_ = e.K8sAgentSocket.Exit(exitCode)
}()
}

Expand Down Expand Up @@ -1182,35 +1182,16 @@ func (e *Executor) setupRedactors() {
e.redactors.Add(valuesToRedact...)
}

func (e *Executor) startKubernetesClient(ctx context.Context, kubernetesClient *kubernetes.Client) error {
e.shell.Commentf("Using experimental Kubernetes support")
err := roko.NewRetrier(
roko.WithMaxAttempts(7),
roko.WithStrategy(roko.Exponential(2*time.Second, 0)),
).Do(func(rtr *roko.Retrier) error {
id, err := strconv.Atoi(os.Getenv("BUILDKITE_CONTAINER_ID"))
if err != nil {
return fmt.Errorf("failed to parse BUILDKITE_CONTAINER_ID %q", os.Getenv("BUILDKITE_CONTAINER_ID"))
}
kubernetesClient.ID = id
connect, err := kubernetesClient.Connect(ctx)
if err != nil {
return err
}
os.Setenv("BUILDKITE_AGENT_ACCESS_TOKEN", connect.AccessToken)
e.shell.Env.Set("BUILDKITE_AGENT_ACCESS_TOKEN", connect.AccessToken)
writer := io.MultiWriter(os.Stdout, kubernetesClient)
e.shell.Writer = writer
e.shell.Logger = shell.NewWriterLogger(writer, true, e.DisabledWarnings)
func (e *Executor) kubernetesSetup(ctx context.Context, k8sAgentSocket *kubernetes.Client) error {
e.shell.Commentf("Using Kubernetes support")

return nil
})

if err != nil {
return fmt.Errorf("error connecting to kubernetes runner: %w", err)
}
// Attach the log stream to the k8s client
writer := io.MultiWriter(os.Stdout, k8sAgentSocket)
e.shell.Writer = writer
e.shell.Logger = shell.NewWriterLogger(writer, true, e.DisabledWarnings)

if err := kubernetesClient.Await(ctx, kubernetes.RunStateStart); err != nil {
// Proceed when ready
if err := k8sAgentSocket.Await(ctx, kubernetes.RunStateStart); err != nil {
return fmt.Errorf("error waiting for client to become ready: %w", err)
}

Expand All @@ -1220,7 +1201,7 @@ func (e *Executor) startKubernetesClient(ctx context.Context, kubernetesClient *
// If the k8s client is interrupted because our own ctx was cancelled,
// then the job is already stopping, so there's no point logging an
// error.
if err := kubernetesClient.Await(ctx, kubernetes.RunStateInterrupt); err != nil && !errors.Is(err, context.Canceled) {
if err := k8sAgentSocket.Await(ctx, kubernetes.RunStateInterrupt); err != nil && !errors.Is(err, context.Canceled) {
e.shell.Errorf("Error waiting for client interrupt: %v", err)
}
e.Cancel()
Expand Down
28 changes: 13 additions & 15 deletions kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,10 @@ type Runner struct {
conf Config
mu sync.Mutex
listener net.Listener
started,
done,
interrupt chan struct{}
startedOnce,
closedOnce,
interruptOnce sync.Once

started, done, interrupt chan struct{}
startedOnce, closedOnce, interruptOnce sync.Once

server *rpc.Server
mux *http.ServeMux
clients map[int]*clientResult
Expand All @@ -79,7 +77,7 @@ type Config struct {
SocketPath string
ClientCount int
Stdout, Stderr io.Writer
AccessToken string
Env []string
}

func (r *Runner) Run(ctx context.Context) error {
Expand Down Expand Up @@ -196,12 +194,11 @@ type ExitCode struct {
}

type Status struct {
Ready bool
AccessToken string
Ready bool
}

type RegisterResponse struct {
AccessToken string
Env []string
}

func (r *Runner) WriteLogs(args Logs, reply *Empty) error {
Expand Down Expand Up @@ -256,7 +253,8 @@ func (r *Runner) Register(id int, reply *RegisterResponse) error {
}
r.logger.Info("client %d connected", id)
client.State = stateConnected
reply.AccessToken = r.conf.AccessToken

reply.Env = r.conf.Env
return nil
}

Expand Down Expand Up @@ -288,7 +286,7 @@ type Client struct {

var errNotConnected = errors.New("client not connected")

func (c *Client) Connect(ctx context.Context) (RegisterResponse, error) {
func (c *Client) Connect(ctx context.Context) (*RegisterResponse, error) {
if c.SocketPath == "" {
c.SocketPath = defaultSocketPath
}
Expand All @@ -303,14 +301,14 @@ func (c *Client) Connect(ctx context.Context) (RegisterResponse, error) {
return rpc.DialHTTP("unix", c.SocketPath)
})
if err != nil {
return RegisterResponse{}, err
return nil, err
}
c.client = client
var resp RegisterResponse
if err := c.client.Call("Runner.Register", c.ID, &resp); err != nil {
return RegisterResponse{}, err
return nil, err
}
return resp, nil
return &resp, nil
}

func (c *Client) Exit(exitStatus int) error {
Expand Down