Skip to content

Commit

Permalink
Change tunnel to ws (#35)
Browse files Browse the repository at this point in the history
* change http tunnel to ws

* wp

* updated diagnostics to use websocket to connect to erigon

---------

Co-authored-by: Mark Holt <markholt@Marks-MacBook-Air.local>
  • Loading branch information
dvovk and Mark Holt authored Nov 15, 2023
1 parent 9616b74 commit 94edab2
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 34 deletions.
58 changes: 44 additions & 14 deletions api/bridge_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"log"
"net/http"
"sync"
"time"

"github.com/go-chi/chi/v5"
"github.com/gorilla/websocket"
"github.com/ledgerwatch/diagnostics"
"github.com/ledgerwatch/diagnostics/api/internal"
"github.com/ledgerwatch/diagnostics/internal/erigon_node"
Expand All @@ -23,35 +25,62 @@ type BridgeHandler struct {
cache sessions.CacheService
}

const (
wsReadBuffer = 1024
wsWriteBuffer = 1024
wsPingInterval = 60 * time.Second
wsPingWriteTimeout = 5 * time.Second
wsMessageSizeLimit = 32 * 1024 * 1024
)

var wsBufferPool = new(sync.Pool)

func (h BridgeHandler) Bridge(w http.ResponseWriter, r *http.Request) {

//Sends a success Message to the Node client, to receive more information
flusher, _ := w.(http.Flusher)
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
defer r.Body.Close()

upgrader := websocket.Upgrader{
EnableCompression: true,
ReadBufferSize: wsReadBuffer,
WriteBufferSize: wsWriteBuffer,
WriteBufferPool: wsBufferPool,
}

// Update the request context with the connection context.
// If the connection is closed by the server, it will also notify everything that waits on the request context.
*r = *r.WithContext(ctx)

w.WriteHeader(http.StatusOK)
flusher.Flush()
conn, err := upgrader.Upgrade(w, r, nil)

if err != nil {
internal.EncodeError(w, r, diagnostics.AsBadRequestErr(errors.Errorf("Error upgrading websocket: %v", err)))
return
}

connectionInfo := struct {
Version uint64 `json:"version"`
Sessions []string `json:"sessions"`
Nodes []*sessions.NodeInfo `json:"nodes"`
}{}

err := json.NewDecoder(r.Body).Decode(&connectionInfo)
_, message, err := conn.ReadMessage()

if err != nil {
log.Printf("Error reading connection info: %v\n", err)
internal.EncodeError(w, r, diagnostics.AsBadRequestErr(errors.Errorf("Error reading connection info: %v", err)))
return
}

err = json.Unmarshal(message, &connectionInfo)

if err != nil {
log.Printf("Error reading connection info: %v\n", err)
internal.EncodeError(w, r, diagnostics.AsBadRequestErr(errors.Errorf("Error unmarshaling connection info: %v", err)))
return
}

requestMap := map[string]*erigon_node.NodeRequest{}
requestMutex := sync.Mutex{}

Expand Down Expand Up @@ -97,12 +126,10 @@ func (h BridgeHandler) Bridge(w http.ResponseWriter, r *http.Request) {
requestMap[rpcRequest.Id] = request
requestMutex.Unlock()

if _, err := w.Write(bytes); err != nil {
if err := conn.WriteMessage(websocket.TextMessage, bytes); err != nil {
requestMutex.Lock()
delete(requestMap, rpcRequest.Id)
requestMutex.Unlock()

fmt.Println(request.Retries, err)
request.Retries++
if request.Retries < 15 {
select {
Expand All @@ -119,18 +146,21 @@ func (h BridgeHandler) Bridge(w http.ResponseWriter, r *http.Request) {
}
continue
}

flusher.Flush()
}
}()
}

decoder := json.NewDecoder(r.Body)

for {
var response erigon_node.Response

if err = decoder.Decode(&response); err != nil {
_, message, err := conn.ReadMessage()

if err != nil {
fmt.Printf("can't read response: %v\n", err)
continue
}

if err = json.Unmarshal(message, &response); err != nil {
fmt.Printf("can't read response: %v\n", err)
continue
}
Expand Down Expand Up @@ -163,7 +193,7 @@ func NewBridgeHandler(cacheSvc sessions.CacheService) BridgeHandler {
cache: cacheSvc,
}

r.Post("/", r.Bridge)
r.Get("/", r.Bridge)

return *r
}
11 changes: 3 additions & 8 deletions cmd/diagnostics/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"log"
Expand Down Expand Up @@ -50,11 +49,6 @@ func main() {
certPool.AppendCertsFromPEM(caCert)
}

tlsConfig := &tls.Config{
RootCAs: certPool,
MinVersion: tls.VersionTLS12,
}

// Passing in the services to REST layer
handlers := api.NewHandler(
api.APIServices{
Expand All @@ -65,12 +59,13 @@ func main() {
Addr: fmt.Sprintf("%s:%d", listenAddr, listenPort),
Handler: handlers,
MaxHeaderBytes: 1 << 20,
TLSConfig: tlsConfig,
ReadHeaderTimeout: 1 * time.Minute,
}

go func() {
if err := srv.ListenAndServeTLS(serverCertFile, serverKeyFile); err != http.ErrServerClosed {
err := srv.ListenAndServe()

if err != nil {
log.Fatal(err)
}
}()
Expand Down
15 changes: 3 additions & 12 deletions internal/bridge/middleware.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,13 @@
package bridge

import "net/http"
import (
"net/http"
)

var ErrHTTP2NotSupported = "HTTP2 not supported"

func Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !r.ProtoAtLeast(2, 0) {
http.Error(w, ErrHTTP2NotSupported, http.StatusHTTPVersionNotSupported)
return
}

_, ok := w.(http.Flusher)
if !ok {
http.Error(w, ErrHTTP2NotSupported, http.StatusHTTPVersionNotSupported)
return
}

next.ServeHTTP(w, r)
})
}

0 comments on commit 94edab2

Please sign in to comment.