Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Introspection 2.0 changes for Client signalling, events and DHT introspection #11

Closed
wants to merge 14 commits into from
130 changes: 130 additions & 0 deletions conn_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package introspector

import (
"context"
"sync"
"time"

introspection_pb "github.com/libp2p/go-libp2p-core/introspection/pb"

"github.com/gogo/protobuf/proto"
"github.com/gorilla/websocket"
)

type connectionState int

const (
running connectionState = iota
paused
)

type connHandler struct {
ctx context.Context
cancel context.CancelFunc

ws *WsServer
conn *websocket.Conn

outgoingChan chan []byte

wg sync.WaitGroup
}

func newConnHandler(ctx context.Context, sv *WsServer, conn *websocket.Conn) *connHandler {
ch := &connHandler{ws: sv, conn: conn, outgoingChan: make(chan []byte, maxClientOutgoingBufferSize)}
ch.ctx, ch.cancel = context.WithCancel(ctx)
return ch
}

func (ch *connHandler) run() {
ch.wg.Add(2)
go ch.outgoingLoop()
go ch.handleIncoming()
ch.wg.Wait()
}

func (ch *connHandler) outgoingLoop() {
defer ch.wg.Done()
for {
select {
case bz := <-ch.outgoingChan:
ch.conn.SetWriteDeadline(time.Now().Add(writeDeadline))
if err := ch.conn.WriteMessage(websocket.BinaryMessage, bz); err != nil {
logger.Warnf("failed to send binary message to client with addr %s, err=%s", err)
ch.cancel()
return
}

case <-ch.ctx.Done():
return
}
}
}

func (ch *connHandler) handleIncoming() {
defer ch.wg.Done()
for {
select {
case <-ch.ctx.Done():
return
default:
mt, message, err := ch.conn.ReadMessage()
switch err.(type) {
case nil:
case *websocket.CloseError:
logger.Warnf("connection closed: %s", err)
ch.cancel()
return
default:
logger.Warnf("failed to read message from ws connection, err: %s", err)
ch.cancel()
return
}
logger.Debugf("received message from ws connection, type: %d. recv: %s", mt, message)

// unmarshal
var clMsg introspection_pb.ClientSignal
if err := proto.Unmarshal(message, &clMsg); err != nil {
logger.Errorf("failed to read client message, err=%s", err)
ch.cancel()
return
}

switch clMsg.Signal {
case introspection_pb.ClientSignal_SEND_DATA:
switch clMsg.DataSource {
case introspection_pb.ClientSignal_STATE:
select {
case ch.ws.sendDataCh <- &sendDataReq{ch, introspection_pb.ClientSignal_STATE}:
case <-ch.ctx.Done():
logger.Errorf("context cancelled while waiting to submit state send request")
return
}

case introspection_pb.ClientSignal_RUNTIME:
select {
case ch.ws.sendDataCh <- &sendDataReq{ch, introspection_pb.ClientSignal_RUNTIME}:
case <-ch.ctx.Done():
logger.Errorf("context cancelled while waiting to submit runtime send request")
return
}
}
case introspection_pb.ClientSignal_PAUSE_PUSH_EMITTER:
select {
case ch.ws.connStateChangeReqCh <- &connStateChangeReq{ch, paused}:
case <-ch.ctx.Done():
logger.Errorf("context cancelled while waiting to submit conn pause request")
return
}
case introspection_pb.ClientSignal_UNPAUSE_PUSH_EMITTER:
select {
case ch.ws.connStateChangeReqCh <- &connStateChangeReq{ch, running}:
case <-ch.ctx.Done():
logger.Errorf("context cancelled while waiting to submit conn unpause request")
return
}
}
}
}

}
165 changes: 165 additions & 0 deletions conn_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package introspector

import (
"fmt"
"testing"
"time"

introspection_pb "github.com/libp2p/go-libp2p-core/introspection/pb"

"github.com/gorilla/websocket"
"github.com/libp2p/go-eventbus"
"github.com/stretchr/testify/require"
)

func TestConnHandlerSignalling(t *testing.T) {
// create a ws server
addr := "localhost:9999"
introspector := NewDefaultIntrospector()
config := &WsServerConfig{
ListenAddrs: []string{addr},
}
server, err := NewWsServer(introspector, config)
require.NoError(t, err)

// start the server
require.NoError(t, server.Start(eventbus.NewBus()))
defer func() {
err := server.Close()
require.NoError(t, err)
}()

// connect to it and get a conn
conn, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("ws://%s/introspect", addr), nil)
require.NoError(t, err)
defer conn.Close()

// assert handler is running
var ch *connHandler
require.Eventually(t, func() bool {
var chlen int
es := paused
done := make(chan struct{}, 1)

server.evalForTest <- func() {
for c, s := range server.connHandlerStates {
ch = c
es = s
}
chlen = len(server.connHandlerStates)
done <- struct{}{}
}
<-done

return chlen == 1 && es == running
}, 10*time.Second, 1*time.Second)

// send a pause message and assert state
cl := &introspection_pb.ClientSignal{Signal: introspection_pb.ClientSignal_PAUSE_PUSH_EMITTER}
sendMessage(t, cl, conn)
require.Eventually(t, func() bool {
es := running
done := make(chan struct{}, 1)
server.evalForTest <- func() {
es = server.connHandlerStates[ch]
done <- struct{}{}
}
<-done

return es == paused
}, 10*time.Second, 1*time.Second)

// send unpause and assert state
cl = &introspection_pb.ClientSignal{Signal: introspection_pb.ClientSignal_UNPAUSE_PUSH_EMITTER}
sendMessage(t, cl, conn)
require.Eventually(t, func() bool {
es := paused
done := make(chan struct{}, 1)
server.evalForTest <- func() {
es = server.connHandlerStates[ch]
done <- struct{}{}
}
<-done

return es == running
}, 10*time.Second, 1*time.Second)

// create one more connection and assert handler
conn2, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("ws://%s/introspect", addr), nil)
require.NoError(t, err)
defer conn2.Close()

var ch2 *connHandler
require.Eventually(t, func() bool {
var chlen int
done := make(chan struct{}, 1)
es := paused

server.evalForTest <- func() {
for c, s := range server.connHandlerStates {
chc := c
if chc != ch {
ch2 = chc
es = s
}
}

chlen = len(server.connHandlerStates)
done <- struct{}{}
}
<-done

return chlen == 2 && es == running
}, 10*time.Second, 1*time.Second)

// changing state of ch2 does not change state for ch1
cl = &introspection_pb.ClientSignal{Signal: introspection_pb.ClientSignal_PAUSE_PUSH_EMITTER}
sendMessage(t, cl, conn2)
require.Eventually(t, func() bool {
es1 := running
es2 := running
done := make(chan struct{}, 1)
server.evalForTest <- func() {
es1 = server.connHandlerStates[ch]
es2 = server.connHandlerStates[ch2]
done <- struct{}{}
}
<-done

return es1 == running && es2 == paused
}, 10*time.Second, 1*time.Second)

// test send runtime
// first drain the first two messages sent at startup
p1, err := fetchProtocolWrapper(t, conn2)
require.NoError(t, err)
require.NotNil(t, p1.GetRuntime())
p2, err := fetchProtocolWrapper(t, conn2)
require.NoError(t, err)
require.NotNil(t, p2.GetState())

// now send a send_runtime
sendMessage(t, &introspection_pb.ClientSignal{Signal: introspection_pb.ClientSignal_SEND_DATA, DataSource: introspection_pb.ClientSignal_RUNTIME}, conn2)
var fetcherr error
require.Eventually(t, func() bool {
p1, err := fetchProtocolWrapper(t, conn2)
if err != nil {
fetcherr = err
return false
}
return p1.GetRuntime() != nil && p1.GetState() == nil
}, 10*time.Second, 1*time.Second)
require.NoError(t, fetcherr)

// now send a send data
sendMessage(t, &introspection_pb.ClientSignal{Signal: introspection_pb.ClientSignal_SEND_DATA, DataSource: introspection_pb.ClientSignal_STATE}, conn2)
require.Eventually(t, func() bool {
p1, err := fetchProtocolWrapper(t, conn2)
if err != nil {
fetcherr = err
return false
}
return p1.GetState() != nil && p1.GetRuntime() == nil
}, 10*time.Second, 1*time.Second)
require.NoError(t, fetcherr)
}
7 changes: 6 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@ go 1.13
require (
github.com/gogo/protobuf v1.3.1
github.com/gorilla/websocket v1.4.1
github.com/hashicorp/go-multierror v1.1.0
github.com/imdario/mergo v0.3.8
github.com/ipfs/go-log v1.0.1
github.com/libp2p/go-libp2p-core v0.3.1-0.20200210163958-6d6f8284b841
github.com/libp2p/go-eventbus v0.1.1-0.20200417061519-63254f6c0da4
github.com/libp2p/go-libp2p-core v0.5.2-0.20200424084047-616e011e7e9d
github.com/multiformats/go-multiaddr v0.2.1
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/stretchr/testify v1.4.0
golang.org/x/net v0.0.0-20190620200207-3b0461eec859
)
Loading