Skip to content

Commit

Permalink
Speed up creation/deletion of game servers in a set by:
Browse files Browse the repository at this point in the history
- parallelizing create/delete calls in GSS controller
- creating dedicated worker queues for creation/deletion requests
- replacing default rate limiters with ones that don't have global QPS limit
  • Loading branch information
jkowalski committed Feb 5, 2019
1 parent bc94c9a commit caf5cda
Show file tree
Hide file tree
Showing 7 changed files with 805 additions and 182 deletions.
49 changes: 45 additions & 4 deletions pkg/gameservers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"fmt"
"sync"
"time"

"agones.dev/agones/pkg/apis/stable"
"agones.dev/agones/pkg/apis/stable/v1alpha1"
Expand Down Expand Up @@ -49,6 +50,7 @@ import (
corelisterv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
)

var (
Expand All @@ -73,6 +75,8 @@ type Controller struct {
portAllocator *PortAllocator
healthController *HealthController
workerqueue *workerqueue.WorkerQueue
creationWorkerQueue *workerqueue.WorkerQueue // handles creation only
deletionWorkerQueue *workerqueue.WorkerQueue // handles deletion only
allocationMutex *sync.Mutex
stop <-chan struct {
}
Expand Down Expand Up @@ -124,20 +128,25 @@ func NewController(
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
c.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "gameserver-controller"})

c.workerqueue = workerqueue.NewWorkerQueue(c.syncGameServer, c.logger, stable.GroupName+".GameServerController")
c.workerqueue = workerqueue.NewWorkerQueueWithRateLimiter(c.syncGameServer, c.logger, stable.GroupName+".GameServerController",
workqueue.NewItemFastSlowRateLimiter(20*time.Millisecond, 500*time.Millisecond, 5))
c.creationWorkerQueue = workerqueue.NewWorkerQueueWithRateLimiter(c.syncGameServer, c.logger, stable.GroupName+".GameServerControllerCreation",
workqueue.NewItemFastSlowRateLimiter(20*time.Millisecond, 500*time.Millisecond, 5))
c.deletionWorkerQueue = workerqueue.NewWorkerQueueWithRateLimiter(c.syncGameServer, c.logger, stable.GroupName+".GameServerControllerDeletion",
workqueue.NewItemFastSlowRateLimiter(20*time.Millisecond, 500*time.Millisecond, 5))
health.AddLivenessCheck("gameserver-workerqueue", healthcheck.Check(c.workerqueue.Healthy))

wh.AddHandler("/mutate", v1alpha1.Kind("GameServer"), admv1beta1.Create, c.creationMutationHandler)
wh.AddHandler("/validate", v1alpha1.Kind("GameServer"), admv1beta1.Create, c.creationValidationHandler)

gsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.workerqueue.Enqueue,
AddFunc: c.enqueueGameServerBasedOnState,
UpdateFunc: func(oldObj, newObj interface{}) {
// no point in processing unless there is a State change
oldGs := oldObj.(*v1alpha1.GameServer)
newGs := newObj.(*v1alpha1.GameServer)
if oldGs.Status.State != newGs.Status.State || oldGs.ObjectMeta.DeletionTimestamp != newGs.ObjectMeta.DeletionTimestamp {
c.workerqueue.Enqueue(newGs)
c.enqueueGameServerBasedOnState(newGs)
}
},
})
Expand Down Expand Up @@ -168,6 +177,24 @@ func NewController(
return c
}

func (c *Controller) enqueueGameServerBasedOnState(item interface{}) {
gs := item.(*v1alpha1.GameServer)

switch gs.Status.State {
case v1alpha1.GameServerStatePortAllocation,
v1alpha1.GameServerStateCreating,
v1alpha1.GameServerStateStarting:
c.creationWorkerQueue.Enqueue(gs)

case v1alpha1.GameServerStateUnhealthy,
v1alpha1.GameServerStateError:
c.deletionWorkerQueue.Enqueue(gs)

default:
c.workerqueue.Enqueue(gs)
}
}

// creationMutationHandler is the handler for the mutating webhook that sets the
// the default values on the GameServer
// Should only be called on gameserver create operations.
Expand Down Expand Up @@ -268,7 +295,21 @@ func (c *Controller) Run(workers int, stop <-chan struct{}) error {
// Run the Health Controller
go c.healthController.Run(stop)

c.workerqueue.Run(workers, stop)
// start work queues
var wg sync.WaitGroup

startWorkQueue := func(wq *workerqueue.WorkerQueue) {
wg.Add(1)
go func() {
defer wg.Done()
wq.Run(workers, stop)
}()
}

startWorkQueue(c.workerqueue)
startWorkQueue(c.creationWorkerQueue)
startWorkQueue(c.deletionWorkerQueue)
wg.Wait()
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/gameservers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,16 @@ func TestControllerWatchGameServers(t *testing.T) {
received := make(chan string)
defer close(received)

c.workerqueue.SyncHandler = func(name string) error {
h := func(name string) error {
assert.Equal(t, "default/test", name)
received <- name
return nil
}

c.workerqueue.SyncHandler = h
c.creationWorkerQueue.SyncHandler = h
c.deletionWorkerQueue.SyncHandler = h

stop, cancel := agtesting.StartInformers(m, c.gameServerSynced)
defer cancel()

Expand Down
Loading

0 comments on commit caf5cda

Please sign in to comment.