diff --git a/CHANGELOG.md b/CHANGELOG.md index 1849e4b15..85d7f8874 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ **Features** +* Empire now includes experimental support for showing attached runs in `emp ps`. This can be enabled with the `--x.showattached` flag, or `EMPIRE_X_SHOW_ATTACHED` [#911](https://github.com/remind101/empire/pull/911) + **Improvements** * The Custom::ECSService custom resource now waits for newly created ECS services to stabilize [#878](https://github.com/remind101/empire/pull/878) diff --git a/cmd/empire/factories.go b/cmd/empire/factories.go index d0d014c4c..660480374 100644 --- a/cmd/empire/factories.go +++ b/cmd/empire/factories.go @@ -23,9 +23,9 @@ import ( "github.com/remind101/empire/pkg/dockerauth" "github.com/remind101/empire/pkg/dockerutil" "github.com/remind101/empire/pkg/ecsutil" - "github.com/remind101/empire/pkg/runner" "github.com/remind101/empire/scheduler" "github.com/remind101/empire/scheduler/cloudformation" + "github.com/remind101/empire/scheduler/docker" "github.com/remind101/empire/scheduler/ecs" "github.com/remind101/pkg/logger" "github.com/remind101/pkg/reporter" @@ -104,12 +104,11 @@ func newEmpire(db *empire.DB, c *cli.Context) (*empire.Empire, error) { // Scheduler ============================ func newScheduler(db *empire.DB, c *cli.Context) (scheduler.Scheduler, error) { - r, err := newDockerRunner(c) - if err != nil { - return nil, err - } + var ( + s scheduler.Scheduler + err error + ) - var s scheduler.Scheduler switch c.String(FlagScheduler) { case "ecs": s, err = newECSScheduler(db, c) @@ -125,10 +124,14 @@ func newScheduler(db *empire.DB, c *cli.Context) (scheduler.Scheduler, error) { return nil, fmt.Errorf("failed to initialize %s scheduler: %v", c.String(FlagScheduler), err) } - return &scheduler.AttachedRunner{ - Scheduler: s, - Runner: r, - }, nil + d, err := newDockerClient(c) + if err != nil { + return nil, err + } + + a := docker.RunAttachedWithDocker(s, d) + a.ShowAttached = c.Bool(FlagXShowAttached) + return a, nil } func newMigrationScheduler(db *empire.DB, c *cli.Context) (*cloudformation.MigrationScheduler, error) { @@ -254,14 +257,6 @@ func newConfigProvider(c *cli.Context) client.ConfigProvider { return p } -func newDockerRunner(c *cli.Context) (*runner.Runner, error) { - client, err := newDockerClient(c) - if err != nil { - return nil, err - } - return runner.NewRunner(client), nil -} - // DockerClient ======================== func newDockerClient(c *cli.Context) (*dockerutil.Client, error) { diff --git a/cmd/empire/main.go b/cmd/empire/main.go index 1e098bc9b..878a9afd5 100644 --- a/cmd/empire/main.go +++ b/cmd/empire/main.go @@ -63,6 +63,9 @@ const ( FlagLogsStreamer = "logs.streamer" FlagEnvironment = "environment" + + // Expiremental flags. + FlagXShowAttached = "x.showattached" ) // Commands are the subcommands that are available. @@ -326,6 +329,11 @@ var EmpireFlags = []cli.Flag{ Usage: "If true, messages will be required for empire actions that emit events.", EnvVar: "EMPIRE_MESSAGES_REQUIRED", }, + cli.BoolFlag{ + Name: FlagXShowAttached, + Usage: "If true, attached runs will be shown in `emp ps` output.", + EnvVar: "EMPIRE_X_SHOW_ATTACHED", + }, } func main() { diff --git a/docker-compose.yml b/docker-compose.yml index 5d78fb59d..4d78ac437 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,6 +14,7 @@ server: EMPIRE_DATABASE_URL: postgres://postgres:postgres@db/postgres?sslmode=disable DOCKER_HOST: unix:///var/run/docker.sock EMPIRE_SCHEDULER: cloudformation-migration + EMPIRE_X_SHOW_ATTACHED: 'true' db: image: postgres ports: diff --git a/docs/cloudformation.json b/docs/cloudformation.json index 823196b68..3b9f54eea 100644 --- a/docs/cloudformation.json +++ b/docs/cloudformation.json @@ -776,6 +776,10 @@ { "Name": "EMPIRE_GITHUB_TEAM_ID", "Value": { "Ref": "GitHubTeamId" } + }, + { + "Name": "EMPIRE_X_SHOW_ATTACHED", + "Value": "true" } ], "Command": ["server", "-automigrate=true"], diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go deleted file mode 100644 index 0deb69a63..000000000 --- a/pkg/runner/runner.go +++ /dev/null @@ -1,170 +0,0 @@ -// package runner provides a simple interface for running docker containers. -package runner - -import ( - "bytes" - "fmt" - "io" - - "code.google.com/p/go-uuid/uuid" - - "github.com/fsouza/go-dockerclient" - "github.com/remind101/empire/pkg/dockerutil" - "github.com/remind101/empire/pkg/image" - "golang.org/x/net/context" -) - -// DefaultStopTimeout is the number of seconds to wait when stopping a -// container. -const DefaultStopTimeout = 10 - -// RunOpts is used when running. -type RunOpts struct { - // Image is the image to run. - Image image.Image - - // Command is the command to run. - Command []string - - // Environment variables to set. - Env map[string]string - - // Labels to set - Labels map[string]string - - // Memory/CPUShares. - Memory int64 - CPUShares int64 - - // Streams fo Stdout, Stderr and Stdin. - Input io.Reader - Output io.Writer -} - -// Runner is a service for running containers. -type Runner struct { - client *dockerutil.Client -} - -// NewRunner returns a new Runner instance using the docker.Client as the docker -// client. -func NewRunner(client *dockerutil.Client) *Runner { - return &Runner{client: client} -} - -func (r *Runner) Run(ctx context.Context, opts RunOpts) error { - if err := r.pull(ctx, opts.Image, replaceNL(opts.Output)); err != nil { - return fmt.Errorf("runner: pull: %v", err) - } - - c, err := r.create(ctx, opts) - if err != nil { - return fmt.Errorf("runner: create container: %v", err) - } - defer r.remove(ctx, c.ID) - - if err := r.start(ctx, c.ID); err != nil { - return fmt.Errorf("runner: start container: %v", err) - } - defer tryClose(opts.Output) - - if err := r.attach(ctx, c.ID, opts.Input, opts.Output); err != nil { - return fmt.Errorf("runner: attach: %v", err) - } - - return nil -} - -func (r *Runner) pull(ctx context.Context, img image.Image, out io.Writer) error { - return r.client.PullImage(ctx, docker.PullImageOptions{ - Registry: img.Registry, - Repository: img.Repository, - Tag: img.Tag, - OutputStream: out, - }) -} - -func (r *Runner) create(ctx context.Context, opts RunOpts) (*docker.Container, error) { - return r.client.CreateContainer(ctx, docker.CreateContainerOptions{ - Name: uuid.New(), - Config: &docker.Config{ - Tty: true, - AttachStdin: true, - AttachStdout: true, - AttachStderr: true, - OpenStdin: true, - Memory: opts.Memory, - CPUShares: opts.CPUShares, - Image: opts.Image.String(), - Cmd: opts.Command, - Env: envKeys(opts.Env), - Labels: opts.Labels, - }, - HostConfig: &docker.HostConfig{ - LogConfig: docker.LogConfig{ - Type: "json-file", - }, - }, - }) -} - -func (r *Runner) start(ctx context.Context, id string) error { - return r.client.StartContainer(ctx, id, nil) -} - -func (r *Runner) attach(ctx context.Context, id string, in io.Reader, out io.Writer) error { - return r.client.AttachToContainer(ctx, docker.AttachToContainerOptions{ - Container: id, - InputStream: in, - OutputStream: out, - ErrorStream: out, - Logs: true, - Stream: true, - Stdin: true, - Stdout: true, - Stderr: true, - RawTerminal: true, - }) -} - -func (r *Runner) remove(ctx context.Context, id string) error { - return r.client.RemoveContainer(ctx, docker.RemoveContainerOptions{ - ID: id, - RemoveVolumes: true, - Force: true, - }) -} - -func envKeys(env map[string]string) []string { - var s []string - - for k, v := range env { - s = append(s, fmt.Sprintf("%s=%s", k, v)) - } - - return s -} - -func tryClose(w io.Writer) error { - if w, ok := w.(io.Closer); ok { - return w.Close() - } - - return nil -} - -// replaceNL returns an io.Writer that will replace "\n" with "\r\n" in the -// stream. -var replaceNL = func(w io.Writer) io.Writer { - o, n := []byte("\n"), []byte("\r\n") - return writerFunc(func(p []byte) (int, error) { - return w.Write(bytes.Replace(p, o, n, -1)) - }) -} - -// writerFunc is a function that implements io.Writer. -type writerFunc func([]byte) (int, error) - -func (f writerFunc) Write(p []byte) (int, error) { - return f(p) -} diff --git a/pkg/runner/runner_test.go b/pkg/runner/runner_test.go deleted file mode 100644 index 344560a98..000000000 --- a/pkg/runner/runner_test.go +++ /dev/null @@ -1,40 +0,0 @@ -// +build docker - -package runner - -import ( - "bytes" - "strings" - "testing" - - "golang.org/x/net/context" - - "github.com/remind101/empire/pkg/dockerutil" - "github.com/remind101/empire/pkg/image" -) - -func TestRunner(t *testing.T) { - r := newTestRunner(t) - out := new(bytes.Buffer) - - if err := r.Run(context.Background(), RunOpts{ - Image: image.Image{ - Repository: "ubuntu", - Tag: "14.04", - }, - Command: "/bin/bash 'sleep 60'", - Input: strings.NewReader("ls\nexit\n"), - Output: out, - }); err != nil { - t.Fatal(err) - } -} - -func newTestRunner(t testing.TB) *Runner { - c, err := dockerutil.NewClientFromEnv(nil) - if err != nil { - t.Fatal(err) - } - - return NewRunner(c) -} diff --git a/scheduler/cloudformation/cloudformation.go b/scheduler/cloudformation/cloudformation.go index acaaa564e..ccee9f334 100644 --- a/scheduler/cloudformation/cloudformation.go +++ b/scheduler/cloudformation/cloudformation.go @@ -702,10 +702,10 @@ func (s *Scheduler) Services(appID string) (map[string]string, error) { } // Stop stops the given ECS task. -func (s *Scheduler) Stop(ctx context.Context, instanceID string) error { +func (s *Scheduler) Stop(ctx context.Context, taskID string) error { _, err := s.ecs.StopTask(&ecs.StopTaskInput{ Cluster: aws.String(s.Cluster), - Task: aws.String(instanceID), + Task: aws.String(taskID), }) return err } diff --git a/scheduler/docker/docker.go b/scheduler/docker/docker.go index 46793388a..52f5ce05b 100644 --- a/scheduler/docker/docker.go +++ b/scheduler/docker/docker.go @@ -1,6 +1,339 @@ // Package docker implements the Scheduler interface backed by the Docker API. -// This implementation is not recommended for production use, but can be used in -// development for testing. -// -// TODO: Implement this. package docker + +import ( + "bytes" + "errors" + "fmt" + "io" + "strings" + + "code.google.com/p/go-uuid/uuid" + + "github.com/fsouza/go-dockerclient" + "github.com/remind101/empire/pkg/dockerutil" + "github.com/remind101/empire/scheduler" + "golang.org/x/net/context" +) + +// The amount of time to wait for a container to stop before sending a SIGKILL. +const stopContainerTimeout = 10 // Seconds + +// dockerClient defines the Docker client interface we use. +type dockerClient interface { + InspectContainer(string) (*docker.Container, error) + ListContainers(docker.ListContainersOptions) ([]docker.APIContainers, error) + PullImage(context.Context, docker.PullImageOptions) error + CreateContainer(context.Context, docker.CreateContainerOptions) (*docker.Container, error) + RemoveContainer(context.Context, docker.RemoveContainerOptions) error + StartContainer(context.Context, string, *docker.HostConfig) error + StopContainer(context.Context, string, uint) error + AttachToContainer(context.Context, docker.AttachToContainerOptions) error +} + +const ( + // Label that determines whether the container is from a one-off run or + // not. The value of this label will be `attached` or `detached`. + runLabel = "run" + + // Label that determines what app the run relates to. + appLabel = "empire.app.id" + + // Label that determines what the name of the process is. + processLabel = "empire.app.process" +) + +// Values for `runLabel`. +const ( + Attached = "attached" + Detached = "detached" +) + +// AttachedScheduler wraps a Scheduler to run attached processes using the Docker +// scheduler. +type AttachedScheduler struct { + // If set, attached run instances will be merged in with instances + // returned from the wrapped scheduler. This is currently an + // experimental feature, since it requires that multiple Empire + // processes interact with a single Docker daemon. + ShowAttached bool + + scheduler.Scheduler + dockerScheduler *Scheduler +} + +// RunAttachedWithDocker wraps a Scheduler to run attached Run's using a Docker +// client. +func RunAttachedWithDocker(s scheduler.Scheduler, client *dockerutil.Client) *AttachedScheduler { + return &AttachedScheduler{ + Scheduler: s, + dockerScheduler: NewScheduler(client), + } +} + +// Run runs attached processes using the docker scheduler, and detached +// processes using the wrapped scheduler. +func (s *AttachedScheduler) Run(ctx context.Context, app *scheduler.App, process *scheduler.Process, in io.Reader, out io.Writer) error { + // Attached means stdout, stdin is attached. + attached := out != nil || in != nil + + if attached { + return s.dockerScheduler.Run(ctx, app, process, in, out) + } else { + return s.Scheduler.Run(ctx, app, process, in, out) + } +} + +// Instances returns a combination of instances from the wrapped scheduler, as +// well as instances from attached runs. +func (s *AttachedScheduler) Instances(ctx context.Context, app string) ([]*scheduler.Instance, error) { + if !s.ShowAttached { + return s.Scheduler.Instances(ctx, app) + } + + type instancesResult struct { + instances []*scheduler.Instance + err error + } + + ch := make(chan instancesResult, 1) + go func() { + attachedInstances, err := s.dockerScheduler.InstancesFromAttachedRuns(ctx, app) + ch <- instancesResult{attachedInstances, err} + }() + + instances, err := s.Scheduler.Instances(ctx, app) + if err != nil { + return instances, err + } + + result := <-ch + if err := result.err; err != nil { + return instances, err + } + + return append(instances, result.instances...), nil +} + +// Stop checks if there's an attached run matching the given id, and stops that +// container if there is. Otherwise, it delegates to the wrapped Scheduler. +func (s *AttachedScheduler) Stop(ctx context.Context, maybeContainerID string) error { + if !s.ShowAttached { + return s.Scheduler.Stop(ctx, maybeContainerID) + } + + err := s.dockerScheduler.Stop(ctx, maybeContainerID) + + // If there's no container with this ID, delegate to the wrapped + // scheduler. + if _, ok := err.(*docker.NoSuchContainer); ok { + return s.Scheduler.Stop(ctx, maybeContainerID) + } + + return err +} + +// Scheduler provides an implementation of the scheduler.Scheduler interface +// backed by Docker. +type Scheduler struct { + docker dockerClient +} + +// NewScheduler returns a new Scheduler instance that uses the given client to +// interact with Docker. +func NewScheduler(client *dockerutil.Client) *Scheduler { + return &Scheduler{ + docker: client, + } +} + +func (s *Scheduler) Run(ctx context.Context, app *scheduler.App, p *scheduler.Process, in io.Reader, out io.Writer) error { + attached := out != nil || in != nil + + if !attached { + return errors.New("cannot run detached processes with Docker scheduler") + } + + labels := scheduler.Labels(app, p) + labels[runLabel] = Attached + + if err := s.docker.PullImage(ctx, docker.PullImageOptions{ + Registry: p.Image.Registry, + Repository: p.Image.Repository, + Tag: p.Image.Tag, + OutputStream: replaceNL(out), + }); err != nil { + return fmt.Errorf("error pulling image: %v", err) + } + + container, err := s.docker.CreateContainer(ctx, docker.CreateContainerOptions{ + Name: uuid.New(), + Config: &docker.Config{ + Tty: true, + AttachStdin: true, + AttachStdout: true, + AttachStderr: true, + OpenStdin: true, + Memory: int64(p.MemoryLimit), + CPUShares: int64(p.CPUShares), + Image: p.Image.String(), + Cmd: p.Command, + Env: envKeys(scheduler.Env(app, p)), + Labels: labels, + }, + HostConfig: &docker.HostConfig{ + LogConfig: docker.LogConfig{ + Type: "json-file", + }, + }, + }) + if err != nil { + return fmt.Errorf("error creating container: %v", err) + } + defer s.docker.RemoveContainer(ctx, docker.RemoveContainerOptions{ + ID: container.ID, + RemoveVolumes: true, + Force: true, + }) + + if err := s.docker.StartContainer(ctx, container.ID, nil); err != nil { + return fmt.Errorf("error starting container: %v", err) + } + defer tryClose(out) + + if err := s.docker.AttachToContainer(ctx, docker.AttachToContainerOptions{ + Container: container.ID, + InputStream: in, + OutputStream: out, + ErrorStream: out, + Logs: true, + Stream: true, + Stdin: true, + Stdout: true, + Stderr: true, + RawTerminal: true, + }); err != nil { + return fmt.Errorf("error attaching to container: %v", err) + } + + return nil +} + +func (s *Scheduler) Instances(ctx context.Context, app string) ([]*scheduler.Instance, error) { + return s.InstancesFromAttachedRuns(ctx, app) +} + +// InstancesFromAttachedRuns returns Instances that were started from attached +// runs. +func (s *Scheduler) InstancesFromAttachedRuns(ctx context.Context, app string) ([]*scheduler.Instance, error) { + // Filter only docker containers that were started as an attached run. + attached := fmt.Sprintf("%s=%s", runLabel, Attached) + return s.instances(ctx, app, attached) +} + +// instances returns docker container instances for this app, optionally +// filtered with labels. +func (s *Scheduler) instances(ctx context.Context, app string, labels ...string) ([]*scheduler.Instance, error) { + var instances []*scheduler.Instance + + containers, err := s.docker.ListContainers(docker.ListContainersOptions{ + Filters: map[string][]string{ + "label": append([]string{ + fmt.Sprintf("%s=%s", appLabel, app), + }, labels...), + }, + }) + if err != nil { + return nil, fmt.Errorf("error listing containers from attached runs: %v", err) + } + + for _, apiContainer := range containers { + container, err := s.docker.InspectContainer(apiContainer.ID) + if err != nil { + return instances, fmt.Errorf("error inspecting container %s: %v", apiContainer.ID, err) + } + + state := strings.ToUpper(container.State.StateString()) + + instances = append(instances, &scheduler.Instance{ + ID: container.ID[0:12], + State: state, + UpdatedAt: container.State.StartedAt, + Process: &scheduler.Process{ + Type: container.Config.Labels[processLabel], + Command: container.Config.Cmd, + Env: parseEnv(container.Config.Env), + MemoryLimit: uint(container.HostConfig.Memory), + CPUShares: uint(container.HostConfig.CPUShares), + }, + }) + } + + return instances, nil +} + +// Stop stops the given container. +func (s *Scheduler) Stop(ctx context.Context, containerID string) error { + container, err := s.docker.InspectContainer(containerID) + if err != nil { + return err + } + + // Some extra protection around stopping containers. We don't want to + // allow users to stop containers that may have been started outside of + // Empire. + if _, ok := container.Config.Labels[runLabel]; !ok { + return &docker.NoSuchContainer{ + ID: containerID, + } + } + + if err := s.docker.StopContainer(ctx, containerID, stopContainerTimeout); err != nil { + return err + } + + return nil +} + +func parseEnv(env []string) map[string]string { + m := make(map[string]string) + for _, e := range env { + parts := strings.SplitN(e, "=", 2) + m[parts[0]] = parts[1] + } + return m +} + +func envKeys(env map[string]string) []string { + var s []string + + for k, v := range env { + s = append(s, fmt.Sprintf("%s=%s", k, v)) + } + + return s +} + +func tryClose(w io.Writer) error { + if w, ok := w.(io.Closer); ok { + return w.Close() + } + + return nil +} + +// replaceNL returns an io.Writer that will replace "\n" with "\r\n" in the +// stream. +var replaceNL = func(w io.Writer) io.Writer { + o, n := []byte("\n"), []byte("\r\n") + return writerFunc(func(p []byte) (int, error) { + return w.Write(bytes.Replace(p, o, n, -1)) + }) +} + +// writerFunc is a function that implements io.Writer. +type writerFunc func([]byte) (int, error) + +func (f writerFunc) Write(p []byte) (int, error) { + return f(p) +} diff --git a/scheduler/docker/docker_test.go b/scheduler/docker/docker_test.go new file mode 100644 index 000000000..a7a8f493b --- /dev/null +++ b/scheduler/docker/docker_test.go @@ -0,0 +1,192 @@ +package docker + +import ( + "testing" + "time" + + "golang.org/x/net/context" + + "github.com/fsouza/go-dockerclient" + "github.com/remind101/empire/pkg/bytesize" + "github.com/remind101/empire/scheduler" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +var ctx = context.Background() + +func TestScheduler_InstancesFromAttachedRuns(t *testing.T) { + d := new(mockDockerClient) + s := Scheduler{ + docker: d, + } + + d.On("ListContainers", docker.ListContainersOptions{ + Filters: map[string][]string{ + "label": []string{ + "empire.app.id=2cdc4941-e36d-4855-a0ec-51525db4a500", + "run=attached", + }, + }, + }).Return([]docker.APIContainers{ + {ID: "65311c2cc20d671d43118b7d42b3f02df6b48a6bb65b1c5939007214e7587b24"}, + }, nil) + + d.On("InspectContainer", "65311c2cc20d671d43118b7d42b3f02df6b48a6bb65b1c5939007214e7587b24").Return(&docker.Container{ + ID: "65311c2cc20d671d43118b7d42b3f02df6b48a6bb65b1c5939007214e7587b24", + State: docker.State{ + Running: true, + StartedAt: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + }, + Config: &docker.Config{ + Labels: map[string]string{ + "run": "attached", + "empire.app.id": "2cdc4941-e36d-4855-a0ec-51525db4a500", + "empire.app.process": "run", + }, + Cmd: []string{"/bin/sh"}, + Env: []string{"FOO=bar"}, + }, + HostConfig: &docker.HostConfig{ + Memory: int64(124 * bytesize.MB), + CPUShares: 512, + }, + }, nil) + + instances, err := s.InstancesFromAttachedRuns(ctx, "2cdc4941-e36d-4855-a0ec-51525db4a500") + assert.NoError(t, err) + assert.Equal(t, 1, len(instances)) + assert.Equal(t, &scheduler.Instance{ + Process: &scheduler.Process{ + Type: "run", + Command: []string{"/bin/sh"}, + Env: map[string]string{ + "FOO": "bar", + }, + MemoryLimit: 124 * bytesize.MB, + CPUShares: 512, + }, + ID: "65311c2cc20d", + State: "RUNNING", + UpdatedAt: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + }, instances[0]) + + d.AssertExpectations(t) +} + +func TestScheduler_Stop(t *testing.T) { + d := new(mockDockerClient) + s := Scheduler{ + docker: d, + } + + d.On("InspectContainer", "container_id").Return(&docker.Container{ + ID: "container_id", + Config: &docker.Config{ + Labels: map[string]string{ + "run": "attached", + }, + }, + }, nil) + + d.On("StopContainer", "container_id", uint(10)).Return(nil) + + err := s.Stop(ctx, "container_id") + assert.NoError(t, err) + + d.AssertExpectations(t) +} + +func TestScheduler_Stop_ContainerNotStartedByEmpire(t *testing.T) { + d := new(mockDockerClient) + s := Scheduler{ + docker: d, + } + + d.On("InspectContainer", "container_id").Return(&docker.Container{ + ID: "container_id", + Config: &docker.Config{ + Labels: map[string]string{ + // Missing the run label + }, + }, + }, nil) + + err := s.Stop(ctx, "container_id") + assert.Error(t, err) + + d.AssertExpectations(t) +} + +func TestAttachedScheduler_Stop_ContainerNotFound(t *testing.T) { + w := new(mockScheduler) + d := new(mockDockerClient) + ds := &Scheduler{ + docker: d, + } + s := &AttachedScheduler{ + Scheduler: w, + dockerScheduler: ds, + ShowAttached: true, + } + + d.On("InspectContainer", "d9ad8d2f-318d-4abd-9d58-ece9a5ca423c").Return(nil, &docker.NoSuchContainer{ + ID: "d9ad8d2f-318d-4abd-9d58-ece9a5ca423c", + }) + + w.On("Stop", "d9ad8d2f-318d-4abd-9d58-ece9a5ca423c").Return(nil) + + err := s.Stop(ctx, "d9ad8d2f-318d-4abd-9d58-ece9a5ca423c") + assert.NoError(t, err) + + d.AssertExpectations(t) + w.AssertExpectations(t) +} + +func TestParseEnv(t *testing.T) { + tests := []struct { + in []string + out map[string]string + }{ + {[]string{"FOO=bar"}, map[string]string{"FOO": "bar"}}, + } + + for _, tt := range tests { + out := parseEnv(tt.in) + assert.Equal(t, tt.out, out) + } +} + +type mockDockerClient struct { + dockerClient + mock.Mock +} + +func (m *mockDockerClient) ListContainers(opts docker.ListContainersOptions) ([]docker.APIContainers, error) { + args := m.Called(opts) + return args.Get(0).([]docker.APIContainers), args.Error(1) +} + +func (m *mockDockerClient) InspectContainer(id string) (*docker.Container, error) { + args := m.Called(id) + var container *docker.Container + if v := args.Get(0); v != nil { + container = v.(*docker.Container) + } + return container, args.Error(1) +} + +func (m *mockDockerClient) StopContainer(ctx context.Context, id string, timeout uint) error { + args := m.Called(id, timeout) + return args.Error(0) +} + +type mockScheduler struct { + scheduler.Scheduler + mock.Mock +} + +func (m *mockScheduler) Stop(ctx context.Context, id string) error { + args := m.Called(id) + return args.Error(0) +} diff --git a/scheduler/runner.go b/scheduler/runner.go deleted file mode 100644 index aea29bb06..000000000 --- a/scheduler/runner.go +++ /dev/null @@ -1,33 +0,0 @@ -package scheduler - -import ( - "io" - - "github.com/remind101/empire/pkg/runner" - "golang.org/x/net/context" -) - -// AttachedRunner wraps a Manager to run attached processes using docker -// directly to get access to stdin and stdout. -type AttachedRunner struct { - Scheduler - Runner *runner.Runner -} - -func (m *AttachedRunner) Run(ctx context.Context, app *App, p *Process, in io.Reader, out io.Writer) error { - // If an output stream is provided, run using the docker runner. - if out != nil { - return m.Runner.Run(ctx, runner.RunOpts{ - Image: p.Image, - Command: p.Command, - Env: Env(app, p), - Memory: int64(p.MemoryLimit), - CPUShares: int64(p.CPUShares), - Labels: Labels(app, p), - Input: in, - Output: out, - }) - } - - return m.Scheduler.Run(ctx, app, p, in, out) -}