diff --git a/conn_handler.go b/conn_handler.go new file mode 100644 index 0000000..219cd72 --- /dev/null +++ b/conn_handler.go @@ -0,0 +1,130 @@ +package introspector + +import ( + "context" + "sync" + "time" + + introspection_pb "github.com/libp2p/go-libp2p-core/introspection/pb" + + "github.com/gogo/protobuf/proto" + "github.com/gorilla/websocket" +) + +type connectionState int + +const ( + running connectionState = iota + paused +) + +type connHandler struct { + ctx context.Context + cancel context.CancelFunc + + ws *WsServer + conn *websocket.Conn + + outgoingChan chan []byte + + wg sync.WaitGroup +} + +func newConnHandler(ctx context.Context, sv *WsServer, conn *websocket.Conn) *connHandler { + ch := &connHandler{ws: sv, conn: conn, outgoingChan: make(chan []byte, maxClientOutgoingBufferSize)} + ch.ctx, ch.cancel = context.WithCancel(ctx) + return ch +} + +func (ch *connHandler) run() { + ch.wg.Add(2) + go ch.outgoingLoop() + go ch.handleIncoming() + ch.wg.Wait() +} + +func (ch *connHandler) outgoingLoop() { + defer ch.wg.Done() + for { + select { + case bz := <-ch.outgoingChan: + ch.conn.SetWriteDeadline(time.Now().Add(writeDeadline)) + if err := ch.conn.WriteMessage(websocket.BinaryMessage, bz); err != nil { + logger.Warnf("failed to send binary message to client with addr %s, err=%s", err) + ch.cancel() + return + } + + case <-ch.ctx.Done(): + return + } + } +} + +func (ch *connHandler) handleIncoming() { + defer ch.wg.Done() + for { + select { + case <-ch.ctx.Done(): + return + default: + mt, message, err := ch.conn.ReadMessage() + switch err.(type) { + case nil: + case *websocket.CloseError: + logger.Warnf("connection closed: %s", err) + ch.cancel() + return + default: + logger.Warnf("failed to read message from ws connection, err: %s", err) + ch.cancel() + return + } + logger.Debugf("received message from ws connection, type: %d. recv: %s", mt, message) + + // unmarshal + var clMsg introspection_pb.ClientSignal + if err := proto.Unmarshal(message, &clMsg); err != nil { + logger.Errorf("failed to read client message, err=%s", err) + ch.cancel() + return + } + + switch clMsg.Signal { + case introspection_pb.ClientSignal_SEND_DATA: + switch clMsg.DataSource { + case introspection_pb.ClientSignal_STATE: + select { + case ch.ws.sendDataCh <- &sendDataReq{ch, introspection_pb.ClientSignal_STATE}: + case <-ch.ctx.Done(): + logger.Errorf("context cancelled while waiting to submit state send request") + return + } + + case introspection_pb.ClientSignal_RUNTIME: + select { + case ch.ws.sendDataCh <- &sendDataReq{ch, introspection_pb.ClientSignal_RUNTIME}: + case <-ch.ctx.Done(): + logger.Errorf("context cancelled while waiting to submit runtime send request") + return + } + } + case introspection_pb.ClientSignal_PAUSE_PUSH_EMITTER: + select { + case ch.ws.connStateChangeReqCh <- &connStateChangeReq{ch, paused}: + case <-ch.ctx.Done(): + logger.Errorf("context cancelled while waiting to submit conn pause request") + return + } + case introspection_pb.ClientSignal_UNPAUSE_PUSH_EMITTER: + select { + case ch.ws.connStateChangeReqCh <- &connStateChangeReq{ch, running}: + case <-ch.ctx.Done(): + logger.Errorf("context cancelled while waiting to submit conn unpause request") + return + } + } + } + } + +} diff --git a/conn_handler_test.go b/conn_handler_test.go new file mode 100644 index 0000000..77d401a --- /dev/null +++ b/conn_handler_test.go @@ -0,0 +1,165 @@ +package introspector + +import ( + "fmt" + "testing" + "time" + + introspection_pb "github.com/libp2p/go-libp2p-core/introspection/pb" + + "github.com/gorilla/websocket" + "github.com/libp2p/go-eventbus" + "github.com/stretchr/testify/require" +) + +func TestConnHandlerSignalling(t *testing.T) { + // create a ws server + addr := "localhost:9999" + introspector := NewDefaultIntrospector() + config := &WsServerConfig{ + ListenAddrs: []string{addr}, + } + server, err := NewWsServer(introspector, config) + require.NoError(t, err) + + // start the server + require.NoError(t, server.Start(eventbus.NewBus())) + defer func() { + err := server.Close() + require.NoError(t, err) + }() + + // connect to it and get a conn + conn, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("ws://%s/introspect", addr), nil) + require.NoError(t, err) + defer conn.Close() + + // assert handler is running + var ch *connHandler + require.Eventually(t, func() bool { + var chlen int + es := paused + done := make(chan struct{}, 1) + + server.evalForTest <- func() { + for c, s := range server.connHandlerStates { + ch = c + es = s + } + chlen = len(server.connHandlerStates) + done <- struct{}{} + } + <-done + + return chlen == 1 && es == running + }, 10*time.Second, 1*time.Second) + + // send a pause message and assert state + cl := &introspection_pb.ClientSignal{Signal: introspection_pb.ClientSignal_PAUSE_PUSH_EMITTER} + sendMessage(t, cl, conn) + require.Eventually(t, func() bool { + es := running + done := make(chan struct{}, 1) + server.evalForTest <- func() { + es = server.connHandlerStates[ch] + done <- struct{}{} + } + <-done + + return es == paused + }, 10*time.Second, 1*time.Second) + + // send unpause and assert state + cl = &introspection_pb.ClientSignal{Signal: introspection_pb.ClientSignal_UNPAUSE_PUSH_EMITTER} + sendMessage(t, cl, conn) + require.Eventually(t, func() bool { + es := paused + done := make(chan struct{}, 1) + server.evalForTest <- func() { + es = server.connHandlerStates[ch] + done <- struct{}{} + } + <-done + + return es == running + }, 10*time.Second, 1*time.Second) + + // create one more connection and assert handler + conn2, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("ws://%s/introspect", addr), nil) + require.NoError(t, err) + defer conn2.Close() + + var ch2 *connHandler + require.Eventually(t, func() bool { + var chlen int + done := make(chan struct{}, 1) + es := paused + + server.evalForTest <- func() { + for c, s := range server.connHandlerStates { + chc := c + if chc != ch { + ch2 = chc + es = s + } + } + + chlen = len(server.connHandlerStates) + done <- struct{}{} + } + <-done + + return chlen == 2 && es == running + }, 10*time.Second, 1*time.Second) + + // changing state of ch2 does not change state for ch1 + cl = &introspection_pb.ClientSignal{Signal: introspection_pb.ClientSignal_PAUSE_PUSH_EMITTER} + sendMessage(t, cl, conn2) + require.Eventually(t, func() bool { + es1 := running + es2 := running + done := make(chan struct{}, 1) + server.evalForTest <- func() { + es1 = server.connHandlerStates[ch] + es2 = server.connHandlerStates[ch2] + done <- struct{}{} + } + <-done + + return es1 == running && es2 == paused + }, 10*time.Second, 1*time.Second) + + // test send runtime + // first drain the first two messages sent at startup + p1, err := fetchProtocolWrapper(t, conn2) + require.NoError(t, err) + require.NotNil(t, p1.GetRuntime()) + p2, err := fetchProtocolWrapper(t, conn2) + require.NoError(t, err) + require.NotNil(t, p2.GetState()) + + // now send a send_runtime + sendMessage(t, &introspection_pb.ClientSignal{Signal: introspection_pb.ClientSignal_SEND_DATA, DataSource: introspection_pb.ClientSignal_RUNTIME}, conn2) + var fetcherr error + require.Eventually(t, func() bool { + p1, err := fetchProtocolWrapper(t, conn2) + if err != nil { + fetcherr = err + return false + } + return p1.GetRuntime() != nil && p1.GetState() == nil + }, 10*time.Second, 1*time.Second) + require.NoError(t, fetcherr) + + // now send a send data + sendMessage(t, &introspection_pb.ClientSignal{Signal: introspection_pb.ClientSignal_SEND_DATA, DataSource: introspection_pb.ClientSignal_STATE}, conn2) + require.Eventually(t, func() bool { + p1, err := fetchProtocolWrapper(t, conn2) + if err != nil { + fetcherr = err + return false + } + return p1.GetState() != nil && p1.GetRuntime() == nil + }, 10*time.Second, 1*time.Second) + require.NoError(t, fetcherr) +} diff --git a/go.mod b/go.mod index f3641d2..1bdaecd 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,13 @@ go 1.13 require ( github.com/gogo/protobuf v1.3.1 github.com/gorilla/websocket v1.4.1 + github.com/hashicorp/go-multierror v1.1.0 github.com/imdario/mergo v0.3.8 github.com/ipfs/go-log v1.0.1 - github.com/libp2p/go-libp2p-core v0.3.1-0.20200210163958-6d6f8284b841 + github.com/libp2p/go-eventbus v0.1.1-0.20200417061519-63254f6c0da4 + github.com/libp2p/go-libp2p-core v0.5.2-0.20200424084047-616e011e7e9d + github.com/multiformats/go-multiaddr v0.2.1 + github.com/patrickmn/go-cache v2.1.0+incompatible github.com/stretchr/testify v1.4.0 + golang.org/x/net v0.0.0-20190620200207-3b0461eec859 ) diff --git a/go.sum b/go.sum index 7de3b99..7da05cd 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= +github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= +github.com/btcsuite/btcd v0.20.1-beta h1:Ik4hyJqN8Jfyv3S4AGBOmyouMsYE3EdYODkMbQjwPGw= github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= @@ -23,20 +25,36 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU= +github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI= +github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA= +github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/imdario/mergo v0.3.8 h1:CGgOkSJeqMRmt0D9XLWExdT4m4F1vd3FV3VPt+0VxkQ= github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= +github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= +github.com/ipfs/go-cid v0.0.4 h1:UlfXKrZx1DjZoBhQHmNHLC1fK1dUJDN20Y28A7s+gJ8= github.com/ipfs/go-cid v0.0.4/go.mod h1:4LLaPOQwmk5z9LBgQnpkivrx8BJjUyGwTXCd5Xfj6+M= +github.com/ipfs/go-cid v0.0.5 h1:o0Ix8e/ql7Zb5UVUJEUfjsWCIY8t48++9lR8qi6oiJU= +github.com/ipfs/go-cid v0.0.5/go.mod h1:plgt+Y5MnOey4vO4UlUazGqdbEXuFYitED67FexhXog= github.com/ipfs/go-log v1.0.1 h1:5lIEEOQTk/vd1WuPFBRqz2mcp+5G1fMVcW+Ib/H5Hfo= github.com/ipfs/go-log v1.0.1/go.mod h1:HuWlQttfN6FWNHRhlY5yMk/lW7evQC0HHGOxEwMRR8I= github.com/ipfs/go-log/v2 v2.0.1 h1:mnR9XFltezAtO8A6tj5U7nKkRzhEQNEw/wT11U2HhPM= github.com/ipfs/go-log/v2 v2.0.1/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0= github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= +github.com/jbenet/goprocess v0.1.3 h1:YKyIEECS/XvcfHtBzxtjBBbWK+MbvA6dG8ASiqwvr10= github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= +github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= +github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= @@ -51,40 +69,96 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ= +github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs= +github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM= +github.com/libp2p/go-eventbus v0.1.1-0.20200415071930-80de910bbe5e h1:6CsnSF+yGe0dxchs2jZX7RGdvnhuF9q7Mu5XdOWmUVI= +github.com/libp2p/go-eventbus v0.1.1-0.20200415071930-80de910bbe5e/go.mod h1:As+8aTYrf6s1JrCScK7U0rb25lLQImluhjbTLG6EBoc= +github.com/libp2p/go-eventbus v0.1.1-0.20200415160459-a324940d7088 h1:aCsq3/Ves5BqbxNybTW70OmSCgF7TUn15pXycjaHk4U= +github.com/libp2p/go-eventbus v0.1.1-0.20200415160459-a324940d7088/go.mod h1:6lTxKMU4/ggnyAgFJFN1PnMepsD4VC6eakw626cFocI= +github.com/libp2p/go-eventbus v0.1.1-0.20200416170853-60bdfbaef67b h1:cYwsEFu9D8lUNyWCgzq0RUDTr4iHlwJFcwXjHXus9qM= +github.com/libp2p/go-eventbus v0.1.1-0.20200416170853-60bdfbaef67b/go.mod h1:Hqak/t7SFu+kL4dS4NBJx2uLqZSHJRApP2KPEglh4vM= +github.com/libp2p/go-eventbus v0.1.1-0.20200417061519-63254f6c0da4 h1:m6ax5Jpc3XCVSs5sPRKsNqI7Zo03dz7u6ndUOFiNqnA= +github.com/libp2p/go-eventbus v0.1.1-0.20200417061519-63254f6c0da4/go.mod h1:6I+OKIbCq7cIh7X7wDxGGRcEYnsTJfcqg2i0HEIetqc= +github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8= github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs= +github.com/libp2p/go-libp2p-core v0.2.0/go.mod h1:X0eyB0Gy93v0DZtSYbEM7RnMChm9Uv3j7yRXjO77xSI= github.com/libp2p/go-libp2p-core v0.3.1-0.20200210163958-6d6f8284b841 h1:NtLkfbsEXfR1mKM901b34Pw4VpEKQmCrtV7oQE6rIPQ= github.com/libp2p/go-libp2p-core v0.3.1-0.20200210163958-6d6f8284b841/go.mod h1:thvWy0hvaSBhnVBaW37BvzgVV68OUhgJJLAa6almrII= +github.com/libp2p/go-libp2p-core v0.3.2-0.20200414114438-d47936597d5b h1:/q9bdQKGXGhjQolx/d3N0mlU4m47W7fyo7Vsc9qwbyU= +github.com/libp2p/go-libp2p-core v0.3.2-0.20200414114438-d47936597d5b/go.mod h1:thvWy0hvaSBhnVBaW37BvzgVV68OUhgJJLAa6almrII= +github.com/libp2p/go-libp2p-core v0.3.2-0.20200415062127-fcc117eae71a h1:ijScFHS86L0LYjaagrxxinbsd1HIniHSKB2vFobmJ5s= +github.com/libp2p/go-libp2p-core v0.3.2-0.20200415062127-fcc117eae71a/go.mod h1:thvWy0hvaSBhnVBaW37BvzgVV68OUhgJJLAa6almrII= +github.com/libp2p/go-libp2p-core v0.3.2-0.20200415154218-9126769533e2/go.mod h1:thvWy0hvaSBhnVBaW37BvzgVV68OUhgJJLAa6almrII= +github.com/libp2p/go-libp2p-core v0.3.2-0.20200416170247-0a7a9856715c h1:IYaFZg3eDAGtXS32hN8qZ/QFbal9+ASqZuNOnyRkV3g= +github.com/libp2p/go-libp2p-core v0.3.2-0.20200416170247-0a7a9856715c/go.mod h1:thvWy0hvaSBhnVBaW37BvzgVV68OUhgJJLAa6almrII= +github.com/libp2p/go-libp2p-core v0.3.2-0.20200416180009-086efa616a18 h1:ZNi+KgvFvYwc2MY4tQeVu49Wjk3or/ahnKDURZGl2V4= +github.com/libp2p/go-libp2p-core v0.3.2-0.20200416180009-086efa616a18/go.mod h1:thvWy0hvaSBhnVBaW37BvzgVV68OUhgJJLAa6almrII= +github.com/libp2p/go-libp2p-core v0.5.2-0.20200417060929-6957bf8a421d h1:ig8zIKt/shtRBOZqaEsJ/GX+LEpxqJ3CfIk0dKciZXI= +github.com/libp2p/go-libp2p-core v0.5.2-0.20200417060929-6957bf8a421d/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y= +github.com/libp2p/go-libp2p-core v0.5.2-0.20200417081628-b224967ed43f h1:gOu3iEE2lxXDOucfRkn6dx0S8T9zpYAedSZuO0f81Jc= +github.com/libp2p/go-libp2p-core v0.5.2-0.20200417081628-b224967ed43f/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y= +github.com/libp2p/go-libp2p-core v0.5.2-0.20200424084047-616e011e7e9d h1:Iwq/F910sPQYfKl8npFFoSxKSzVBNTJyPVTLTcRQX6U= +github.com/libp2p/go-libp2p-core v0.5.2-0.20200424084047-616e011e7e9d/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y= +github.com/libp2p/go-libp2p-testing v0.1.1/go.mod h1:xaZWMJrPUM5GlDBxCeGUi7kI4eqnjVyavGroI2nxEM0= +github.com/libp2p/go-msgio v0.0.4/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ= +github.com/libp2p/go-openssl v0.0.4 h1:d27YZvLoTyMhIN4njrkr8zMDOM4lfpHIp6A+TK9fovg= github.com/libp2p/go-openssl v0.0.4/go.mod h1:unDrJpgy3oFr+rqXsarWifmJuNnJR4chtO1HmaZjggc= +github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329 h1:2gxZ0XQIU/5z3Z3bUBu+FXuk2pFbkN6tcwi/pjyaDic= github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= +github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ= +github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U= +github.com/minio/sha256-simd v0.0.0-20190328051042-05b4dd3047e5/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U= +github.com/minio/sha256-simd v0.1.0/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U= github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= +github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU= github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= +github.com/mr-tron/base58 v1.1.3 h1:v+sk57XuaCKGXpWtVBX8YJzO7hMGx4Aajh4TQbdEFdc= github.com/mr-tron/base58 v1.1.3/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= +github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI= github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA= +github.com/multiformats/go-multiaddr v0.0.4/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44= +github.com/multiformats/go-multiaddr v0.2.0 h1:lR52sFwcTCuQb6bTfnXF6zA2XfyYvyd+5a9qECv/J90= github.com/multiformats/go-multiaddr v0.2.0/go.mod h1:0nO36NvPpyV4QzvTLi/lafl2y95ncPj0vFwVF6k6wJ4= +github.com/multiformats/go-multiaddr v0.2.1 h1:SgG/cw5vqyB5QQe5FPe2TqggU9WtrA9X4nZw7LlVqOI= +github.com/multiformats/go-multiaddr v0.2.1/go.mod h1:s/Apk6IyxfvMjDafnhJgJ3/46z7tZ04iMk5wP4QMGGE= +github.com/multiformats/go-multibase v0.0.1 h1:PN9/v21eLywrFWdFNsFKaU04kLJzuYzmrJR+ubhT9qA= github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs= +github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U= +github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po= github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew= github.com/multiformats/go-multihash v0.0.10/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew= +github.com/multiformats/go-multihash v0.0.13 h1:06x+mk/zj1FoMsgNejLpy6QTvJqlSt/BhLEy87zidlc= github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc= github.com/multiformats/go-varint v0.0.1/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= +github.com/multiformats/go-varint v0.0.2/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= +github.com/multiformats/go-varint v0.0.5 h1:XVZwSo04Cs3j/jS0uAEPpT3JY6DzMcVLLoWOSnCxOjg= github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/patrickmn/go-cache v1.0.0 h1:3gD5McaYs9CxjyK5AXGcq8gdeCARtd/9gJDUvVeaZ0Y= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/smola/gocompat v0.2.0 h1:6b1oIMlUXIpz//VKEDzPVBK8KG7beVwmHIUEBIs/Pns= github.com/smola/gocompat v0.2.0/go.mod h1:1B0MlxbmoZNo3h8guHp8HztB3BSYR5itql9qtVc0ypY= +github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a/go.mod h1:7AyxJNCJ7SBZ1MfVQCWD6Uqo2oubI2Eq2y2eqf+A5r0= +github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+1+HkWaX/Yh71Ee5ZHaHYt7ZP4sQgUrm6cDU= github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/src-d/envconfig v1.0.0/go.mod h1:Q9YQZ7BKITldTBnoxsE5gOeB5y66RyPXeue/R4aaNBc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -93,7 +167,9 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= +go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= @@ -102,8 +178,13 @@ go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8 h1:1wopBVtVdWnn03fZelqdXTqk7U7zPQCb+T4rbU9ZEoU= golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443 h1:IcSOAf4PyMp3U3XbIEj1/xJ2BjNN2jWv7JoyOsMxXUU= +golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -114,6 +195,7 @@ golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -123,8 +205,10 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190219092855-153ac476189d/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb h1:fgwFCsaw9buMuxNd6+DQfAuSFqbNiQZpcgJQAgJsK6k= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -134,10 +218,12 @@ golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20181130052023-1c3d964395ce/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd h1:/e+gpKk9r3dJobndpTytxS2gOy6m5uvpg+ISQoEcusQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= diff --git a/introspector.go b/introspector.go index 044acad..0b6de46 100644 --- a/introspector.go +++ b/introspector.go @@ -8,7 +8,6 @@ import ( "github.com/libp2p/go-libp2p-core/introspection" introspection_pb "github.com/libp2p/go-libp2p-core/introspection/pb" - "github.com/gogo/protobuf/types" "github.com/imdario/mergo" ) @@ -19,10 +18,12 @@ var _ introspection.Introspector = (*DefaultIntrospector)(nil) type DefaultIntrospector struct { treeMu sync.RWMutex tree *introspection.DataProviders + + snapshotStartTime time.Time } func NewDefaultIntrospector() *DefaultIntrospector { - return &DefaultIntrospector{tree: &introspection.DataProviders{}} + return &DefaultIntrospector{tree: &introspection.DataProviders{}, snapshotStartTime: time.Now()} } func (d *DefaultIntrospector) RegisterDataProviders(provs *introspection.DataProviders) error { @@ -36,36 +37,36 @@ func (d *DefaultIntrospector) RegisterDataProviders(provs *introspection.DataPro return nil } +func (d *DefaultIntrospector) FetchRuntime() (*introspection_pb.Runtime, error) { + var err error + r := &introspection_pb.Runtime{} + if d.tree.Runtime != nil { + if r, err = d.tree.Runtime(); err != nil { + return nil, fmt.Errorf("failed to fetch runtime info, err=%s", err) + } + } + return r, err +} + func (d *DefaultIntrospector) FetchFullState() (*introspection_pb.State, error) { d.treeMu.RLock() defer d.treeMu.RUnlock() s := &introspection_pb.State{} + // timestamps + s.StartTs = timeToUnixMillis(d.snapshotStartTime) + s.InstantTs = timeToUnixMillis(time.Now()) + d.snapshotStartTime = time.Now() + // subsystems s.Subsystems = &introspection_pb.Subsystems{} - // version - s.Version = &introspection_pb.Version{Number: introspection.ProtoVersion} - - // runtime - if d.tree.Runtime != nil { - r, err := d.tree.Runtime() - if err != nil { - return nil, fmt.Errorf("failed to fetch runtime info: %w", err) - } - s.Runtime = r - } - - // timestamps - s.InstantTs = &types.Timestamp{Seconds: time.Now().Unix()} - // TODO Figure out the other two timestamp fields - // connections if d.tree.Connection != nil { conns, err := d.tree.Connection(introspection.ConnectionQueryParams{Output: introspection.QueryOutputFull}) if err != nil { - return nil, fmt.Errorf("failed to fetch connections: %w", err) + return nil, fmt.Errorf("failed to fetch connections: %wsvc", err) } // resolve streams on connection if d.tree.Stream != nil { @@ -75,9 +76,9 @@ func (d *DefaultIntrospector) FetchFullState() (*introspection_pb.State, error) sids = append(sids, introspection.StreamID(s)) } - sl, err := d.tree.Stream(introspection.StreamQueryParams{introspection.QueryOutputFull, sids}) + sl, err := d.tree.Stream(introspection.StreamQueryParams{Output: introspection.QueryOutputFull, Include: sids}) if err != nil { - return nil, fmt.Errorf("failed to fetch streams for connection: %w", err) + return nil, fmt.Errorf("failed to fetch streams for connection: %wsvc", err) } c.Streams = sl } @@ -89,10 +90,14 @@ func (d *DefaultIntrospector) FetchFullState() (*introspection_pb.State, error) if d.tree.Traffic != nil { tr, err := d.tree.Traffic() if err != nil { - return nil, fmt.Errorf("failed to fetch traffic: %w", err) + return nil, fmt.Errorf("failed to fetch traffic: %wsvc", err) } s.Traffic = tr } return s, nil } + +func timeToUnixMillis(t time.Time) uint64 { + return uint64(t.UnixNano() / 1000000) +} diff --git a/ws_server.go b/ws_server.go index 9dddc7c..0cdbe0b 100644 --- a/ws_server.go +++ b/ws_server.go @@ -1,31 +1,97 @@ package introspector import ( + "context" + "encoding/binary" + "encoding/json" "errors" "fmt" + "hash/fnv" "net" "net/http" + "reflect" "sync" "time" + "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/introspection" - - logging "github.com/ipfs/go-log" + introspection_pb "github.com/libp2p/go-libp2p-core/introspection/pb" + "github.com/libp2p/go-libp2p-core/peer" "github.com/gogo/protobuf/proto" "github.com/gorilla/websocket" + logging "github.com/ipfs/go-log" + "github.com/libp2p/go-eventbus" + "github.com/multiformats/go-multiaddr" +) + +var ( + jsType = reflect.TypeOf(new(event.JSString)).Elem() + peerIdType = reflect.TypeOf(new(peer.ID)).Elem() + timeType = reflect.TypeOf(new(time.Time)).Elem() + maddrType = reflect.TypeOf(new(multiaddr.Multiaddr)).Elem() ) var logger = logging.Logger("introspection-server") var upgrader = websocket.Upgrader{} -type WsServer struct { - sync.RWMutex +const ( + writeDeadline = 10 * time.Second + stateMsgPeriod = 2 * time.Second + keepStaleData = 120 * time.Second + + maxClientOutgoingBufferSize = 64 +) + +type newConnReq struct { + conn *websocket.Conn + done chan struct{} +} + +type closeConnReq struct { + ch *connHandler + done chan struct{} +} + +type getEventTypesReq struct { + resp chan []*introspection_pb.EventType +} + +type connStateChangeReq struct { + ch *connHandler + newState connectionState +} - config *WsServerConfig +type sendDataReq struct { + ch *connHandler + dataType introspection_pb.ClientSignal_DataSource +} + +type WsServer struct { + // state initialized by constructor + ctx context.Context + cancel context.CancelFunc + closeWg sync.WaitGroup + introspector introspection.Introspector + config *WsServerConfig + server *http.Server + eventSub event.Subscription + + // state managed in the eventloop + newConnCh chan *newConnReq + closeConnCh chan *closeConnReq + connHandlerStates map[*connHandler]connectionState + connStateChangeReqCh chan *connStateChangeReq + sendDataCh chan *sendDataReq + + knownEvtProps map[reflect.Type]introspection_pb.EventType + + evalForTest chan func() + + // state managed by locking + lk sync.RWMutex listeners []net.Listener - server *http.Server - closeWg sync.WaitGroup + isClosed bool } var _ introspection.Endpoint = (*WsServer)(nil) @@ -45,21 +111,36 @@ func WsServerWithConfig(config *WsServerConfig) func(i introspection.Introspecto // NewWsServer creates a WebSockets server to serve introspection data. func NewWsServer(introspector introspection.Introspector, config *WsServerConfig) (*WsServer, error) { + if introspector == nil || config == nil { + return nil, errors.New("none of introspector, event-bus OR config can be nil") + } mux := http.NewServeMux() - // introspection handler - mux.HandleFunc("/introspect", wsUpgrader(introspector)) srv := &WsServer{ server: &http.Server{Handler: mux}, config: config, + + introspector: introspector, + + connHandlerStates: make(map[*connHandler]connectionState), + newConnCh: make(chan *newConnReq), + closeConnCh: make(chan *closeConnReq), + connStateChangeReqCh: make(chan *connStateChangeReq), + sendDataCh: make(chan *sendDataReq), + knownEvtProps: make(map[reflect.Type]introspection_pb.EventType), + evalForTest: make(chan func()), } + srv.ctx, srv.cancel = context.WithCancel(context.Background()) + + // register introspection handler + mux.HandleFunc("/introspect", srv.wsUpgrader()) return srv, nil } // Start starts this WS server. -func (s *WsServer) Start() error { - s.Lock() - defer s.Unlock() +func (s *WsServer) Start(bus event.Bus) error { + s.lk.Lock() + defer s.lk.Unlock() if len(s.listeners) > 0 { return errors.New("failed to start WS server: already started") @@ -69,51 +150,71 @@ func (s *WsServer) Start() error { return errors.New("failed to start WS server: no listen addresses supplied") } + sub, err := bus.Subscribe(event.WildcardSubscriptionType, eventbus.BufSize(256)) + if err != nil { + return fmt.Errorf("failed to susbcribe for events with WildcardSubscriptionType") + } + s.eventSub = sub + logger.Infof("WS introspection server starting, listening on %s", s.config.ListenAddrs) for _, addr := range s.config.ListenAddrs { l, err := net.Listen("tcp", addr) if err != nil { - return fmt.Errorf("failed to start WS server: %w", err) + return fmt.Errorf("failed to start WS server: %wsvc", err) } s.closeWg.Add(1) go func() { + defer s.closeWg.Done() if err := s.server.Serve(l); err != http.ErrServerClosed { logger.Errorf("failed to start WS server, err: %s", err) } - s.closeWg.Done() }() s.listeners = append(s.listeners, l) } + // start the periodic event outgoingLoop + s.closeWg.Add(1) + go s.eventLoop() + return nil } // Close closes a WS introspection server. func (s *WsServer) Close() error { - s.Lock() - defer s.Unlock() + s.lk.Lock() + defer s.lk.Unlock() - if len(s.listeners) == 0 { - // nothing to do. + if s.isClosed { return nil } - // Close the server, which in turn closes all listenerse. + // Close the server, which in turn closes all listeners. if err := s.server.Close(); err != nil { return err } + // cancel the context and wait for all go-routines to shut down + s.cancel() + s.closeWg.Wait() + + // close our subscription to the events + if err := s.eventSub.Close(); err != nil { + logger.Errorf("error while trying to close eventbus subscription, err=%s", err) + } + s.listeners = nil + s.connHandlerStates = nil + s.isClosed = true return nil } // ListenAddrs returns the actual listen addresses of this server. func (s *WsServer) ListenAddrs() []string { - s.RLock() - defer s.RUnlock() + s.lk.RLock() + defer s.lk.RUnlock() res := make([]string, 0, len(s.listeners)) for _, l := range s.listeners { @@ -122,7 +223,7 @@ func (s *WsServer) ListenAddrs() []string { return res } -func wsUpgrader(introspector introspection.Introspector) http.HandlerFunc { +func (s *WsServer) wsUpgrader() http.HandlerFunc { return func(w http.ResponseWriter, rq *http.Request) { upgrader.CheckOrigin = func(rq *http.Request) bool { return true } wsConn, err := upgrader.Upgrade(w, rq, nil) @@ -130,42 +231,328 @@ func wsUpgrader(introspector introspection.Introspector) http.HandlerFunc { logger.Errorf("upgrade to websocket failed, err: %s", err) return } - defer wsConn.Close() - - for { - // wait for client to ask for the state - mt, message, err := wsConn.ReadMessage() - switch err.(type) { - case nil: - case *websocket.CloseError: - logger.Warnf("connection closed: %s", err) - return - default: - logger.Errorf("failed to read message from ws connection, err: %s", err) - return - } - logger.Debugf("received message from ws connection, type: %d. recv: %s", mt, message) + done := make(chan struct{}, 1) + select { + case s.newConnCh <- &newConnReq{wsConn, done}: + case <-s.ctx.Done(): + wsConn.Close() + return + } + + select { + case <-done: + case <-s.ctx.Done(): + wsConn.Close() + return + } + } +} + +func (s *WsServer) eventLoop() { + var stateMessageTimer *time.Timer + var stateTimerCh <-chan time.Time + stMsgTmrRunning := false + + eventSubCh := s.eventSub.Out() - // fetch the current state & marshal to bytes - state, err := introspector.FetchFullState() + defer func() { + if stMsgTmrRunning && !stateMessageTimer.Stop() { + <-stateMessageTimer.C + } + s.closeWg.Done() + }() + + for { + select { + case <-stateTimerCh: + stateBz, err := s.fetchStateBinary() if err != nil { - logger.Errorf("failed to fetch current state in introspector, err: %s", err) - return + logger.Errorf("failed to fetch state bytes for periodic push, err=%s", err) + continue } + s.broadcast(stateBz) + stateMessageTimer.Reset(stateMsgPeriod) - bz, err := proto.Marshal(state) - if err != nil { - logger.Errorf("failed to marshal introspector state, err: %s", err) - return + case req := <-s.connStateChangeReqCh: + currState := s.connHandlerStates[req.ch] + s.connHandlerStates[req.ch] = req.newState + + if currState == paused && req.newState == running { + s.sendRuntimeMessage(req.ch) + s.sendStateMessage(req.ch) + } + + case sendDataReq := <-s.sendDataCh: + switch sendDataReq.dataType { + case introspection_pb.ClientSignal_STATE: + s.sendStateMessage(sendDataReq.ch) + case introspection_pb.ClientSignal_RUNTIME: + s.sendRuntimeMessage(sendDataReq.ch) + } + + case newConnReq := <-s.newConnCh: + handler := newConnHandler(s.ctx, s, newConnReq.conn) + s.connHandlerStates[handler] = running + // if this was the first connection, start the periodic state push + if len(s.connHandlerStates) == 1 { + stateMessageTimer = time.NewTimer(stateMsgPeriod) + stateTimerCh = stateMessageTimer.C + stMsgTmrRunning = true } - // send the response - wsConn.SetWriteDeadline(time.Now().Add(10 * time.Second)) - if err = wsConn.WriteMessage(websocket.BinaryMessage, bz); err != nil { - logger.Errorf("failed to write response to ws connection, err: %s", err) - return + // schedule runtime followed by state message on the connection + s.sendRuntimeMessage(handler) + s.sendStateMessage(handler) + + s.closeWg.Add(1) + go func() { + defer s.closeWg.Done() + handler.run() + select { + case s.closeConnCh <- &closeConnReq{handler, newConnReq.done}: + case <-s.ctx.Done(): + return + } + }() + + case closeReq := <-s.closeConnCh: + closeReq.ch.conn.Close() + delete(s.connHandlerStates, closeReq.ch) + + // shut down the periodic state push + if len(s.connHandlerStates) == 0 { + // stop periodic state push + if stMsgTmrRunning && !stateMessageTimer.Stop() { + <-stateMessageTimer.C + } + stMsgTmrRunning = false + stateTimerCh = nil + stateMessageTimer = nil + } + + closeReq.done <- struct{}{} + + case evt, more := <-eventSubCh: + if !more { + eventSubCh = nil + continue + } + if len(s.connHandlerStates) == 0 { + continue + } + + // will only send the name property if we've seen the event before, otherwise + // will send props for all fields + if err := s.broadcastEvent(evt); err != nil { + logger.Errorf("error while broadcasting event, err=%s", err) + } + + case fnc := <-s.evalForTest: + fnc() + + case <-s.ctx.Done(): + return + } + } + +} + +func (s *WsServer) sendMessageToConn(ch *connHandler, msg []byte) { + select { + case ch.outgoingChan <- msg: + default: + logger.Warnf("dropping outgoing message to %s as buffer is full", ch.conn.RemoteAddr()) + } +} + +func (s *WsServer) sendRuntimeMessage(ch *connHandler) { + rt, err := s.fetchRuntimeBinary() + if err != nil { + logger.Errorf("failed to fetch runtime message, err=%s", err) + } else { + s.sendMessageToConn(ch, rt) + } +} + +func (s *WsServer) sendStateMessage(ch *connHandler) { + st, err := s.fetchStateBinary() + if err != nil { + logger.Errorf("failed to fetch full state, err=%s", err) + } else { + s.sendMessageToConn(ch, st) + } +} + +func (s *WsServer) broadcastEvent(evt interface{}) error { + js, err := json.Marshal(evt) + if err != nil { + return fmt.Errorf("failed to marshal event to json, err=%s", err) + } + + key := reflect.TypeOf(evt) + var evtType *introspection_pb.EventType + _, ok := s.knownEvtProps[key] + if ok { + // just send the name if we've already seen the event before + evtType = &introspection_pb.EventType{Name: s.knownEvtProps[key].Name} + } else { + // compute props, cache and send + evtType, err = getEventProperties(evt) + if err != nil { + return fmt.Errorf("failed to get properties of event, err=%s", err) + } + s.knownEvtProps[key] = *evtType + } + + evtMessage := &introspection_pb.ProtocolDataPacket{ + Version: introspection.ProtoVersionPb, + Message: &introspection_pb.ProtocolDataPacket_Event{ + Event: &introspection_pb.Event{ + Type: evtType, + Ts: timeToUnixMillis(time.Now()), + Content: string(js), + }, + }, + } + + bz, err := proto.Marshal(evtMessage) + if err != nil { + return fmt.Errorf("failed to marshal event proto, err=%s", err) + } + + cbz, err := checkSumedMessageForClient(bz) + if err != nil { + return fmt.Errorf("failed to generate checksumed event message, err=%s", err) + } + + s.broadcast(cbz) + return nil +} + +func (s *WsServer) broadcast(msg []byte) { + for c, _ := range s.connHandlerStates { + ch := c + if s.connHandlerStates[ch] != running { + continue + } + + s.sendMessageToConn(ch, msg) + } +} + +func (s *WsServer) fetchStateBinary() ([]byte, error) { + st, err := s.introspector.FetchFullState() + if err != nil { + return nil, fmt.Errorf("failed to fetch state, err=%s", err) + } + + stMsg := &introspection_pb.ProtocolDataPacket{ + Version: introspection.ProtoVersionPb, + Message: &introspection_pb.ProtocolDataPacket_State{State: st}, + } + + bz, err := proto.Marshal(stMsg) + if err != nil { + return nil, fmt.Errorf("failed to marshal state proto to bianry, err=%s", err) + } + return checkSumedMessageForClient(bz) +} + +func (s *WsServer) fetchRuntimeBinary() ([]byte, error) { + rt, err := s.introspector.FetchRuntime() + if err != nil { + return nil, fmt.Errorf("failed to fetch runtime mesage, err=%s", err) + } + rt.SendStateIntervalMs = uint32(stateMsgPeriod.Milliseconds()) + rt.KeepStaleDataMs = uint32(keepStaleData.Milliseconds()) + + evtProps := make([]*introspection_pb.EventType, 0, len(s.knownEvtProps)) + for k := range s.knownEvtProps { + v := s.knownEvtProps[k] + evtProps = append(evtProps, &v) + } + rt.EventTypes = evtProps + + rtMsg := &introspection_pb.ProtocolDataPacket{ + Version: introspection.ProtoVersionPb, + Message: &introspection_pb.ProtocolDataPacket_Runtime{Runtime: rt}, + } + + bz, err := proto.Marshal(rtMsg) + if err != nil { + return nil, fmt.Errorf("failed to marshal runtime proto to binary, err=%s", err) + } + return checkSumedMessageForClient(bz) +} + +func checkSumedMessageForClient(protoMsg []byte) ([]byte, error) { + bz := make([]byte, 12+len(protoMsg)) + + f := fnv.New32a() + n, err := f.Write(protoMsg) + if err != nil { + return nil, fmt.Errorf("failed creating fnc hash digest, err=%s", err) + } + if n != len(protoMsg) { + return nil, fmt.Errorf("failed to write all bytes of the proto message to the hash digest") + } + + binary.LittleEndian.PutUint32(bz[0:4], introspection.ProtoVersion) + binary.LittleEndian.PutUint32(bz[4:8], f.Sum32()) + binary.LittleEndian.PutUint32(bz[8:12], uint32(len(protoMsg))) + copy(bz[12:], protoMsg) + + return bz, nil +} + +func getEventProperties(evt interface{}) (*introspection_pb.EventType, error) { + eventType := reflect.TypeOf(evt) + + if eventType.Kind() != reflect.Struct { + return nil, errors.New("event type must be a struct") + } + + re := &introspection_pb.EventType{} + re.Name = eventType.Name() + re.PropertyTypes = make([]*introspection_pb.EventType_EventProperty, 0, eventType.NumField()) + + for i := 0; i < eventType.NumField(); i++ { + prop := &introspection_pb.EventType_EventProperty{} + fld := eventType.Field(i) + prop.Name = fld.Name + + fldType := fld.Type + + if fldType.Kind() == reflect.Array || fldType.Kind() == reflect.Slice { + prop.HasMultiple = true + fldType = fld.Type.Elem() + } + + switch fldType { + case jsType: + prop.Type = introspection_pb.EventType_EventProperty_JSON + case peerIdType: + prop.Type = introspection_pb.EventType_EventProperty_PEERID + case maddrType: + prop.Type = introspection_pb.EventType_EventProperty_MULTIADDR + case timeType: + prop.Type = introspection_pb.EventType_EventProperty_TIME + default: + switch fldType.Kind() { + case reflect.String: + prop.Type = introspection_pb.EventType_EventProperty_STRING + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, + reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, + reflect.Uint64, reflect.Float32, reflect.Float64: + prop.Type = introspection_pb.EventType_EventProperty_NUMBER + default: + prop.Type = introspection_pb.EventType_EventProperty_JSON } } + + re.PropertyTypes = append(re.PropertyTypes, prop) } + + return re, nil } diff --git a/ws_server_test.go b/ws_server_test.go index 65f9011..a7fc2c1 100644 --- a/ws_server_test.go +++ b/ws_server_test.go @@ -1,15 +1,49 @@ package introspector import ( + "encoding/binary" + "encoding/hex" + "encoding/json" + "errors" "fmt" + "hash/fnv" "net" + "reflect" "strconv" + "sync" "testing" + "time" + "golang.org/x/net/context" + + "github.com/libp2p/go-libp2p-core/event" + "github.com/libp2p/go-libp2p-core/introspection" + introspection_pb "github.com/libp2p/go-libp2p-core/introspection/pb" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" + "github.com/libp2p/go-libp2p-core/test" + + "github.com/gogo/protobuf/proto" "github.com/gorilla/websocket" + "github.com/libp2p/go-eventbus" + "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" ) +type TestEvent1 struct { + a int + b int +} + +type TestEvent2 struct { + m int +} + +type TestEvent3 struct { + a string + b int +} + func TestIntrospectionServer(t *testing.T) { require := require.New(t) introspector := NewDefaultIntrospector() @@ -24,11 +58,11 @@ func TestIntrospectionServer(t *testing.T) { t.Fatalf("failed to construct ws server: %s", err) } - if err := server.Start(); err != nil { + if err := server.Start(eventbus.NewBus()); err != nil { t.Fatalf("failed to start ws server: %s", err) } - if err := server.Start(); err == nil { + if err := server.Start(eventbus.NewBus()); err == nil { t.Fatalf("expected to fail when starting server twice") } @@ -55,8 +89,7 @@ func TestIntrospectionServer(t *testing.T) { require.Equal(conn.RemoteAddr().String(), addr) - err = conn.WriteMessage(websocket.BinaryMessage, []byte("foo")) - require.NoError(err) + sendMessage(t, &introspection_pb.ClientSignal{}, conn) _, _, err = conn.ReadMessage() require.NoError(err) @@ -67,3 +100,623 @@ func TestIntrospectionServer(t *testing.T) { t.Run("single address", test([]string{"localhost:0"})) t.Run("multiple address", test([]string{"localhost:0", "localhost:9999"})) } + +func TestBroadcast(t *testing.T) { + nConns := 3 + addr := "localhost:9999" + // create a ws server + + introspector := NewDefaultIntrospector() + config := &WsServerConfig{ + ListenAddrs: []string{addr}, + } + server, err := NewWsServer(introspector, config) + require.NoError(t, err) + + // start the server + require.NoError(t, server.Start(eventbus.NewBus())) + defer func() { + err := server.Close() + require.NoError(t, err) + }() + + var conns []*websocket.Conn + + for i := 0; i < nConns; i++ { + conn, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("ws://%s/introspect", addr), nil) + require.NoError(t, err) + defer conn.Close() + conns = append(conns, conn) + } + + // assert conn handlers + require.Eventually(t, func() bool { + return getNConns(server) == nConns + }, 10*time.Second, 1*time.Second) + + // should get runtime followed by state message on all 3 conns + for i := 0; i < nConns; i++ { + pd1, err := fetchProtocolWrapper(t, conns[i]) + require.NoError(t, err) + require.NotNil(t, pd1.GetRuntime()) + require.Nil(t, pd1.GetState()) + + pd2, err := fetchProtocolWrapper(t, conns[i]) + require.NoError(t, err) + require.NotNil(t, pd2.GetState()) + require.Nil(t, pd2.GetRuntime()) + } + + // get periodic state messages on all connections -> atleast 3 + doneCh := make(chan struct{}, nConns) + + var lk sync.Mutex + var fetchError error + + for i := 0; i < nConns; i++ { + go func(i int) { + count := 0 + for { + pd1, err := fetchProtocolWrapper(t, conns[i]) + if err != nil { + lk.Lock() + fetchError = err + lk.Unlock() + return + } + if pd1.GetState() != nil && pd1.GetRuntime() == nil { + count++ + } + if count == 3 { + doneCh <- struct{}{} + return + } + } + }(i) + } + + // should get at least 3 state broadcasts on each connection in 12 seconds. + ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second) + defer cancel() + + count := 0 + select { + case <-ctx.Done(): + t.Fatal("failed to get atleast 3 state broadcasts on all three connections") + case <-doneCh: + lk.Lock() + require.NoError(t, fetchError) + lk.Unlock() + count++ + if count == nConns { + return + } + } +} + +func TestEventsBroadcast(t *testing.T) { + bus := eventbus.NewBus() + + e1, err := bus.Emitter(new(event.EvtPeerProtocolsUpdated)) + require.NoError(t, err) + + e2, err := bus.Emitter(new(event.EvtPeerIdentificationCompleted)) + require.NoError(t, err) + + nConns := 3 + addr := "localhost:9999" + // create a ws server + introspector := NewDefaultIntrospector() + config := &WsServerConfig{ + ListenAddrs: []string{addr}, + } + server, err := NewWsServer(introspector, config) + require.NoError(t, err) + + // start the server + require.NoError(t, server.Start(bus)) + defer func() { + err := server.Close() + require.NoError(t, err) + }() + + // Test events broadcast + var conns []*websocket.Conn + + for i := 0; i < nConns; i++ { + conn, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("ws://%s/introspect", addr), nil) + require.NoError(t, err) + defer conn.Close() + conns = append(conns, conn) + } + + // assert conn handlers + require.Eventually(t, func() bool { + return getNConns(server) == nConns + }, 10*time.Second, 1*time.Second) + + // drain initial runtime & state messages from them + for i := 0; i < nConns; i++ { + pd1, err := fetchProtocolWrapper(t, conns[i]) + require.NoError(t, err) + require.NotNil(t, pd1.GetRuntime()) + require.Nil(t, pd1.GetState()) + + pd2, err := fetchProtocolWrapper(t, conns[i]) + require.NoError(t, err) + require.NotNil(t, pd2.GetState()) + require.Nil(t, pd2.GetRuntime()) + } + + // emit two event and see them on all handlers + pid := test.RandPeerIDFatal(t) + ev1 := event.EvtPeerProtocolsUpdated{ + Peer: pid, + Added: []protocol.ID{"P1"}, + Removed: []protocol.ID{"P2"}, + } + ev2 := event.EvtPeerIdentificationCompleted{Peer: pid} + require.NoError(t, e1.Emit(ev1)) + require.NoError(t, e2.Emit(ev2)) + + var lk sync.Mutex + var fetchError error + doneCh := make(chan struct{}, nConns) + + for i := 0; i < nConns; i++ { + go func(i int) { + gotev1 := false + gotev2 := false + for { + pd1, err := fetchProtocolWrapper(t, conns[i]) + if err != nil { + lk.Lock() + fetchError = err + lk.Unlock() + return + } + if pd1.GetState() == nil && pd1.GetRuntime() == nil && pd1.GetEvent() != nil { + ev := pd1.GetEvent() + switch ev.Type.Name { + case reflect.TypeOf(new(event.EvtPeerProtocolsUpdated)).Elem().Name(): + gotev1 = true + + evt := &event.EvtPeerProtocolsUpdated{} + require.NoError(t, json.Unmarshal([]byte(ev.Content), evt)) + require.Equal(t, ev1, *evt) + + case reflect.TypeOf(new(event.EvtPeerIdentificationCompleted)).Elem().Name(): + gotev2 = true + evt := &event.EvtPeerIdentificationCompleted{} + require.NoError(t, json.Unmarshal([]byte(ev.Content), evt)) + require.Equal(t, ev2, *evt) + default: + lk.Lock() + fetchError = errors.New("invalid event type") + lk.Unlock() + return + } + + } + if gotev1 && gotev2 { + doneCh <- struct{}{} + return + } + } + }(i) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + count := 0 + select { + case <-ctx.Done(): + t.Fatal("failed to get events") + case <-doneCh: + lk.Lock() + require.NoError(t, fetchError) + lk.Unlock() + count++ + if count == nConns { + return + } + } + +} + +func TestRuntimeAndEvent(t *testing.T) { + bus := eventbus.NewBus() + em1, err := bus.Emitter(new(TestEvent1)) + require.NoError(t, err) + + addr := "localhost:9999" + // create a ws server + introspector := NewDefaultIntrospector() + config := &WsServerConfig{ + ListenAddrs: []string{addr}, + } + server, err := NewWsServer(introspector, config) + require.NoError(t, err) + + // start the server + require.NoError(t, server.Start(bus)) + defer func() { + err := server.Close() + require.NoError(t, err) + }() + + // make a conn + conn, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("ws://%s/introspect", addr), nil) + require.NoError(t, err) + defer conn.Close() + + // assert conn handlers are created + require.Eventually(t, func() bool { + return getNConns(server) == 1 + }, 10*time.Second, 1*time.Second) + + // drain state and runtime + pd1, err := fetchProtocolWrapper(t, conn) + require.NoError(t, err) + require.NotNil(t, pd1.GetRuntime()) + require.Nil(t, pd1.GetState()) + + pd2, err := fetchProtocolWrapper(t, conn) + require.NoError(t, err) + require.NotNil(t, pd2.GetState()) + require.Nil(t, pd2.GetRuntime()) + + // emit event + require.NoError(t, em1.Emit(TestEvent1{})) + + // assert event state + require.Eventually(t, func() bool { + done := make(chan struct{}) + var tk map[reflect.Type]introspection_pb.EventType + + server.evalForTest <- func() { + tk = server.knownEvtProps + done <- struct{}{} + } + <-done + + _, ok := tk[reflect.TypeOf(new(TestEvent1)).Elem()] + return len(tk) == 1 && ok + + }, 10*time.Second, 1*time.Second) + + // now make a second conn so we get info about the first event in the runtime + conn2, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("ws://%s/introspect", addr), nil) + require.NoError(t, err) + defer conn2.Close() + + var rt *introspection_pb.Runtime + require.Eventually(t, func() bool { + pd1, err := fetchProtocolWrapper(t, conn2) + rt = pd1.GetRuntime() + return err == nil && pd1.GetRuntime() != nil + }, 10*time.Second, 1*time.Second) + + require.Len(t, rt.EventTypes, 1) + require.Equal(t, reflect.TypeOf(new(TestEvent1)).Elem().Name(), rt.EventTypes[0].Name) + + // and emitting a new event gets us the actual information + var evt2 *introspection_pb.Event + em2, err := bus.Emitter(new(TestEvent2)) + require.NoError(t, err) + require.NoError(t, em2.Emit(TestEvent2{})) + require.Eventually(t, func() bool { + pd1, err := fetchProtocolWrapper(t, conn2) + evt2 = pd1.GetEvent() + return err == nil && evt2 != nil + }, 10*time.Second, 1*time.Second) + + require.Len(t, evt2.Type.PropertyTypes, 1) + require.Equal(t, reflect.TypeOf(new(TestEvent2)).Elem().Name(), evt2.Type.Name) + + // emit another event and wait for it + em3, err := bus.Emitter(new(TestEvent3)) + require.NoError(t, err) + require.NoError(t, em3.Emit(TestEvent3{})) + require.Eventually(t, func() bool { + pd1, err := fetchProtocolWrapper(t, conn2) + evt2 = pd1.GetEvent() + return err == nil && evt2 != nil + }, 10*time.Second, 1*time.Second) + + // now make another connection so we get runtime message with info about all three events + conn3, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("ws://%s/introspect", addr), nil) + require.NoError(t, err) + defer conn3.Close() + require.Eventually(t, func() bool { + pd1, err := fetchProtocolWrapper(t, conn3) + rt = pd1.GetRuntime() + return err == nil && pd1.GetRuntime() != nil + }, 10*time.Second, 1*time.Second) + + m := make(map[string]*introspection_pb.EventType) + require.Len(t, rt.EventTypes, 3) + for _, e := range rt.EventTypes { + ec := e + m[ec.Name] = ec + } + require.Len(t, m, 3) +} + +func TestEventMessageHasProperties(t *testing.T) { + bus := eventbus.NewBus() + + // create a ws server + addr := "localhost:9999" + introspector := NewDefaultIntrospector() + config := &WsServerConfig{ + ListenAddrs: []string{addr}, + } + server, err := NewWsServer(introspector, config) + require.NoError(t, err) + + // start the server + require.NoError(t, server.Start(bus)) + defer func() { + err := server.Close() + require.NoError(t, err) + }() + + // make a conn + conn, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("ws://%s/introspect", addr), nil) + require.NoError(t, err) + defer conn.Close() + + // drain state and runtime + pd1, err := fetchProtocolWrapper(t, conn) + require.NoError(t, err) + require.NotNil(t, pd1.GetRuntime()) + require.Nil(t, pd1.GetState()) + + pd2, err := fetchProtocolWrapper(t, conn) + require.NoError(t, err) + require.NotNil(t, pd2.GetState()) + require.Nil(t, pd2.GetRuntime()) + + // first event has eventype + e1, err := bus.Emitter(new(TestEvent1)) + require.NoError(t, err) + require.NoError(t, e1.Emit(TestEvent1{})) + + pd, err := fetchProtocolWrapper(t, conn) + require.NoError(t, err) + require.Nil(t, pd.GetRuntime()) + require.Nil(t, pd.GetState()) + + evt := pd.GetEvent() + require.NotNil(t, evt) + require.Equal(t, reflect.TypeOf(new(TestEvent1)).Elem().Name(), evt.Type.Name) + require.NotEmpty(t, evt.Type.PropertyTypes) + require.Len(t, evt.Type.PropertyTypes, 2) + + // second ONLY has name + require.NoError(t, e1.Emit(TestEvent1{})) + pd, err = fetchProtocolWrapper(t, conn) + require.NoError(t, err) + require.Nil(t, pd.GetRuntime()) + require.Nil(t, pd.GetState()) + + evt = pd.GetEvent() + require.NotNil(t, evt) + require.Equal(t, reflect.TypeOf(new(TestEvent1)).Elem().Name(), evt.Type.Name) + require.Empty(t, evt.Type.PropertyTypes) + + // first event has eventype + e2, err := bus.Emitter(new(TestEvent2)) + require.NoError(t, err) + require.NoError(t, e2.Emit(TestEvent2{})) + + pd, err = fetchProtocolWrapper(t, conn) + require.NoError(t, err) + require.Nil(t, pd.GetRuntime()) + require.Nil(t, pd.GetState()) + + evt = pd.GetEvent() + require.NotNil(t, evt) + require.Equal(t, reflect.TypeOf(new(TestEvent2)).Elem().Name(), evt.Type.Name) + require.NotEmpty(t, evt.Type.PropertyTypes) + require.Len(t, evt.Type.PropertyTypes, 1) + + // second ONLY has name + require.NoError(t, e2.Emit(TestEvent2{})) + pd, err = fetchProtocolWrapper(t, conn) + require.NoError(t, err) + require.Nil(t, pd.GetRuntime()) + require.Nil(t, pd.GetState()) + + evt = pd.GetEvent() + require.NotNil(t, evt) + require.Equal(t, reflect.TypeOf(new(TestEvent2)).Elem().Name(), evt.Type.Name) + require.Empty(t, evt.Type.PropertyTypes) + + // assert internal state + done := make(chan struct{}, 1) + var tmap map[reflect.Type]introspection_pb.EventType + server.evalForTest <- func() { + tmap = server.knownEvtProps + done <- struct{}{} + } + <-done + + require.Len(t, tmap, 2) + t1 := tmap[reflect.TypeOf(new(TestEvent1)).Elem()] + require.Len(t, t1.PropertyTypes, 2) + t2 := tmap[reflect.TypeOf(new(TestEvent2)).Elem()] + require.Len(t, t2.PropertyTypes, 1) +} + +func getNConns(server *WsServer) int { + var chsLen int + doneChan := make(chan struct{}, 1) + + server.evalForTest <- func() { + chsLen = len(server.connHandlerStates) + doneChan <- struct{}{} + } + + <-doneChan + return chsLen +} + +func sendMessage(t *testing.T, cl *introspection_pb.ClientSignal, conn *websocket.Conn) { + bz, err := proto.Marshal(cl) + require.NoError(t, err) + + err = conn.WriteMessage(websocket.BinaryMessage, bz) + require.NoError(t, err) +} + +func fetchProtocolWrapper(t *testing.T, conn *websocket.Conn) (*introspection_pb.ProtocolDataPacket, error) { + _, msg, err := conn.ReadMessage() + if err != nil { + return nil, err + } + + var ( + // get the message + version = msg[0:4] + checksum = msg[4:8] + length = msg[8:12] + payload = msg[12:] + ) + + require.EqualValues(t, len(payload), binary.LittleEndian.Uint32(length)) + require.EqualValues(t, introspection.ProtoVersion, binary.LittleEndian.Uint32(version)) + + h := fnv.New32a() + _, err = h.Write(payload) + require.NoError(t, err) + require.EqualValues(t, h.Sum32(), binary.LittleEndian.Uint32(checksum)) + + pd := &introspection_pb.ProtocolDataPacket{} + + // read the protocol message directly + if err := proto.Unmarshal(payload, pd); err != nil { + return nil, err + } + + require.NotNil(t, pd.Message, "nil message received from server") + require.Equal(t, introspection.ProtoVersion, pd.Version.Version, "incorrect proto version receieved from client") + + return pd, nil +} + +func TestGetEventProps(t *testing.T) { + type EventA struct { + pid peer.ID + pids []peer.ID + + maddr multiaddr.Multiaddr + maddrs []multiaddr.Multiaddr + + ti time.Time + tis []time.Time + + s string + ss []string + + num int32 + nums []int64 + + fallback connHandler + fallbacks []connHandler + + x event.JSString + } + + prop, err := getEventProperties(EventA{}) + require.NoError(t, err) + + // name + require.Equal(t, reflect.TypeOf(new(EventA)).Elem().Name(), prop.Name) + + // num props + nameType := make(map[string]introspection_pb.EventType_EventProperty) + + for _, p := range prop.PropertyTypes { + p2 := p + nameType[p.Name] = *p2 + } + + require.Len(t, nameType, 13) + + pidN := nameType["pid"] + require.Equal(t, introspection_pb.EventType_EventProperty_PEERID, pidN.Type) + require.False(t, pidN.HasMultiple) + + pidsN := nameType["pids"] + require.Equal(t, introspection_pb.EventType_EventProperty_PEERID, pidsN.Type) + require.True(t, pidsN.HasMultiple) + + maddrN := nameType["maddr"] + require.Equal(t, introspection_pb.EventType_EventProperty_MULTIADDR, maddrN.Type) + require.False(t, maddrN.HasMultiple) + + maddrsN := nameType["maddrs"] + require.Equal(t, introspection_pb.EventType_EventProperty_MULTIADDR, maddrsN.Type) + require.True(t, maddrsN.HasMultiple) + + tiN := nameType["ti"] + require.Equal(t, introspection_pb.EventType_EventProperty_TIME, tiN.Type) + require.False(t, tiN.HasMultiple) + + tisN := nameType["tis"] + require.Equal(t, introspection_pb.EventType_EventProperty_TIME, tisN.Type) + require.True(t, tisN.HasMultiple) + + sN := nameType["s"] + require.Equal(t, introspection_pb.EventType_EventProperty_STRING, sN.Type) + require.False(t, sN.HasMultiple) + + sNs := nameType["ss"] + require.Equal(t, introspection_pb.EventType_EventProperty_STRING, sNs.Type) + require.True(t, sNs.HasMultiple) + + cn := nameType["fallback"] + require.Equal(t, introspection_pb.EventType_EventProperty_JSON, cn.Type) + require.False(t, cn.HasMultiple) + + cns := nameType["fallbacks"] + require.Equal(t, introspection_pb.EventType_EventProperty_JSON, cns.Type) + require.True(t, cns.HasMultiple) + + js := nameType["x"] + require.Equal(t, introspection_pb.EventType_EventProperty_JSON, js.Type) +} + +func TestCheckedSumMessage(t *testing.T) { + cl := &introspection_pb.ClientSignal{Signal: introspection_pb.ClientSignal_PAUSE_PUSH_EMITTER} + bz, err := proto.Marshal(cl) + require.NoError(t, err) + + cbz, err := checkSumedMessageForClient(bz) + require.NoError(t, err) + require.Len(t, cbz, 12+len(bz)) + + fmt.Println(hex.EncodeToString(cbz)) + + // version + require.EqualValues(t, introspection.ProtoVersion, binary.LittleEndian.Uint32(cbz[0:4])) + + // message + messageBz := cbz[12:] + require.Len(t, messageBz, len(bz)) + cl2 := &introspection_pb.ClientSignal{} + require.NoError(t, proto.Unmarshal(messageBz, cl2)) + require.Equal(t, cl, cl2) + + // checksum + h := fnv.New32a() + _, err = h.Write(messageBz) + require.NoError(t, err) + require.EqualValues(t, h.Sum32(), binary.LittleEndian.Uint32(cbz[4:8])) + + // length + require.EqualValues(t, len(bz), binary.LittleEndian.Uint32(cbz[8:12])) +}