Skip to content

Commit

Permalink
Honor task desired state in allocator
Browse files Browse the repository at this point in the history
With the task lifecycle changes and the introduction of DesiredState in
task object, allocator needs to the honor that and make
allocation/deallocation decisions based on that to properly work in sync
with the other stages of the manager pipeline. Made changes to make
allocation/deallocation decisions based on a combination of task
DesiredState and the current state.

Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
  • Loading branch information
mrjana committed Apr 26, 2016
1 parent 2b28c7a commit 5fcc3f4
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 19 deletions.
8 changes: 6 additions & 2 deletions manager/allocator/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ func TestAllocator(t *testing.T) {
Status: &api.TaskStatus{
State: api.TaskStateNew,
},
ServiceID: "testServiceID2",
ServiceID: "testServiceID2",
DesiredState: api.TaskStateRunning,
Spec: &api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.Container{
Expand Down Expand Up @@ -214,6 +215,7 @@ func TestAllocator(t *testing.T) {
Status: &api.TaskStatus{
State: api.TaskStateNew,
},
DesiredState: api.TaskStateRunning,
Spec: &api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.Container{
Expand Down Expand Up @@ -284,7 +286,8 @@ func TestAllocator(t *testing.T) {
Status: &api.TaskStatus{
State: api.TaskStateNew,
},
ServiceID: "testServiceID2",
DesiredState: api.TaskStateRunning,
ServiceID: "testServiceID2",
Spec: &api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.Container{},
Expand Down Expand Up @@ -320,6 +323,7 @@ func TestAllocator(t *testing.T) {
Status: &api.TaskStatus{
State: api.TaskStateNew,
},
DesiredState: api.TaskStateRunning,
Spec: &api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.Container{},
Expand Down
66 changes: 49 additions & 17 deletions manager/allocator/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {

if _, err := a.store.Batch(func(batch state.Batch) error {
for _, t := range tasks {
if taskDead(t) {
continue
}

// No container or network configured. Not interested.
if t.Spec.GetContainer() == nil {
continue
Expand All @@ -115,6 +119,10 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
continue
}

if nc.nwkAllocator.IsTaskAllocated(t) {
continue
}

err := batch.Update(func(tx state.Tx) error {
return a.allocateTask(ctx, nc, tx, t)
})
Expand Down Expand Up @@ -218,6 +226,17 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
}
}

// taskRunning checks whether a task is either actively running, or in the
// process of starting up.
func taskRunning(t *api.Task) bool {
return t.DesiredState == api.TaskStateRunning && t.Status != nil && t.Status.State <= api.TaskStateRunning
}

// taskDead checks whether a task is not actively running as far as allocator purposes are concerned.
func taskDead(t *api.Task) bool {
return t.DesiredState == api.TaskStateDead && t.Status != nil && t.Status.State > api.TaskStateRunning
}

func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev events.Event) {
var (
isDelete bool
Expand All @@ -237,6 +256,21 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even
return
}

// If the task has stopped running or it's being deleted then
// we should free the network resources associated with the
// task right away.
if taskDead(t) || isDelete {
if nc.nwkAllocator.IsTaskAllocated(t) {
if err := nc.nwkAllocator.DeallocateTask(t); err != nil {
log.G(ctx).Errorf("Failed freeing network resources for task %s: %v", t.ID, err)
}
}

// Cleanup any task references that might exist in unallocatedTasks
delete(nc.unallocatedTasks, t.ID)
return
}

// No container or network configured. Not interested.
if t.Spec.GetContainer() == nil {
return
Expand All @@ -251,8 +285,18 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even
}
return nil
}); err != nil {
log.G(ctx).Errorf("Failed to get service %s for task %s: %v", t.ServiceID, t.ID, err)
return
// If the task is running it is not normal to
// not be able to find the associated
// service. If the task is not running (task
// is either dead or the desired state is set
// to dead) then the service may not be
// available in store. But we still need to
// cleanup network resources associated with
// the task.
if taskRunning(t) && !isDelete {
log.G(ctx).Errorf("Event %T: Failed to get service %s for task %s state %s: %v", ev, t.ServiceID, t.ID, t.Status.State, err)
return
}
}
}

Expand Down Expand Up @@ -303,22 +347,8 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even

if !nc.nwkAllocator.IsTaskAllocated(t) ||
(s != nil && s.Spec.Endpoint != nil && !nc.nwkAllocator.IsServiceAllocated(s)) {
if isDelete {
delete(nc.unallocatedTasks, t.ID)
return
}

nc.unallocatedTasks[t.ID] = t
return
}

// If the task has stopped running or it's being deleted then
// we should free the network resources associated with the
// task.
if t.Status.State > api.TaskStateRunning || isDelete {
if err := nc.nwkAllocator.DeallocateTask(t); err != nil {
log.G(ctx).Errorf("Failed freeing network resources for task %s: %v", t.ID, err)
}
}
}

Expand Down Expand Up @@ -413,7 +443,9 @@ func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx sta
// Update the network allocations and moving to
// ALLOCATED state on top of the latest store state.
if a.taskAllocateVote(networkVoter, t.ID) {
storeT.Status.State = api.TaskStateAllocated
if storeT.Status.State < api.TaskStateAllocated {
storeT.Status.State = api.TaskStateAllocated
}
}

storeT.Networks = t.Networks
Expand Down

0 comments on commit 5fcc3f4

Please sign in to comment.