Skip to content

Commit

Permalink
Improve sector upload estimate tracking (#1193)
Browse files Browse the repository at this point in the history
It turns out that Chris 's hunch was right and that we are not tracking
sector upload estimates at times we should be, even worse we currently
don't track anything at times we should be penalising the host for being
slow. I added explicit logging for when we penalise a host for being
(super) slow, so going to follow this up on my node to be sure.

Overall I thought it was a little tricky to come up with a good way to
fix it and to write a solid unit and/or integration test for it. If we
really want to check the exact errors we would need an integration test
but that would involve a level of orchestration we are currently not
capable of doing from an integration test. The unit test I came up with
is not exactly what I was hoping for but it does cover all branches.

Closes #1167
  • Loading branch information
ChrisSchinnerl committed May 3, 2024
2 parents b15f1ae + 51c31b7 commit 8aadd52
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 44 deletions.
10 changes: 7 additions & 3 deletions worker/rhpv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/siad/build"
"go.sia.tech/siad/crypto"
"lukechampine.com/frand"
Expand Down Expand Up @@ -85,9 +86,12 @@ func (hes HostErrorSet) Error() string {
return "\n" + strings.Join(strs, "\n")
}

func wrapErr(err *error, fnName string) {
func wrapErr(ctx context.Context, fnName string, err *error) {
if *err != nil {
*err = fmt.Errorf("%s: %w", fnName, *err)
if cause := context.Cause(ctx); cause != nil && !utils.IsErr(*err, cause) {
*err = fmt.Errorf("%w; %w", cause, *err)
}
}
}

Expand Down Expand Up @@ -133,7 +137,7 @@ func updateRevisionOutputs(rev *types.FileContractRevision, cost, collateral typ

// RPCSettings calls the Settings RPC, returning the host's reported settings.
func RPCSettings(ctx context.Context, t *rhpv2.Transport) (settings rhpv2.HostSettings, err error) {
defer wrapErr(&err, "Settings")
defer wrapErr(ctx, "Settings", &err)

var resp rhpv2.RPCSettingsResponse
if err := t.Call(rhpv2.RPCSettingsID, nil, &resp); err != nil {
Expand All @@ -147,7 +151,7 @@ func RPCSettings(ctx context.Context, t *rhpv2.Transport) (settings rhpv2.HostSe

// RPCFormContract forms a contract with a host.
func RPCFormContract(ctx context.Context, t *rhpv2.Transport, renterKey types.PrivateKey, txnSet []types.Transaction) (_ rhpv2.ContractRevision, _ []types.Transaction, err error) {
defer wrapErr(&err, "FormContract")
defer wrapErr(ctx, "FormContract", &err)

// strip our signatures before sending
parents, txn := txnSet[:len(txnSet)-1], txnSet[len(txnSet)-1]
Expand Down
20 changes: 12 additions & 8 deletions worker/rhpv3.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ var (
// errTransport is used to wrap rpc errors caused by the transport.
errTransport = errors.New("transport error")

// errDialTransport is returned when the worker could not dial the host.
errDialTransport = errors.New("could not dial transport")

// errBalanceInsufficient occurs when a withdrawal failed because the
// account balance was insufficient.
errBalanceInsufficient = errors.New("ephemeral account balance was insufficient")
Expand Down Expand Up @@ -175,7 +178,7 @@ func (t *transportV3) DialStream(ctx context.Context) (*streamV3, error) {
newTransport, err := dialTransport(ctx, t.siamuxAddr, t.hostKey)
if err != nil {
t.mu.Unlock()
return nil, fmt.Errorf("DialStream: could not dial transport: %w (%v)", err, time.Since(start))
return nil, fmt.Errorf("DialStream: %w: %w (%v)", errDialTransport, err, time.Since(start))
}
t.t = newTransport
}
Expand Down Expand Up @@ -623,7 +626,7 @@ type PriceTablePaymentFunc func(pt rhpv3.HostPriceTable) (rhpv3.PaymentMethod, e

// RPCPriceTable calls the UpdatePriceTable RPC.
func RPCPriceTable(ctx context.Context, t *transportV3, paymentFunc PriceTablePaymentFunc) (_ api.HostPriceTable, err error) {
defer wrapErr(&err, "PriceTable")
defer wrapErr(ctx, "PriceTable", &err)

s, err := t.DialStream(ctx)
if err != nil {
Expand Down Expand Up @@ -660,7 +663,7 @@ func RPCPriceTable(ctx context.Context, t *transportV3, paymentFunc PriceTablePa

// RPCAccountBalance calls the AccountBalance RPC.
func RPCAccountBalance(ctx context.Context, t *transportV3, payment rhpv3.PaymentMethod, account rhpv3.Account, settingsID rhpv3.SettingsID) (bal types.Currency, err error) {
defer wrapErr(&err, "AccountBalance")
defer wrapErr(ctx, "AccountBalance", &err)
s, err := t.DialStream(ctx)
if err != nil {
return types.ZeroCurrency, err
Expand All @@ -685,7 +688,7 @@ func RPCAccountBalance(ctx context.Context, t *transportV3, payment rhpv3.Paymen

// RPCFundAccount calls the FundAccount RPC.
func RPCFundAccount(ctx context.Context, t *transportV3, payment rhpv3.PaymentMethod, account rhpv3.Account, settingsID rhpv3.SettingsID) (err error) {
defer wrapErr(&err, "FundAccount")
defer wrapErr(ctx, "FundAccount", &err)
s, err := t.DialStream(ctx)
if err != nil {
return err
Expand All @@ -712,7 +715,7 @@ func RPCFundAccount(ctx context.Context, t *transportV3, payment rhpv3.PaymentMe
// fetching a pricetable using the fetched revision to pay for it. If
// paymentFunc returns 'nil' as payment, the host is not paid.
func RPCLatestRevision(ctx context.Context, t *transportV3, contractID types.FileContractID, paymentFunc func(rev *types.FileContractRevision) (rhpv3.HostPriceTable, rhpv3.PaymentMethod, error)) (_ types.FileContractRevision, err error) {
defer wrapErr(&err, "LatestRevision")
defer wrapErr(ctx, "LatestRevision", &err)
s, err := t.DialStream(ctx)
if err != nil {
return types.FileContractRevision{}, err
Expand All @@ -738,7 +741,7 @@ func RPCLatestRevision(ctx context.Context, t *transportV3, contractID types.Fil

// RPCReadSector calls the ExecuteProgram RPC with a ReadSector instruction.
func RPCReadSector(ctx context.Context, t *transportV3, w io.Writer, pt rhpv3.HostPriceTable, payment rhpv3.PaymentMethod, offset, length uint32, merkleRoot types.Hash256) (cost, refund types.Currency, err error) {
defer wrapErr(&err, "ReadSector")
defer wrapErr(ctx, "ReadSector", &err)
s, err := t.DialStream(ctx)
if err != nil {
return types.ZeroCurrency, types.ZeroCurrency, err
Expand Down Expand Up @@ -803,7 +806,7 @@ func RPCReadSector(ctx context.Context, t *transportV3, w io.Writer, pt rhpv3.Ho
}

func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.PrivateKey, pt rhpv3.HostPriceTable, rev *types.FileContractRevision, payment rhpv3.PaymentMethod, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte) (cost types.Currency, err error) {
defer wrapErr(&err, "AppendSector")
defer wrapErr(ctx, "AppendSector", &err)

// sanity check revision first
if rev.RevisionNumber == math.MaxUint64 {
Expand Down Expand Up @@ -941,7 +944,8 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat
}

func RPCRenew(ctx context.Context, rrr api.RHPRenewRequest, bus Bus, t *transportV3, pt *rhpv3.HostPriceTable, rev types.FileContractRevision, renterKey types.PrivateKey, l *zap.SugaredLogger) (_ rhpv2.ContractRevision, _ []types.Transaction, _ types.Currency, err error) {
defer wrapErr(&err, "RPCRenew")
defer wrapErr(ctx, "RPCRenew", &err)

s, err := t.DialStream(ctx)
if err != nil {
return rhpv2.ContractRevision{}, nil, types.ZeroCurrency, fmt.Errorf("failed to dial stream: %w", err)
Expand Down
15 changes: 8 additions & 7 deletions worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ const (
)

var (
errContractExpired = errors.New("contract expired")
errNoCandidateUploader = errors.New("no candidate uploader found")
errNotEnoughContracts = errors.New("not enough contracts to support requested redundancy")
errUploadInterrupted = errors.New("upload was interrupted")
errContractExpired = errors.New("contract expired")
errNoCandidateUploader = errors.New("no candidate uploader found")
errNotEnoughContracts = errors.New("not enough contracts to support requested redundancy")
errUploadInterrupted = errors.New("upload was interrupted")
errSectorUploadFinished = errors.New("sector upload already finished")
)

type (
Expand Down Expand Up @@ -117,7 +118,7 @@ type (
root types.Hash256

ctx context.Context
cancel context.CancelFunc
cancel context.CancelCauseFunc

mu sync.Mutex
uploaded object.Sector
Expand Down Expand Up @@ -750,7 +751,7 @@ func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders [
wg.Add(1)
go func(idx int) {
// create the ctx
sCtx, sCancel := context.WithCancel(ctx)
sCtx, sCancel := context.WithCancelCause(ctx)

// create the sector
// NOTE: we are computing the sector root here and pass it all the
Expand Down Expand Up @@ -1087,7 +1088,7 @@ func (s *sectorUpload) finish(sector object.Sector) {
s.mu.Lock()
defer s.mu.Unlock()

s.cancel()
s.cancel(errSectorUploadFinished)
s.uploaded = sector
s.data = nil
}
Expand Down
89 changes: 63 additions & 26 deletions worker/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,20 @@ outer:
}

// execute it
elapsed, err := u.execute(req)

// the uploader's contract got renewed, requeue the request
start := time.Now()
duration, err := u.execute(req)
if errors.Is(err, errMaxRevisionReached) {
if u.tryRefresh(req.sector.ctx) {
u.enqueue(req)
continue outer
}
}

// track stats
success, failure, uploadEstimateMS, uploadSpeedBytesPerMS := handleSectorUpload(err, duration, time.Since(start), req.overdrive, u.logger)
u.trackSectorUploadStats(uploadEstimateMS, uploadSpeedBytesPerMS)
u.trackConsecutiveFailures(success, failure)

// send the response
select {
case <-req.sector.ctx.Done():
Expand All @@ -134,16 +138,44 @@ outer:
err: err,
}:
}
}
}
}

// track the error, ignore gracefully closed streams and canceled overdrives
canceledOverdrive := req.done() && req.overdrive && err != nil
if !canceledOverdrive && !isClosedStream(err) {
u.trackSectorUpload(err, elapsed)
} else {
u.logger.Debugw("not tracking sector upload metric", zap.Error(err))
}
func handleSectorUpload(uploadErr error, uploadDuration, totalDuration time.Duration, overdrive bool, logger *zap.SugaredLogger) (success bool, failure bool, uploadEstimateMS float64, uploadSpeedBytesPerMS float64) {
// special case, uploader will refresh and the request will be requeued
if errors.Is(uploadErr, errMaxRevisionReached) {
logger.Debugw("sector upload failure was ignored", "uploadError", uploadErr, "uploadDuration", uploadDuration, "totalDuration", totalDuration, "overdrive", overdrive)
return false, false, 0, 0
}

// happy case, upload was successful
if uploadErr == nil {
ms := uploadDuration.Milliseconds()
if ms == 0 {
ms = 1 // avoid division by zero
}
return true, false, float64(ms), float64(rhpv2.SectorSize / ms)
}

// upload failed because the sector was already uploaded by another host, in
// this case we want to punish the host for being too slow but only when we
// weren't overdriving or when it took too long to dial
if errors.Is(uploadErr, errSectorUploadFinished) {
slowDial := errors.Is(uploadErr, errDialTransport) && totalDuration > time.Second
if !overdrive || slowDial {
failure = overdrive
uploadEstimateMS = float64(totalDuration.Milliseconds() * 10)
logger.Debugw("sector upload failure was penalised", "uploadError", uploadErr, "uploadDuration", uploadDuration, "totalDuration", totalDuration, "overdrive", overdrive, "penalty", totalDuration.Milliseconds()*10)
} else {
logger.Debugw("sector upload failure was ignored", "uploadError", uploadErr, "uploadDuration", uploadDuration, "totalDuration", totalDuration, "overdrive", overdrive)
}
return false, failure, uploadEstimateMS, 0
}

// in all other cases we want to punish the host for failing the upload
logger.Debugw("sector upload failure was penalised", "uploadError", uploadErr, "uploadDuration", uploadDuration, "totalDuration", totalDuration, "overdrive", overdrive, "penalty", time.Hour)
return false, true, float64(time.Hour.Milliseconds()), 0
}

func (u *uploader) Stop(err error) {
Expand Down Expand Up @@ -198,6 +230,8 @@ func (u *uploader) estimate() float64 {
return numSectors * estimateP90
}

// execute executes the sector upload request, if the upload was successful it
// returns the time it took to upload the sector to the host
func (u *uploader) execute(req *sectorUploadReq) (time.Duration, error) {
// grab fields
u.mu.Lock()
Expand Down Expand Up @@ -233,19 +267,17 @@ func (u *uploader) execute(req *sectorUploadReq) (time.Duration, error) {

// update the bus
if err := u.os.AddUploadingSector(ctx, req.uploadID, fcid, req.sector.root); err != nil {
return 0, fmt.Errorf("failed to add uploading sector to contract %v, err: %v", fcid, err)
return 0, fmt.Errorf("failed to add uploading sector to contract %v; %w", fcid, err)
}

// upload the sector
start := time.Now()
err = host.UploadSector(ctx, req.sector.root, req.sector.sectorData(), rev)
if err != nil {
return 0, fmt.Errorf("failed to upload sector to contract %v, err: %v", fcid, err)
return 0, fmt.Errorf("failed to upload sector to contract %v; %w", fcid, err)
}

// calculate elapsed time
elapsed := time.Since(start)
return elapsed, nil
return time.Since(start), nil
}

func (u *uploader) pop() *sectorUploadReq {
Expand All @@ -268,21 +300,26 @@ func (u *uploader) signalWork() {
}
}

func (u *uploader) trackSectorUpload(err error, d time.Duration) {
func (u *uploader) trackConsecutiveFailures(success, failure bool) {
u.mu.Lock()
defer u.mu.Unlock()
if err != nil {
u.consecutiveFailures++
u.statsSectorUploadEstimateInMS.Track(float64(time.Hour.Milliseconds()))
} else {
ms := d.Milliseconds()
if ms == 0 {
ms = 1 // avoid division by zero
}

if success {
u.consecutiveFailures = 0
u.statsSectorUploadEstimateInMS.Track(float64(ms)) // duration in ms
u.statsSectorUploadSpeedBytesPerMS.Track(float64(rhpv2.SectorSize / ms)) // bytes per ms
} else if failure {
u.consecutiveFailures++
}
}

func (u *uploader) trackSectorUploadStats(uploadEstimateMS, uploadSpeedBytesPerMS float64) {
u.mu.Lock()
defer u.mu.Unlock()

if uploadEstimateMS > 0 {
u.statsSectorUploadEstimateInMS.Track(uploadEstimateMS)
}
if uploadSpeedBytesPerMS > 0 {
u.statsSectorUploadSpeedBytesPerMS.Track(uploadSpeedBytesPerMS)
}
}

Expand Down
59 changes: 59 additions & 0 deletions worker/uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package worker
import (
"context"
"errors"
"fmt"
"testing"
"time"

rhpv2 "go.sia.tech/core/rhp/v2"
"go.uber.org/zap"
)

func TestUploaderStopped(t *testing.T) {
Expand Down Expand Up @@ -32,3 +36,58 @@ func TestUploaderStopped(t *testing.T) {
t.Fatal("no response")
}
}

func TestHandleSectorUpload(t *testing.T) {
ms := time.Millisecond
ss := float64(rhpv2.SectorSize)
overdrive := true
regular := false

errHostError := errors.New("some host error")
errSectorUploadFinishedAndDial := fmt.Errorf("%w;%w", errDialTransport, errSectorUploadFinished)

cases := []struct {
// input
uploadErr error
uploadDur time.Duration
totalDur time.Duration
overdrive bool

// expected output
success bool
failure bool
uploadEstimateMS float64
uploadSpeedBytesPerMS float64
}{
// happy case
{nil, ms, ms, regular, true, false, 1, ss},
{nil, ms, ms, overdrive, true, false, 1, ss},

// renewed contract case
{errMaxRevisionReached, 0, ms, regular, false, false, 0, 0},
{errMaxRevisionReached, 0, ms, overdrive, false, false, 0, 0},

// sector already uploaded case
{errSectorUploadFinished, ms, ms, regular, false, false, 10, 0},
{errSectorUploadFinished, ms, ms, overdrive, false, false, 0, 0},
{errSectorUploadFinishedAndDial, ms, ms, overdrive, false, false, 0, 0},
{errSectorUploadFinishedAndDial, ms, 1001 * ms, overdrive, false, true, 10010, 0},

// host failure
{errHostError, ms, ms, regular, false, true, 3600000, 0},
{errHostError, ms, ms, overdrive, false, true, 3600000, 0},
}

for i, c := range cases {
success, failure, uploadEstimateMS, uploadSpeedBytesPerMS := handleSectorUpload(c.uploadErr, c.uploadDur, c.totalDur, c.overdrive, zap.NewNop().Sugar())
if success != c.success {
t.Fatalf("case %d failed: expected success %v, got %v", i+1, c.success, success)
} else if failure != c.failure {
t.Fatalf("case %d failed: expected failure %v, got %v", i+1, c.failure, failure)
} else if uploadEstimateMS != c.uploadEstimateMS {
t.Fatalf("case %d failed: expected uploadEstimateMS %v, got %v", i+1, c.uploadEstimateMS, uploadEstimateMS)
} else if uploadSpeedBytesPerMS != c.uploadSpeedBytesPerMS {
t.Fatalf("case %d failed: expected uploadSpeedBytesPerMS %v, got %v", i+1, c.uploadSpeedBytesPerMS, uploadSpeedBytesPerMS)
}
}
}

0 comments on commit 8aadd52

Please sign in to comment.