-
Notifications
You must be signed in to change notification settings - Fork 159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Show attached runs in emp ps. #911
Changes from 1 commit
82e128d
06c5594
472ffeb
c2037dc
8f45a93
54d4641
3e5cad2
a166c38
ed73c70
e805896
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,171 @@ | ||
// Package docker implements the Scheduler interface backed by the Docker API. | ||
// This implementation is not recommended for production use, but can be used in | ||
// development for testing. | ||
// | ||
// TODO: Implement this. | ||
package docker | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"io" | ||
"strings" | ||
|
||
"github.com/fsouza/go-dockerclient" | ||
"github.com/remind101/empire/pkg/dockerutil" | ||
"github.com/remind101/empire/pkg/runner" | ||
"github.com/remind101/empire/scheduler" | ||
"golang.org/x/net/context" | ||
) | ||
|
||
type dockerClient interface { | ||
InspectContainer(string) (*docker.Container, error) | ||
ListContainers(docker.ListContainersOptions) ([]docker.APIContainers, error) | ||
} | ||
|
||
const ( | ||
// Label that determines whether the container is from an attached run | ||
// or not. The value of this label will be the app id. | ||
attachedRunLabel = "attached-run" | ||
|
||
// Label that determines what app the run relates to. | ||
appLabel = "empire.app.id" | ||
|
||
// Label that determines what the name of the process is. | ||
processLabel = "empire.app.process" | ||
) | ||
|
||
// attachedScheduler wraps a Scheduler to run attached processes using the Docker | ||
// scheduler. | ||
type attachedScheduler struct { | ||
scheduler.Scheduler | ||
dockerScheduler *Scheduler | ||
} | ||
|
||
// RunAttachedWithDocker wraps a Scheduler to run attached Run's using a Docker | ||
// client. | ||
func RunAttachedWithDocker(s scheduler.Scheduler, client *dockerutil.Client) scheduler.Scheduler { | ||
return &attachedScheduler{ | ||
Scheduler: s, | ||
dockerScheduler: NewScheduler(client), | ||
} | ||
} | ||
|
||
// Run runs attached processes using the docker scheduler, and detached | ||
// processes using the wrapped scheduler. | ||
func (s *attachedScheduler) Run(ctx context.Context, app *scheduler.App, process *scheduler.Process, in io.Reader, out io.Writer) error { | ||
// Attached means stdout, stdin is attached. | ||
attached := out != nil || in != nil | ||
|
||
if attached { | ||
return s.dockerScheduler.Run(ctx, app, process, in, out) | ||
} else { | ||
return s.Scheduler.Run(ctx, app, process, in, out) | ||
} | ||
} | ||
|
||
// Instances returns a combination of instances from the wrapped scheduler, as | ||
// well as instances from attached runs. | ||
func (s *attachedScheduler) Instances(ctx context.Context, app string) ([]*scheduler.Instance, error) { | ||
instances, err := s.Scheduler.Instances(ctx, app) | ||
if err != nil { | ||
return instances, err | ||
} | ||
|
||
attachedInstances, err := s.dockerScheduler.InstancesFromAttachedRuns(ctx, app) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should get kicked off in a goroutine. |
||
if err != nil { | ||
return instances, err | ||
} | ||
|
||
return append(instances, attachedInstances...), nil | ||
} | ||
|
||
// Scheduler provides an implementation of the scheduler.Scheduler interface | ||
// backed by Docker. | ||
type Scheduler struct { | ||
runner *runner.Runner | ||
docker dockerClient | ||
} | ||
|
||
// NewScheduler returns a new Scheduler instance that uses the given client to | ||
// interact with Docker. | ||
func NewScheduler(client *dockerutil.Client) *Scheduler { | ||
return &Scheduler{ | ||
runner: runner.NewRunner(client), | ||
docker: client, | ||
} | ||
} | ||
|
||
func (s *Scheduler) Run(ctx context.Context, app *scheduler.App, p *scheduler.Process, in io.Reader, out io.Writer) error { | ||
attached := out != nil || in != nil | ||
|
||
if !attached { | ||
return errors.New("cannot run detached processes with Docker scheduler") | ||
} | ||
|
||
labels := scheduler.Labels(app, p) | ||
labels[attachedRunLabel] = "true" | ||
return s.runner.Run(ctx, runner.RunOpts{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might as well just pull |
||
Image: p.Image, | ||
Command: p.Command, | ||
Env: scheduler.Env(app, p), | ||
Memory: int64(p.MemoryLimit), | ||
CPUShares: int64(p.CPUShares), | ||
Labels: labels, | ||
Input: in, | ||
Output: out, | ||
}) | ||
} | ||
|
||
func (s *Scheduler) Instances(ctx context.Context, app string) ([]*scheduler.Instance, error) { | ||
return s.InstancesFromAttachedRuns(ctx, app) | ||
} | ||
|
||
// InstancesFromAttachedRuns returns Instances that were started from attached | ||
// runs. | ||
func (s *Scheduler) InstancesFromAttachedRuns(ctx context.Context, app string) ([]*scheduler.Instance, error) { | ||
var instances []*scheduler.Instance | ||
|
||
containers, err := s.docker.ListContainers(docker.ListContainersOptions{ | ||
Filters: map[string][]string{ | ||
"label": []string{ | ||
fmt.Sprintf("%s=true", attachedRunLabel), | ||
fmt.Sprintf("%s=%s", appLabel, app), | ||
}, | ||
}, | ||
}) | ||
if err != nil { | ||
return nil, fmt.Errorf("error listing containers from attached runs: %v", err) | ||
} | ||
|
||
for _, apiContainer := range containers { | ||
container, err := s.docker.InspectContainer(apiContainer.ID) | ||
if err != nil { | ||
return instances, fmt.Errorf("error inspecting container %s: %v", apiContainer.ID, err) | ||
} | ||
|
||
state := strings.ToUpper(container.State.StateString()) | ||
|
||
instances = append(instances, &scheduler.Instance{ | ||
ID: container.ID[0:12], | ||
State: state, | ||
UpdatedAt: container.State.StartedAt, | ||
Process: &scheduler.Process{ | ||
Type: container.Config.Labels[processLabel], | ||
Command: container.Config.Cmd, | ||
Env: parseEnv(container.Config.Env), | ||
MemoryLimit: uint(container.HostConfig.Memory), | ||
CPUShares: uint(container.HostConfig.CPUShares), | ||
}, | ||
}) | ||
} | ||
|
||
return instances, nil | ||
} | ||
|
||
func parseEnv(env []string) map[string]string { | ||
m := make(map[string]string) | ||
for _, e := range env { | ||
parts := strings.SplitN(e, "=", 2) | ||
m[parts[0]] = parts[1] | ||
} | ||
return m | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
package docker | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"golang.org/x/net/context" | ||
|
||
"github.com/fsouza/go-dockerclient" | ||
"github.com/remind101/empire/pkg/bytesize" | ||
"github.com/remind101/empire/scheduler" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/mock" | ||
) | ||
|
||
var ctx = context.Background() | ||
|
||
func TestScheduler_InstancesFromAttachedRuns(t *testing.T) { | ||
d := new(mockDockerClient) | ||
s := Scheduler{ | ||
docker: d, | ||
} | ||
|
||
d.On("ListContainers", docker.ListContainersOptions{ | ||
Filters: map[string][]string{ | ||
"label": []string{ | ||
"attached-run=true", | ||
"empire.app.id=2cdc4941-e36d-4855-a0ec-51525db4a500", | ||
}, | ||
}, | ||
}).Return([]docker.APIContainers{ | ||
{ID: "65311c2cc20d671d43118b7d42b3f02df6b48a6bb65b1c5939007214e7587b24"}, | ||
}, nil) | ||
|
||
d.On("InspectContainer", "65311c2cc20d671d43118b7d42b3f02df6b48a6bb65b1c5939007214e7587b24").Return(&docker.Container{ | ||
ID: "65311c2cc20d671d43118b7d42b3f02df6b48a6bb65b1c5939007214e7587b24", | ||
State: docker.State{ | ||
Running: true, | ||
StartedAt: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), | ||
}, | ||
Config: &docker.Config{ | ||
Labels: map[string]string{ | ||
"attached-run": "true", | ||
"empire.app.id": "2cdc4941-e36d-4855-a0ec-51525db4a500", | ||
"empire.app.process": "run", | ||
}, | ||
Cmd: []string{"/bin/sh"}, | ||
Env: []string{"FOO=bar"}, | ||
}, | ||
HostConfig: &docker.HostConfig{ | ||
Memory: int64(124 * bytesize.MB), | ||
CPUShares: 512, | ||
}, | ||
}, nil) | ||
|
||
instances, err := s.InstancesFromAttachedRuns(ctx, "2cdc4941-e36d-4855-a0ec-51525db4a500") | ||
assert.NoError(t, err) | ||
assert.Equal(t, 1, len(instances)) | ||
assert.Equal(t, &scheduler.Instance{ | ||
Process: &scheduler.Process{ | ||
Type: "run", | ||
Command: []string{"/bin/sh"}, | ||
Env: map[string]string{ | ||
"FOO": "bar", | ||
}, | ||
MemoryLimit: 124 * bytesize.MB, | ||
CPUShares: 512, | ||
}, | ||
ID: "65311c2cc20d", | ||
State: "RUNNING", | ||
UpdatedAt: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), | ||
}, instances[0]) | ||
} | ||
|
||
func TestParseEnv(t *testing.T) { | ||
tests := []struct { | ||
in []string | ||
out map[string]string | ||
}{ | ||
{[]string{"FOO=bar"}, map[string]string{"FOO": "bar"}}, | ||
} | ||
|
||
for _, tt := range tests { | ||
out := parseEnv(tt.in) | ||
assert.Equal(t, tt.out, out) | ||
} | ||
} | ||
|
||
type mockDockerClient struct { | ||
dockerClient | ||
mock.Mock | ||
} | ||
|
||
func (m *mockDockerClient) ListContainers(opts docker.ListContainersOptions) ([]docker.APIContainers, error) { | ||
args := m.Called(opts) | ||
return args.Get(0).([]docker.APIContainers), args.Error(1) | ||
} | ||
|
||
func (m *mockDockerClient) InspectContainer(id string) (*docker.Container, error) { | ||
args := m.Called(id) | ||
return args.Get(0).(*docker.Container), args.Error(1) | ||
} |
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be updated?