Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

URL path support - groundwork for non-jsonrpc HTTP proxying #23

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"path"
"strconv"
"sync/atomic"
"time"
Expand All @@ -16,7 +18,7 @@ import (

type Node struct {
log *zap.SugaredLogger
URI string
URI string // base Hostname
AddedAt time.Time
jobC chan *SimRequest
numWorkers int32
Expand All @@ -28,7 +30,7 @@ type Node struct {

func (n *Node) HealthCheck() error {
payload := `{"jsonrpc":"2.0","method":"net_version","params":[],"id":123}`
_, _, err := n.ProxyRequest([]byte(payload), 5*time.Second)
_, _, err := n.ProxyRequest("", []byte(payload), 5*time.Second)
return err
}

Expand Down Expand Up @@ -60,7 +62,7 @@ func (n *Node) startProxyWorker(id int32, cancelContext context.Context) {

req.Tries += 1
timeBeforeProxy := time.Now().UTC()
payload, statusCode, err := n.ProxyRequest(req.Payload, ProxyRequestTimeout)
payload, statusCode, err := n.ProxyRequest(req.TargetPath, req.Payload, ProxyRequestTimeout)
requestDuration := time.Since(timeBeforeProxy)
_log = _log.With("requestDurationUS", requestDuration.Microseconds())
if err != nil {
Expand Down Expand Up @@ -117,10 +119,17 @@ func (n *Node) StopWorkersAndWait() {
}
}

func (n *Node) ProxyRequest(payload []byte, timeout time.Duration) (resp []byte, statusCode int, err error) {
func (n *Node) ProxyRequest(targetPath string, payload []byte, timeout time.Duration) (resp []byte, statusCode int, err error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
httpReq, err := http.NewRequestWithContext(ctx, "POST", n.URI, bytes.NewBuffer(payload))

url, err := url.Parse(n.URI)
if err != nil {
return resp, statusCode, err
}
url.Path = path.Join(url.Path, targetPath)

httpReq, err := http.NewRequestWithContext(ctx, "POST", url.String(), bytes.NewBuffer(payload))
if err != nil {
return resp, statusCode, errors.Wrap(err, "creating proxy request failed")
}
Expand Down
10 changes: 9 additions & 1 deletion server/node_notee.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,15 @@ func NewNode(log *zap.SugaredLogger, uri string, jobC chan *SimRequest, numWorke
AddedAt: time.Now(),
jobC: jobC,
numWorkers: numWorkers,
client: &http.Client{},
client: &http.Client{
Timeout: ProxyRequestTimeout,
Transport: &http.Transport{
MaxIdleConns: 100,
MaxConnsPerHost: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
},
},
}
return node, nil
}
6 changes: 3 additions & 3 deletions server/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestNode(t *testing.T) {
err = node.HealthCheck()
require.Nil(t, err, err)

request := NewSimRequest("1", []byte("foo"), true, false)
request := NewSimRequest("1", []byte("foo"), true, false, "")
node.StartWorkers()
node.jobC <- request
res := <-request.ResponseC
Expand Down Expand Up @@ -55,12 +55,12 @@ func TestNodeError(t *testing.T) {
require.Contains(t, err.Error(), "479")

// Check failing ProxyRequest
_, statusCode, err := node.ProxyRequest([]byte("net_version"), 3*time.Second)
_, statusCode, err := node.ProxyRequest("", []byte("net_version"), 3*time.Second)
require.NotNil(t, err, err)
require.Equal(t, 479, statusCode)

// Check failing SimRequest
request := NewSimRequest("1", []byte("foo"), true, false)
request := NewSimRequest("1", []byte("foo"), true, false, "")
node.StartWorkers()
node.jobC <- request
res := <-request.ResponseC
Expand Down
4 changes: 2 additions & 2 deletions server/nodepool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestNodePoolProxy(t *testing.T) {
err := gp.AddNode(rpcBackendServer.URL)
require.Nil(t, err, err)

request := NewSimRequest("1", []byte("foo"), true, false)
request := NewSimRequest("1", []byte("foo"), true, false, "")

gp.JobC <- request
res := <-request.ResponseC
Expand All @@ -84,7 +84,7 @@ func TestNodePoolWithError(t *testing.T) {
http.Error(w, "error", 479)
}

request := NewSimRequest("1", []byte("foo"), true, false)
request := NewSimRequest("1", []byte("foo"), true, false, "")
gp.JobC <- request
res := <-request.ResponseC
require.NotNil(t, res)
Expand Down
14 changes: 7 additions & 7 deletions server/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import (
)

func cloneRequest(req *SimRequest) *SimRequest {
return NewSimRequest("1", req.Payload, req.IsHighPrio, req.IsFastTrack)
return NewSimRequest("1", req.Payload, req.IsHighPrio, req.IsFastTrack, "")
}

func fillQueue(t *testing.T, q *PrioQueue) {
t.Helper()

taskLowPrio := NewSimRequest("1", []byte("taskLowPrio"), false, false)
taskHighPrio := NewSimRequest("1", []byte("taskHighPrio"), true, false)
taskFastTrack := NewSimRequest("1", []byte("tasFastTrack"), false, true)
taskLowPrio := NewSimRequest("1", []byte("taskLowPrio"), false, false, "")
taskHighPrio := NewSimRequest("1", []byte("taskHighPrio"), true, false, "")
taskFastTrack := NewSimRequest("1", []byte("tasFastTrack"), false, true, "")

q.Push(taskLowPrio)
q.Push(taskHighPrio)
Expand All @@ -46,7 +46,7 @@ func fillQueue(t *testing.T, q *PrioQueue) {

func TestQueueBlockingPop(t *testing.T) {
q := NewPrioQueue(0, 0, 0, 2, false)
taskLowPrio := NewSimRequest("1", []byte("taskLowPrio"), false, false)
taskLowPrio := NewSimRequest("1", []byte("taskLowPrio"), false, false, "")

// Ensure queue.Pop is blocking
t1 := time.Now()
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestQueuePopping(t *testing.T) {

func TestPrioQueueMultipleReaders(t *testing.T) {
q := NewPrioQueue(0, 0, 0, 2, false)
taskLowPrio := NewSimRequest("1", []byte("taskLowPrio"), false, false)
taskLowPrio := NewSimRequest("1", []byte("taskLowPrio"), false, false, "")

counts := make(map[int]int)
resultC := make(chan int, 4)
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestPrioQueueVarious(t *testing.T) {
// Test used for benchmark: single reader
func _testPrioQueue1(numWorkers, numItems int) *PrioQueue {
q := NewPrioQueue(0, 0, 0, 2, false)
taskLowPrio := NewSimRequest("1", []byte("taskLowPrio"), false, false)
taskLowPrio := NewSimRequest("1", []byte("taskLowPrio"), false, false, "")

var wg sync.WaitGroup

Expand Down
7 changes: 5 additions & 2 deletions server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,19 @@ type SimRequest struct {
IsHighPrio bool
IsFastTrack bool

Payload []byte
TargetPath string
Payload []byte

ResponseC chan SimResponse
Cancelled bool
CreatedAt time.Time
Tries int
}

func NewSimRequest(id string, payload []byte, isHighPrio, IsFastTrack bool) *SimRequest {
func NewSimRequest(id string, payload []byte, isHighPrio, IsFastTrack bool, targetPath string) *SimRequest {
return &SimRequest{
ID: id,
TargetPath: targetPath,
Payload: payload,
IsHighPrio: isHighPrio,
IsFastTrack: IsFastTrack,
Expand Down
7 changes: 5 additions & 2 deletions server/webserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ func (s *Webserver) HandleQueueRequest(w http.ResponseWriter, req *http.Request)
defer req.Body.Close()

// Allow single `X-Request-ID:...` log field via header
reqID := req.Header.Get("X-Request-ID")
log := s.log

reqID := req.Header.Get("X-Request-ID")
if reqID != "" {
log = s.log.With("reqID", reqID)
}
Expand All @@ -102,7 +103,9 @@ func (s *Webserver) HandleQueueRequest(w http.ResponseWriter, req *http.Request)
// Add new sim request to queue
isFastTrack := req.Header.Get("X-Fast-Track") == "true"
isHighPrio := req.Header.Get("high_prio") == "true" || req.Header.Get("X-High-Priority") == "true"
simReq := NewSimRequest(reqID, body, isHighPrio, isFastTrack)
targetPath := req.Header.Get("X-Target-Path")

simReq := NewSimRequest(reqID, body, isHighPrio, isFastTrack, targetPath)
wasAdded := s.prioQueue.Push(simReq)
if !wasAdded { // queue was full, job not added
log.Error("Couldn't add request, queue is full")
Expand Down
Loading