Skip to content

Commit

Permalink
Implements SDK callback for GameServer updates
Browse files Browse the repository at this point in the history
This implements the gRPC system to send messages up to a unidirectional
stream from the sdk-server to the SDK.

This has been implemented in the Go, C++ and REST SDK.

Closes #277
  • Loading branch information
markmandel committed Aug 10, 2018
1 parent 84b60d5 commit 6b7c090
Show file tree
Hide file tree
Showing 31 changed files with 1,001 additions and 292 deletions.
2 changes: 1 addition & 1 deletion cmd/sdk-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func main() {
defer cancel()

if ctlConf.IsLocal {
sdk.RegisterSDKServer(grpcServer, &gameservers.LocalSDKServer{})
sdk.RegisterSDKServer(grpcServer, gameservers.NewLocalSDKServer())
} else {
var config *rest.Config
config, err = rest.InClusterConfig()
Expand Down
21 changes: 21 additions & 0 deletions docs/sdk_rest_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,24 @@ Response:
}
}
```

### Watch GameServer

⚠️⚠️⚠️ **/watch/gameserver is currently a development feature and has not been released** ⚠️⚠️⚠️

Call this when you want to get updates of when the backing `GameServer` configuration is updated.

These updates will come as newline delimited JSON, send on each update. To that end, you will
want to keep the http connection open, and read lines from the result stream and and process as they
come in.

```bash
$ curl -H "Content-Type: application/json" -X GET http://localhost:59358/watch/gameserver
```

Response:
```json
{"result":{"object_meta":{"name":"local","namespace":"default","uid":"1234","resource_version":"v1","generation":"1","creation_timestamp":"1533766607","annotations":{"annotation":"true"},"labels":{"islocal":"true"}},"status":{"state":"Ready","address":"127.0.0.1","ports":[{"name":"default","port":7777}]}}}
{"result":{"object_meta":{"name":"local","namespace":"default","uid":"1234","resource_version":"v1","generation":"1","creation_timestamp":"1533766607","annotations":{"annotation":"true"},"labels":{"islocal":"true"}},"status":{"state":"Ready","address":"127.0.0.1","ports":[{"name":"default","port":7777}]}}}
{"result":{"object_meta":{"name":"local","namespace":"default","uid":"1234","resource_version":"v1","generation":"1","creation_timestamp":"1533766607","annotations":{"annotation":"true"},"labels":{"islocal":"true"}},"status":{"state":"Ready","address":"127.0.0.1","ports":[{"name":"default","port":7777}]}}}
```
2 changes: 1 addition & 1 deletion examples/cpp-simple/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ REPOSITORY = gcr.io/agones-images
# Directory that this Makefile is in.
mkfile_path := $(abspath $(lastword $(MAKEFILE_LIST)))
project_path := $(dir $(mkfile_path))
server_tag = $(REPOSITORY)/cpp-simple-server:0.2
server_tag = $(REPOSITORY)/cpp-simple-server:0.3

# _____ _
# |_ _|_ _ _ __ __ _ ___| |_ ___
Expand Down
2 changes: 1 addition & 1 deletion examples/cpp-simple/fleet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ spec:
spec:
containers:
- name: cpp-simple
image: gcr.io/agones-images/cpp-simple-server:0.2
image: gcr.io/agones-images/cpp-simple-server:0.3
# imagePullPolicy: Always # add for development
2 changes: 1 addition & 1 deletion examples/cpp-simple/gameserver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ spec:
spec:
containers:
- name: cpp-simple
image: gcr.io/agones-images/cpp-simple-server:0.2
image: gcr.io/agones-images/cpp-simple-server:0.3
# imagePullPolicy: Always # add for development
10 changes: 10 additions & 0 deletions examples/cpp-simple/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ void doHealth(agones::SDK *sdk) {
}
}

// watch GameServer Updates
void watchUpdates(agones::SDK *sdk) {
std::cout << "Starting to watch GameServer updates..." << std::endl;
sdk->WatchGameServer([](stable::agones::dev::sdk::GameServer gameserver){
std::cout << "GameServer Update, name: " << gameserver.object_meta().name() << std::endl;
std::cout << "GameServer Update, state: " << gameserver.status().state() << std::endl;
});
}

int main() {
std::cout << "C++ Game Server has started!" << std::endl;

Expand All @@ -48,6 +57,7 @@ int main() {
std::cout << "...handshake complete." << std::endl;

std::thread health (doHealth, sdk);
std::thread watch (watchUpdates, sdk);

std::cout << "Marking server as ready..." << std::endl;
grpc::Status status = sdk->Ready();
Expand Down
2 changes: 1 addition & 1 deletion examples/simple-udp/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ REPOSITORY = gcr.io/agones-images

mkfile_path := $(abspath $(lastword $(MAKEFILE_LIST)))
project_path := $(dir $(mkfile_path))
server_tag = $(REPOSITORY)/udp-server:0.2
server_tag = $(REPOSITORY)/udp-server:0.3
package = agones.dev/agones/examples/simple-udp

# _____ _
Expand Down
2 changes: 1 addition & 1 deletion examples/simple-udp/server/fleet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ spec:
spec:
containers:
- name: simple-udp
image: gcr.io/agones-images/udp-server:0.2
image: gcr.io/agones-images/udp-server:0.3
2 changes: 1 addition & 1 deletion examples/simple-udp/server/gameserver-legacy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ spec:
spec:
containers:
- name: simple-udp
image: gcr.io/agones-images/udp-server:0.2
image: gcr.io/agones-images/udp-server:0.3
2 changes: 1 addition & 1 deletion examples/simple-udp/server/gameserver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ spec:
spec:
containers:
- name: simple-udp
image: gcr.io/agones-images/udp-server:0.2
image: gcr.io/agones-images/udp-server:0.3
18 changes: 18 additions & 0 deletions examples/simple-udp/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ func readWriteLoop(conn net.PacketConn, stop chan struct{}, s *sdk.SDK) {

case "GAMESERVER":
writeGameServerName(s, conn, sender)

case "WATCH":
watchGameServerEvents(s)
}

// echo it back
Expand Down Expand Up @@ -120,6 +123,21 @@ func writeGameServerName(s *sdk.SDK, conn net.PacketConn, sender net.Addr) {
}
}

// watchGameServerEvents creates a callback to log when
// gameserver events occur
func watchGameServerEvents(s *sdk.SDK) {
err := s.WatchGameServer(func(gs *coresdk.GameServer) {
j, err := json.Marshal(gs)
if err != nil {
log.Fatalf("error mashalling GameServer to JSON: %v", err)
}
log.Printf("GameServer Event: %s \n", string(j))
})
if err != nil {
log.Fatalf("Could not watch Game Server events, %v", err)
}
}

// doHealth sends the regular Health Pings
func doHealth(sdk *sdk.SDK, stop <-chan struct{}) {
tick := time.Tick(2 * time.Second)
Expand Down
103 changes: 72 additions & 31 deletions pkg/gameservers/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"agones.dev/agones/pkg/sdk"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
netcontext "golang.org/x/net/context"
"google.golang.org/grpc/metadata"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -42,70 +43,110 @@ func newSingleContainerSpec() v1alpha1.GameServerSpec {
}
}

// mockStream is the mock of the SDK_HealthServer for streaming
type mockStream struct {
func testHTTPHealth(t *testing.T, url string, expectedResponse string, expectedStatus int) {
// do a poll, because this code could run before the health check becomes live
err := wait.PollImmediate(time.Second, 20*time.Second, func() (done bool, err error) {
resp, err := http.Get(url)
if err != nil {
logrus.WithError(err).Error("Error connecting to ", url)
return false, nil
}

assert.NotNil(t, resp)
if resp != nil {
defer resp.Body.Close() // nolint: errcheck
body, err := ioutil.ReadAll(resp.Body)
assert.Nil(t, err, "(%s) read response error should be nil: %v", url, err)
assert.Equal(t, expectedStatus, resp.StatusCode, "url: %s", url)
assert.Equal(t, []byte(expectedResponse), body, "(%s) response body should be '%s'", url, expectedResponse)
}

return true, nil
})
assert.Nil(t, err, "Timeout on %s health check, %v", url, err)
}

// emptyMockStream is the mock of the SDK_HealthServer for streaming
type emptyMockStream struct {
msgs chan *sdk.Empty
}

func newMockStream() *mockStream {
return &mockStream{msgs: make(chan *sdk.Empty)}
func newEmptyMockStream() *emptyMockStream {
return &emptyMockStream{msgs: make(chan *sdk.Empty)}
}

func (m *mockStream) SendAndClose(*sdk.Empty) error {
func (m *emptyMockStream) SendAndClose(*sdk.Empty) error {
return nil
}

func (m *mockStream) Recv() (*sdk.Empty, error) {
func (m *emptyMockStream) Recv() (*sdk.Empty, error) {
empty, ok := <-m.msgs
if ok {
return empty, nil
}
return empty, io.EOF
}

func (m *mockStream) SetHeader(metadata.MD) error {
func (m *emptyMockStream) SetHeader(metadata.MD) error {
panic("implement me")
}

func (m *mockStream) SendHeader(metadata.MD) error {
func (m *emptyMockStream) SendHeader(metadata.MD) error {
panic("implement me")
}

func (m *mockStream) SetTrailer(metadata.MD) {
func (m *emptyMockStream) SetTrailer(metadata.MD) {
panic("implement me")
}

func (m *mockStream) Context() context.Context {
func (m *emptyMockStream) Context() context.Context {
panic("implement me")
}

func (m *mockStream) SendMsg(msg interface{}) error {
func (m *emptyMockStream) SendMsg(msg interface{}) error {
panic("implement me")
}

func (m *mockStream) RecvMsg(msg interface{}) error {
func (m *emptyMockStream) RecvMsg(msg interface{}) error {
panic("implement me")
}

func testHTTPHealth(t *testing.T, url string, expectedResponse string, expectedStatus int) {
// do a poll, because this code could run before the health check becomes live
err := wait.PollImmediate(time.Second, 20*time.Second, func() (done bool, err error) {
resp, err := http.Get(url)
if err != nil {
logrus.WithError(err).Error("Error connecting to ", url)
return false, nil
}
type gameServerMockStream struct {
msgs chan *sdk.GameServer
}

assert.NotNil(t, resp)
if resp != nil {
defer resp.Body.Close() // nolint: errcheck
body, err := ioutil.ReadAll(resp.Body)
assert.Nil(t, err, "(%s) read response error should be nil: %v", url, err)
assert.Equal(t, expectedStatus, resp.StatusCode, "url: %s", url)
assert.Equal(t, []byte(expectedResponse), body, "(%s) response body should be '%s'", url, expectedResponse)
}
// newGameServerMockStream implements SDK_WatchGameServerServer for testing
func newGameServerMockStream() *gameServerMockStream {
return &gameServerMockStream{
msgs: make(chan *sdk.GameServer, 10),
}
}

return true, nil
})
assert.Nil(t, err, "Timeout on %s health check, %v", url, err)
func (m *gameServerMockStream) Send(gs *sdk.GameServer) error {
m.msgs <- gs
return nil
}

func (*gameServerMockStream) SetHeader(metadata.MD) error {
panic("implement me")
}

func (*gameServerMockStream) SendHeader(metadata.MD) error {
panic("implement me")
}

func (*gameServerMockStream) SetTrailer(metadata.MD) {
panic("implement me")
}

func (*gameServerMockStream) Context() netcontext.Context {
panic("implement me")
}

func (*gameServerMockStream) SendMsg(m interface{}) error {
panic("implement me")
}

func (*gameServerMockStream) RecvMsg(m interface{}) error {
panic("implement me")
}
66 changes: 47 additions & 19 deletions pkg/gameservers/localsdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package gameservers

import (
"io"

"time"

"agones.dev/agones/pkg/sdk"
Expand All @@ -25,12 +24,40 @@ import (
"golang.org/x/net/context"
)

var _ sdk.SDKServer = &LocalSDKServer{}
var (
_ sdk.SDKServer = &LocalSDKServer{}

fixture = &sdk.GameServer{
ObjectMeta: &sdk.GameServer_ObjectMeta{
Name: "local",
Namespace: "default",
Uid: "1234",
Generation: 1,
ResourceVersion: "v1",
CreationTimestamp: time.Now().Unix(),
Labels: map[string]string{"islocal": "true"},
Annotations: map[string]string{"annotation": "true"},
},
Status: &sdk.GameServer_Status{
State: "Ready",
Address: "127.0.0.1",
Ports: []*sdk.GameServer_Status_Port{{Name: "default", Port: 7777}},
},
}
)

// LocalSDKServer type is the SDKServer implementation for when the sidecar
// is being run for local development, and doesn't connect to the
// Kubernetes cluster
type LocalSDKServer struct {
watchPeriod time.Duration
}

// NewLocalSDKServer returns the default LocalSDKServer
func NewLocalSDKServer() *LocalSDKServer {
return &LocalSDKServer{
watchPeriod: 5 * time.Second,
}
}

// Ready logs that the Ready request has been received
Expand Down Expand Up @@ -63,23 +90,24 @@ func (l *LocalSDKServer) Health(stream sdk.SDK_HealthServer) error {
// GetGameServer returns a dummy game server.
func (l *LocalSDKServer) GetGameServer(context.Context, *sdk.Empty) (*sdk.GameServer, error) {
logrus.Info("getting GameServer details")
gs := &sdk.GameServer{
ObjectMeta: &sdk.GameServer_ObjectMeta{
Name: "local",
Namespace: "default",
Uid: "1234",
Generation: 1,
ResourceVersion: "v1",
CreationTimestamp: time.Now().Unix(),
Labels: map[string]string{"islocal": "true"},
Annotations: map[string]string{"annotation": "true"},
},
Status: &sdk.GameServer_Status{
State: "Ready",
Address: "127.0.0.1",
Ports: []*sdk.GameServer_Status_Port{{Name: "default", Port: 7777}},
},
return fixture, nil
}

// WatchGameServer will return a dummy GameServer (with no changes), 3 times, every 5 seconds
func (l *LocalSDKServer) WatchGameServer(_ *sdk.Empty, stream sdk.SDK_WatchGameServerServer) error {
logrus.Info("connected to watch GameServer...")
times := 3

for i := 0; i < times; i++ {
logrus.Info("Sending watched GameServer!")
err := stream.Send(fixture)
if err != nil {
logrus.WithError(err).Error("error sending gameserver")
return err
}

time.Sleep(l.watchPeriod)
}

return gs, nil
return nil
}
Loading

0 comments on commit 6b7c090

Please sign in to comment.