Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convey env vars between k8s containers #2851

Merged
merged 4 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,10 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con
return nil, fmt.Errorf("failed to parse BUILDKITE_CONTAINER_COUNT: %w", err)
}
r.process = kubernetes.New(r.agentLogger, kubernetes.Config{
AccessToken: r.apiClient.Config().Token,
Stdout: r.jobLogs,
Stderr: r.jobLogs,
ClientCount: containerCount,
Env: processEnv,
})
} else { // not Kubernetes
// The bootstrap-script gets parsed based on the operating system
Expand Down
41 changes: 40 additions & 1 deletion clicommand/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import (
"syscall"
"time"

"github.com/buildkite/agent/v3/env"
"github.com/buildkite/agent/v3/internal/job"
"github.com/buildkite/agent/v3/kubernetes"
"github.com/buildkite/agent/v3/process"
"github.com/buildkite/roko"
"github.com/urfave/cli"
)

Expand Down Expand Up @@ -98,6 +101,7 @@ type BootstrapConfig struct {
NoJobAPI bool `cli:"no-job-api"`
DisableWarningsFor []string `cli:"disable-warnings-for" normalize:"list"`
KubernetesExec bool `cli:"kubernetes-exec"`
KubernetesContainerID int `cli:"kubernetes-container-id"`
}

var BootstrapCommand = cli.Command{
Expand Down Expand Up @@ -362,6 +366,13 @@ var BootstrapCommand = cli.Command{
Usage: "A list of warning IDs to disable",
EnvVar: "BUILDKITE_AGENT_DISABLE_WARNINGS_FOR",
},
cli.IntFlag{
Name: "kubernetes-container-id",
Usage: "This is intended to be used only by the Buildkite k8s stack " +
"(github.com/buildkite/agent-stack-k8s); it sets an ID number " +
"used to identify this container within the pod",
EnvVar: "BUILDKITE_CONTAINER_ID",
},
cancelSignalFlag,
signalGracePeriodSecondsFlag,

Expand All @@ -376,6 +387,34 @@ var BootstrapCommand = cli.Command{
},
Action: func(c *cli.Context) error {
ctx := context.Background()

// Surprise! Before doing anything else, even loading the other
// flags/config, we connect the socket to get env vars. These are
// normally present in the environment if the bootstrap was forked,
// directly, but because in k8s land we use containers to separate the
// agent and its executors, such vars won't be available unless we do
// this.
var k8sClient *kubernetes.Client
if c.Bool(KubernetesExecFlag.Name) {
k8sClient = &kubernetes.Client{ID: c.Int("kubernetes-container-id")}

rtr := roko.NewRetrier(
roko.WithMaxAttempts(7),
roko.WithStrategy(roko.Exponential(2*time.Second, 0)),
)
regResp, err := roko.DoFunc(ctx, rtr, func(rtr *roko.Retrier) (*kubernetes.RegisterResponse, error) {
return k8sClient.Connect(ctx)
})
if err != nil {
return fmt.Errorf("error connecting to kubernetes runner: %w", err)
}
for n, v := range env.FromSlice(regResp.Env).Dump() {
if err := os.Setenv(n, v); err != nil {
return err
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only issues I can think of for this are:

  1. now many environment variables set by our k8s controller as part of the pod spec will be entirely useless. We might want to eliminate those logic in k8s stack.
  2. Is there any environment variables that agent container have access but bootstrap containers don't/shouldn't?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Probably!
  2. In theory, everything the agent container passes this way to the bootstrap container would, if not using the k8s stack, be set specifically on the bootstrap subprocess forked by the agent. So I don't imagine there's anything the bootstrap shouldn't have access to. Definitely something to think about.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Point 2 was a good callout - after thinking on it for a bit, some of the vars normally set by the agent would interfere with the variables set by the k8s stack if they were set after the bootstrap is forked.

}

ctx, cfg, l, _, done := setupLoggerAndConfig[BootstrapConfig](ctx, c)
defer done()

Expand Down Expand Up @@ -454,7 +493,7 @@ var BootstrapCommand = cli.Command{
TracingServiceName: cfg.TracingServiceName,
JobAPI: !cfg.NoJobAPI,
DisabledWarnings: cfg.DisableWarningsFor,
KubernetesExec: cfg.KubernetesExec,
KubernetesClient: k8sClient,
DrJosh9000 marked this conversation as resolved.
Show resolved Hide resolved
})

cctx, cancel := context.WithCancel(ctx)
Expand Down
5 changes: 3 additions & 2 deletions internal/job/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/buildkite/agent/v3/env"
"github.com/buildkite/agent/v3/kubernetes"
"github.com/buildkite/agent/v3/process"
)

Expand Down Expand Up @@ -168,8 +169,8 @@ type ExecutorConfig struct {
// Whether to start the JobAPI
JobAPI bool

// Whether to connect to the Kubernetes socket
KubernetesExec bool
// The connected Kubernetes socket, if needed
KubernetesClient *kubernetes.Client

// The warnings that have been disabled by the user
DisabledWarnings []string
Expand Down
41 changes: 11 additions & 30 deletions internal/job/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,14 @@ func (e *Executor) Run(ctx context.Context) (exitCode int) {
e.shell.InterruptSignal = e.ExecutorConfig.CancelSignal
e.shell.SignalGracePeriod = e.ExecutorConfig.SignalGracePeriod
}
if e.KubernetesExec {
kubernetesClient := &kubernetes.Client{}
if err := e.startKubernetesClient(ctx, kubernetesClient); err != nil {

if e.KubernetesClient != nil {
if err := e.kubernetesSetup(ctx, e.KubernetesClient); err != nil {
e.shell.Errorf("Failed to start kubernetes client: %v", err)
return 1
}
defer func() {
_ = kubernetesClient.Exit(exitCode)
_ = e.KubernetesClient.Exit(exitCode)
}()
}

Expand Down Expand Up @@ -1182,34 +1182,15 @@ func (e *Executor) setupRedactors() {
e.redactors.Add(valuesToRedact...)
}

func (e *Executor) startKubernetesClient(ctx context.Context, kubernetesClient *kubernetes.Client) error {
e.shell.Commentf("Using experimental Kubernetes support")
err := roko.NewRetrier(
roko.WithMaxAttempts(7),
roko.WithStrategy(roko.Exponential(2*time.Second, 0)),
).Do(func(rtr *roko.Retrier) error {
id, err := strconv.Atoi(os.Getenv("BUILDKITE_CONTAINER_ID"))
if err != nil {
return fmt.Errorf("failed to parse BUILDKITE_CONTAINER_ID %q", os.Getenv("BUILDKITE_CONTAINER_ID"))
}
kubernetesClient.ID = id
connect, err := kubernetesClient.Connect(ctx)
if err != nil {
return err
}
os.Setenv("BUILDKITE_AGENT_ACCESS_TOKEN", connect.AccessToken)
e.shell.Env.Set("BUILDKITE_AGENT_ACCESS_TOKEN", connect.AccessToken)
writer := io.MultiWriter(os.Stdout, kubernetesClient)
e.shell.Writer = writer
e.shell.Logger = shell.NewWriterLogger(writer, true, e.DisabledWarnings)
func (e *Executor) kubernetesSetup(ctx context.Context, kubernetesClient *kubernetes.Client) error {
e.shell.Commentf("Using Kubernetes support")

return nil
})

if err != nil {
return fmt.Errorf("error connecting to kubernetes runner: %w", err)
}
// Attach the log stream to the k8s client
writer := io.MultiWriter(os.Stdout, kubernetesClient)
e.shell.Writer = writer
e.shell.Logger = shell.NewWriterLogger(writer, true, e.DisabledWarnings)

// Proceed when ready
if err := kubernetesClient.Await(ctx, kubernetes.RunStateStart); err != nil {
return fmt.Errorf("error waiting for client to become ready: %w", err)
}
Expand Down
28 changes: 13 additions & 15 deletions kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,10 @@ type Runner struct {
conf Config
mu sync.Mutex
listener net.Listener
started,
done,
interrupt chan struct{}
startedOnce,
closedOnce,
interruptOnce sync.Once

started, done, interrupt chan struct{}
startedOnce, closedOnce, interruptOnce sync.Once

server *rpc.Server
mux *http.ServeMux
clients map[int]*clientResult
Expand All @@ -79,7 +77,7 @@ type Config struct {
SocketPath string
ClientCount int
Stdout, Stderr io.Writer
AccessToken string
Env []string
}

func (r *Runner) Run(ctx context.Context) error {
Expand Down Expand Up @@ -196,12 +194,11 @@ type ExitCode struct {
}

type Status struct {
Ready bool
AccessToken string
Ready bool
}

type RegisterResponse struct {
AccessToken string
Env []string
}

func (r *Runner) WriteLogs(args Logs, reply *Empty) error {
Expand Down Expand Up @@ -256,7 +253,8 @@ func (r *Runner) Register(id int, reply *RegisterResponse) error {
}
r.logger.Info("client %d connected", id)
client.State = stateConnected
reply.AccessToken = r.conf.AccessToken

reply.Env = r.conf.Env
return nil
}

Expand Down Expand Up @@ -288,7 +286,7 @@ type Client struct {

var errNotConnected = errors.New("client not connected")

func (c *Client) Connect(ctx context.Context) (RegisterResponse, error) {
func (c *Client) Connect(ctx context.Context) (*RegisterResponse, error) {
if c.SocketPath == "" {
c.SocketPath = defaultSocketPath
}
Expand All @@ -303,14 +301,14 @@ func (c *Client) Connect(ctx context.Context) (RegisterResponse, error) {
return rpc.DialHTTP("unix", c.SocketPath)
})
if err != nil {
return RegisterResponse{}, err
return nil, err
}
c.client = client
var resp RegisterResponse
if err := c.client.Call("Runner.Register", c.ID, &resp); err != nil {
return RegisterResponse{}, err
return nil, err
}
return resp, nil
return &resp, nil
}

func (c *Client) Exit(exitStatus int) error {
Expand Down