diff --git a/caboose.go b/caboose.go index e14fe94..ffdb096 100644 --- a/caboose.go +++ b/caboose.go @@ -2,18 +2,22 @@ package caboose import ( "context" + "encoding/json" "io" "net/http" "net/url" "os" - "strings" "time" + requestcontext "github.com/willscott/go-requestcontext" + ipfsblockstore "github.com/ipfs/boxo/blockstore" ipath "github.com/ipfs/boxo/coreiface/path" gateway "github.com/ipfs/boxo/gateway" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" + + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -30,7 +34,7 @@ type Config struct { // OrchestratorClient is the HTTP client to use when communicating with the orchestrator. OrchestratorClient *http.Client // OrchestratorOverride replaces calls to the orchestrator with a fixed response. - OrchestratorOverride []string + OrchestratorOverride []state.NodeInfo // LoggingEndpoint is the URL of the logging endpoint where we submit logs pertaining to retrieval requests. LoggingEndpoint url.URL @@ -55,6 +59,9 @@ type Config struct { // PoolRefresh is the interval at which we refresh the pool of upstreams from the orchestrator. PoolRefresh time.Duration + // PoolTargetSize is a baseline size for the pool - the pool will accept decrements in performance to reach maintain at least this size. + PoolTargetSize int + // MirrorFraction is what fraction of requests will be mirrored to another random node in order to track metrics / determine the current best nodes. MirrorFraction float64 @@ -74,6 +81,9 @@ type Config struct { // Harness is an internal test harness that is set during testing. Harness *state.State + + // ComplianceCidPeriod controls how many requests caboose makes on average before requesting a compliance cid + ComplianceCidPeriod int64 } const DefaultLoggingInterval = 5 * time.Second @@ -88,8 +98,11 @@ const defaultMaxRetries = 3 // default percentage of requests to mirror for tracking how nodes perform unless overridden by MirrorFraction const defaultMirrorFraction = 0.01 -const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=200" +const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes?maxNodes=200" const DefaultPoolRefreshInterval = 5 * time.Minute +const DefaultPoolTargetSize = 30 + +const DefaultComplianceCidPeriod = int64(100) // we cool off sending requests for a cid for a certain duration // if we've seen a certain number of failures for it already in a given duration. @@ -129,7 +142,16 @@ func NewCaboose(config *Config) (*Caboose, error) { config.MirrorFraction = defaultMirrorFraction } if override := os.Getenv(BackendOverrideKey); len(override) > 0 { - config.OrchestratorOverride = strings.Split(override, ",") + var overrideNodes []state.NodeInfo + err := json.Unmarshal([]byte(override), &overrideNodes) + if err != nil { + goLogger.Warnf("Error parsing BackendOverrideKey:", "err", err) + return nil, err + } + config.OrchestratorOverride = overrideNodes + } + if config.PoolTargetSize == 0 { + config.PoolTargetSize = DefaultPoolTargetSize } logger := newLogger(config) @@ -144,6 +166,9 @@ func NewCaboose(config *Config) (*Caboose, error) { Timeout: DefaultCarRequestTimeout, } } + + c.config.Client.Transport = otelhttp.NewTransport(c.config.Client.Transport) + if c.config.OrchestratorEndpoint == nil { var err error c.config.OrchestratorEndpoint, err = url.Parse(DefaultOrchestratorEndpoint) @@ -152,6 +177,10 @@ func NewCaboose(config *Config) (*Caboose, error) { } } + if c.config.ComplianceCidPeriod == 0 { + c.config.ComplianceCidPeriod = DefaultComplianceCidPeriod + } + if c.config.PoolRefresh == 0 { c.config.PoolRefresh = DefaultPoolRefreshInterval } @@ -185,17 +214,29 @@ func (c *Caboose) Close() { // Fetch allows fetching car archives by a path of the form `/ipfs/[/path/to/file]` func (c *Caboose) Fetch(ctx context.Context, path string, cb DataCallback) error { + traceID := requestcontext.IDFromContext(ctx) + tid, err := trace.TraceIDFromHex(traceID) + ctx, span := spanTrace(ctx, "Fetch", trace.WithAttributes(attribute.String("path", path))) defer span.End() - return c.pool.fetchResourceWith(ctx, path, cb, c.getAffinity(ctx)) + if err == nil { + sc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: tid, + SpanID: span.SpanContext().SpanID(), + Remote: true, + }) + ctx = trace.ContextWithRemoteSpanContext(ctx, sc) + } + + return c.pool.fetchResourceWith(ctx, path, cb, c.GetAffinity(ctx)) } func (c *Caboose) Has(ctx context.Context, it cid.Cid) (bool, error) { ctx, span := spanTrace(ctx, "Has", trace.WithAttributes(attribute.Stringer("cid", it))) defer span.End() - blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx)) + blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx)) if err != nil { return false, err } @@ -206,7 +247,7 @@ func (c *Caboose) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) { ctx, span := spanTrace(ctx, "Get", trace.WithAttributes(attribute.Stringer("cid", it))) defer span.End() - blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx)) + blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx)) if err != nil { return nil, err } @@ -218,14 +259,14 @@ func (c *Caboose) GetSize(ctx context.Context, it cid.Cid) (int, error) { ctx, span := spanTrace(ctx, "GetSize", trace.WithAttributes(attribute.Stringer("cid", it))) defer span.End() - blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx)) + blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx)) if err != nil { return 0, err } return len(blk.RawData()), nil } -func (c *Caboose) getAffinity(ctx context.Context) string { +func (c *Caboose) GetAffinity(ctx context.Context) string { // https://github.com/ipfs/bifrost-gateway/issues/53#issuecomment-1442732865 if affG := ctx.Value(gateway.ContentPathKey); affG != nil { contentPath := affG.(ipath.Path).String() diff --git a/cmd/caboose/main.go b/cmd/caboose/main.go index dba6e81..22caecc 100644 --- a/cmd/caboose/main.go +++ b/cmd/caboose/main.go @@ -11,9 +11,9 @@ import ( "time" "github.com/filecoin-saturn/caboose" - carv2 "github.com/ipfs/boxo/ipld/car/v2" - "github.com/ipfs/boxo/ipld/car/v2/blockstore" "github.com/ipfs/go-cid" + carv2 "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/blockstore" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/storage/bsadapter" selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" diff --git a/errors.go b/errors.go index 9870103..10934a8 100644 --- a/errors.go +++ b/errors.go @@ -80,4 +80,10 @@ func (epr ErrPartialResponse) Error() string { // ErrInvalidResponse can be returned from a DataCallback to indicate that the data provided for the // requested resource was explicitly 'incorrect' - that blocks not in the requested dag, or non-car-conforming // data was returned. -type ErrInvalidResponse error +type ErrInvalidResponse struct { + Message string +} + +func (e ErrInvalidResponse) Error() string { + return e.Message +} diff --git a/fetcher.go b/fetcher.go index ea1e67b..4a4b371 100644 --- a/fetcher.go +++ b/fetcher.go @@ -7,10 +7,13 @@ import ( "hash/crc32" "io" "net/http" + "net/http/httptrace" "os" "strconv" + "strings" "time" + "go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -49,11 +52,14 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m } // if the context is already cancelled, there's nothing we can do here. if ce := ctx.Err(); ce != nil { - fetchRequestContextErrorTotalMetric.WithLabelValues(resourceType, fmt.Sprintf("%t", errors.Is(ce, context.Canceled)), "fetchResource-init").Add(1) return ce } - ctx, span := spanTrace(ctx, "Pool.FetchResource", trace.WithAttributes(attribute.String("from", from.URL), attribute.String("of", resource), attribute.String("mime", mime))) + p.ActiveNodes.lk.RLock() + isCore := p.ActiveNodes.IsCore(from) + p.ActiveNodes.lk.RUnlock() + + ctx, span := spanTrace(ctx, "Pool.FetchResource", trace.WithAttributes(attribute.String("from", from.URL), attribute.String("of", resource), attribute.String("mime", mime), attribute.Bool("core", isCore))) defer span.End() requestId := uuid.NewString() @@ -67,12 +73,21 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m proto := "unknown" respReq := &http.Request{} received := 0 - reqUrl := fmt.Sprintf("https://%s%s", from.URL, resource) + + reqUrl := "" + if strings.Contains(from.URL, "://") { + reqUrl = fmt.Sprintf("%s%s", from.URL, resource) + } else { + reqUrl = fmt.Sprintf("https://%s%s", from.URL, resource) + } + var respHeader http.Header saturnNodeId := "" saturnTransferId := "" isCacheHit := false networkError := "" + verificationError := "" + otherError := "" isBlockRequest := false if mime == "application/vnd.ipld.raw" { @@ -101,10 +116,6 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m if cacheHit == saturnCacheHit { isCacheHit = true } - - for k, v := range respHeader { - received = received + len(k) + len(v) - } } durationSecs := time.Since(start).Seconds() @@ -117,19 +128,14 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m ttfbMs = fb.Sub(start).Milliseconds() cacheStatus := getCacheStatus(isCacheHit) - if isBlockRequest { - fetchSizeBlockMetric.Observe(float64(received)) - } else { + if !isBlockRequest { fetchSizeCarMetric.WithLabelValues("success").Observe(float64(received)) } durationMs := response_success_end.Sub(start).Milliseconds() fetchSpeedPerPeerSuccessMetric.WithLabelValues(resourceType, cacheStatus).Observe(float64(received) / float64(durationMs)) fetchCacheCountSuccessTotalMetric.WithLabelValues(resourceType, cacheStatus).Add(1) // track individual block metrics separately - if isBlockRequest { - fetchTTFBPerBlockPerPeerSuccessMetric.WithLabelValues(cacheStatus).Observe(float64(ttfbMs)) - fetchDurationPerBlockPerPeerSuccessMetric.WithLabelValues(cacheStatus).Observe(float64(response_success_end.Sub(start).Milliseconds())) - } else { + if !isBlockRequest { ci := 0 for index, value := range carSizes { if float64(received) < value { @@ -142,20 +148,11 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m fetchTTFBPerCARPerPeerSuccessMetric.WithLabelValues(cacheStatus, carSizeStr).Observe(float64(ttfbMs)) fetchDurationPerCarPerPeerSuccessMetric.WithLabelValues(cacheStatus).Observe(float64(response_success_end.Sub(start).Milliseconds())) } - - // update L1 server timings - updateSuccessServerTimingMetrics(respHeader.Values(servertiming.HeaderKey), resourceType, isCacheHit, durationMs, ttfbMs, received) } else { - if isBlockRequest { - fetchDurationPerBlockPerPeerFailureMetric.Observe(float64(time.Since(start).Milliseconds())) - } else { + if !isBlockRequest { fetchDurationPerCarPerPeerFailureMetric.Observe(float64(time.Since(start).Milliseconds())) fetchSizeCarMetric.WithLabelValues("failure").Observe(float64(received)) } - - if code == http.StatusBadGateway || code == http.StatusGatewayTimeout { - updateLassie5xxTime(respHeader.Values(servertiming.HeaderKey), resourceType) - } } if err == nil || !errors.Is(err, context.Canceled) { @@ -171,12 +168,14 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m HTTPProtocol: proto, TTFBMS: int(ttfbMs), // my address - Range: "", - Referrer: respReq.Referer(), - UserAgent: respReq.UserAgent(), - NodeId: saturnNodeId, - NodeIpAddress: from.URL, - IfNetworkError: networkError, + Range: "", + Referrer: respReq.Referer(), + UserAgent: respReq.UserAgent(), + NodeId: saturnNodeId, + NodeIpAddress: from.URL, + IfNetworkError: networkError, + VerificationError: verificationError, + OtherError: otherError, } } } @@ -193,9 +192,11 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m reqCtx, cancel := context.WithTimeout(ctx, requestTimeout) defer cancel() - req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, reqUrl, nil) + clientTrace := otelhttptrace.NewClientTrace(reqCtx) + subReqCtx := httptrace.WithClientTrace(reqCtx, clientTrace) + req, err := http.NewRequestWithContext(subReqCtx, http.MethodGet, reqUrl, nil) if err != nil { - if recordIfContextErr(resourceType, reqCtx, "build-http-request") { + if isCtxError(reqCtx) { return reqCtx.Err() } return err @@ -219,11 +220,10 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m var resp *http.Response saturnCallsTotalMetric.WithLabelValues(resourceType).Add(1) - startReq := time.Now() resp, err = p.config.Client.Do(req) if err != nil { - if recordIfContextErr(resourceType, reqCtx, "send-http-request") { + if isCtxError(reqCtx) { if errors.Is(err, context.Canceled) { return reqCtx.Err() } @@ -239,7 +239,6 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m } respHeader = resp.Header - headerTTFBPerPeerMetric.WithLabelValues(resourceType, getCacheStatus(respHeader.Get(saturnCacheHitKey) == saturnCacheHit)).Observe(float64(time.Since(startReq).Milliseconds())) defer resp.Body.Close() @@ -299,11 +298,9 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m wrapped := TrackingReader{resp.Body, time.Time{}, 0} err = cb(resource, &wrapped) received = wrapped.len - // drain body so it can be re-used. - _, _ = io.Copy(io.Discard, resp.Body) if err != nil { - if recordIfContextErr(resourceType, reqCtx, "read-http-response") { + if isCtxError(reqCtx) { if errors.Is(err, context.Canceled) { return reqCtx.Err() } @@ -315,7 +312,15 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m saturnCallsFailureTotalMetric.WithLabelValues(resourceType, fmt.Sprintf("failed-response-read-%s", getCacheStatus(isCacheHit)), fmt.Sprintf("%d", code)).Add(1) } - networkError = err.Error() + var target = ErrInvalidResponse{} + if errors.As(err, &target) { + verificationError = err.Error() + goLogger.Errorw("failed to read response; verification error", "err", err.Error()) + } else { + otherError = err.Error() + goLogger.Errorw("failed to read response; no verification error", "err", err.Error()) + } + return err } @@ -324,6 +329,7 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m // trace-metrics // request life-cycle metrics + saturnCallsSuccessTotalMetric.WithLabelValues(resourceType, getCacheStatus(isCacheHit)).Add(1) fetchRequestSuccessTimeTraceMetric.WithLabelValues(resourceType, getCacheStatus(isCacheHit), "tcp_connection").Observe(float64(result.TCPConnection.Milliseconds())) fetchRequestSuccessTimeTraceMetric.WithLabelValues(resourceType, getCacheStatus(isCacheHit), "tls_handshake").Observe(float64(result.TLSHandshake.Milliseconds())) @@ -338,78 +344,18 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m fetchRequestSuccessTimeTraceMetric.WithLabelValues(resourceType, getCacheStatus(isCacheHit), "start_transfer"). Observe(float64(result.StartTransfer.Milliseconds())) - saturnCallsSuccessTotalMetric.WithLabelValues(resourceType, getCacheStatus(isCacheHit)).Add(1) - from.RecordSuccess(start, float64(wrapped.firstByte.Sub(start).Milliseconds()), float64(received)/float64(response_success_end.Sub(start).Milliseconds())) return nil } -func recordIfContextErr(resourceType string, ctx context.Context, requestState string) bool { +func isCtxError(ctx context.Context) bool { if ce := ctx.Err(); ce != nil { - fetchRequestContextErrorTotalMetric.WithLabelValues(resourceType, fmt.Sprintf("%t", errors.Is(ce, context.Canceled)), requestState).Add(1) return true } return false } -func updateLassie5xxTime(timingHeaders []string, resourceType string) { - if len(timingHeaders) == 0 { - goLogger.Debug("no timing headers in request response.") - return - } - - for _, th := range timingHeaders { - if subReqTiming, err := servertiming.ParseHeader(th); err == nil { - for _, m := range subReqTiming.Metrics { - switch m.Name { - case "shim_lassie_headers": - if m.Duration.Milliseconds() != 0 { - lassie5XXTimeMetric.WithLabelValues(resourceType).Observe(float64(m.Duration.Milliseconds())) - } - return - default: - } - } - } - } -} - -// todo: refactor for dryness -func updateSuccessServerTimingMetrics(timingHeaders []string, resourceType string, isCacheHit bool, totalTimeMs, ttfbMs int64, recieved int) { - if len(timingHeaders) == 0 { - goLogger.Debug("no timing headers in request response.") - return - } - - for _, th := range timingHeaders { - if subReqTiming, err := servertiming.ParseHeader(th); err == nil { - for _, m := range subReqTiming.Metrics { - switch m.Name { - case "shim_lassie_headers": - if m.Duration.Milliseconds() != 0 && !isCacheHit { - fetchDurationPerPeerSuccessCacheMissTotalLassieMetric.WithLabelValues(resourceType).Observe(float64(m.Duration.Milliseconds())) - } - - case "nginx": - // sanity checks - if totalTimeMs != 0 && ttfbMs != 0 && m.Duration.Milliseconds() != 0 { - fetchDurationPerPeerSuccessTotalL1NodeMetric.WithLabelValues(resourceType, getCacheStatus(isCacheHit)).Observe(float64(m.Duration.Milliseconds())) - networkTimeMs := totalTimeMs - m.Duration.Milliseconds() - if networkTimeMs > 0 { - s := float64(recieved) / float64(networkTimeMs) - fetchNetworkSpeedPerPeerSuccessMetric.WithLabelValues(resourceType).Observe(s) - } - networkLatencyMs := ttfbMs - m.Duration.Milliseconds() - fetchNetworkLatencyPeerSuccessMetric.WithLabelValues(resourceType).Observe(float64(networkLatencyMs)) - } - default: - } - } - } - } -} - func getCacheStatus(isCacheHit bool) string { if isCacheHit { return "Cache-hit" diff --git a/go.mod b/go.mod index 714a0f4..8bca168 100644 --- a/go.mod +++ b/go.mod @@ -4,12 +4,12 @@ go 1.19 require ( github.com/google/uuid v1.3.0 - github.com/ipfs/boxo v0.10.2 + github.com/ipfs/boxo v0.11.0 github.com/ipfs/go-block-format v0.1.2 github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-log/v2 v2.5.1 - github.com/ipld/go-car v0.6.1 - github.com/ipld/go-car/v2 v2.10.1 + github.com/ipld/go-car v0.6.2 + github.com/ipld/go-car/v2 v2.10.2-0.20230622090957-499d0c909d33 github.com/ipld/go-ipld-prime v0.20.0 github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd github.com/mitchellh/go-server-timing v1.0.1 @@ -19,10 +19,13 @@ require ( github.com/stretchr/testify v1.8.4 github.com/tcnksm/go-httpstat v0.2.0 github.com/urfave/cli/v2 v2.24.2 + github.com/willscott/go-requestcontext v0.0.1 github.com/willscott/hashring v0.0.0-20230731155239-15f93a2dfb44 github.com/zyedidia/generic v1.2.2-0.20230625215236-3404399b19f1 - go.opentelemetry.io/otel v1.14.0 - go.opentelemetry.io/otel/trace v1.14.0 + go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.43.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.43.0 + go.opentelemetry.io/otel v1.17.0 + go.opentelemetry.io/otel/trace v1.17.0 ) require ( @@ -34,9 +37,9 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect github.com/dustin/go-humanize v1.0.0 // indirect - github.com/felixge/httpsnoop v1.0.0 // indirect + github.com/felixge/httpsnoop v1.0.3 // indirect github.com/gabriel-vasile/mimetype v1.4.1 // indirect - github.com/go-logr/logr v1.2.3 // indirect + github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/gddo v0.0.0-20180823221919-9d8ff1c67be5 // indirect @@ -45,6 +48,7 @@ require ( github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.1 // indirect github.com/ipfs/bbloom v0.0.4 // indirect github.com/ipfs/go-bitfield v1.1.0 // indirect github.com/ipfs/go-blockservice v0.5.0 // indirect @@ -57,7 +61,6 @@ require ( github.com/ipfs/go-ipld-cbor v0.0.6 // indirect github.com/ipfs/go-ipld-format v0.5.0 // indirect github.com/ipfs/go-ipld-legacy v0.2.1 // indirect - github.com/ipfs/go-ipns v0.3.0 // indirect github.com/ipfs/go-log v1.0.5 // indirect github.com/ipfs/go-merkledag v0.11.0 // indirect github.com/ipfs/go-metrics-interface v0.0.1 // indirect @@ -71,7 +74,7 @@ require ( github.com/libp2p/go-doh-resolver v0.4.0 // indirect github.com/libp2p/go-libp2p v0.26.3 // indirect github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect - github.com/libp2p/go-libp2p-kad-dht v0.21.1 // indirect + github.com/libp2p/go-libp2p-kad-dht v0.23.0 // indirect github.com/libp2p/go-libp2p-kbucket v0.5.0 // indirect github.com/libp2p/go-libp2p-record v0.2.0 // indirect github.com/libp2p/go-libp2p-routing-helpers v0.7.0 // indirect @@ -86,7 +89,7 @@ require ( github.com/multiformats/go-base36 v0.2.0 // indirect github.com/multiformats/go-multiaddr v0.8.0 // indirect github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect - github.com/multiformats/go-multibase v0.1.1 // indirect + github.com/multiformats/go-multibase v0.2.0 // indirect github.com/multiformats/go-multihash v0.2.3 // indirect github.com/multiformats/go-multistream v0.4.1 // indirect github.com/multiformats/go-varint v0.0.7 // indirect @@ -107,6 +110,7 @@ require ( github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect go.opencensus.io v0.24.0 // indirect + go.opentelemetry.io/otel/metric v1.17.0 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.9.0 // indirect go.uber.org/zap v1.24.0 // indirect @@ -118,7 +122,8 @@ require ( golang.org/x/sys v0.6.0 // indirect golang.org/x/tools v0.3.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect - google.golang.org/protobuf v1.28.1 // indirect + gonum.org/v1/gonum v0.11.0 // indirect + google.golang.org/protobuf v1.30.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.1.7 // indirect ) diff --git a/go.sum b/go.sum index 60a70f2..362284f 100644 --- a/go.sum +++ b/go.sum @@ -79,8 +79,9 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/felixge/httpsnoop v1.0.0 h1:gh8fMGz0rlOv/1WmRZm7OgncIOTsAj21iNJot48omJQ= github.com/felixge/httpsnoop v1.0.0/go.mod h1:3+D9sFq0ahK/JeJPhCBUV1xlf4/eIYrUQaxulT0VzX8= +github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= +github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ= github.com/francoispqt/gojay v1.2.13 h1:d2m3sFjloqoIUQU3TsHBgj6qg/BVGlTBeHDUmyJnXKk= github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY= @@ -98,8 +99,8 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= -github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -187,12 +188,14 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru/v2 v2.0.1 h1:5pv5N1lT1fjLg2VQ5KWc7kmucp2x/kvFOnxuVTqZ6x4= +github.com/hashicorp/golang-lru/v2 v2.0.1/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/huin/goupnp v1.0.3 h1:N8No57ls+MnjlB+JPiCVSOyy/ot7MJTqlo7rn+NYSqQ= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= -github.com/ipfs/boxo v0.10.2 h1:kspw9HmMyKzLQxpKk417sF69i6iuf50AXtRjFqCYyL4= -github.com/ipfs/boxo v0.10.2/go.mod h1:1qgKq45mPRCxf4ZPoJV2lnXxyxucigILMJOrQrVivv8= +github.com/ipfs/boxo v0.11.0 h1:urMxhZ3xoF4HssJVD3+0ssGT9pptEfHfbL8DYdoWFlg= +github.com/ipfs/boxo v0.11.0/go.mod h1:8IfDmp+FzFGcF4zjAgHMVPpwYw4AjN9ePEzDfkaYJ1w= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU= github.com/ipfs/go-bitswap v0.11.0 h1:j1WVvhDX1yhG32NTC9xfxnqycqYIlhzEzLXG/cU1HyQ= @@ -237,8 +240,6 @@ github.com/ipfs/go-ipld-format v0.5.0 h1:WyEle9K96MSrvr47zZHKKcDxJ/vlpET6PSiQsAF github.com/ipfs/go-ipld-format v0.5.0/go.mod h1:ImdZqJQaEouMjCvqCe0ORUS+uoBmf7Hf+EO/jh+nk3M= github.com/ipfs/go-ipld-legacy v0.2.1 h1:mDFtrBpmU7b//LzLSypVrXsD8QxkEWxu5qVxN99/+tk= github.com/ipfs/go-ipld-legacy v0.2.1/go.mod h1:782MOUghNzMO2DER0FlBR94mllfdCJCkTtDtPM51otM= -github.com/ipfs/go-ipns v0.3.0 h1:ai791nTgVo+zTuq2bLvEGmWP1M0A6kGTXUsgv/Yq67A= -github.com/ipfs/go-ipns v0.3.0/go.mod h1:3cLT2rbvgPZGkHJoPO1YMJeh6LtkxopCkKFcio/wE24= github.com/ipfs/go-log v1.0.5 h1:2dOuUCB1Z7uoczMWgAyDck5JLb72zHzrMnGnCNNbvY8= github.com/ipfs/go-log v1.0.5/go.mod h1:j0b8ZoR+7+R99LD9jZ6+AJsrzkPbSXbZfGakb5JPtIo= github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g= @@ -254,10 +255,10 @@ github.com/ipfs/go-unixfsnode v1.7.1 h1:RRxO2b6CSr5UQ/kxnGzaChTjp5LWTdf3Y4n8ANZg github.com/ipfs/go-unixfsnode v1.7.1/go.mod h1:PVfoyZkX1B34qzT3vJO4nsLUpRCyhnMuHBznRcXirlk= github.com/ipfs/go-verifcid v0.0.2 h1:XPnUv0XmdH+ZIhLGKg6U2vaPaRDXb9urMyNVCE7uvTs= github.com/ipfs/go-verifcid v0.0.2/go.mod h1:40cD9x1y4OWnFXbLNJYRe7MpNvWlMn3LZAG5Wb4xnPU= -github.com/ipld/go-car v0.6.1 h1:blWbEHf1j62JMWFIqWE//YR0m7k5ZMw0AuUOU5hjrH8= -github.com/ipld/go-car v0.6.1/go.mod h1:oEGXdwp6bmxJCZ+rARSkDliTeYnVzv3++eXajZ+Bmr8= -github.com/ipld/go-car/v2 v2.10.1 h1:MRDqkONNW9WRhB79u+Z3U5b+NoN7lYA5B8n8qI3+BoI= -github.com/ipld/go-car/v2 v2.10.1/go.mod h1:sQEkXVM3csejlb1kCCb+vQ/pWBKX9QtvsrysMQjOgOg= +github.com/ipld/go-car v0.6.2 h1:Hlnl3Awgnq8icK+ze3iRghk805lu8YNq3wlREDTF2qc= +github.com/ipld/go-car v0.6.2/go.mod h1:oEGXdwp6bmxJCZ+rARSkDliTeYnVzv3++eXajZ+Bmr8= +github.com/ipld/go-car/v2 v2.10.2-0.20230622090957-499d0c909d33 h1:0OZwzSYWIuiKEOXd/2vm5cMcEmmGLFn+1h6lHELCm3s= +github.com/ipld/go-car/v2 v2.10.2-0.20230622090957-499d0c909d33/go.mod h1:sQEkXVM3csejlb1kCCb+vQ/pWBKX9QtvsrysMQjOgOg= github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6rOcc= github.com/ipld/go-codec-dagpb v1.6.0/go.mod h1:ANzFhfP2uMJxRBr8CE+WQWs5UsNa0pYtmKZ+agnUw9s= github.com/ipld/go-ipld-prime v0.20.0 h1:Ud3VwE9ClxpO2LkCYP7vWPc0Fo+dYdYzgxUJZ3uRG4g= @@ -309,8 +310,8 @@ github.com/libp2p/go-libp2p v0.26.3 h1:6g/psubqwdaBqNNoidbRKSTBEYgaOuKBhHl8Q5tO+ github.com/libp2p/go-libp2p v0.26.3/go.mod h1:x75BN32YbwuY0Awm2Uix4d4KOz+/4piInkp4Wr3yOo8= github.com/libp2p/go-libp2p-asn-util v0.2.0 h1:rg3+Os8jbnO5DxkC7K/Utdi+DkY3q/d1/1q+8WeNAsw= github.com/libp2p/go-libp2p-asn-util v0.2.0/go.mod h1:WoaWxbHKBymSN41hWSq/lGKJEca7TNm58+gGJi2WsLI= -github.com/libp2p/go-libp2p-kad-dht v0.21.1 h1:xpfp8/t9+X2ip1l8Umap1/UGNnJ3RHJgKGAEsnRAlTo= -github.com/libp2p/go-libp2p-kad-dht v0.21.1/go.mod h1:Oy8wvbdjpB70eS5AaFaI68tOtrdo3KylTvXDjikxqFo= +github.com/libp2p/go-libp2p-kad-dht v0.23.0 h1:sxE6LxLopp79eLeV695n7+c77V/Vn4AMF28AdM/XFqM= +github.com/libp2p/go-libp2p-kad-dht v0.23.0/go.mod h1:oO5N308VT2msnQI6qi5M61wzPmJYg7Tr9e16m5n7uDU= github.com/libp2p/go-libp2p-kbucket v0.5.0 h1:g/7tVm8ACHDxH29BGrpsQlnNeu+6OF1A9bno/4/U1oA= github.com/libp2p/go-libp2p-kbucket v0.5.0/go.mod h1:zGzGCpQd78b5BNTDGHNDLaTt9aDK/A02xeZp9QeFC4U= github.com/libp2p/go-libp2p-record v0.2.0 h1:oiNUOCWno2BFuxt3my4i1frNrt7PerzB3queqa1NkQ0= @@ -369,8 +370,8 @@ github.com/multiformats/go-multiaddr-dns v0.3.1/go.mod h1:G/245BRQ6FJGmryJCrOuTd github.com/multiformats/go-multiaddr-fmt v0.1.0 h1:WLEFClPycPkp4fnIzoFoV9FVd49/eQsuaL3/CWe167E= github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs= github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc= -github.com/multiformats/go-multibase v0.1.1 h1:3ASCDsuLX8+j4kx58qnJ4YFq/JWTJpCyDW27ztsVTOI= -github.com/multiformats/go-multibase v0.1.1/go.mod h1:ZEjHE+IsUrgp5mhlEAYjMtZwK1k4haNkcaPg9aoe1a8= +github.com/multiformats/go-multibase v0.2.0 h1:isdYCVLvksgWlMW9OZRYJEa9pZETFivncJHmHnnd87g= +github.com/multiformats/go-multibase v0.2.0/go.mod h1:bFBZX4lKCA/2lyOFSAoKH5SS6oPyjtnzK/XTFDPkNuk= github.com/multiformats/go-multicodec v0.9.0 h1:pb/dlPnzee/Sxv/j4PmkDRxCOi3hXTz3IbPKOXWJkmg= github.com/multiformats/go-multicodec v0.9.0/go.mod h1:L3QTQvMIaVBkXOXXtVmYE+LI16i14xuaojr/H7Ai54k= github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U= @@ -490,6 +491,8 @@ github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa/go.mod h1:f github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f h1:jQa4QT2UP9WYv2nzyawpKMOCl+Z/jW7djv2/J50lj9E= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc= +github.com/willscott/go-requestcontext v0.0.1 h1:qHL7y9r4MOO/4MRdTP/JB0f0uEle+qlueTZJQVvT1YU= +github.com/willscott/go-requestcontext v0.0.1/go.mod h1:23J4EoOLguNM3JeGv2AUDtcWnzK6AFieymcLTDqXQfg= github.com/willscott/hashring v0.0.0-20230731155239-15f93a2dfb44 h1:zs4g7LTzNzjl8WC0XPJqJx2lga4/6RSH5QaZ3nXOHCg= github.com/willscott/hashring v0.0.0-20230731155239-15f93a2dfb44/go.mod h1:8ORHm5iheceXLsLvS8Ch8nFWBSjxwajLoKA3a05cjL4= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= @@ -508,10 +511,16 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= -go.opentelemetry.io/otel v1.14.0 h1:/79Huy8wbf5DnIPhemGB+zEPVwnN6fuQybr/SRXa6hM= -go.opentelemetry.io/otel v1.14.0/go.mod h1:o4buv+dJzx8rohcUeRmWUZhqupFvzWis188WlggnNeU= -go.opentelemetry.io/otel/trace v1.14.0 h1:wp2Mmvj41tDsyAJXiWDWpfNsOiIyd38fy85pyKcFq/M= -go.opentelemetry.io/otel/trace v1.14.0/go.mod h1:8avnQLK+CG77yNLUae4ea2JDQ6iT+gozhnZjy/rw9G8= +go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.43.0 h1:lp9h55W1raxWOkKkasHTnqse5R1YKVNJ5/NPcWXYjRM= +go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.43.0/go.mod h1:haEjy8B8Upz9+p1zuhvsKm2uPiKeYFHaNB6BddllMBE= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.43.0 h1:HKORGpiOY0R0nAPtKx/ub8/7XoHhRooP8yNRkuPfelI= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.43.0/go.mod h1:e+y1M74SYXo/FcIx3UATwth2+5dDkM8dBi7eXg1tbw8= +go.opentelemetry.io/otel v1.17.0 h1:MW+phZ6WZ5/uk2nd93ANk/6yJ+dVrvNWUjGhnnFU5jM= +go.opentelemetry.io/otel v1.17.0/go.mod h1:I2vmBGtFaODIVMBSTPVDlJSzBDNf93k60E6Ft0nyjo0= +go.opentelemetry.io/otel/metric v1.17.0 h1:iG6LGVz5Gh+IuO0jmgvpTB6YVrCGngi8QGm+pMd8Pdc= +go.opentelemetry.io/otel/metric v1.17.0/go.mod h1:h4skoxdZI17AxwITdmdZjjYJQH5nzijUUjm+wtPph5o= +go.opentelemetry.io/otel/trace v1.17.0 h1:/SWhSRHmDPOImIAetP1QAeMnZYiQXrTy4fMMYOdSKWQ= +go.opentelemetry.io/otel/trace v1.17.0/go.mod h1:I/4vKTgFclIsXRVucpH25X0mpFSczM7aHeaz0ZBLWjY= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= @@ -750,6 +759,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E= +gonum.org/v1/gonum v0.11.0/go.mod h1:fSG4YDCxxUZQJ7rKsQrj0gMOg00Il0Z96/qMA4bVQhA= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg= @@ -826,8 +837,8 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= -google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= +google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/state/state.go b/internal/state/state.go index 9c9e5f0..96b959a 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -9,3 +9,12 @@ type State struct { type PoolController interface { DoRefresh() } + +type NodeInfo struct { + ID string `json:"id"` + IP string `json:"ip"` + Distance float32 `json:"distance"` + Weight int `json:"weight"` + ComplianceCid string `json:"complianceCid"` + Core bool `json:"core"` +} diff --git a/internal/util/harness.go b/internal/util/harness.go index a711745..fe304de 100644 --- a/internal/util/harness.go +++ b/internal/util/harness.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "encoding/json" "errors" + "math/rand" "net/http" "net/http/httptest" "net/url" @@ -26,11 +27,21 @@ func BuildCabooseHarness(t *testing.T, n int, maxRetries int, opts ...HarnessOpt ch := &CabooseHarness{} ch.Endpoints = make([]*Endpoint, n) - purls := make([]string, n) + purls := make([]state.NodeInfo, n) for i := 0; i < len(ch.Endpoints); i++ { ch.Endpoints[i] = &Endpoint{} ch.Endpoints[i].Setup() - purls[i] = strings.TrimPrefix(ch.Endpoints[i].Server.URL, "https://") + ip := strings.TrimPrefix(ch.Endpoints[i].Server.URL, "https://") + + cid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum([]byte(testBlock)) + + purls[i] = state.NodeInfo{ + IP: ip, + ID: "node-id", + Weight: rand.Intn(100), + Distance: rand.Float32(), + ComplianceCid: cid.String(), + } } ch.goodOrch = true orch := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -63,9 +74,11 @@ func BuildCabooseHarness(t *testing.T, n int, maxRetries int, opts ...HarnessOpt Client: saturnClient, DoValidation: false, - PoolRefresh: time.Millisecond * 50, + PoolRefresh: time.Second * 50, MaxRetrievalAttempts: maxRetries, Harness: &state.State{}, + + MirrorFraction: 1.0, } for _, opt := range opts { @@ -78,6 +91,8 @@ func BuildCabooseHarness(t *testing.T, n int, maxRetries int, opts ...HarnessOpt ch.Caboose = bs ch.CabooseActiveNodes = conf.Harness.ActiveNodes.(*caboose.NodeRing) ch.CabooseAllNodes = conf.Harness.AllNodes.(*caboose.NodeHeap) + ch.CaboosePool = conf.Harness.PoolController + ch.Config = conf return ch } @@ -87,11 +102,19 @@ type CabooseHarness struct { CabooseActiveNodes *caboose.NodeRing CabooseAllNodes *caboose.NodeHeap + CaboosePool state.PoolController + Config *caboose.Config gol sync.Mutex goodOrch bool } +type NodeStats struct { + Start time.Time + Latency float64 + Size float64 +} + func (ch *CabooseHarness) RunFetchesForRandCids(n int) { for i := 0; i < n; i++ { randCid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum([]byte{uint8(i)}) @@ -122,6 +145,23 @@ func (ch *CabooseHarness) FetchAndAssertSuccess(t *testing.T, ctx context.Contex require.NotEmpty(t, blk) } +func (ch *CabooseHarness) RecordSuccesses(t *testing.T, nodes []*caboose.Node, s NodeStats, n int) { + for _, node := range nodes { + s.Start = time.Now().Add(-time.Second * 5) + for i := 0; i < n; i++ { + node.RecordSuccess(s.Start, s.Latency, s.Size) + } + } +} + +func (ch *CabooseHarness) RecordFailures(t *testing.T, nodes []*caboose.Node, n int) { + for _, node := range nodes { + for i := 0; i < n; i++ { + node.RecordFailure() + } + } +} + func (ch *CabooseHarness) FailNodesWithCode(t *testing.T, selectorF func(ep *Endpoint) bool, code int) { for _, n := range ch.Endpoints { if selectorF(n) { @@ -221,6 +261,18 @@ func WithMaxFailuresBeforeCoolDown(max int) func(config *caboose.Config) { } } +func WithComplianceCidPeriod(n int64) func(config *caboose.Config) { + return func(config *caboose.Config) { + config.ComplianceCidPeriod = n + } +} + +func WithMirrorFraction(n float64) func(config *caboose.Config) { + return func(config *caboose.Config) { + config.MirrorFraction = n + } +} + func WithCidCoolDownDuration(duration time.Duration) func(config *caboose.Config) { return func(config *caboose.Config) { config.FetchKeyCoolDownDuration = duration diff --git a/log.go b/log.go index f43561f..7248558 100644 --- a/log.go +++ b/log.go @@ -104,4 +104,6 @@ type log struct { NodeId string `json:"nodeId"` IfNetworkError string `json:"ifNetworkError"` NodeIpAddress string `json:"nodeIpAddress"` + VerificationError string `json:"verificationError"` + OtherError string `json:"otherError"` } diff --git a/metrics.go b/metrics.go index 46ee23c..4350a8d 100644 --- a/metrics.go +++ b/metrics.go @@ -31,10 +31,6 @@ var ( ) var ( - // size buckets from 256 KiB (default chunk in Kubo) to 4MiB (maxBlockSize), 256 KiB wide each - // histogram buckets will be [256KiB, 512KiB, 768KiB, 1MiB, ... 4MiB] -> total 16 buckets +1 prometheus Inf bucket - blockSizeHistogram = prometheus.LinearBuckets(262144, 262144, 16) - // TODO: Speed max bucket could use some further refinement, // for now we don't expect speed being bigger than transfering 4MiB (max block) in 500ms // histogram buckets will be [1byte/milliseconds, ... 8387 bytes/milliseconds] -> total 20 buckets +1 prometheus Inf bucket @@ -42,11 +38,6 @@ var ( // ----- Histogram buckets to record fetch duration metrics ----- // The upper bound on the fetch duration buckets are informed by the timeouts per block and per peer request/retry. - - // buckets to record duration in milliseconds to fetch a block, - // histogram buckets will be [50ms,.., 60 seconds] -> total 20 buckets +1 prometheus Inf bucket - durationMsPerBlockHistogram = prometheus.ExponentialBucketsRange(50, 60000, 20) - // buckets to record duration in milliseconds to fetch a CAR, // histogram buckets will be [50ms,.., 30 minutes] -> total 40 buckets +1 prometheus Inf bucket durationMsPerCarHistogram = prometheus.ExponentialBucketsRange(50, 1800000, 40) @@ -55,7 +46,7 @@ var ( var ( fetchResponseCodeMetric = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_response_code"), - Help: "Response codes observed during caboose fetches for a block", + Help: "Response codes observed during caboose fetches", }, []string{"resourceType", "code"}) // success cases @@ -71,46 +62,6 @@ var ( }, []string{"resourceType", "cache_status"}) ) -// block metrics -var ( - fetchDurationPerBlockPerPeerSuccessMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_block_peer_success"), - Help: "Latency observed during successful caboose fetches from a single peer in milliseconds", - Buckets: durationMsPerBlockHistogram, - }, []string{"cache_status"}) - - fetchDurationBlockSuccessMetric = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_block_success"), - Help: "Latency observed during successful caboose fetches for a block across multiple peers and retries in milliseconds", - Buckets: durationMsPerBlockHistogram, - }) - - fetchTTFBPerBlockPerPeerSuccessMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_ttfb_block_peer_success"), - Help: "TTFB observed during a successful caboose fetch from a single peer in milliseconds", - Buckets: durationMsPerBlockHistogram, - }, []string{"cache_status"}) - - // failures - fetchDurationPerBlockPerPeerFailureMetric = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_block_peer_failure"), - Help: "Latency observed during failed caboose fetches from a single peer in milliseconds", - Buckets: durationMsPerBlockHistogram, - }) - - fetchDurationBlockFailureMetric = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_block_failure"), - Help: "Latency observed during failed caboose fetches for a block across multiple peers and retries in milliseconds", - Buckets: durationMsPerBlockHistogram, - }) - - fetchSizeBlockMetric = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_size_block"), - Help: "Size in bytes of caboose block fetches", - Buckets: blockSizeHistogram, - }) -) - // CAR metrics var ( fetchDurationPerCarPerPeerSuccessMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{ @@ -131,11 +82,6 @@ var ( Buckets: durationMsPerCarHistogram, }, []string{"cache_status", "car_size"}) - headerTTFBPerPeerMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: prometheus.BuildFQName("ipfs", "caboose", "header_ttfb_peer"), - Buckets: durationMsPerCarHistogram, - }, []string{"resourceType", "cache_status"}) - // failure fetchDurationPerCarPerPeerFailureMetric = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_car_peer_failure"), @@ -156,53 +102,11 @@ var ( }, []string{"error_status"}) ) -// Saturn Server-timings -var ( - // ---------------------- For successful fetches ONLY for now---------------------- - // L1 server timings - // nginx + l1 compute + lassie - fetchDurationPerPeerSuccessTotalL1NodeMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_peer_total_saturn_l1"), - Help: "Total time spent on an L1 node for a successful fetch per peer in milliseconds", - Buckets: durationMsPerCarHistogram, - }, []string{"resourceType", "cache_status"}) - - // total only on lassie - fetchDurationPerPeerSuccessCacheMissTotalLassieMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_peer_cache_miss_total_lassie"), - Help: "Time spent in Lassie for a Saturn L1 Nginx cache miss for a successful fetch per peer in milliseconds", - Buckets: durationMsPerCarHistogram, - }, []string{"resourceType"}) - - lassie5XXTimeMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_5xx_total_lassie"), - Help: "Time spent in Lassie for a Saturn L1 Nginx cache miss for a 5xx in milliseconds", - Buckets: durationMsPerCarHistogram, - }, []string{"resourceType"}) - - // network timing - fetchNetworkSpeedPerPeerSuccessMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_network_speed_peer_success"), - Help: "Network speed observed during successful caboose fetches from a single peer in bytes per milliseconds", - Buckets: speedBytesPerMsHistogram, - }, []string{"resourceType"}) - - fetchNetworkLatencyPeerSuccessMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_network_latency_peer_success"), - Help: "Network latency observed during successful caboose fetches from a single peer in milliseconds", - Buckets: durationMsPerCarHistogram, - }, []string{"resourceType"}) -) - var ( fetchCalledTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_called_total"), }, []string{"resourceType"}) - fetchRequestContextErrorTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_request_context_error_total"), - }, []string{"resourceType", "errorType", "requestStage"}) - fetchRequestSuccessTimeTraceMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_request_success_time_trace"), Buckets: durationMsPerCarHistogram, @@ -229,6 +133,10 @@ var ( mirroredTrafficTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: prometheus.BuildFQName("ipfs", "caboose", "mirrored_traffic_total"), }, []string{"error_status"}) + + complianceCidCallsTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: prometheus.BuildFQName("ipfs", "caboose", "compliance_cids_total"), + }, []string{"error_status"}) ) var CabooseMetrics = prometheus.NewRegistry() @@ -238,53 +146,32 @@ func init() { CabooseMetrics.MustRegister(poolRefreshErrorMetric) CabooseMetrics.MustRegister(poolSizeMetric) CabooseMetrics.MustRegister(poolNewMembersMetric) - CabooseMetrics.MustRegister(poolRemovedFailureTotalMetric) - CabooseMetrics.MustRegister(poolRemovedConnFailureTotalMetric) - CabooseMetrics.MustRegister(poolRemovedReadFailureTotalMetric) - CabooseMetrics.MustRegister(poolRemovedNon2xxTotalMetric) - CabooseMetrics.MustRegister(poolMembersNotAddedBecauseRemovedMetric) - CabooseMetrics.MustRegister(poolMembersRemovedAndAddedBackMetric) - CabooseMetrics.MustRegister(poolEnoughObservationsForMainSetDurationMetric) CabooseMetrics.MustRegister(poolTierChangeMetric) CabooseMetrics.MustRegister(fetchResponseCodeMetric) CabooseMetrics.MustRegister(fetchSpeedPerPeerSuccessMetric) - CabooseMetrics.MustRegister(fetchDurationPerBlockPerPeerSuccessMetric) CabooseMetrics.MustRegister(fetchDurationPerCarPerPeerSuccessMetric) - CabooseMetrics.MustRegister(fetchDurationPerBlockPerPeerFailureMetric) CabooseMetrics.MustRegister(fetchDurationPerCarPerPeerFailureMetric) - CabooseMetrics.MustRegister(fetchDurationBlockSuccessMetric) CabooseMetrics.MustRegister(fetchDurationCarSuccessMetric) - CabooseMetrics.MustRegister(fetchDurationBlockFailureMetric) CabooseMetrics.MustRegister(fetchDurationCarFailureMetric) - CabooseMetrics.MustRegister(fetchTTFBPerBlockPerPeerSuccessMetric) CabooseMetrics.MustRegister(fetchTTFBPerCARPerPeerSuccessMetric) - CabooseMetrics.MustRegister(headerTTFBPerPeerMetric) CabooseMetrics.MustRegister(fetchCacheCountSuccessTotalMetric) - CabooseMetrics.MustRegister(fetchDurationPerPeerSuccessTotalL1NodeMetric) - CabooseMetrics.MustRegister(fetchDurationPerPeerSuccessCacheMissTotalLassieMetric) - CabooseMetrics.MustRegister(lassie5XXTimeMetric) - - CabooseMetrics.MustRegister(fetchNetworkSpeedPerPeerSuccessMetric) - CabooseMetrics.MustRegister(fetchNetworkLatencyPeerSuccessMetric) - CabooseMetrics.MustRegister(m_collector{&peerLatencyDistribution}) CabooseMetrics.MustRegister(fetchSizeCarMetric) - CabooseMetrics.MustRegister(fetchSizeBlockMetric) - - CabooseMetrics.MustRegister(fetchRequestContextErrorTotalMetric) CabooseMetrics.MustRegister(fetchCalledTotalMetric) - CabooseMetrics.MustRegister(fetchRequestSuccessTimeTraceMetric) CabooseMetrics.MustRegister(saturnCallsTotalMetric) CabooseMetrics.MustRegister(saturnCallsFailureTotalMetric) CabooseMetrics.MustRegister(saturnConnectionFailureTotalMetric) + CabooseMetrics.MustRegister(complianceCidCallsTotalMetric) CabooseMetrics.MustRegister(saturnCallsSuccessTotalMetric) CabooseMetrics.MustRegister(mirroredTrafficTotalMetric) + + CabooseMetrics.MustRegister(fetchRequestSuccessTimeTraceMetric) } diff --git a/node.go b/node.go index 4fc27ce..f21f1b2 100644 --- a/node.go +++ b/node.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/filecoin-saturn/caboose/internal/state" "github.com/zyedidia/generic/queue" ) @@ -14,7 +15,9 @@ const ( ) type Node struct { - URL string + URL string + ComplianceCid string + Core bool PredictedLatency float64 PredictedThroughput float64 @@ -25,10 +28,12 @@ type Node struct { lk sync.RWMutex } -func NewNode(url string) *Node { +func NewNode(info state.NodeInfo) *Node { return &Node{ - URL: url, - Samples: queue.New[NodeSample](), + URL: info.IP, + ComplianceCid: info.ComplianceCid, + Core: info.Core, + Samples: queue.New[NodeSample](), } } @@ -131,3 +136,7 @@ func (n *Node) Rate() float64 { last := n.Samples.Peek() return float64(len) / float64(time.Since(last.Start)) } + +func (n *Node) String() string { + return n.URL +} diff --git a/node_heap.go b/node_heap.go index 0cce5f6..c71771f 100644 --- a/node_heap.go +++ b/node_heap.go @@ -2,6 +2,7 @@ package caboose import ( "container/heap" + "math/rand" "sync" ) @@ -45,18 +46,33 @@ func (nh *NodeHeap) Best() *Node { func (nh *NodeHeap) PeekRandom() *Node { nh.lk.RLock() defer nh.lk.RUnlock() - // TODO - return nil + + if len(nh.Nodes) == 0 { + return nil + } + + randIdx := rand.Intn(len(nh.Nodes)) + return nh.Nodes[randIdx] } func (nh *NodeHeap) TopN(n int) []*Node { m := make([]*Node, 0, n) - nh.lk.RLock() - defer nh.lk.RUnlock() - for i := 0; i < n && i < len(nh.Nodes); i++ { - node := nh.Nodes[i] + temp := make([]*Node, 0, n) + nh.lk.Lock() + defer nh.lk.Unlock() + + heap.Init(nh) + for i := 0; i < n && nh.Len() > 0; i++ { + item := heap.Pop(nh) + node := item.(*Node) m = append(m, node) + temp = append(temp, node) } + + for _, node := range temp { + heap.Push(nh, node) + } + return m } diff --git a/node_ring.go b/node_ring.go index 329b605..eafbbf3 100644 --- a/node_ring.go +++ b/node_ring.go @@ -1,6 +1,8 @@ package caboose import ( + "fmt" + "strings" "sync" "github.com/willscott/hashring" @@ -8,23 +10,25 @@ import ( // NodeRing represents a set of nodes organized for stable hashing. type NodeRing struct { - nodes map[string]*Node - ring hashring.HashRing + Nodes map[string]*Node + ring hashring.HashRing + targetSize int lk sync.RWMutex } -func NewNodeRing() *NodeRing { +func NewNodeRing(targetSize int) *NodeRing { return &NodeRing{ - nodes: map[string]*Node{}, - ring: *hashring.New([]string{}), + Nodes: map[string]*Node{}, + ring: *hashring.New([]string{}), + targetSize: targetSize, } } func (nr *NodeRing) updateRing() error { // this method expects that the lk is held when called. rs := make(map[string]int) - for _, n := range nr.nodes { + for _, n := range nr.Nodes { // TODO: weight multiples rs[n.URL] = 1 } @@ -32,6 +36,40 @@ func (nr *NodeRing) updateRing() error { return nil } +// A score of '0' ==> overall experience is the same as the current state +// A positive score ==> overall experience is better than the current state +// A negative score ==> overall experience is worse than the current state +func (nr *NodeRing) getScoreForUpdate(candidate string, priority float64, weight int) float64 { + changes := nr.ring.ConsiderUpdateWeightedNode(candidate, weight) + delta := float64(0) + var neighbor *Node + + for n, v := range changes { + neighbor = nr.Nodes[n] + neighborVolume := neighbor.Rate() + if neighborVolume < 1 { + neighborVolume = 1 + } + + amntChanged := v + // for now, add some bounds + if amntChanged < -1 { + amntChanged = -1 + } else if amntChanged > 1 { + amntChanged = 1 + } + // a negative amntChanged means that we're replacing the neighbor with the candidate. + amntChanged *= -1 + + // how much worse is candidate? + diff := priority - neighbor.Priority() + cs := diff * neighborVolume * float64(amntChanged) + delta += cs + // fmt.Printf("+%f (n %s: diff %f=(n %f - candidate %f) * volume %f * v = %f)", cs, neighbor.URL, diff, neighbor.Priority(), priority, neighborVolume, amntChanged) + } + return delta +} + func (nr *NodeRing) MaybeSubstituteOrAdd(candidate *Node, activationThreshold int64) (bool, error) { nr.lk.Lock() defer nr.lk.Unlock() @@ -39,36 +77,44 @@ func (nr *NodeRing) MaybeSubstituteOrAdd(candidate *Node, activationThreshold in _, ok := nr.ring.GetNode(candidate.URL) if !ok { // ring is empty. in this case we always want to add. - nr.nodes[candidate.URL] = candidate + nr.Nodes[candidate.URL] = candidate return true, nr.updateRing() } // how much space is being claimed? - overlapEstimate := nr.ring.ConsiderUpdateWeightedNode(candidate.URL, 1) + delta := nr.getScoreForUpdate(candidate.URL, candidate.Priority(), 1) - var neighbor *Node - delta := float64(0) - - for n, v := range overlapEstimate { - neighbor = nr.nodes[n] - neighborVolume := neighbor.Rate() + if delta >= float64(activationThreshold) { + nr.Nodes[candidate.URL] = candidate + return true, nr.updateRing() + } - // how much worse is candidate? - diff := candidate.Priority() - neighbor.Priority() - delta += diff * neighborVolume * float64(v) + // not a clear benefit to add, but maybe acceptable for substitution: + worst := candidate.Priority() + worstN := "" + for _, n := range nr.Nodes { + if n.Priority() < worst { + worst = n.Priority() + worstN = n.URL + } } - if delta > float64(activationThreshold) { - nr.nodes[candidate.URL] = candidate + // todo: the '+1' is an arbitrary threshold to prevent thrashing. it should be configurable. + if worstN != "" && candidate.Priority()-worst > float64(activationThreshold)+1 { + nr.Nodes[candidate.URL] = candidate + delete(nr.Nodes, worstN) return true, nr.updateRing() + } + + // fmt.Printf("did not add - delta %f activation %d, node priority %f\n", delta, activationThreshold, candidate.Priority()) return false, nil } func (nr *NodeRing) Add(n *Node) error { nr.lk.Lock() defer nr.lk.Unlock() - nr.nodes[n.URL] = n + nr.Nodes[n.URL] = n return nr.updateRing() } @@ -76,8 +122,8 @@ func (nr *NodeRing) Remove(n *Node) error { nr.lk.Lock() defer nr.lk.Unlock() - if _, ok := nr.nodes[n.URL]; ok { - delete(nr.nodes, n.URL) + if _, ok := nr.Nodes[n.URL]; ok { + delete(nr.Nodes, n.URL) return nr.updateRing() } return ErrNoBackend @@ -87,10 +133,21 @@ func (nr *NodeRing) Contains(n *Node) bool { nr.lk.RLock() defer nr.lk.RUnlock() - _, ok := nr.nodes[n.URL] + _, ok := nr.Nodes[n.URL] return ok } +func (nr *NodeRing) IsCore(n *Node) bool { + nr.lk.RLock() + defer nr.lk.RUnlock() + + nd, ok := nr.Nodes[n.URL] + if !ok { + return false + } + return nd.Core +} + func (nr *NodeRing) GetNodes(key string, number int) ([]*Node, error) { nr.lk.RLock() defer nr.lk.RUnlock() @@ -104,7 +161,7 @@ func (nr *NodeRing) GetNodes(key string, number int) ([]*Node, error) { } nodes := make([]*Node, 0, len(keys)) for _, k := range keys { - if n, ok := nr.nodes[k]; ok { + if n, ok := nr.Nodes[k]; ok { nodes = append(nodes, n) } } @@ -116,3 +173,15 @@ func (nr *NodeRing) Len() int { defer nr.lk.RUnlock() return nr.ring.Size() } + +func (nr *NodeRing) String() string { + nr.lk.RLock() + defer nr.lk.RUnlock() + + ns := make([]string, 0, len(nr.Nodes)) + for _, n := range nr.Nodes { + ns = append(ns, n.String()) + } + + return fmt.Sprintf("NodeRing[len %d]{%s}", nr.ring.Size(), strings.Join(ns, ",")) +} diff --git a/node_ring_test.go b/node_ring_test.go index a5ff11a..c24e14d 100644 --- a/node_ring_test.go +++ b/node_ring_test.go @@ -8,7 +8,7 @@ import ( ) func TestNodeRing(t *testing.T) { - nr := caboose.NewNodeRing() + nr := caboose.NewNodeRing(30) nodes := make([]*caboose.Node, 0) for i := 0; i < 100; i++ { nodes = append(nodes, &caboose.Node{URL: fmt.Sprintf("node%d", i)}) diff --git a/pool.go b/pool.go index f46f3c5..64dca91 100644 --- a/pool.go +++ b/pool.go @@ -2,15 +2,19 @@ package caboose import ( "context" + cryptoRand "crypto/rand" "encoding/json" "errors" "fmt" "io" + "math/big" "math/rand" "net/url" "sync" "time" + "github.com/filecoin-saturn/caboose/internal/state" + "github.com/patrickmn/go-cache" "github.com/ipfs/boxo/path" @@ -19,11 +23,16 @@ import ( "github.com/ipld/go-car" ) -const blockPathPattern = "/ipfs/%s?format=car&dag-scope=block" +const ( + blockPathPattern = "/ipfs/%s?format=car&dag-scope=block" + defaultMirroredConcurrency = 5 +) + +var complianceCidReqTemplate = "/ipfs/%s?format=raw" // loadPool refreshes the set of endpoints in the pool by fetching an updated list of nodes from the // Orchestrator. -func (p *pool) loadPool() ([]string, error) { +func (p *pool) loadPool() ([]state.NodeInfo, error) { if p.config.OrchestratorOverride != nil { return p.config.OrchestratorOverride, nil } @@ -35,12 +44,15 @@ func (p *pool) loadPool() ([]string, error) { } defer resp.Body.Close() - responses := make([]string, 0) + responses := make([]state.NodeInfo, 0) + if err := json.NewDecoder(resp.Body).Decode(&responses); err != nil { goLogger.Warnw("failed to decode backends from orchestrator", "err", err, "endpoint", p.config.OrchestratorEndpoint.String()) return nil, err } + goLogger.Infow("got backends from orchestrators", "cnt", len(responses), "endpoint", p.config.OrchestratorEndpoint.String()) + return responses, nil } @@ -80,7 +92,7 @@ func newPool(c *Config, logger *logger) *pool { fetchKeyCoolDownCache: cache.New(c.FetchKeyCoolDownDuration, 1*time.Minute), fetchKeyFailureCache: cache.New(c.FetchKeyCoolDownDuration, 1*time.Minute), - ActiveNodes: NewNodeRing(), + ActiveNodes: NewNodeRing(c.PoolTargetSize), AllNodes: NewNodeHeap(), } @@ -136,30 +148,67 @@ func (p *pool) refreshPool() { } } +func (p *pool) fetchComplianceCid(node *Node) error { + sc := node.ComplianceCid + if len(node.ComplianceCid) == 0 { + goLogger.Warnw("failed to find compliance cid ", "for node", node) + return fmt.Errorf("compliance cid doesn't exist for node: %s ", node) + } + trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second) + reqUrl := fmt.Sprintf(complianceCidReqTemplate, sc) + goLogger.Debugw("fetching compliance cid", "cid", reqUrl, "from", node) + err := p.fetchResourceAndUpdate(trialTimeout, node, reqUrl, 0, p.mirrorValidator) + cancel() + return err +} + func (p *pool) checkPool() { + sem := make(chan struct{}, defaultMirroredConcurrency) + for { select { case msg := <-p.mirrorSamples: - // see if it is to a main-tier node - if so find appropriate test node to test against. - if !p.ActiveNodes.Contains(msg.node) { - continue - } - testNode := p.AllNodes.PeekRandom() - if testNode == nil { - continue - } - if p.ActiveNodes.Contains(testNode) { - continue - } + sem <- struct{}{} + go func(msg mirroredPoolRequest) { + defer func() { <-sem }() + + // see if it is to a main-tier node - if so find appropriate test node to test against. + if !p.ActiveNodes.Contains(msg.node) { + return + } + testNode := p.AllNodes.PeekRandom() + if testNode == nil { + return + } + if p.ActiveNodes.Contains(testNode) { + rand := big.NewInt(1) + if p.config.ComplianceCidPeriod > 0 { + rand, _ = cryptoRand.Int(cryptoRand.Reader, big.NewInt(p.config.ComplianceCidPeriod)) + } + + if rand.Cmp(big.NewInt(0)) == 0 { + err := p.fetchComplianceCid(testNode) + if err != nil { + goLogger.Warnw("failed to fetch compliance cid ", "err", err) + complianceCidCallsTotalMetric.WithLabelValues("error").Add(1) + } else { + complianceCidCallsTotalMetric.WithLabelValues("success").Add(1) + } + } + return + } + + trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second) + err := p.fetchResourceAndUpdate(trialTimeout, testNode, msg.path, 0, p.mirrorValidator) + + cancel() + if err != nil { + mirroredTrafficTotalMetric.WithLabelValues("error").Inc() + } else { + mirroredTrafficTotalMetric.WithLabelValues("no-error").Inc() + } + }(msg) - trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second) - err := p.fetchResourceAndUpdate(trialTimeout, testNode, msg.path, 0, p.mirrorValidator) - cancel() - if err != nil { - mirroredTrafficTotalMetric.WithLabelValues("error").Inc() - } else { - mirroredTrafficTotalMetric.WithLabelValues("no-error").Inc() - } case <-p.done: return } @@ -189,7 +238,7 @@ func (p *pool) mirrorValidator(resource string, reader io.Reader) error { return err } - br, err := car.NewCarReader(reader) + br, err := car.NewCarReaderWithOptions(reader, car.WithErrorOnEmptyRoots(false)) if err != nil { return err } @@ -275,7 +324,7 @@ func (p *pool) updateFetchKeyCoolDown(key string) { func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallback, with string) (err error) { fetchCalledTotalMetric.WithLabelValues(resourceTypeCar).Add(1) - if recordIfContextErr(resourceTypeCar, ctx, "fetchResourceWith") { + if isCtxError(ctx) { return ctx.Err() } @@ -312,7 +361,7 @@ func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallba pq := []string{path} for i := 0; i < len(nodes); i++ { - if recordIfContextErr(resourceTypeCar, ctx, "fetchResourceWithLoop") { + if isCtxError(ctx) { return ctx.Err() } @@ -323,6 +372,7 @@ func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallba default: } } + old := pq[0] err = p.fetchResourceAndUpdate(ctx, nodes[i], pq[0], i, cb) if err != nil && errors.Is(err, context.Canceled) { return err @@ -337,6 +387,8 @@ func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallba //fetchSpeedPerBlockMetric.Observe(float64(float64(len(blk.RawData())) / float64(durationMs))) fetchDurationCarSuccessMetric.Observe(float64(durationMs)) return + } else if pq[0] == old { + continue } else { // TODO: potentially worth doing something smarter here based on what the current state // of permanent vs temporary errors is. @@ -352,11 +404,16 @@ func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallba } pq = pq[1:] pq = append(pq, epr.StillNeed...) - // TODO: potentially worth doing something smarter here based on what the current state - // of permanent vs temporary errors is. - // for now: reset i on partials so we also give them a chance to retry. - i = -1 + if pq[0] == old { + continue + } else { + // TODO: potentially worth doing something smarter here based on what the current state + // of permanent vs temporary errors is. + + // for now: reset i on partials so we also give them a chance to retry. + i = -1 + } } } diff --git a/pool_dynamics_test.go b/pool_dynamics_test.go index c4a8c71..dc2d4a9 100644 --- a/pool_dynamics_test.go +++ b/pool_dynamics_test.go @@ -1,13 +1,335 @@ package caboose_test import ( + "context" + cryptoRand "crypto/rand" + "fmt" + "math/rand" + "net/url" "testing" + "time" + "github.com/filecoin-saturn/caboose" "github.com/filecoin-saturn/caboose/internal/util" + "github.com/ipfs/go-cid" + "github.com/multiformats/go-multicodec" + "github.com/stretchr/testify/assert" ) +const ( + nodesSize = 6 +) +const blockPathPattern = "/ipfs/%s?format=car&dag-scope=block" + +/* +This function tests if the caboose pool converges to a set of nodes that are expected +based on given controled scenarios. The function continuously injects stats into +certain nodes and simulates the caboose pool refreshing over time and updating its +active set of nodes based on the stats injected. + +The tests are designed such that there is two groups of nodes: "bad", and "good". Those +are picked randomly in the beginning of each test. At the end of each test, the pool should +always be converging to the "good" nodes. +*/ func TestPoolDynamics(t *testing.T) { - ch := util.BuildCabooseHarness(t, 3, 3) + baseStatSize := 100000 + baseStatLatency := 100 + poolRefreshNo := 10 + ctx := context.Background() + testCid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum(testBlock) + + // This test ensures that when the pool is intialized, it should converge to a set + // of nodes that have stats vs a set of nodes that don't have any stats. + t.Run("pool converges to good nodes vs nodes with no stats", func(t *testing.T) { + ch, controlGroup := getHarnessAndControlGroup(t, nodesSize, nodesSize/2) + ch.FetchAndAssertSuccess(t, ctx, testCid) + + goodNodes := make([]*caboose.Node, 0) + for _, n := range ch.CabooseAllNodes.Nodes { + _, ok := controlGroup[n.URL] + if ok { + goodNodes = append(goodNodes, n) + } + } + + for i := 0; i < 1; i++ { + goodStats := util.NodeStats{ + Start: time.Now().Add(-time.Second * 2), + Latency: float64(baseStatLatency) / float64(10), + Size: float64(baseStatSize) * float64(10), + } + + ch.RecordSuccesses(t, goodNodes, goodStats, 1000) + ch.CaboosePool.DoRefresh() + } + + for n := range controlGroup { + assert.Contains(t, ch.CabooseActiveNodes.Nodes, n) + } + }) + + t.Run("pool converges to good nodes vs nodes with worse stats", func(t *testing.T) { + ch, controlGroup := getHarnessAndControlGroup(t, nodesSize, nodesSize/2) + ch.FetchAndAssertSuccess(t, ctx, testCid) + + goodNodes := make([]*caboose.Node, 0) + badNodes := make([]*caboose.Node, 0) + for _, n := range ch.CabooseAllNodes.Nodes { + _, ok := controlGroup[n.URL] + if ok { + goodNodes = append(goodNodes, n) + } else { + badNodes = append(badNodes, n) + } + } + + for i := 0; i < poolRefreshNo; i++ { + + goodStats := util.NodeStats{ + Start: time.Now().Add(-time.Second * 2), + Latency: float64(baseStatLatency) / float64(10), + Size: float64(baseStatSize) * float64(10), + } + badStats := util.NodeStats{ + Start: time.Now().Add(-time.Second * 2), + Latency: float64(baseStatLatency) * float64(10), + Size: float64(baseStatSize) / float64(10), + } + + ch.RecordSuccesses(t, goodNodes, goodStats, 1000) + ch.RecordSuccesses(t, badNodes, badStats, 1000) + ch.CaboosePool.DoRefresh() + } + + for n := range controlGroup { + assert.Contains(t, ch.CabooseActiveNodes.Nodes, n) + } + }) + + // When new nodes join, if they start consistently performing better than the nodes in the current pool, + // then those nodes should replace the nodes in the current pool. + t.Run("pool converges to new nodes that are better than the current pool", func(t *testing.T) { + ch, controlGroup := getHarnessAndControlGroup(t, nodesSize, nodesSize/2) + ch.FetchAndAssertSuccess(t, ctx, testCid) + + goodNodes := make([]*caboose.Node, 0) + badNodes := make([]*caboose.Node, 0) + + for _, n := range ch.CabooseAllNodes.Nodes { + _, ok := controlGroup[n.URL] + if ok { + goodNodes = append(goodNodes, n) + } else { + badNodes = append(badNodes, n) + } + } + + // Give the bad nodes some stats, those nodes then become the main active tier. + // The good nodes have 0 stats after this should not be picked at this point. + for i := 0; i < poolRefreshNo; i++ { + badStats := util.NodeStats{ + Start: time.Now().Add(-time.Second * 2), + Latency: float64(baseStatLatency) * float64(10), + Size: float64(baseStatSize) / float64(10), + } + ch.RecordSuccesses(t, badNodes, badStats, 1000) + ch.CaboosePool.DoRefresh() + } + + // Add some new "good" nodes that have better stats over a longer period of time. + for i := 0; i < poolRefreshNo*2; i++ { + goodStats := util.NodeStats{ + Start: time.Now().Add(-time.Second * 2), + Latency: float64(baseStatLatency) / float64(10), + Size: float64(baseStatSize) * float64(10), + } + ch.RecordSuccesses(t, goodNodes, goodStats, 2000) + ch.CaboosePool.DoRefresh() + } + + ch.CaboosePool.DoRefresh() + for n := range controlGroup { + assert.Contains(t, ch.CabooseActiveNodes.Nodes, n) + } + + }) + + // If the current active main pool starts failing, the pool should converge to + // to nodes that are not failing. + t.Run("pool converges to other nodes if the current ones start failing", func(t *testing.T) { + ch, controlGroup := getHarnessAndControlGroup(t, nodesSize, nodesSize/2) + ch.FetchAndAssertSuccess(t, ctx, testCid) + + goodNodes := make([]*caboose.Node, 0) + badNodes := make([]*caboose.Node, 0) + + for _, n := range ch.CabooseAllNodes.Nodes { + _, ok := controlGroup[n.URL] + if ok { + goodNodes = append(goodNodes, n) + } else { + badNodes = append(badNodes, n) + } + } + + // Start with the bad nodes having better stats than the good nodes + for i := 0; i < poolRefreshNo; i++ { + goodStats := util.NodeStats{ + Start: time.Now().Add(-time.Second * 2), + Latency: float64(baseStatLatency) / float64(10), + Size: float64(baseStatSize) * float64(10), + } + badStats := util.NodeStats{ + Start: time.Now().Add(-time.Second * 2), + Latency: float64(baseStatLatency) * float64(10), + Size: float64(baseStatSize) / float64(10), + } + + ch.RecordSuccesses(t, goodNodes, badStats, 1000) + ch.RecordSuccesses(t, badNodes, goodStats, 1000) + ch.CaboosePool.DoRefresh() + } + + // Start failing the bad nodes and keep giving the same stats to the good nodes. + for i := 0; i < poolRefreshNo*2; i++ { + badStats := util.NodeStats{ + Start: time.Now().Add(-time.Second * 2), + Latency: float64(baseStatLatency) * float64(10), + Size: float64(baseStatSize) / float64(10), + } + + ch.RecordSuccesses(t, goodNodes, badStats, 1000) + ch.RecordFailures(t, badNodes, 1000) + ch.CaboosePool.DoRefresh() + } + + ch.CaboosePool.DoRefresh() + for n := range controlGroup { + assert.Contains(t, ch.CabooseActiveNodes.Nodes, n) + } + + }) + +} + +func TestPoolAffinity(t *testing.T) { + baseStatSize := 100000 + baseStatLatency := 100 + // statVarianceFactor := 0.1 + poolRefreshNo := 10 + simReqCount := 10000 + ctx := context.Background() + cidList := generateRandomCIDs(20) + + t.Run("selected nodes remain consistent for same cid reqs", func(t *testing.T) { + // 80 nodes will be in the good pool. 20 will be added later with the same stats. + // So, 20% of the nodes in the pool will eventually be "new nodes" that have been added later. + ch, controlGroup := getHarnessAndControlGroup(t, 100, 80) + _, _ = ch.Caboose.Get(ctx, cidList[0]) + + existingNodes := make([]*caboose.Node, 0) + newNodes := make([]*caboose.Node, 0) + + for _, n := range ch.CabooseAllNodes.Nodes { + _, ok := controlGroup[n.URL] + if ok { + existingNodes = append(existingNodes, n) + } else { + newNodes = append(newNodes, n) + } + } + + // Send requests to control group nodes to bump their selection into the pool. + for i := 0; i < poolRefreshNo; i++ { + baseStats := util.NodeStats{ + Start: time.Now().Add(-time.Second * 2), + Latency: float64(baseStatLatency) / float64(10), + Size: float64(baseStatSize) * float64(10), + } + + ch.RecordSuccesses(t, existingNodes, baseStats, 1000) + ch.CaboosePool.DoRefresh() + } + + // Make a bunch of requests to similar cids to establish a stable hashring + for i := 0; i < simReqCount; i++ { + rand.New(rand.NewSource(time.Now().Unix())) + idx := rand.Intn(len(cidList)) + _, _ = ch.Caboose.Get(ctx, cidList[idx]) + } + ch.CaboosePool.DoRefresh() + + // Introduce new nodes by sendng same stats to those nodes. + for i := 0; i < poolRefreshNo/2; i++ { + baseStats := util.NodeStats{ + Start: time.Now().Add(-time.Second * 2), + Latency: float64(baseStatLatency) / float64(10), + Size: float64(baseStatSize) * float64(10), + } + + ch.RecordSuccesses(t, existingNodes, baseStats, 100) + ch.RecordSuccesses(t, newNodes, baseStats, 10) + + ch.CaboosePool.DoRefresh() + } + + rerouteCount := 0 + + // Get the candidate nodes for each cid in the cid list to see if it's been rerouted to a new node. + for _, c := range cidList { + aff := ch.Caboose.GetAffinity(ctx) + if aff == "" { + aff = fmt.Sprintf(blockPathPattern, c) + } + nodes, _ := ch.CabooseActiveNodes.GetNodes(aff, ch.Config.MaxRetrievalAttempts) + + for _, n := range newNodes { + n := n + if n.URL == nodes[0].URL { + rerouteCount++ + } + } + } + + // no more than 5 cids from the cid list of 20 should get re-routed (25%) + assert.LessOrEqual(t, rerouteCount, 5) + }) +} + +func getHarnessAndControlGroup(t *testing.T, nodesSize int, poolSize int) (*util.CabooseHarness, map[string]string) { + ch := util.BuildCabooseHarness(t, nodesSize, 3, func(config *caboose.Config) { + config.PoolTargetSize = nodesSize / 2 + }) ch.StartOrchestrator() + + rand.New(rand.NewSource(0)) + eps := ch.Endpoints + controlGroup := make(map[string]string) + + rand.Shuffle(len(eps), func(i, j int) { + eps[i], eps[j] = eps[j], eps[i] + }) + + for _, ep := range eps[:poolSize] { + url, _ := url.Parse(ep.Server.URL) + controlGroup[url.Host] = ep.Server.URL + } + + return ch, controlGroup +} + +func generateRandomCIDs(count int) []cid.Cid { + var cids []cid.Cid + for i := 0; i < count; i++ { + block := make([]byte, 32) + cryptoRand.Read(block) + c, _ := cid.V1Builder{ + Codec: uint64(multicodec.Raw), + MhType: uint64(multicodec.Sha2_256), + }.Sum(block) + + cids = append(cids, c) + } + return cids } diff --git a/pool_metrics.go b/pool_metrics.go index 6e6ca69..f61da20 100644 --- a/pool_metrics.go +++ b/pool_metrics.go @@ -4,31 +4,6 @@ import "github.com/prometheus/client_golang/prometheus" // pool metrics var ( - poolRemovedFailureTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: prometheus.BuildFQName("ipfs", "caboose", "pool_removed_failure_total"), - }, []string{"tier", "reason"}) - - poolRemovedConnFailureTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: prometheus.BuildFQName("ipfs", "caboose", "pool_removed_conn_failure_total"), - }, []string{"tier"}) - - poolRemovedReadFailureTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: prometheus.BuildFQName("ipfs", "caboose", "pool_removed_read_failure_total"), - }, []string{"tier"}) - - poolRemovedNon2xxTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: prometheus.BuildFQName("ipfs", "caboose", "pool_removed_non2xx_total"), - }, []string{"tier"}) - - // The below metrics are only updated periodically on every Caboose pool refresh - poolMembersNotAddedBecauseRemovedMetric = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: prometheus.BuildFQName("ipfs", "caboose", "pool_members_not_added"), - }) - - poolMembersRemovedAndAddedBackMetric = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: prometheus.BuildFQName("ipfs", "caboose", "pool_removed_and_added_back"), - }) - poolRefreshErrorMetric = prometheus.NewCounter(prometheus.CounterOpts{ Name: prometheus.BuildFQName("ipfs", "caboose", "pool_refresh_errors"), Help: "Number of errors refreshing the caboose pool", @@ -44,10 +19,6 @@ var ( Help: "New members added to the Caboose pool", }) - poolEnoughObservationsForMainSetDurationMetric = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: prometheus.BuildFQName("ipfs", "caboose", "pool_observations_for_main_set_duration"), - }) - poolTierChangeMetric = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: prometheus.BuildFQName("ipfs", "caboose", "pool_tier_change"), }, []string{"change"}) diff --git a/pool_refresh_test.go b/pool_refresh_test.go index e6318bf..5ae4ad4 100644 --- a/pool_refresh_test.go +++ b/pool_refresh_test.go @@ -1,8 +1,12 @@ package caboose import ( + "math/rand" "testing" + "github.com/filecoin-saturn/caboose/internal/state" + "github.com/ipfs/go-cid" + "github.com/multiformats/go-multicodec" "github.com/stretchr/testify/require" ) @@ -29,8 +33,25 @@ func TestPoolRefresh(t *testing.T) { } func addAndAssertPool(t *testing.T, p *pool, nodes []string, expectedTotal int) { - for _, n := range nodes { + nodeStructs := genNodeStructs(nodes) + for _, n := range nodeStructs { p.AllNodes.AddIfNotPresent(NewNode(n)) } require.Equal(t, expectedTotal, p.AllNodes.Len()) } + +func genNodeStructs(nodes []string) []state.NodeInfo { + var nodeStructs []state.NodeInfo + + for _, node := range nodes { + cid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum([]byte(node)) + nodeStructs = append(nodeStructs, state.NodeInfo{ + IP: node, + ID: node, + Weight: rand.Intn(100), + Distance: rand.Float32(), + ComplianceCid: cid.String(), + }) + } + return nodeStructs +} diff --git a/pool_test.go b/pool_test.go index 683dca2..543078b 100644 --- a/pool_test.go +++ b/pool_test.go @@ -3,15 +3,10 @@ package caboose_test import ( "bytes" "context" - "crypto/tls" - "net/http" - "net/url" - "strings" "testing" "time" "unsafe" - "github.com/filecoin-saturn/caboose" "github.com/filecoin-saturn/caboose/internal/util" "github.com/ipfs/go-cid" "github.com/ipld/go-car/v2" @@ -28,15 +23,7 @@ func TestPoolMirroring(t *testing.T) { t.Skip("skipping for 32bit architectures because too slow") } - saturnClient := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - }, - }, - } - - data := []byte("hello world") + data := []byte("hello World") ls := cidlink.DefaultLinkSystem() lsm := memstore.Store{} ls.SetReadStorage(&lsm) @@ -50,32 +37,8 @@ func TestPoolMirroring(t *testing.T) { t.Fatal(err) } - e := util.Endpoint{} - e.Setup() - e.SetResp(carBytes.Bytes(), false) - eURL := strings.TrimPrefix(e.Server.URL, "https://") - - e2 := util.Endpoint{} - e2.Setup() - e2.SetResp(carBytes.Bytes(), false) - e2URL := strings.TrimPrefix(e2.Server.URL, "https://") - - conf := caboose.Config{ - OrchestratorEndpoint: &url.URL{}, - OrchestratorClient: http.DefaultClient, - OrchestratorOverride: []string{eURL, e2URL}, - LoggingEndpoint: url.URL{}, - LoggingClient: http.DefaultClient, - LoggingInterval: time.Hour, - - Client: saturnClient, - DoValidation: false, - PoolRefresh: time.Minute, - MaxRetrievalAttempts: 1, - MirrorFraction: 1.0, - } + ch := util.BuildCabooseHarness(t, 2, 3) - c, err := caboose.NewCaboose(&conf) if err != nil { t.Fatal(err) } @@ -84,7 +47,7 @@ func TestPoolMirroring(t *testing.T) { // Make 10 requests, and expect some fraction trigger a mirror. for i := 0; i < 10; i++ { - _, err = c.Get(context.Background(), finalC) + _, err = ch.Caboose.Get(context.Background(), finalC) if err != nil { t.Fatal(err) } @@ -92,11 +55,43 @@ func TestPoolMirroring(t *testing.T) { } time.Sleep(100 * time.Millisecond) - c.Close() + ch.Caboose.Close() + + ec := ch.Endpoints[0].Count() - ec := e.Count() - e2c := e2.Count() + e2c := ch.Endpoints[1].Count() if ec+e2c < 10 { t.Fatalf("expected at least 10 fetches, got %d", ec+e2c) } } + +func TestFetchComplianceCid(t *testing.T) { + if unsafe.Sizeof(unsafe.Pointer(nil)) <= 4 { + t.Skip("skipping for 32bit architectures because too slow") + } + + ch := util.BuildCabooseHarness(t, 1, 1, util.WithComplianceCidPeriod(1), util.WithMirrorFraction(1.0)) + + ch.CaboosePool.DoRefresh() + + ls := cidlink.DefaultLinkSystem() + lsm := memstore.Store{} + ls.SetReadStorage(&lsm) + ls.SetWriteStorage(&lsm) + finalCL := ls.MustStore(ipld.LinkContext{}, cidlink.LinkPrototype{Prefix: cid.NewPrefixV1(uint64(multicodec.Raw), uint64(multicodec.Sha2_256))}, basicnode.NewBytes(testBlock)) + finalC := finalCL.(cidlink.Link).Cid + + _, err := ch.Caboose.Get(context.Background(), finalC) + if err != nil { + t.Fatal(err) + } + + time.Sleep(100 * time.Millisecond) + ch.Caboose.Close() + + e := ch.Endpoints[0] + + if e.Count() != 2 { + t.Fatalf("expected 2 primary fetch, got %d", e.Count()) + } +} diff --git a/pool_tier_promotion.go b/pool_tier_promotion.go index 326816b..36cf98a 100644 --- a/pool_tier_promotion.go +++ b/pool_tier_promotion.go @@ -1,19 +1,18 @@ package caboose -const ( - poolConsiderationCount = 30 - activationThreshold = 0 +var ( + activationThreshold = 0 ) func updateActiveNodes(active *NodeRing, all *NodeHeap) error { - candidates := all.TopN(poolConsiderationCount) + candidates := all.TopN(active.targetSize) added := 0 for _, c := range candidates { if active.Contains(c) { continue } activeSize := active.Len() - discount := poolConsiderationCount - activeSize + discount := active.targetSize - activeSize if discount < 0 { discount = 0 }