From 5fcc3f4b0cff8833cb383d904836f257e2e20091 Mon Sep 17 00:00:00 2001 From: Jana Radhakrishnan Date: Mon, 25 Apr 2016 12:06:32 -0700 Subject: [PATCH] Honor task desired state in allocator With the task lifecycle changes and the introduction of DesiredState in task object, allocator needs to the honor that and make allocation/deallocation decisions based on that to properly work in sync with the other stages of the manager pipeline. Made changes to make allocation/deallocation decisions based on a combination of task DesiredState and the current state. Signed-off-by: Jana Radhakrishnan --- manager/allocator/allocator_test.go | 8 +++- manager/allocator/network.go | 66 +++++++++++++++++++++-------- 2 files changed, 55 insertions(+), 19 deletions(-) diff --git a/manager/allocator/allocator_test.go b/manager/allocator/allocator_test.go index ff98155552..9b0479df69 100644 --- a/manager/allocator/allocator_test.go +++ b/manager/allocator/allocator_test.go @@ -178,7 +178,8 @@ func TestAllocator(t *testing.T) { Status: &api.TaskStatus{ State: api.TaskStateNew, }, - ServiceID: "testServiceID2", + ServiceID: "testServiceID2", + DesiredState: api.TaskStateRunning, Spec: &api.TaskSpec{ Runtime: &api.TaskSpec_Container{ Container: &api.Container{ @@ -214,6 +215,7 @@ func TestAllocator(t *testing.T) { Status: &api.TaskStatus{ State: api.TaskStateNew, }, + DesiredState: api.TaskStateRunning, Spec: &api.TaskSpec{ Runtime: &api.TaskSpec_Container{ Container: &api.Container{ @@ -284,7 +286,8 @@ func TestAllocator(t *testing.T) { Status: &api.TaskStatus{ State: api.TaskStateNew, }, - ServiceID: "testServiceID2", + DesiredState: api.TaskStateRunning, + ServiceID: "testServiceID2", Spec: &api.TaskSpec{ Runtime: &api.TaskSpec_Container{ Container: &api.Container{}, @@ -320,6 +323,7 @@ func TestAllocator(t *testing.T) { Status: &api.TaskStatus{ State: api.TaskStateNew, }, + DesiredState: api.TaskStateRunning, Spec: &api.TaskSpec{ Runtime: &api.TaskSpec_Container{ Container: &api.Container{}, diff --git a/manager/allocator/network.go b/manager/allocator/network.go index cf2e77ea48..09d126f465 100644 --- a/manager/allocator/network.go +++ b/manager/allocator/network.go @@ -106,6 +106,10 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error { if _, err := a.store.Batch(func(batch state.Batch) error { for _, t := range tasks { + if taskDead(t) { + continue + } + // No container or network configured. Not interested. if t.Spec.GetContainer() == nil { continue @@ -115,6 +119,10 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error { continue } + if nc.nwkAllocator.IsTaskAllocated(t) { + continue + } + err := batch.Update(func(tx state.Tx) error { return a.allocateTask(ctx, nc, tx, t) }) @@ -218,6 +226,17 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { } } +// taskRunning checks whether a task is either actively running, or in the +// process of starting up. +func taskRunning(t *api.Task) bool { + return t.DesiredState == api.TaskStateRunning && t.Status != nil && t.Status.State <= api.TaskStateRunning +} + +// taskDead checks whether a task is not actively running as far as allocator purposes are concerned. +func taskDead(t *api.Task) bool { + return t.DesiredState == api.TaskStateDead && t.Status != nil && t.Status.State > api.TaskStateRunning +} + func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev events.Event) { var ( isDelete bool @@ -237,6 +256,21 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even return } + // If the task has stopped running or it's being deleted then + // we should free the network resources associated with the + // task right away. + if taskDead(t) || isDelete { + if nc.nwkAllocator.IsTaskAllocated(t) { + if err := nc.nwkAllocator.DeallocateTask(t); err != nil { + log.G(ctx).Errorf("Failed freeing network resources for task %s: %v", t.ID, err) + } + } + + // Cleanup any task references that might exist in unallocatedTasks + delete(nc.unallocatedTasks, t.ID) + return + } + // No container or network configured. Not interested. if t.Spec.GetContainer() == nil { return @@ -251,8 +285,18 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even } return nil }); err != nil { - log.G(ctx).Errorf("Failed to get service %s for task %s: %v", t.ServiceID, t.ID, err) - return + // If the task is running it is not normal to + // not be able to find the associated + // service. If the task is not running (task + // is either dead or the desired state is set + // to dead) then the service may not be + // available in store. But we still need to + // cleanup network resources associated with + // the task. + if taskRunning(t) && !isDelete { + log.G(ctx).Errorf("Event %T: Failed to get service %s for task %s state %s: %v", ev, t.ServiceID, t.ID, t.Status.State, err) + return + } } } @@ -303,22 +347,8 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even if !nc.nwkAllocator.IsTaskAllocated(t) || (s != nil && s.Spec.Endpoint != nil && !nc.nwkAllocator.IsServiceAllocated(s)) { - if isDelete { - delete(nc.unallocatedTasks, t.ID) - return - } nc.unallocatedTasks[t.ID] = t - return - } - - // If the task has stopped running or it's being deleted then - // we should free the network resources associated with the - // task. - if t.Status.State > api.TaskStateRunning || isDelete { - if err := nc.nwkAllocator.DeallocateTask(t); err != nil { - log.G(ctx).Errorf("Failed freeing network resources for task %s: %v", t.ID, err) - } } } @@ -413,7 +443,9 @@ func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx sta // Update the network allocations and moving to // ALLOCATED state on top of the latest store state. if a.taskAllocateVote(networkVoter, t.ID) { - storeT.Status.State = api.TaskStateAllocated + if storeT.Status.State < api.TaskStateAllocated { + storeT.Status.State = api.TaskStateAllocated + } } storeT.Networks = t.Networks