From cbe402b696853f1470230ad6ba1e346c16d3ae67 Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Thu, 4 Apr 2024 22:40:02 +0200 Subject: [PATCH 1/2] URL path support - groundwork for non-jsonrpc HTTP proxying --- server/node.go | 19 ++++++++++++++----- server/node_test.go | 6 +++--- server/nodepool_test.go | 4 ++-- server/queue_test.go | 14 +++++++------- server/types.go | 7 +++++-- server/webserver.go | 7 +++++-- 6 files changed, 36 insertions(+), 21 deletions(-) diff --git a/server/node.go b/server/node.go index 106695d..94880b8 100644 --- a/server/node.go +++ b/server/node.go @@ -6,6 +6,8 @@ import ( "fmt" "io" "net/http" + "net/url" + "path" "strconv" "sync/atomic" "time" @@ -16,7 +18,7 @@ import ( type Node struct { log *zap.SugaredLogger - URI string + URI string // base Hostname AddedAt time.Time jobC chan *SimRequest numWorkers int32 @@ -28,7 +30,7 @@ type Node struct { func (n *Node) HealthCheck() error { payload := `{"jsonrpc":"2.0","method":"net_version","params":[],"id":123}` - _, _, err := n.ProxyRequest([]byte(payload), 5*time.Second) + _, _, err := n.ProxyRequest("", []byte(payload), 5*time.Second) return err } @@ -60,7 +62,7 @@ func (n *Node) startProxyWorker(id int32, cancelContext context.Context) { req.Tries += 1 timeBeforeProxy := time.Now().UTC() - payload, statusCode, err := n.ProxyRequest(req.Payload, ProxyRequestTimeout) + payload, statusCode, err := n.ProxyRequest(req.TargetPath, req.Payload, ProxyRequestTimeout) requestDuration := time.Since(timeBeforeProxy) _log = _log.With("requestDurationUS", requestDuration.Microseconds()) if err != nil { @@ -117,10 +119,17 @@ func (n *Node) StopWorkersAndWait() { } } -func (n *Node) ProxyRequest(payload []byte, timeout time.Duration) (resp []byte, statusCode int, err error) { +func (n *Node) ProxyRequest(targetPath string, payload []byte, timeout time.Duration) (resp []byte, statusCode int, err error) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - httpReq, err := http.NewRequestWithContext(ctx, "POST", n.URI, bytes.NewBuffer(payload)) + + url, err := url.Parse(n.URI) + if err != nil { + return resp, statusCode, err + } + url.Path = path.Join(url.Path, targetPath) + + httpReq, err := http.NewRequestWithContext(ctx, "POST", url.String(), bytes.NewBuffer(payload)) if err != nil { return resp, statusCode, errors.Wrap(err, "creating proxy request failed") } diff --git a/server/node_test.go b/server/node_test.go index 20efdd5..cb64434 100644 --- a/server/node_test.go +++ b/server/node_test.go @@ -21,7 +21,7 @@ func TestNode(t *testing.T) { err = node.HealthCheck() require.Nil(t, err, err) - request := NewSimRequest("1", []byte("foo"), true, false) + request := NewSimRequest("1", []byte("foo"), true, false, "") node.StartWorkers() node.jobC <- request res := <-request.ResponseC @@ -55,12 +55,12 @@ func TestNodeError(t *testing.T) { require.Contains(t, err.Error(), "479") // Check failing ProxyRequest - _, statusCode, err := node.ProxyRequest([]byte("net_version"), 3*time.Second) + _, statusCode, err := node.ProxyRequest("", []byte("net_version"), 3*time.Second) require.NotNil(t, err, err) require.Equal(t, 479, statusCode) // Check failing SimRequest - request := NewSimRequest("1", []byte("foo"), true, false) + request := NewSimRequest("1", []byte("foo"), true, false, "") node.StartWorkers() node.jobC <- request res := <-request.ResponseC diff --git a/server/nodepool_test.go b/server/nodepool_test.go index 9c0b1e3..89e2585 100644 --- a/server/nodepool_test.go +++ b/server/nodepool_test.go @@ -63,7 +63,7 @@ func TestNodePoolProxy(t *testing.T) { err := gp.AddNode(rpcBackendServer.URL) require.Nil(t, err, err) - request := NewSimRequest("1", []byte("foo"), true, false) + request := NewSimRequest("1", []byte("foo"), true, false, "") gp.JobC <- request res := <-request.ResponseC @@ -84,7 +84,7 @@ func TestNodePoolWithError(t *testing.T) { http.Error(w, "error", 479) } - request := NewSimRequest("1", []byte("foo"), true, false) + request := NewSimRequest("1", []byte("foo"), true, false, "") gp.JobC <- request res := <-request.ResponseC require.NotNil(t, res) diff --git a/server/queue_test.go b/server/queue_test.go index c7f947b..b5a33a8 100644 --- a/server/queue_test.go +++ b/server/queue_test.go @@ -11,15 +11,15 @@ import ( ) func cloneRequest(req *SimRequest) *SimRequest { - return NewSimRequest("1", req.Payload, req.IsHighPrio, req.IsFastTrack) + return NewSimRequest("1", req.Payload, req.IsHighPrio, req.IsFastTrack, "") } func fillQueue(t *testing.T, q *PrioQueue) { t.Helper() - taskLowPrio := NewSimRequest("1", []byte("taskLowPrio"), false, false) - taskHighPrio := NewSimRequest("1", []byte("taskHighPrio"), true, false) - taskFastTrack := NewSimRequest("1", []byte("tasFastTrack"), false, true) + taskLowPrio := NewSimRequest("1", []byte("taskLowPrio"), false, false, "") + taskHighPrio := NewSimRequest("1", []byte("taskHighPrio"), true, false, "") + taskFastTrack := NewSimRequest("1", []byte("tasFastTrack"), false, true, "") q.Push(taskLowPrio) q.Push(taskHighPrio) @@ -46,7 +46,7 @@ func fillQueue(t *testing.T, q *PrioQueue) { func TestQueueBlockingPop(t *testing.T) { q := NewPrioQueue(0, 0, 0, 2, false) - taskLowPrio := NewSimRequest("1", []byte("taskLowPrio"), false, false) + taskLowPrio := NewSimRequest("1", []byte("taskLowPrio"), false, false, "") // Ensure queue.Pop is blocking t1 := time.Now() @@ -102,7 +102,7 @@ func TestQueuePopping(t *testing.T) { func TestPrioQueueMultipleReaders(t *testing.T) { q := NewPrioQueue(0, 0, 0, 2, false) - taskLowPrio := NewSimRequest("1", []byte("taskLowPrio"), false, false) + taskLowPrio := NewSimRequest("1", []byte("taskLowPrio"), false, false, "") counts := make(map[int]int) resultC := make(chan int, 4) @@ -155,7 +155,7 @@ func TestPrioQueueVarious(t *testing.T) { // Test used for benchmark: single reader func _testPrioQueue1(numWorkers, numItems int) *PrioQueue { q := NewPrioQueue(0, 0, 0, 2, false) - taskLowPrio := NewSimRequest("1", []byte("taskLowPrio"), false, false) + taskLowPrio := NewSimRequest("1", []byte("taskLowPrio"), false, false, "") var wg sync.WaitGroup diff --git a/server/types.go b/server/types.go index 542ff55..38f4fe1 100644 --- a/server/types.go +++ b/server/types.go @@ -8,16 +8,19 @@ type SimRequest struct { IsHighPrio bool IsFastTrack bool - Payload []byte + TargetPath string + Payload []byte + ResponseC chan SimResponse Cancelled bool CreatedAt time.Time Tries int } -func NewSimRequest(id string, payload []byte, isHighPrio, IsFastTrack bool) *SimRequest { +func NewSimRequest(id string, payload []byte, isHighPrio, IsFastTrack bool, targetPath string) *SimRequest { return &SimRequest{ ID: id, + TargetPath: targetPath, Payload: payload, IsHighPrio: isHighPrio, IsFastTrack: IsFastTrack, diff --git a/server/webserver.go b/server/webserver.go index 77b7e23..483dba8 100644 --- a/server/webserver.go +++ b/server/webserver.go @@ -75,8 +75,9 @@ func (s *Webserver) HandleQueueRequest(w http.ResponseWriter, req *http.Request) defer req.Body.Close() // Allow single `X-Request-ID:...` log field via header - reqID := req.Header.Get("X-Request-ID") log := s.log + + reqID := req.Header.Get("X-Request-ID") if reqID != "" { log = s.log.With("reqID", reqID) } @@ -102,7 +103,9 @@ func (s *Webserver) HandleQueueRequest(w http.ResponseWriter, req *http.Request) // Add new sim request to queue isFastTrack := req.Header.Get("X-Fast-Track") == "true" isHighPrio := req.Header.Get("high_prio") == "true" || req.Header.Get("X-High-Priority") == "true" - simReq := NewSimRequest(reqID, body, isHighPrio, isFastTrack) + targetPath := req.Header.Get("X-Target-Path") + + simReq := NewSimRequest(reqID, body, isHighPrio, isFastTrack, targetPath) wasAdded := s.prioQueue.Push(simReq) if !wasAdded { // queue was full, job not added log.Error("Couldn't add request, queue is full") From 74a1e916cf1b4f390b8f08757492ba3c907730ef Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Fri, 5 Apr 2024 11:21:53 +0200 Subject: [PATCH 2/2] improved proxy http client setup --- server/node_notee.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/server/node_notee.go b/server/node_notee.go index eaa2c38..4755f7b 100644 --- a/server/node_notee.go +++ b/server/node_notee.go @@ -36,7 +36,15 @@ func NewNode(log *zap.SugaredLogger, uri string, jobC chan *SimRequest, numWorke AddedAt: time.Now(), jobC: jobC, numWorkers: numWorkers, - client: &http.Client{}, + client: &http.Client{ + Timeout: ProxyRequestTimeout, + Transport: &http.Transport{ + MaxIdleConns: 100, + MaxConnsPerHost: 100, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 90 * time.Second, + }, + }, } return node, nil }