Skip to content

Commit

Permalink
Implement graceful server shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
jkongie committed Aug 24, 2021
1 parent 7442db8 commit f488901
Showing 1 changed file with 17 additions and 7 deletions.
24 changes: 17 additions & 7 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/ed25519"
"crypto/x509"
"errors"
"log"
"net"
"net/http"
"sync"
Expand Down Expand Up @@ -45,6 +46,8 @@ type Server struct {
quit *wsrpcsync.Event
// Signals a done event once the server has finished shutting down
done *wsrpcsync.Event

serveWG sync.WaitGroup
}

func NewServer(opt ...ServerOption) *Server {
Expand All @@ -63,6 +66,7 @@ func NewServer(opt ...ServerOption) *Server {
methodCalls: map[string]chan<- *message.Response{},
quit: wsrpcsync.NewEvent(),
done: wsrpcsync.NewEvent(),
serveWG: sync.WaitGroup{},
}

return s
Expand Down Expand Up @@ -111,28 +115,34 @@ func (s *Server) wshandler(w http.ResponseWriter, r *http.Request) {

config := &transport.ServerConfig{}
onClose := func() {
s.connMgr.removeConnection(pubKey)
// There is only no connection maanger when we are shutting down, so
// we can ignore removing the connection.
if s.connMgr != nil {
s.connMgr.removeConnection(pubKey)
}
s.serveWG.Done()
close(done)
}

// Initialize the transport
tr, err := transport.NewServerTransport(conn, config, onClose)
if err != nil {
// log.Println("Could not initialize server transport")
return
}

// Register the transport against the public key
s.connMgr.registerConnection(pubKey, tr)

s.serveWG.Add(1)

// Start the reader handler
go s.handleRead(pubKey, done)

select {
case <-done:
// log.Println("Closing Handler: Connection dropped")
log.Println("[wsrpc] Connection dropped")
case <-s.quit.Done():
// log.Println("Closing Handler: Shutdown")
log.Println("[wsrpc] Connection closed due to shutdown")
}
}

Expand Down Expand Up @@ -328,7 +338,6 @@ func (s *Server) GetConnectedPeerPublicKeys() []credentials.StaticSizedPublicKey
// Stop stops the gRPC server. It immediately closes all open
// connections and listeners.
func (s *Server) Stop() {
// log.Println("[Server] Stopping Server")
s.quit.Fire()
defer func() {
s.done.Fire()
Expand All @@ -339,9 +348,10 @@ func (s *Server) Stop() {
s.connMgr = nil
s.mu.Unlock()

// TODO - Wait for the connections to close cleanly so we can perform a
// graceful shutdown.
connMgr.close()

// Wait for all the connections to close
s.serveWG.Wait()
}

// Ensure there is only a single connection per public key by checking the
Expand Down

0 comments on commit f488901

Please sign in to comment.