Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
Merge pull request #214 from bergwolf/reconnect
Browse files Browse the repository at this point in the history
support agent grpc reconnection over serial port
  • Loading branch information
Eric Ernst authored Apr 16, 2018
2 parents 7aa11d8 + 134d5d5 commit bdc70d4
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 36 deletions.
90 changes: 54 additions & 36 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package main
import (
"flag"
"fmt"
"io"
"io/ioutil"
"net"
"os"
Expand Down Expand Up @@ -59,17 +60,17 @@ type container struct {
type sandbox struct {
sync.RWMutex

id string
running bool
noPivotRoot bool
containers map[string]*container
channel channel
network network
wg sync.WaitGroup
grpcListener net.Listener
sharedPidNs namespace
mounts []string
subreaper reaper
id string
running bool
noPivotRoot bool
containers map[string]*container
channel channel
network network
wg sync.WaitGroup
sharedPidNs namespace
mounts []string
subreaper reaper
server *grpc.Server
}

type namespace struct {
Expand Down Expand Up @@ -437,17 +438,10 @@ func (s *sandbox) initChannel() error {

s.channel = c

return s.channel.setup()
return nil
}

func (s *sandbox) startGRPC() error {
l, err := s.channel.listen()
if err != nil {
return err
}

s.grpcListener = l

func (s *sandbox) startGRPC() {
grpcImpl := &agentGRPC{
sandbox: s,
version: version,
Expand All @@ -456,23 +450,54 @@ func (s *sandbox) startGRPC() error {
grpcServer := grpc.NewServer()
pb.RegisterAgentServiceServer(grpcServer, grpcImpl)
pb.RegisterHealthServer(grpcServer, grpcImpl)
s.server = grpcServer

s.wg.Add(1)
go func() {
defer s.wg.Done()

grpcServer.Serve(l)
}()
var err error
for err == nil || err == io.EOF {
agentLog.Info("agent grpc server starts")

return nil
err = s.channel.setup()
if err != nil {
agentLog.WithError(err).Warn("Failed to setup agent grpc channel")
return
}

err = s.channel.wait()
if err != nil {
agentLog.WithError(err).Warn("Failed to wait agent grpc channel ready")
return
}

var l net.Listener
l, err = s.channel.listen()
if err != nil {
agentLog.WithError(err).Warn("Failed to create agent grpc listener")
return
}

// l is closed when Serve() returns
err = grpcServer.Serve(l)
if err != nil {
agentLog.WithError(err).Warn("agent grpc server quits")
}

errT := s.channel.teardown()
if errT != nil {
agentLog.WithError(errT).Warn("agent grpc channel teardown failed")
}
}
}()
}

func (s *sandbox) teardown() error {
if err := s.grpcListener.Close(); err != nil {
return err
func (s *sandbox) stopGRPC() {
if s.server != nil {
s.server.Stop()
s.server = nil
}

return s.channel.teardown()
}

type initMount struct {
Expand Down Expand Up @@ -604,14 +629,7 @@ func main() {
}

// Start gRPC server.
if err = s.startGRPC(); err != nil {
return
}
s.startGRPC()

s.wg.Wait()

// Tear down properly.
if err = s.teardown(); err != nil {
return
}
}
16 changes: 16 additions & 0 deletions agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,19 @@ func TestGetProcessFromSandbox(t *testing.T) {
"Process structures should be identical: got %+v, expecting %+v",
proc, p)
}

func TestStartStopGRPCServer(t *testing.T) {
_, out, err := os.Pipe()
assert.Nil(t, err, "%v", err)

s := &sandbox{
containers: make(map[string]*container),
channel: &serialChannel{serialConn: out},
}

s.startGRPC()
assert.NotNil(t, s.server, "failed starting grpc server")

s.stopGRPC()
assert.Nil(t, s.server, "failed stopping grpc server")
}
58 changes: 58 additions & 0 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
package main

import (
"fmt"
"io/ioutil"
"net"
"os"
"path/filepath"
"strings"
"syscall"

"github.com/hashicorp/yamux"
"github.com/mdlayher/vsock"
Expand All @@ -22,6 +24,7 @@ import (

type channel interface {
setup() error
wait() error
listen() (net.Listener, error)
teardown() error
}
Expand Down Expand Up @@ -57,6 +60,10 @@ func (c *vSockChannel) setup() error {
return nil
}

func (c *vSockChannel) wait() error {
return nil
}

func (c *vSockChannel) listen() (net.Listener, error) {
l, err := vsock.Listen(vSockPort)
if err != nil {
Expand Down Expand Up @@ -86,6 +93,57 @@ func (c *serialChannel) setup() error {
return nil
}

func (c *serialChannel) wait() error {
var event syscall.EpollEvent
var events [1]syscall.EpollEvent

fd := c.serialConn.Fd()
if fd <= 0 {
return fmt.Errorf("serial port IO closed")
}

epfd, err := syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
if err != nil {
return err
}
defer syscall.Close(epfd)

// EPOLLOUT: Writable when there is a connection
// EPOLLET: Edge trigger as EPOLLHUP is always on when there is no connection
// 0xffffffff: EPOLLET is negative and cannot fit in uint32 in golang
event.Events = syscall.EPOLLOUT | syscall.EPOLLET&0xffffffff
event.Fd = int32(fd)
if err = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int(fd), &event); err != nil {
return err
}
defer syscall.EpollCtl(epfd, syscall.EPOLL_CTL_DEL, int(fd), nil)

for {
nev, err := syscall.EpollWait(epfd, events[:], -1)
if err != nil {
return err
}

for i := 0; i < nev; i++ {
ev := events[i]
if ev.Fd == int32(fd) {
agentLog.WithField("events", ev.Events).Debug("New serial channel event")
if ev.Events&syscall.EPOLLOUT != 0 {
return nil
}
if ev.Events&syscall.EPOLLERR != 0 {
return fmt.Errorf("serial port IO failure")
}
if ev.Events&syscall.EPOLLHUP != 0 {
continue
}
}
}
}

// Never reach here
}

func (c *serialChannel) listen() (net.Listener, error) {
// Initialize Yamux server.
session, err := yamux.Server(c.serialConn, nil)
Expand Down
38 changes: 38 additions & 0 deletions channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,41 @@ func TestTeardownVSockChannel(t *testing.T) {
err := c.teardown()
assert.Nil(t, err, "%v", err)
}

func TestWaitVSockChannel(t *testing.T) {
c := &vSockChannel{}

err := c.wait()
assert.Nil(t, err, "%v", err)
}

func TestWaitSerialChannel(t *testing.T) {
_, f, err := os.Pipe()
assert.Nil(t, err, "%v", err)
defer f.Close()

c := &serialChannel{serialConn: f}

err = c.wait()
assert.Nil(t, err, "%v", err)
}

func TestListenSerialChannel(t *testing.T) {
_, f, err := os.Pipe()
assert.Nil(t, err, "%v", err)

c := &serialChannel{serialConn: f}

_, err = c.listen()
assert.Nil(t, err, "%v", err)
}

func TestTeardownSerialChannel(t *testing.T) {
_, f, err := os.Pipe()
assert.Nil(t, err, "%v", err)

c := &serialChannel{serialConn: f}

err = c.teardown()
assert.Nil(t, err, "%v", err)
}

0 comments on commit bdc70d4

Please sign in to comment.