Skip to content

Commit

Permalink
global orchestrator: reconcile more than one service at a time
Browse files Browse the repository at this point in the history
This can avoid doing extra raft writes, and reading extra data from the
store.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
  • Loading branch information
aaronlehmann committed Sep 27, 2016
1 parent 5ba2a5f commit bc7b79e
Showing 1 changed file with 143 additions and 131 deletions.
274 changes: 143 additions & 131 deletions manager/orchestrator/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,15 @@ func (g *GlobalOrchestrator) Run(ctx context.Context) error {
if err != nil {
return err
}

var reconcileServiceIDs []string
for _, s := range existingServices {
if isGlobalService(s) {
g.updateService(s)
g.reconcileOneService(ctx, s.ID)
reconcileServiceIDs = append(reconcileServiceIDs, s.ID)
}
}
g.reconcileServices(ctx, reconcileServiceIDs)

for {
select {
Expand All @@ -114,13 +117,13 @@ func (g *GlobalOrchestrator) Run(ctx context.Context) error {
continue
}
g.updateService(v.Service)
g.reconcileOneService(ctx, v.Service.ID)
g.reconcileServices(ctx, []string{v.Service.ID})
case state.EventUpdateService:
if !isGlobalService(v.Service) {
continue
}
g.updateService(v.Service)
g.reconcileOneService(ctx, v.Service.ID)
g.reconcileServices(ctx, []string{v.Service.ID})
case state.EventDeleteService:
if !isGlobalService(v.Service) {
continue
Expand Down Expand Up @@ -160,7 +163,7 @@ func (g *GlobalOrchestrator) Run(ctx context.Context) error {
if _, exists := g.globalServices[v.Task.ServiceID]; !exists {
continue
}
g.reconcileServiceOneNode(ctx, v.Task.ServiceID, v.Task.NodeID)
g.reconcileServicesOneNode(ctx, []string{v.Task.ServiceID}, v.Task.NodeID)
}
case <-g.stopChan:
return nil
Expand Down Expand Up @@ -203,81 +206,86 @@ func (g *GlobalOrchestrator) removeTasksFromNode(ctx context.Context, node *api.
}
}

func (g *GlobalOrchestrator) reconcileOneService(ctx context.Context, serviceID string) {
service, exists := g.globalServices[serviceID]
if !exists {
return
}
func (g *GlobalOrchestrator) reconcileServices(ctx context.Context, serviceIDs []string) {
nodeCompleted := make(map[string]map[string]struct{})
nodeTasks := make(map[string]map[string][]*api.Task)

var (
tasks []*api.Task
err error
)
g.store.View(func(tx store.ReadTx) {
tasks, err = store.FindTasks(tx, store.ByServiceID(service.ID))
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileOneService failed finding tasks")
return
}
// a node may have completed this service
nodeCompleted := make(map[string]struct{})
// nodeID -> task list
nodeTasks := make(map[string][]*api.Task)
for _, serviceID := range serviceIDs {
tasks, err := store.FindTasks(tx, store.ByServiceID(serviceID))
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileServices failed finding tasks for service %s", serviceID)
continue
}

for _, t := range tasks {
if isTaskRunning(t) {
// Collect all running instances of this service
nodeTasks[t.NodeID] = append(nodeTasks[t.NodeID], t)
} else {
// for finished tasks, check restartPolicy
if isTaskCompleted(t, restartCondition(t)) {
nodeCompleted[t.NodeID] = struct{}{}
// a node may have completed this service
nodeCompleted[serviceID] = make(map[string]struct{})
// nodeID -> task list
nodeTasks[serviceID] = make(map[string][]*api.Task)

for _, t := range tasks {
if isTaskRunning(t) {
// Collect all running instances of this service
nodeTasks[serviceID][t.NodeID] = append(nodeTasks[serviceID][t.NodeID], t)
} else {
// for finished tasks, check restartPolicy
if isTaskCompleted(t, restartCondition(t)) {
nodeCompleted[serviceID][t.NodeID] = struct{}{}
}
}
}
}
}
})

_, err = g.store.Batch(func(batch *store.Batch) error {
_, err := g.store.Batch(func(batch *store.Batch) error {
var updateTasks []slot
for nodeID, node := range g.nodes {
meetsConstraints := constraint.NodeMatches(service.constraints, node)
ntasks := nodeTasks[nodeID]
delete(nodeTasks, nodeID)

// if restart policy considers this node has finished its task
// it should remove all running tasks
if _, exists := nodeCompleted[nodeID]; exists || !meetsConstraints {
g.removeTasks(ctx, batch, ntasks)
for _, serviceID := range serviceIDs {
if _, exists := nodeTasks[serviceID]; !exists {
continue
}

if node.Spec.Availability == api.NodeAvailabilityPause {
// the node is paused, so we won't add or update
// any tasks
continue
}
service := g.globalServices[serviceID]

// this node needs to run 1 copy of the task
if len(ntasks) == 0 {
g.addTask(ctx, batch, service.Service, nodeID)
} else {
updateTasks = append(updateTasks, ntasks)
for nodeID, node := range g.nodes {
meetsConstraints := constraint.NodeMatches(service.constraints, node)
ntasks := nodeTasks[serviceID][nodeID]
delete(nodeTasks[serviceID], nodeID)

// if restart policy considers this node has finished its task
// it should remove all running tasks
if _, exists := nodeCompleted[serviceID][nodeID]; exists || !meetsConstraints {
g.removeTasks(ctx, batch, ntasks)
continue
}

if node.Spec.Availability == api.NodeAvailabilityPause {
// the node is paused, so we won't add or update
// any tasks
continue
}

// this node needs to run 1 copy of the task
if len(ntasks) == 0 {
g.addTask(ctx, batch, service.Service, nodeID)
} else {
updateTasks = append(updateTasks, ntasks)
}
}
if len(updateTasks) > 0 {
g.updater.Update(ctx, g.cluster, service.Service, updateTasks)
}
}
if len(updateTasks) > 0 {
g.updater.Update(ctx, g.cluster, service.Service, updateTasks)
}

// Remove any tasks assigned to nodes not found in g.nodes.
// These must be associated with nodes that are drained, or
// nodes that no longer exist.
for _, ntasks := range nodeTasks {
g.removeTasks(ctx, batch, ntasks)
// Remove any tasks assigned to nodes not found in g.nodes.
// These must be associated with nodes that are drained, or
// nodes that no longer exist.
for _, ntasks := range nodeTasks[serviceID] {
g.removeTasks(ctx, batch, ntasks)
}
}
return nil
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileOneService transaction failed")
log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileServices transaction failed")
}
}

Expand Down Expand Up @@ -312,107 +320,111 @@ func (g *GlobalOrchestrator) reconcileOneNode(ctx context.Context, node *api.Nod
return
}

// typically there are only a few global services on a node
// iterate through all of them one by one. If raft store visits become a concern,
// it can be optimized.
for _, service := range g.globalServices {
g.reconcileServiceOneNode(ctx, service.ID, node.ID)
var serviceIDs []string
for id := range g.globalServices {
serviceIDs = append(serviceIDs, id)
}
g.reconcileServicesOneNode(ctx, serviceIDs, node.ID)
}

// reconcileServiceOneNode checks one service on one node
func (g *GlobalOrchestrator) reconcileServiceOneNode(ctx context.Context, serviceID string, nodeID string) {
// reconcileServicesOneNode checks the specified services on one node
func (g *GlobalOrchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs []string, nodeID string) {
node, exists := g.nodes[nodeID]
if !exists {
return
}
service, exists := g.globalServices[serviceID]
if !exists {
return
}
// the node has completed this servie
completed := false
// tasks for this node and service

// whether each service has completed on the node
completed := make(map[string]bool)
// tasks by service
tasks := make(map[string][]*api.Task)

var (
tasks []*api.Task
err error
tasksOnNode []*api.Task
err error
)

g.store.View(func(tx store.ReadTx) {
var tasksOnNode []*api.Task
tasksOnNode, err = store.FindTasks(tx, store.ByNodeID(nodeID))
if err != nil {
return
}
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: reconcile failed finding tasks on node %s", nodeID)
return
}

for _, serviceID := range serviceIDs {
for _, t := range tasksOnNode {
// only interested in one service
if t.ServiceID != serviceID {
continue
}
if isTaskRunning(t) {
tasks = append(tasks, t)
tasks[serviceID] = append(tasks[serviceID], t)
} else {
if isTaskCompleted(t, restartCondition(t)) {
completed = true
completed[serviceID] = true
}
}
}
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: reconcile failed finding tasks")
return
}

_, err = g.store.Batch(func(batch *store.Batch) error {
meetsConstraints := constraint.NodeMatches(service.constraints, node)

// if restart policy considers this node has finished its task
// it should remove all running tasks
if completed || !meetsConstraints {
g.removeTasks(ctx, batch, tasks)
return nil
}
for _, serviceID := range serviceIDs {
service, exists := g.globalServices[serviceID]
if !exists {
continue
}

if node.Spec.Availability == api.NodeAvailabilityPause {
// the node is paused, so we won't add or update tasks
return nil
}
meetsConstraints := constraint.NodeMatches(service.constraints, node)

if len(tasks) == 0 {
g.addTask(ctx, batch, service.Service, nodeID)
} else {
// If task is out of date, update it. This can happen
// on node reconciliation if, for example, we drain a
// node, update the service, and then activate the node
// later.

// We don't use g.updater here for two reasons:
// - This is not a rolling update. Since it was not
// triggered directly by updating the service, it
// should not observe the rolling update parameters
// or show status in UpdateStatus.
// - Calling Update cancels any current rolling updates
// for the service, such as one triggered by service
// reconciliation.

var (
dirtyTasks []*api.Task
cleanTasks []*api.Task
)
// if restart policy considers this node has finished its task
// it should remove all running tasks
if completed[serviceID] || !meetsConstraints {
g.removeTasks(ctx, batch, tasks[serviceID])
continue
}

for _, t := range tasks {
if isTaskDirty(service.Service, t) {
dirtyTasks = append(dirtyTasks, t)
} else {
cleanTasks = append(cleanTasks, t)
}
if node.Spec.Availability == api.NodeAvailabilityPause {
// the node is paused, so we won't add or update tasks
continue
}

if len(cleanTasks) == 0 {
if len(tasks) == 0 {
g.addTask(ctx, batch, service.Service, nodeID)
} else {
dirtyTasks = append(dirtyTasks, cleanTasks[1:]...)
// If task is out of date, update it. This can happen
// on node reconciliation if, for example, we pause a
// node, update the service, and then activate the node
// later.

// We don't use g.updater here for two reasons:
// - This is not a rolling update. Since it was not
// triggered directly by updating the service, it
// should not observe the rolling update parameters
// or show status in UpdateStatus.
// - Calling Update cancels any current rolling updates
// for the service, such as one triggered by service
// reconciliation.

var (
dirtyTasks []*api.Task
cleanTasks []*api.Task
)

for _, t := range tasks[serviceID] {
if isTaskDirty(service.Service, t) {
dirtyTasks = append(dirtyTasks, t)
} else {
cleanTasks = append(cleanTasks, t)
}
}

if len(cleanTasks) == 0 {
g.addTask(ctx, batch, service.Service, nodeID)
} else {
dirtyTasks = append(dirtyTasks, cleanTasks[1:]...)
}
g.removeTasks(ctx, batch, dirtyTasks)
}
g.removeTasks(ctx, batch, dirtyTasks)
}
return nil
})
Expand Down

0 comments on commit bc7b79e

Please sign in to comment.