Skip to content

Commit

Permalink
Merge pull request #1651 from aaronlehmann/allocator-crash
Browse files Browse the repository at this point in the history
allocator: Fix panic when allocations happen at init time
  • Loading branch information
aluzzardi committed Oct 18, 2016
2 parents b79d6cd + a8066c1 commit f8ec492
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 29 deletions.
4 changes: 3 additions & 1 deletion manager/allocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func (a *Allocator) Run(ctx context.Context) error {
aaCopy := aa
actor := func() error {
wg.Add(1)
defer wg.Done()

// init might return an allocator specific context
// which is a child of the passed in context to hold
// allocator specific state
Expand All @@ -133,10 +135,10 @@ func (a *Allocator) Run(ctx context.Context) error {
// if we are failing in the init of
// this allocator.
aa.cancel()
wg.Done()
return err
}

wg.Add(1)
go func() {
defer wg.Done()
a.run(ctx, aaCopy)
Expand Down
77 changes: 49 additions & 28 deletions manager/allocator/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type networkContext struct {
unallocatedNetworks map[string]*api.Network
}

func (a *Allocator) doNetworkInit(ctx context.Context) error {
func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
na, err := networkallocator.New()
if err != nil {
return err
Expand All @@ -81,6 +81,13 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
unallocatedNetworks: make(map[string]*api.Network),
ingressNetwork: newIngressNetwork(),
}
a.netCtx = nc
defer func() {
// Clear a.netCtx if initialization was unsuccessful.
if err != nil {
a.netCtx = nil
}
}()

// Check if we have the ingress network. If not found create
// it before reading all network objects for allocation.
Expand Down Expand Up @@ -125,7 +132,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
// that the we can get the preferred subnet for ingress
// network.
if !na.IsAllocated(nc.ingressNetwork) {
if err := a.allocateNetwork(ctx, nc, nc.ingressNetwork); err != nil {
if err := a.allocateNetwork(ctx, nc.ingressNetwork); err != nil {
log.G(ctx).WithError(err).Error("failed allocating ingress network during init")
}

Expand Down Expand Up @@ -155,7 +162,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
continue
}

if err := a.allocateNetwork(ctx, nc, n); err != nil {
if err := a.allocateNetwork(ctx, n); err != nil {
log.G(ctx).WithError(err).Errorf("failed allocating network %s during init", n.ID)
}
}
Expand All @@ -179,7 +186,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
}

node.Attachment.Network = nc.ingressNetwork.Copy()
if err := a.allocateNode(ctx, nc, node); err != nil {
if err := a.allocateNode(ctx, node); err != nil {
log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s during init", node.ID)
}
}
Expand All @@ -198,7 +205,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
continue
}

if err := a.allocateService(ctx, nc, s); err != nil {
if err := a.allocateService(ctx, s); err != nil {
log.G(ctx).WithError(err).Errorf("failed allocating service %s during init", s.ID)
}
}
Expand Down Expand Up @@ -260,7 +267,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
}

err := batch.Update(func(tx store.Tx) error {
_, err := a.allocateTask(ctx, nc, tx, t)
_, err := a.allocateTask(ctx, tx, t)
return err
})
if err != nil {
Expand All @@ -274,7 +281,6 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
return err
}

a.netCtx = nc
return nil
}

Expand All @@ -288,7 +294,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
break
}

if err := a.allocateNetwork(ctx, nc, n); err != nil {
if err := a.allocateNetwork(ctx, n); err != nil {
log.G(ctx).WithError(err).Errorf("Failed allocation for network %s", n.ID)
break
}
Expand All @@ -309,7 +315,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
break
}

if err := a.allocateService(ctx, nc, s); err != nil {
if err := a.allocateService(ctx, s); err != nil {
log.G(ctx).WithError(err).Errorf("Failed allocation for service %s", s.ID)
break
}
Expand All @@ -320,7 +326,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
break
}

if err := a.allocateService(ctx, nc, s); err != nil {
if err := a.allocateService(ctx, s); err != nil {
log.G(ctx).WithError(err).Errorf("Failed allocation during update of service %s", s.ID)
break
}
Expand All @@ -335,18 +341,18 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
// it's still there.
delete(nc.unallocatedServices, s.ID)
case state.EventCreateNode, state.EventUpdateNode, state.EventDeleteNode:
a.doNodeAlloc(ctx, nc, ev)
a.doNodeAlloc(ctx, ev)
case state.EventCreateTask, state.EventUpdateTask, state.EventDeleteTask:
a.doTaskAlloc(ctx, nc, ev)
a.doTaskAlloc(ctx, ev)
case state.EventCommit:
a.procUnallocatedNetworks(ctx, nc)
a.procUnallocatedServices(ctx, nc)
a.procUnallocatedTasksNetwork(ctx, nc)
a.procUnallocatedNetworks(ctx)
a.procUnallocatedServices(ctx)
a.procUnallocatedTasksNetwork(ctx)
return
}
}

func (a *Allocator) doNodeAlloc(ctx context.Context, nc *networkContext, ev events.Event) {
func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) {
var (
isDelete bool
node *api.Node
Expand All @@ -362,6 +368,8 @@ func (a *Allocator) doNodeAlloc(ctx context.Context, nc *networkContext, ev even
node = v.Node.Copy()
}

nc := a.netCtx

if isDelete {
if nc.nwkAllocator.IsNodeAllocated(node) {
if err := nc.nwkAllocator.DeallocateNode(node); err != nil {
Expand All @@ -377,7 +385,7 @@ func (a *Allocator) doNodeAlloc(ctx context.Context, nc *networkContext, ev even
}

node.Attachment.Network = nc.ingressNetwork.Copy()
if err := a.allocateNode(ctx, nc, node); err != nil {
if err := a.allocateNode(ctx, node); err != nil {
log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s", node.ID)
}
}
Expand Down Expand Up @@ -460,7 +468,7 @@ func (a *Allocator) taskCreateNetworkAttachments(t *api.Task, s *api.Service) {
taskUpdateNetworks(t, networks)
}

func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev events.Event) {
func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
var (
isDelete bool
t *api.Task
Expand All @@ -476,6 +484,8 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even
t = v.Task.Copy()
}

nc := a.netCtx

// If the task has stopped running or it's being deleted then
// we should free the network resources associated with the
// task right away.
Expand Down Expand Up @@ -526,7 +536,9 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even
nc.unallocatedTasks[t.ID] = t
}

func (a *Allocator) allocateNode(ctx context.Context, nc *networkContext, node *api.Node) error {
func (a *Allocator) allocateNode(ctx context.Context, node *api.Node) error {
nc := a.netCtx

if err := nc.nwkAllocator.AllocateNode(node); err != nil {
return err
}
Expand Down Expand Up @@ -559,7 +571,9 @@ func (a *Allocator) allocateNode(ctx context.Context, nc *networkContext, node *
return nil
}

func (a *Allocator) allocateService(ctx context.Context, nc *networkContext, s *api.Service) error {
func (a *Allocator) allocateService(ctx context.Context, s *api.Service) error {
nc := a.netCtx

if s.Spec.Endpoint != nil {
// service has user-defined endpoint
if s.Endpoint == nil {
Expand Down Expand Up @@ -644,7 +658,9 @@ func (a *Allocator) allocateService(ctx context.Context, nc *networkContext, s *
return nil
}

func (a *Allocator) allocateNetwork(ctx context.Context, nc *networkContext, n *api.Network) error {
func (a *Allocator) allocateNetwork(ctx context.Context, n *api.Network) error {
nc := a.netCtx

if err := nc.nwkAllocator.Allocate(n); err != nil {
nc.unallocatedNetworks[n.ID] = n
return errors.Wrapf(err, "failed during network allocation for network %s", n.ID)
Expand All @@ -666,7 +682,7 @@ func (a *Allocator) allocateNetwork(ctx context.Context, nc *networkContext, n *
return nil
}

func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx store.Tx, t *api.Task) (*api.Task, error) {
func (a *Allocator) allocateTask(ctx context.Context, tx store.Tx, t *api.Task) (*api.Task, error) {
taskUpdated := false

// Get the latest task state from the store before updating.
Expand All @@ -675,6 +691,8 @@ func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx sto
return nil, fmt.Errorf("could not find task %s while trying to update network allocation", t.ID)
}

nc := a.netCtx

// We might be here even if a task allocation has already
// happened but wasn't successfully committed to store. In such
// cases skip allocation and go straight ahead to updating the
Expand Down Expand Up @@ -734,10 +752,11 @@ func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx sto
return storeT, nil
}

func (a *Allocator) procUnallocatedNetworks(ctx context.Context, nc *networkContext) {
func (a *Allocator) procUnallocatedNetworks(ctx context.Context) {
nc := a.netCtx
for _, n := range nc.unallocatedNetworks {
if !nc.nwkAllocator.IsAllocated(n) {
if err := a.allocateNetwork(ctx, nc, n); err != nil {
if err := a.allocateNetwork(ctx, n); err != nil {
log.G(ctx).Debugf("Failed allocation of unallocated network %s: %v", n.ID, err)
continue
}
Expand All @@ -747,10 +766,11 @@ func (a *Allocator) procUnallocatedNetworks(ctx context.Context, nc *networkCont
}
}

func (a *Allocator) procUnallocatedServices(ctx context.Context, nc *networkContext) {
func (a *Allocator) procUnallocatedServices(ctx context.Context) {
nc := a.netCtx
for _, s := range nc.unallocatedServices {
if !nc.nwkAllocator.IsServiceAllocated(s) {
if err := a.allocateService(ctx, nc, s); err != nil {
if err := a.allocateService(ctx, s); err != nil {
log.G(ctx).Debugf("Failed allocation of unallocated service %s: %v", s.ID, err)
continue
}
Expand All @@ -760,15 +780,16 @@ func (a *Allocator) procUnallocatedServices(ctx context.Context, nc *networkCont
}
}

func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context, nc *networkContext) {
func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) {
nc := a.netCtx
tasks := make([]*api.Task, 0, len(nc.unallocatedTasks))

committed, err := a.store.Batch(func(batch *store.Batch) error {
for _, t := range nc.unallocatedTasks {
var allocatedT *api.Task
err := batch.Update(func(tx store.Tx) error {
var err error
allocatedT, err = a.allocateTask(ctx, nc, tx, t)
allocatedT, err = a.allocateTask(ctx, tx, t)
return err
})

Expand Down

0 comments on commit f8ec492

Please sign in to comment.