Skip to content

Commit

Permalink
Drop the dependency to allocation Extension APIServer from agones-all…
Browse files Browse the repository at this point in the history
…ocator service
  • Loading branch information
pooneh-m committed Oct 18, 2019
1 parent b22140a commit dcfaa1f
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 36 deletions.
87 changes: 72 additions & 15 deletions cmd/allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,30 @@ import (
"os"
"path/filepath"
"strings"
"time"

"agones.dev/agones/pkg"
"agones.dev/agones/pkg/allocation/converters"
pb "agones.dev/agones/pkg/allocation/go/v1alpha1"
allocationv1 "agones.dev/agones/pkg/apis/allocation/v1"
"agones.dev/agones/pkg/client/clientset/versioned"
"agones.dev/agones/pkg/client/informers/externalversions"
"agones.dev/agones/pkg/gameserverallocations"
"agones.dev/agones/pkg/gameservers"
"agones.dev/agones/pkg/metrics"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/signals"
"github.com/heptiolabs/healthcheck"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/stats/view"
k8serror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

Expand Down Expand Up @@ -75,10 +85,11 @@ func main() {
// http.DefaultServerMux is used for http connection, not for https
http.Handle("/", health)

agonesClient, err := getAgonesClient()
kubeClient, agonesClient, err := getClients()
if err != nil {
logger.WithError(err).Fatal("could not create agones client")
logger.WithError(err).Fatal("could not create clients")
}

// This will test the connection to agones on each readiness probe
// so if one of the allocator pod can't reach Kubernetes it will be removed
// from the Kubernetes service.
Expand All @@ -87,9 +98,7 @@ func main() {
return err
})

h := httpHandler{
agonesClient: agonesClient,
}
h := newServiceHandler(kubeClient, agonesClient, health)

// mux for https server to serve gameserver allocations
httpsMux := http.NewServeMux()
Expand Down Expand Up @@ -126,20 +135,54 @@ func main() {
logger.WithError(err).Fatal("allocation service crashed")
}

func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.Interface, health healthcheck.Handler) *httpHandler {
defaultResync := 30 * time.Second
agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync)
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync)
gsCounter := gameservers.NewPerNodeCounter(kubeInformerFactory, agonesInformerFactory)

allocator := gameserverallocations.NewAllocator(
agonesInformerFactory.Multicluster().V1alpha1().GameServerAllocationPolicies(),
kubeInformerFactory.Core().V1().Secrets(),
kubeClient,
gameserverallocations.NewReadyGameServerCache(agonesInformerFactory.Agones().V1().GameServers(), agonesClient.AgonesV1(), gsCounter, health))

stop := signals.NewStopChannel()
h := httpHandler{
allocationCallback: func(gsa *allocationv1.GameServerAllocation) (k8sruntime.Object, error) {
return allocator.Allocate(gsa, stop)
},
}

kubeInformerFactory.Start(stop)
agonesInformerFactory.Start(stop)
if err := allocator.Start(stop); err != nil {
logger.WithError(err).Fatal("starting allocator failed.")
}

return &h
}

// Set up our client which we will use to call the API
func getAgonesClient() (*versioned.Clientset, error) {
func getClients() (*kubernetes.Clientset, *versioned.Clientset, error) {
// Create the in-cluster config
config, err := rest.InClusterConfig()
if err != nil {
return nil, errors.New("Could not create in cluster config")
return nil, nil, errors.New("Could not create in cluster config")
}

// Access to the Agones resources through the Agones Clientset
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, nil, errors.New("Could not create the kubernetes api clientset")
}

// Access to the Agones resources through the Agones Clientset
agonesClient, err := versioned.NewForConfig(config)
if err != nil {
return nil, errors.New("Could not create the agones api clientset")
return nil, nil, errors.New("Could not create the agones api clientset")
}
return agonesClient, nil
return kubeClient, agonesClient, nil
}

func getCACertPool(path string) (*x509.CertPool, error) {
Expand Down Expand Up @@ -180,7 +223,7 @@ func (h *httpHandler) postOnly(in handler) handler {
}

type httpHandler struct {
agonesClient versioned.Interface
allocationCallback func(*allocationv1.GameServerAllocation) (k8sruntime.Object, error)
}

func (h *httpHandler) allocateHandler(w http.ResponseWriter, r *http.Request) {
Expand All @@ -193,22 +236,36 @@ func (h *httpHandler) allocateHandler(w http.ResponseWriter, r *http.Request) {
logger.WithField("request", request).Infof("allocation request received")

gsa := converters.ConvertAllocationRequestV1Alpha1ToGSAV1(&request)
allocation := h.agonesClient.AllocationV1().GameServerAllocations(gsa.ObjectMeta.Namespace)
allocatedGsa, err := allocation.Create(gsa)
resultObj, err := h.allocationCallback(gsa)
if err != nil {
http.Error(w, err.Error(), httpCode(err))
logger.WithField("gsa", gsa).WithError(err).Info("calling allocation extension API failed")
logger.WithField("gsa", gsa).WithError(err).Info("allocation failed")
return
}

w.Header().Set("Content-Type", "application/json")
if status, ok := resultObj.(*metav1.Status); ok {
w.WriteHeader(int(status.Code))
err = json.NewEncoder(w).Encode(status)
if err != nil {
http.Error(w, "internal server error", http.StatusInternalServerError)
logger.WithError(err).Error("Unable to encode status in json")
return
}
}
allocatedGsa, ok := resultObj.(*allocationv1.GameServerAllocation)
if !ok {
http.Error(w, "internal server error", http.StatusInternalServerError)
logger.Errorf("internal server error - Bad GSA format %v", resultObj)
return
}
response := converters.ConvertGSAV1ToAllocationResponseV1Alpha1(allocatedGsa)
logger.WithField("response", response).Infof("allocation response is being sent")

w.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(w).Encode(response)
if err != nil {
http.Error(w, "internal server error", http.StatusInternalServerError)
logger.Error(err)
logger.WithError(err).Error("Unable to encode status in json")
return
}
}
Expand Down
91 changes: 70 additions & 21 deletions cmd/allocator/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,29 @@ import (

pb "agones.dev/agones/pkg/allocation/go/v1alpha1"
allocationv1 "agones.dev/agones/pkg/apis/allocation/v1"
agonesfake "agones.dev/agones/pkg/client/clientset/versioned/fake"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
k8serror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
k8stesting "k8s.io/client-go/testing"
)

func TestAllocateHandler(t *testing.T) {
t.Parallel()

fakeAgones := &agonesfake.Clientset{}
h := httpHandler{
agonesClient: fakeAgones,
allocationCallback: func(gsa *allocationv1.GameServerAllocation) (k8sruntime.Object, error) {
return &allocationv1.GameServerAllocation{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
},
Status: allocationv1.GameServerAllocationStatus{
State: allocationv1.GameServerAllocationContention,
},
}, nil
},
}

fakeAgones.AddReactor("create", "gameserverallocations", func(action k8stesting.Action) (bool, k8sruntime.Object, error) {
return true, &allocationv1.GameServerAllocation{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
},
Status: allocationv1.GameServerAllocationStatus{
State: allocationv1.GameServerAllocationContention,
},
}, nil
})

request := &pb.AllocationRequest{
Namespace: "ns",
MultiClusterSetting: &pb.MultiClusterSetting{
Expand Down Expand Up @@ -78,15 +74,12 @@ func TestAllocateHandler(t *testing.T) {
func TestAllocateHandlerReturnsError(t *testing.T) {
t.Parallel()

fakeAgones := &agonesfake.Clientset{}
h := httpHandler{
agonesClient: fakeAgones,
allocationCallback: func(gsa *allocationv1.GameServerAllocation) (k8sruntime.Object, error) {
return nil, k8serror.NewBadRequest("error")
},
}

fakeAgones.AddReactor("create", "gameserverallocations", func(action k8stesting.Action) (bool, k8sruntime.Object, error) {
return true, nil, k8serror.NewBadRequest("error")
})

request := &pb.AllocationRequest{}
body, _ := json.Marshal(request)
buf := bytes.NewBuffer(body)
Expand All @@ -101,6 +94,62 @@ func TestAllocateHandlerReturnsError(t *testing.T) {
assert.Contains(t, rec.Body.String(), "error")
}

func TestHandlingStatus(t *testing.T) {
t.Parallel()

errorMessage := "GameServerAllocation is invalid"
h := httpHandler{
allocationCallback: func(gsa *allocationv1.GameServerAllocation) (k8sruntime.Object, error) {
return &metav1.Status{
Status: metav1.StatusFailure,
Message: errorMessage,
Reason: metav1.StatusReasonInvalid,
Details: &metav1.StatusDetails{
Kind: "GameServerAllocation",
Group: allocationv1.SchemeGroupVersion.Group,
},
Code: http.StatusUnprocessableEntity,
}, nil
},
}

request := &pb.AllocationRequest{}
body, _ := json.Marshal(request)
buf := bytes.NewBuffer(body)
req, err := http.NewRequest(http.MethodPost, "/", buf)
if !assert.Nil(t, err) {
return
}

rec := httptest.NewRecorder()
h.allocateHandler(rec, req)
assert.Equal(t, rec.Code, 422)
assert.Contains(t, rec.Body.String(), errorMessage)
}

func TestBadReturnType(t *testing.T) {
t.Parallel()

h := httpHandler{
allocationCallback: func(gsa *allocationv1.GameServerAllocation) (k8sruntime.Object, error) {
return &corev1.Secret{}, nil
},
}

request := &pb.AllocationRequest{}
body, _ := json.Marshal(request)
buf := bytes.NewBuffer(body)
req, err := http.NewRequest(http.MethodPost, "/", buf)
if !assert.Nil(t, err) {
return
}

rec := httptest.NewRecorder()
h.allocateHandler(rec, req)
assert.Equal(t, rec.Code, 500)
assert.Contains(t, rec.Body.String(), "internal server error")
}

func TestGettingCaCert(t *testing.T) {
t.Parallel()

Expand Down
15 changes: 15 additions & 0 deletions install/helm/agones/templates/service/allocation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,24 @@ metadata:
release: {{ $.Release.Name }}
heritage: {{ $.Release.Service }}
rules:
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "patch"]
- apiGroups: ["allocation.agones.dev"]
resources: ["gameserverallocations"]
verbs: ["create"]
- apiGroups: [""]
resources: ["nodes", "secrets"]
verbs: ["get", "list", "watch"]
- apiGroups: ["agones.dev"]
resources: ["gameservers", "gameserversets"]
verbs: ["get", "list", "update", "watch"]
- apiGroups: ["agones.dev"]
resources: ["gameservers"]
verbs: ["patch"]
- apiGroups: ["multicluster.agones.dev"]
resources: ["gameserverallocationpolicies"]
verbs: ["get", "list", "watch"]

---
# Create a ServiceAccount that will be bound to the above role
Expand Down
15 changes: 15 additions & 0 deletions install/yaml/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1174,9 +1174,24 @@ metadata:
release: agones-manual
heritage: Tiller
rules:
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "patch"]
- apiGroups: ["allocation.agones.dev"]
resources: ["gameserverallocations"]
verbs: ["create"]
- apiGroups: [""]
resources: ["nodes", "secrets"]
verbs: ["get", "list", "watch"]
- apiGroups: ["agones.dev"]
resources: ["gameservers", "gameserversets"]
verbs: ["get", "list", "update", "watch"]
- apiGroups: ["agones.dev"]
resources: ["gameservers"]
verbs: ["patch"]
- apiGroups: ["multicluster.agones.dev"]
resources: ["gameserverallocationpolicies"]
verbs: ["get", "list", "watch"]

---
# Create a ServiceAccount that will be bound to the above role
Expand Down

0 comments on commit dcfaa1f

Please sign in to comment.