Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show attached runs in emp ps. #911

Merged
merged 10 commits into from
Jul 11, 2016
29 changes: 11 additions & 18 deletions cmd/empire/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"github.com/remind101/empire/pkg/dockerauth"
"github.com/remind101/empire/pkg/dockerutil"
"github.com/remind101/empire/pkg/ecsutil"
"github.com/remind101/empire/pkg/runner"
"github.com/remind101/empire/scheduler"
"github.com/remind101/empire/scheduler/cloudformation"
"github.com/remind101/empire/scheduler/docker"
"github.com/remind101/empire/scheduler/ecs"
"github.com/remind101/pkg/logger"
"github.com/remind101/pkg/reporter"
Expand Down Expand Up @@ -104,12 +104,11 @@ func newEmpire(db *empire.DB, c *cli.Context) (*empire.Empire, error) {
// Scheduler ============================

func newScheduler(db *empire.DB, c *cli.Context) (scheduler.Scheduler, error) {
r, err := newDockerRunner(c)
if err != nil {
return nil, err
}
var (
s scheduler.Scheduler
err error
)

var s scheduler.Scheduler
switch c.String(FlagScheduler) {
case "ecs":
s, err = newECSScheduler(db, c)
Expand All @@ -125,10 +124,12 @@ func newScheduler(db *empire.DB, c *cli.Context) (scheduler.Scheduler, error) {
return nil, fmt.Errorf("failed to initialize %s scheduler: %v", c.String(FlagScheduler), err)
}

return &scheduler.AttachedRunner{
Scheduler: s,
Runner: r,
}, nil
d, err := newDockerClient(c)
if err != nil {
return nil, err
}

return docker.RunAttachedWithDocker(s, d), nil
}

func newMigrationScheduler(db *empire.DB, c *cli.Context) (*cloudformation.MigrationScheduler, error) {
Expand Down Expand Up @@ -254,14 +255,6 @@ func newConfigProvider(c *cli.Context) client.ConfigProvider {
return p
}

func newDockerRunner(c *cli.Context) (*runner.Runner, error) {
client, err := newDockerClient(c)
if err != nil {
return nil, err
}
return runner.NewRunner(client), nil
}

// DockerClient ========================

func newDockerClient(c *cli.Context) (*dockerutil.Client, error) {
Expand Down
169 changes: 167 additions & 2 deletions scheduler/docker/docker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,171 @@
// Package docker implements the Scheduler interface backed by the Docker API.
// This implementation is not recommended for production use, but can be used in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be updated?

// development for testing.
//
// TODO: Implement this.
package docker

import (
"errors"
"fmt"
"io"
"strings"

"github.com/fsouza/go-dockerclient"
"github.com/remind101/empire/pkg/dockerutil"
"github.com/remind101/empire/pkg/runner"
"github.com/remind101/empire/scheduler"
"golang.org/x/net/context"
)

type dockerClient interface {
InspectContainer(string) (*docker.Container, error)
ListContainers(docker.ListContainersOptions) ([]docker.APIContainers, error)
}

const (
// Label that determines whether the container is from an attached run
// or not. The value of this label will be the app id.
attachedRunLabel = "attached-run"

// Label that determines what app the run relates to.
appLabel = "empire.app.id"

// Label that determines what the name of the process is.
processLabel = "empire.app.process"
)

// attachedScheduler wraps a Scheduler to run attached processes using the Docker
// scheduler.
type attachedScheduler struct {
scheduler.Scheduler
dockerScheduler *Scheduler
}

// RunAttachedWithDocker wraps a Scheduler to run attached Run's using a Docker
// client.
func RunAttachedWithDocker(s scheduler.Scheduler, client *dockerutil.Client) scheduler.Scheduler {
return &attachedScheduler{
Scheduler: s,
dockerScheduler: NewScheduler(client),
}
}

// Run runs attached processes using the docker scheduler, and detached
// processes using the wrapped scheduler.
func (s *attachedScheduler) Run(ctx context.Context, app *scheduler.App, process *scheduler.Process, in io.Reader, out io.Writer) error {
// Attached means stdout, stdin is attached.
attached := out != nil || in != nil

if attached {
return s.dockerScheduler.Run(ctx, app, process, in, out)
} else {
return s.Scheduler.Run(ctx, app, process, in, out)
}
}

// Instances returns a combination of instances from the wrapped scheduler, as
// well as instances from attached runs.
func (s *attachedScheduler) Instances(ctx context.Context, app string) ([]*scheduler.Instance, error) {
instances, err := s.Scheduler.Instances(ctx, app)
if err != nil {
return instances, err
}

attachedInstances, err := s.dockerScheduler.InstancesFromAttachedRuns(ctx, app)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should get kicked off in a goroutine.

if err != nil {
return instances, err
}

return append(instances, attachedInstances...), nil
}

// Scheduler provides an implementation of the scheduler.Scheduler interface
// backed by Docker.
type Scheduler struct {
runner *runner.Runner
docker dockerClient
}

// NewScheduler returns a new Scheduler instance that uses the given client to
// interact with Docker.
func NewScheduler(client *dockerutil.Client) *Scheduler {
return &Scheduler{
runner: runner.NewRunner(client),
docker: client,
}
}

func (s *Scheduler) Run(ctx context.Context, app *scheduler.App, p *scheduler.Process, in io.Reader, out io.Writer) error {
attached := out != nil || in != nil

if !attached {
return errors.New("cannot run detached processes with Docker scheduler")
}

labels := scheduler.Labels(app, p)
labels[attachedRunLabel] = "true"
return s.runner.Run(ctx, runner.RunOpts{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might as well just pull pkg/runner in here, since it's only used inside scheduler/docker.

Image: p.Image,
Command: p.Command,
Env: scheduler.Env(app, p),
Memory: int64(p.MemoryLimit),
CPUShares: int64(p.CPUShares),
Labels: labels,
Input: in,
Output: out,
})
}

func (s *Scheduler) Instances(ctx context.Context, app string) ([]*scheduler.Instance, error) {
return s.InstancesFromAttachedRuns(ctx, app)
}

// InstancesFromAttachedRuns returns Instances that were started from attached
// runs.
func (s *Scheduler) InstancesFromAttachedRuns(ctx context.Context, app string) ([]*scheduler.Instance, error) {
var instances []*scheduler.Instance

containers, err := s.docker.ListContainers(docker.ListContainersOptions{
Filters: map[string][]string{
"label": []string{
fmt.Sprintf("%s=true", attachedRunLabel),
fmt.Sprintf("%s=%s", appLabel, app),
},
},
})
if err != nil {
return nil, fmt.Errorf("error listing containers from attached runs: %v", err)
}

for _, apiContainer := range containers {
container, err := s.docker.InspectContainer(apiContainer.ID)
if err != nil {
return instances, fmt.Errorf("error inspecting container %s: %v", apiContainer.ID, err)
}

state := strings.ToUpper(container.State.StateString())

instances = append(instances, &scheduler.Instance{
ID: container.ID[0:12],
State: state,
UpdatedAt: container.State.StartedAt,
Process: &scheduler.Process{
Type: container.Config.Labels[processLabel],
Command: container.Config.Cmd,
Env: parseEnv(container.Config.Env),
MemoryLimit: uint(container.HostConfig.Memory),
CPUShares: uint(container.HostConfig.CPUShares),
},
})
}

return instances, nil
}

func parseEnv(env []string) map[string]string {
m := make(map[string]string)
for _, e := range env {
parts := strings.SplitN(e, "=", 2)
m[parts[0]] = parts[1]
}
return m
}
102 changes: 102 additions & 0 deletions scheduler/docker/docker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package docker

import (
"testing"
"time"

"golang.org/x/net/context"

"github.com/fsouza/go-dockerclient"
"github.com/remind101/empire/pkg/bytesize"
"github.com/remind101/empire/scheduler"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

var ctx = context.Background()

func TestScheduler_InstancesFromAttachedRuns(t *testing.T) {
d := new(mockDockerClient)
s := Scheduler{
docker: d,
}

d.On("ListContainers", docker.ListContainersOptions{
Filters: map[string][]string{
"label": []string{
"attached-run=true",
"empire.app.id=2cdc4941-e36d-4855-a0ec-51525db4a500",
},
},
}).Return([]docker.APIContainers{
{ID: "65311c2cc20d671d43118b7d42b3f02df6b48a6bb65b1c5939007214e7587b24"},
}, nil)

d.On("InspectContainer", "65311c2cc20d671d43118b7d42b3f02df6b48a6bb65b1c5939007214e7587b24").Return(&docker.Container{
ID: "65311c2cc20d671d43118b7d42b3f02df6b48a6bb65b1c5939007214e7587b24",
State: docker.State{
Running: true,
StartedAt: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
},
Config: &docker.Config{
Labels: map[string]string{
"attached-run": "true",
"empire.app.id": "2cdc4941-e36d-4855-a0ec-51525db4a500",
"empire.app.process": "run",
},
Cmd: []string{"/bin/sh"},
Env: []string{"FOO=bar"},
},
HostConfig: &docker.HostConfig{
Memory: int64(124 * bytesize.MB),
CPUShares: 512,
},
}, nil)

instances, err := s.InstancesFromAttachedRuns(ctx, "2cdc4941-e36d-4855-a0ec-51525db4a500")
assert.NoError(t, err)
assert.Equal(t, 1, len(instances))
assert.Equal(t, &scheduler.Instance{
Process: &scheduler.Process{
Type: "run",
Command: []string{"/bin/sh"},
Env: map[string]string{
"FOO": "bar",
},
MemoryLimit: 124 * bytesize.MB,
CPUShares: 512,
},
ID: "65311c2cc20d",
State: "RUNNING",
UpdatedAt: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
}, instances[0])
}

func TestParseEnv(t *testing.T) {
tests := []struct {
in []string
out map[string]string
}{
{[]string{"FOO=bar"}, map[string]string{"FOO": "bar"}},
}

for _, tt := range tests {
out := parseEnv(tt.in)
assert.Equal(t, tt.out, out)
}
}

type mockDockerClient struct {
dockerClient
mock.Mock
}

func (m *mockDockerClient) ListContainers(opts docker.ListContainersOptions) ([]docker.APIContainers, error) {
args := m.Called(opts)
return args.Get(0).([]docker.APIContainers), args.Error(1)
}

func (m *mockDockerClient) InspectContainer(id string) (*docker.Container, error) {
args := m.Called(id)
return args.Get(0).(*docker.Container), args.Error(1)
}
33 changes: 0 additions & 33 deletions scheduler/runner.go

This file was deleted.