Skip to content

Commit

Permalink
Merge pull request #441 from SiaFoundation/pj/fix-rev-timeout
Browse files Browse the repository at this point in the history
Fix Revision Timeout
  • Loading branch information
ChrisSchinnerl committed Jun 23, 2023
2 parents 63d6caa + 4ac1b6d commit f98b0fa
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 29 deletions.
4 changes: 1 addition & 3 deletions worker/interactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package worker
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -226,7 +224,7 @@ func isSuccessfulInteraction(err error) bool {
return true
}
// List of errors that are considered successful interactions.
if errors.Is(err, ErrInsufficientFunds) || strings.Contains(err.Error(), ErrInsufficientFunds.Error()) {
if isInsufficientFunds(err) {
return true
}
if isBalanceInsufficient(err) {
Expand Down
45 changes: 29 additions & 16 deletions worker/rhpv3.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ const (
)

var (
// errBalanceSufficient occurs when funding an account to a desired balance
// that's lower than its current balance.
errBalanceSufficient = errors.New("ephemeral account balance greater than desired balance")

// errBalanceInsufficient occurs when a withdrawal failed because the
// account balance was insufficient.
errBalanceInsufficient = errors.New("ephemeral account balance was insufficient")
Expand All @@ -59,6 +55,10 @@ var (
// when trying to use a renewed contract.
errMaxRevisionReached = errors.New("contract has reached the maximum number of revisions")

// errWithdrawalsInactive occurs when the host is (perhaps temporarily)
// unsynced and has disabled its account manager.
errWithdrawalsInactive = errors.New("ephemeral account withdrawals are inactive because the host is not synced")

// errTransportClosed is returned when using a transportV3 which was already
// closed.
errTransportClosed = errors.New("transport closed")
Expand Down Expand Up @@ -220,7 +220,7 @@ func (h *host) FetchRevision(ctx context.Context, fetchTimeout time.Duration, bl
ctx, cancel := timeoutCtx()
defer cancel()
rev, err := h.fetchRevisionWithAccount(ctx, h.HostKey(), h.siamuxAddr, blockHeight, h.fcid)
if err != nil && !isBalanceInsufficient(err) {
if err != nil && !(isBalanceInsufficient(err) || isWithdrawalsInactive(err) || isClosedStream(err)) { // TODO: checking for a closed stream here can be removed once the withdrawal timeout on the host side is removed
return types.FileContractRevision{}, fmt.Errorf("unable to fetch revision with account: %v", err)
} else if err == nil {
return rev, nil
Expand All @@ -230,7 +230,7 @@ func (h *host) FetchRevision(ctx context.Context, fetchTimeout time.Duration, bl
ctx, cancel = timeoutCtx()
defer cancel()
rev, err = h.fetchRevisionWithContract(ctx, h.HostKey(), h.siamuxAddr, h.fcid)
if err != nil && !strings.Contains(err.Error(), ErrInsufficientFunds.Error()) {
if err != nil && !isInsufficientFunds(err) {
return types.FileContractRevision{}, fmt.Errorf("unable to fetch revision with contract: %v", err)
} else if err == nil {
return rev, nil
Expand Down Expand Up @@ -315,7 +315,7 @@ func (h *host) FundAccount(ctx context.Context, balance types.Currency, rev *typ
return err
}
if curr.Cmp(balance) >= 0 {
return fmt.Errorf("%w; %v>%v", errBalanceSufficient, curr, balance)
return nil
}
amount := balance.Sub(curr)

Expand Down Expand Up @@ -365,23 +365,36 @@ func (h *host) SyncAccount(ctx context.Context, rev *types.FileContractRevision)
})
}

func isMaxBalanceExceeded(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), errBalanceMaxExceeded.Error())
func isBalanceInsufficient(err error) bool {
return isError(err, errBalanceInsufficient)
}

func isBalanceInsufficient(err error) bool {
func isBalanceMaxExceeded(err error) bool {
return isError(err, errBalanceMaxExceeded)
}

func isClosedStream(err error) bool {
return isError(err, mux.ErrClosedStream)
}

func isInsufficientFunds(err error) bool {
return isError(err, ErrInsufficientFunds)
}

func isWithdrawalsInactive(err error) bool {
return isError(err, errWithdrawalsInactive)
}

func isError(err error, target error) bool {
if err == nil {
return false
return err == target
}
// compare error first
if errors.Is(err, errBalanceSufficient) {
if errors.Is(err, target) {
return true
}
// then compare the string in case the error was returned by a host
return strings.Contains(err.Error(), errBalanceInsufficient.Error())
return strings.Contains(err.Error(), target.Error())
}

type (
Expand Down
4 changes: 1 addition & 3 deletions worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"go.opentelemetry.io/otel/trace"
rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/mux/v1"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/tracing"
Expand Down Expand Up @@ -793,9 +792,8 @@ outer:
}

// track the error, ignore gracefully closed streams and canceled overdrives
isErrClosedStream := errors.Is(err, mux.ErrClosedStream)
canceledOverdrive := upload.done() && upload.overdrive && err != nil
if !canceledOverdrive && !isErrClosedStream {
if !canceledOverdrive && !isClosedStream(err) {
u.trackSectorUpload(err, time.Since(start))
}
}
Expand Down
8 changes: 1 addition & 7 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ func (w *worker) rhpFundHandler(jc jape.Context) {
jc.Check("couldn't fund account", w.withRevision(ctx, defaultRevisionFetchTimeout, rfr.ContractID, rfr.HostKey, rfr.SiamuxAddr, lockingPriorityFunding, gp.ConsensusState.BlockHeight, func(rev types.FileContractRevision) (err error) {
h := w.newHostV3(rev.ParentID, rfr.HostKey, rfr.SiamuxAddr)
err = h.FundAccount(ctx, rfr.Balance, &rev)
if isMaxBalanceExceeded(err) {
if isBalanceMaxExceeded(err) {
// sync the account
err = h.SyncAccount(ctx, &rev)
if err != nil {
Expand All @@ -582,12 +582,6 @@ func (w *worker) rhpFundHandler(jc jape.Context) {

// try funding the account again
err = h.FundAccount(ctx, rfr.Balance, &rev)
if errors.Is(err, errBalanceSufficient) {
w.logger.Debugf("account balance for host %v restored after sync", rfr.HostKey)
return nil
}

// funding failed after syncing the account successfully
if err != nil {
w.logger.Errorw(fmt.Sprintf("failed to fund account after syncing: %v", err), "host", rfr.HostKey, "balance", rfr.Balance)
}
Expand Down

0 comments on commit f98b0fa

Please sign in to comment.