Skip to content

Commit

Permalink
Merge pull request #435 from mrjana/allocator
Browse files Browse the repository at this point in the history
Honor task desired state in allocator
  • Loading branch information
mrjana committed Apr 26, 2016
2 parents 2bfa0db + 5fcc3f4 commit 22b3519
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 19 deletions.
8 changes: 6 additions & 2 deletions manager/allocator/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ func TestAllocator(t *testing.T) {
Status: &api.TaskStatus{
State: api.TaskStateNew,
},
ServiceID: "testServiceID2",
ServiceID: "testServiceID2",
DesiredState: api.TaskStateRunning,
Spec: &api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.Container{
Expand Down Expand Up @@ -214,6 +215,7 @@ func TestAllocator(t *testing.T) {
Status: &api.TaskStatus{
State: api.TaskStateNew,
},
DesiredState: api.TaskStateRunning,
Spec: &api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.Container{
Expand Down Expand Up @@ -284,7 +286,8 @@ func TestAllocator(t *testing.T) {
Status: &api.TaskStatus{
State: api.TaskStateNew,
},
ServiceID: "testServiceID2",
DesiredState: api.TaskStateRunning,
ServiceID: "testServiceID2",
Spec: &api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.Container{},
Expand Down Expand Up @@ -320,6 +323,7 @@ func TestAllocator(t *testing.T) {
Status: &api.TaskStatus{
State: api.TaskStateNew,
},
DesiredState: api.TaskStateRunning,
Spec: &api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.Container{},
Expand Down
66 changes: 49 additions & 17 deletions manager/allocator/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {

if _, err := a.store.Batch(func(batch state.Batch) error {
for _, t := range tasks {
if taskDead(t) {
continue
}

// No container or network configured. Not interested.
if t.Spec.GetContainer() == nil {
continue
Expand All @@ -115,6 +119,10 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
continue
}

if nc.nwkAllocator.IsTaskAllocated(t) {
continue
}

err := batch.Update(func(tx state.Tx) error {
return a.allocateTask(ctx, nc, tx, t)
})
Expand Down Expand Up @@ -218,6 +226,17 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
}
}

// taskRunning checks whether a task is either actively running, or in the
// process of starting up.
func taskRunning(t *api.Task) bool {
return t.DesiredState == api.TaskStateRunning && t.Status != nil && t.Status.State <= api.TaskStateRunning
}

// taskDead checks whether a task is not actively running as far as allocator purposes are concerned.
func taskDead(t *api.Task) bool {
return t.DesiredState == api.TaskStateDead && t.Status != nil && t.Status.State > api.TaskStateRunning
}

func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev events.Event) {
var (
isDelete bool
Expand All @@ -237,6 +256,21 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even
return
}

// If the task has stopped running or it's being deleted then
// we should free the network resources associated with the
// task right away.
if taskDead(t) || isDelete {
if nc.nwkAllocator.IsTaskAllocated(t) {
if err := nc.nwkAllocator.DeallocateTask(t); err != nil {
log.G(ctx).Errorf("Failed freeing network resources for task %s: %v", t.ID, err)
}
}

// Cleanup any task references that might exist in unallocatedTasks
delete(nc.unallocatedTasks, t.ID)
return
}

// No container or network configured. Not interested.
if t.Spec.GetContainer() == nil {
return
Expand All @@ -251,8 +285,18 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even
}
return nil
}); err != nil {
log.G(ctx).Errorf("Failed to get service %s for task %s: %v", t.ServiceID, t.ID, err)
return
// If the task is running it is not normal to
// not be able to find the associated
// service. If the task is not running (task
// is either dead or the desired state is set
// to dead) then the service may not be
// available in store. But we still need to
// cleanup network resources associated with
// the task.
if taskRunning(t) && !isDelete {
log.G(ctx).Errorf("Event %T: Failed to get service %s for task %s state %s: %v", ev, t.ServiceID, t.ID, t.Status.State, err)
return
}
}
}

Expand Down Expand Up @@ -303,22 +347,8 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even

if !nc.nwkAllocator.IsTaskAllocated(t) ||
(s != nil && s.Spec.Endpoint != nil && !nc.nwkAllocator.IsServiceAllocated(s)) {
if isDelete {
delete(nc.unallocatedTasks, t.ID)
return
}

nc.unallocatedTasks[t.ID] = t
return
}

// If the task has stopped running or it's being deleted then
// we should free the network resources associated with the
// task.
if t.Status.State > api.TaskStateRunning || isDelete {
if err := nc.nwkAllocator.DeallocateTask(t); err != nil {
log.G(ctx).Errorf("Failed freeing network resources for task %s: %v", t.ID, err)
}
}
}

Expand Down Expand Up @@ -413,7 +443,9 @@ func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx sta
// Update the network allocations and moving to
// ALLOCATED state on top of the latest store state.
if a.taskAllocateVote(networkVoter, t.ID) {
storeT.Status.State = api.TaskStateAllocated
if storeT.Status.State < api.TaskStateAllocated {
storeT.Status.State = api.TaskStateAllocated
}
}

storeT.Networks = t.Networks
Expand Down

0 comments on commit 22b3519

Please sign in to comment.