Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

feat: modify pool tests #153

Merged
merged 44 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
17bb795
feat: modify pool tests
AmeanAsad Aug 31, 2023
00a4344
fix: surface caboose pool methods for testing
AmeanAsad Sep 1, 2023
0cd98cf
fix: top n node selection from heap
AmeanAsad Sep 1, 2023
b6b03f9
feat: add more comprehensive tests
AmeanAsad Sep 4, 2023
5537699
enhancement: add refresh no to tests
AmeanAsad Sep 4, 2023
778cd45
go fmt
AmeanAsad Sep 4, 2023
35646a8
remove unused metrics
aarshkshah1992 Sep 4, 2023
fa59f39
put back trace
aarshkshah1992 Sep 4, 2023
a23963d
Merge pull request #154 from filecoin-saturn/feat/remove-metrics
aarshkshah1992 Sep 4, 2023
d2c669e
response size does not include header
aarshkshah1992 Sep 4, 2023
ce46ce5
reset retry counter only if progress is made
aarshkshah1992 Sep 4, 2023
ca5522d
update go-car
aarshkshah1992 Sep 4, 2023
27b62be
dont drain response body
aarshkshah1992 Sep 4, 2023
296eaec
send verification errors to Saturn
aarshkshah1992 Sep 4, 2023
eb1e8b8
pool tier promotion
aarshkshah1992 Sep 4, 2023
2713f51
otel and send trace id to Saturn
aarshkshah1992 Sep 4, 2023
7375178
stabilize dynamics tests
willscott Sep 4, 2023
93135a7
mirroring parallel
aarshkshah1992 Sep 5, 2023
bda8d0d
Merge remote-tracking branch 'origin/aa/test-simulator' into feat/por…
aarshkshah1992 Sep 5, 2023
c8be27d
pool-target-size through config to better test dynamics
willscott Sep 5, 2023
d52ef6e
down to flakiness
willscott Sep 6, 2023
61c82da
add substitution (rough)
willscott Sep 10, 2023
550cf5b
Merge remote-tracking branch 'origin/aa/test-simulator' into feat/por…
aarshkshah1992 Sep 18, 2023
c0ea85c
use new orchestrator API
aarshkshah1992 Sep 18, 2023
608a668
Merge pull request #161 from filecoin-saturn/feat/integrate-new-endpoint
aarshkshah1992 Sep 18, 2023
ea1d62b
fix: top N selection
AmeanAsad Sep 18, 2023
05c2b37
Merge branch 'aa/test-simulator' into feat/port-Caboose-main
AmeanAsad Sep 18, 2023
c1ab0e9
enhancement: increase test size
AmeanAsad Sep 19, 2023
1975f49
feat: Add tests for affinity
AmeanAsad Sep 19, 2023
78f3490
test cache affinity
aarshkshah1992 Sep 19, 2023
5e02c7f
test cache affinity
aarshkshah1992 Sep 19, 2023
d8ae01e
remove assert
aarshkshah1992 Sep 19, 2023
b647fab
fix test
aarshkshah1992 Sep 19, 2023
0cf6c94
address review
aarshkshah1992 Sep 19, 2023
552ea1b
Merge pull request #163 from filecoin-saturn/feat/cache-aff-test
aarshkshah1992 Sep 19, 2023
9eb9c18
feat: port compliance cids
AmeanAsad Sep 19, 2023
af17595
fix: remove unused code
AmeanAsad Sep 19, 2023
310c079
modify harness
AmeanAsad Sep 19, 2023
8804f45
feat: add core attr to trace span
AmeanAsad Sep 19, 2023
3f63a01
Merge pull request #164 from filecoin-saturn/feat/port-compliance-cids
aarshkshah1992 Sep 20, 2023
da9ad17
Merge branch 'aa/test-simulator' into feat/port-Caboose-main
aarshkshah1992 Sep 20, 2023
46b5374
fix CI
aarshkshah1992 Sep 20, 2023
ad399fb
Merge pull request #155 from filecoin-saturn/feat/port-Caboose-main
aarshkshah1992 Sep 20, 2023
1015a7f
improve error classification (#165)
AmeanAsad Oct 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type Config struct {
// PoolRefresh is the interval at which we refresh the pool of upstreams from the orchestrator.
PoolRefresh time.Duration

// PoolTargetSize is a baseline size for the pool - the pool will accept decrements in performance to reach maintain at least this size.
PoolTargetSize int

// MirrorFraction is what fraction of requests will be mirrored to another random node in order to track metrics / determine the current best nodes.
MirrorFraction float64

Expand Down Expand Up @@ -90,6 +93,7 @@ const defaultMirrorFraction = 0.01

const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=200"
const DefaultPoolRefreshInterval = 5 * time.Minute
const DefaultPoolTargetSize = 30

// we cool off sending requests for a cid for a certain duration
// if we've seen a certain number of failures for it already in a given duration.
Expand Down Expand Up @@ -131,6 +135,9 @@ func NewCaboose(config *Config) (*Caboose, error) {
if override := os.Getenv(BackendOverrideKey); len(override) > 0 {
config.OrchestratorOverride = strings.Split(override, ",")
}
if config.PoolTargetSize == 0 {
config.PoolTargetSize = DefaultPoolTargetSize
}

logger := newLogger(config)
c := Caboose{
Expand Down Expand Up @@ -188,14 +195,14 @@ func (c *Caboose) Fetch(ctx context.Context, path string, cb DataCallback) error
ctx, span := spanTrace(ctx, "Fetch", trace.WithAttributes(attribute.String("path", path)))
defer span.End()

return c.pool.fetchResourceWith(ctx, path, cb, c.getAffinity(ctx))
return c.pool.fetchResourceWith(ctx, path, cb, c.GetAffinity(ctx))
}

func (c *Caboose) Has(ctx context.Context, it cid.Cid) (bool, error) {
ctx, span := spanTrace(ctx, "Has", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx))
if err != nil {
return false, err
}
Expand All @@ -206,7 +213,7 @@ func (c *Caboose) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) {
ctx, span := spanTrace(ctx, "Get", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx))
if err != nil {
return nil, err
}
Expand All @@ -218,14 +225,14 @@ func (c *Caboose) GetSize(ctx context.Context, it cid.Cid) (int, error) {
ctx, span := spanTrace(ctx, "GetSize", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx))
if err != nil {
return 0, err
}
return len(blk.RawData()), nil
}

func (c *Caboose) getAffinity(ctx context.Context) string {
func (c *Caboose) GetAffinity(ctx context.Context) string {
// https://github.com/ipfs/bifrost-gateway/issues/53#issuecomment-1442732865
if affG := ctx.Value(gateway.ContentPathKey); affG != nil {
contentPath := affG.(ipath.Path).String()
Expand Down
92 changes: 8 additions & 84 deletions fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
}
// if the context is already cancelled, there's nothing we can do here.
if ce := ctx.Err(); ce != nil {
fetchRequestContextErrorTotalMetric.WithLabelValues(resourceType, fmt.Sprintf("%t", errors.Is(ce, context.Canceled)), "fetchResource-init").Add(1)
return ce
}

Expand Down Expand Up @@ -117,19 +116,14 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
ttfbMs = fb.Sub(start).Milliseconds()

cacheStatus := getCacheStatus(isCacheHit)
if isBlockRequest {
fetchSizeBlockMetric.Observe(float64(received))
} else {
if !isBlockRequest {
fetchSizeCarMetric.WithLabelValues("success").Observe(float64(received))
}
durationMs := response_success_end.Sub(start).Milliseconds()
fetchSpeedPerPeerSuccessMetric.WithLabelValues(resourceType, cacheStatus).Observe(float64(received) / float64(durationMs))
fetchCacheCountSuccessTotalMetric.WithLabelValues(resourceType, cacheStatus).Add(1)
// track individual block metrics separately
if isBlockRequest {
fetchTTFBPerBlockPerPeerSuccessMetric.WithLabelValues(cacheStatus).Observe(float64(ttfbMs))
fetchDurationPerBlockPerPeerSuccessMetric.WithLabelValues(cacheStatus).Observe(float64(response_success_end.Sub(start).Milliseconds()))
} else {
if !isBlockRequest {
ci := 0
for index, value := range carSizes {
if float64(received) < value {
Expand All @@ -142,20 +136,11 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
fetchTTFBPerCARPerPeerSuccessMetric.WithLabelValues(cacheStatus, carSizeStr).Observe(float64(ttfbMs))
fetchDurationPerCarPerPeerSuccessMetric.WithLabelValues(cacheStatus).Observe(float64(response_success_end.Sub(start).Milliseconds()))
}

// update L1 server timings
updateSuccessServerTimingMetrics(respHeader.Values(servertiming.HeaderKey), resourceType, isCacheHit, durationMs, ttfbMs, received)
} else {
if isBlockRequest {
fetchDurationPerBlockPerPeerFailureMetric.Observe(float64(time.Since(start).Milliseconds()))
} else {
if !isBlockRequest {
fetchDurationPerCarPerPeerFailureMetric.Observe(float64(time.Since(start).Milliseconds()))
fetchSizeCarMetric.WithLabelValues("failure").Observe(float64(received))
}

if code == http.StatusBadGateway || code == http.StatusGatewayTimeout {
updateLassie5xxTime(respHeader.Values(servertiming.HeaderKey), resourceType)
}
}

if err == nil || !errors.Is(err, context.Canceled) {
Expand Down Expand Up @@ -195,7 +180,7 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
defer cancel()
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, reqUrl, nil)
if err != nil {
if recordIfContextErr(resourceType, reqCtx, "build-http-request") {
if isCtxError(reqCtx) {
return reqCtx.Err()
}
return err
Expand All @@ -219,11 +204,10 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m

var resp *http.Response
saturnCallsTotalMetric.WithLabelValues(resourceType).Add(1)
startReq := time.Now()

resp, err = p.config.Client.Do(req)
if err != nil {
if recordIfContextErr(resourceType, reqCtx, "send-http-request") {
if isCtxError(reqCtx) {
if errors.Is(err, context.Canceled) {
return reqCtx.Err()
}
Expand All @@ -239,7 +223,6 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
}

respHeader = resp.Header
headerTTFBPerPeerMetric.WithLabelValues(resourceType, getCacheStatus(respHeader.Get(saturnCacheHitKey) == saturnCacheHit)).Observe(float64(time.Since(startReq).Milliseconds()))

defer resp.Body.Close()

Expand Down Expand Up @@ -303,7 +286,7 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
_, _ = io.Copy(io.Discard, resp.Body)

if err != nil {
if recordIfContextErr(resourceType, reqCtx, "read-http-response") {
if isCtxError(reqCtx) {
if errors.Is(err, context.Canceled) {
return reqCtx.Err()
}
Expand All @@ -324,6 +307,7 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m

// trace-metrics
// request life-cycle metrics
saturnCallsSuccessTotalMetric.WithLabelValues(resourceType, getCacheStatus(isCacheHit)).Add(1)

fetchRequestSuccessTimeTraceMetric.WithLabelValues(resourceType, getCacheStatus(isCacheHit), "tcp_connection").Observe(float64(result.TCPConnection.Milliseconds()))
fetchRequestSuccessTimeTraceMetric.WithLabelValues(resourceType, getCacheStatus(isCacheHit), "tls_handshake").Observe(float64(result.TLSHandshake.Milliseconds()))
Expand All @@ -338,78 +322,18 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
fetchRequestSuccessTimeTraceMetric.WithLabelValues(resourceType, getCacheStatus(isCacheHit), "start_transfer").
Observe(float64(result.StartTransfer.Milliseconds()))

saturnCallsSuccessTotalMetric.WithLabelValues(resourceType, getCacheStatus(isCacheHit)).Add(1)

from.RecordSuccess(start, float64(wrapped.firstByte.Sub(start).Milliseconds()), float64(received)/float64(response_success_end.Sub(start).Milliseconds()))

return nil
}

func recordIfContextErr(resourceType string, ctx context.Context, requestState string) bool {
func isCtxError(ctx context.Context) bool {
if ce := ctx.Err(); ce != nil {
fetchRequestContextErrorTotalMetric.WithLabelValues(resourceType, fmt.Sprintf("%t", errors.Is(ce, context.Canceled)), requestState).Add(1)
return true
}
return false
}

func updateLassie5xxTime(timingHeaders []string, resourceType string) {
if len(timingHeaders) == 0 {
goLogger.Debug("no timing headers in request response.")
return
}

for _, th := range timingHeaders {
if subReqTiming, err := servertiming.ParseHeader(th); err == nil {
for _, m := range subReqTiming.Metrics {
switch m.Name {
case "shim_lassie_headers":
if m.Duration.Milliseconds() != 0 {
lassie5XXTimeMetric.WithLabelValues(resourceType).Observe(float64(m.Duration.Milliseconds()))
}
return
default:
}
}
}
}
}

// todo: refactor for dryness
func updateSuccessServerTimingMetrics(timingHeaders []string, resourceType string, isCacheHit bool, totalTimeMs, ttfbMs int64, recieved int) {
if len(timingHeaders) == 0 {
goLogger.Debug("no timing headers in request response.")
return
}

for _, th := range timingHeaders {
if subReqTiming, err := servertiming.ParseHeader(th); err == nil {
for _, m := range subReqTiming.Metrics {
switch m.Name {
case "shim_lassie_headers":
if m.Duration.Milliseconds() != 0 && !isCacheHit {
fetchDurationPerPeerSuccessCacheMissTotalLassieMetric.WithLabelValues(resourceType).Observe(float64(m.Duration.Milliseconds()))
}

case "nginx":
// sanity checks
if totalTimeMs != 0 && ttfbMs != 0 && m.Duration.Milliseconds() != 0 {
fetchDurationPerPeerSuccessTotalL1NodeMetric.WithLabelValues(resourceType, getCacheStatus(isCacheHit)).Observe(float64(m.Duration.Milliseconds()))
networkTimeMs := totalTimeMs - m.Duration.Milliseconds()
if networkTimeMs > 0 {
s := float64(recieved) / float64(networkTimeMs)
fetchNetworkSpeedPerPeerSuccessMetric.WithLabelValues(resourceType).Observe(s)
}
networkLatencyMs := ttfbMs - m.Duration.Milliseconds()
fetchNetworkLatencyPeerSuccessMetric.WithLabelValues(resourceType).Observe(float64(networkLatencyMs))
}
default:
}
}
}
}
}

func getCacheStatus(isCacheHit bool) string {
if isCacheHit {
return "Cache-hit"
Expand Down
29 changes: 28 additions & 1 deletion internal/util/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func BuildCabooseHarness(t *testing.T, n int, maxRetries int, opts ...HarnessOpt

Client: saturnClient,
DoValidation: false,
PoolRefresh: time.Millisecond * 50,
PoolRefresh: time.Second * 50,
MaxRetrievalAttempts: maxRetries,
Harness: &state.State{},
}
Expand All @@ -78,6 +78,8 @@ func BuildCabooseHarness(t *testing.T, n int, maxRetries int, opts ...HarnessOpt
ch.Caboose = bs
ch.CabooseActiveNodes = conf.Harness.ActiveNodes.(*caboose.NodeRing)
ch.CabooseAllNodes = conf.Harness.AllNodes.(*caboose.NodeHeap)
ch.CaboosePool = conf.Harness.PoolController
ch.Config = conf
return ch
}

Expand All @@ -87,11 +89,19 @@ type CabooseHarness struct {

CabooseActiveNodes *caboose.NodeRing
CabooseAllNodes *caboose.NodeHeap
CaboosePool state.PoolController
Config *caboose.Config

gol sync.Mutex
goodOrch bool
}

type NodeStats struct {
Start time.Time
Latency float64
Size float64
}

func (ch *CabooseHarness) RunFetchesForRandCids(n int) {
for i := 0; i < n; i++ {
randCid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum([]byte{uint8(i)})
Expand Down Expand Up @@ -122,6 +132,23 @@ func (ch *CabooseHarness) FetchAndAssertSuccess(t *testing.T, ctx context.Contex
require.NotEmpty(t, blk)
}

func (ch *CabooseHarness) RecordSuccesses(t *testing.T, nodes []*caboose.Node, s NodeStats, n int) {
for _, node := range nodes {
s.Start = time.Now().Add(-time.Second * 5)
for i := 0; i < n; i++ {
node.RecordSuccess(s.Start, s.Latency, s.Size)
}
}
}

func (ch *CabooseHarness) RecordFailures(t *testing.T, nodes []*caboose.Node, n int) {
for _, node := range nodes {
for i := 0; i < n; i++ {
node.RecordFailure()
}
}
}

func (ch *CabooseHarness) FailNodesWithCode(t *testing.T, selectorF func(ep *Endpoint) bool, code int) {
for _, n := range ch.Endpoints {
if selectorF(n) {
Expand Down
Loading
Loading