Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Algod Importer: Longer Timeout #133

Merged
merged 16 commits into from
Aug 11, 2023
50 changes: 30 additions & 20 deletions conduit/plugins/importers/algod/algod_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
)

var (
waitForRoundTimeout = 5 * time.Second
waitForRoundTimeout = 15 * time.Second
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
)

const catchpointsURL = "https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/%s_catchpoints.txt"
Expand All @@ -68,14 +68,14 @@
}

func (algodImp *algodImporter) OnComplete(input data.BlockData) error {
if algodImp.mode == followerMode {
syncRound := input.Round() + 1
_, err := algodImp.aclient.SetSyncRound(syncRound).Do(algodImp.ctx)
algodImp.logger.Tracef("importer algod.OnComplete(BlockData) called SetSyncRound(syncRound=%d) err: %v", syncRound, err)
return err

Check warning on line 75 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L71-L75

Added lines #L71 - L75 were not covered by tests
}

return nil

Check warning on line 78 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L78

Added line #L78 was not covered by tests
}

func (algodImp *algodImporter) Metadata() plugins.Metadata {
Expand Down Expand Up @@ -165,11 +165,11 @@
func getMissingCatchpointLabel(URL string, nextRound uint64) (string, error) {
resp, err := http.Get(URL)
if err != nil {
return "", err
}

Check warning on line 169 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L168-L169

Added lines #L168 - L169 were not covered by tests
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("failed to read catchpoint label response: %w", err)

Check warning on line 172 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L172

Added line #L172 was not covered by tests
}

if resp.StatusCode != 200 {
Expand All @@ -184,7 +184,7 @@
line := scanner.Text()
round, err := parseCatchpointRound(line)
if err != nil {
return "", err

Check warning on line 187 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L187

Added line #L187 was not covered by tests
}
// TODO: Change >= to > after go-algorand#5352 is fixed.
if uint64(round) >= nextRound {
Expand Down Expand Up @@ -370,7 +370,7 @@
algodImp.aclient = client
genesisResponse, err := algodImp.aclient.GetGenesis().Do(algodImp.ctx)
if err != nil {
return err

Check warning on line 373 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L373

Added line #L373 was not covered by tests
}

genesis := sdk.Genesis{}
Expand All @@ -378,7 +378,7 @@
// Don't fail on unknown properties here since the go-algorand and SDK genesis types differ slightly
err = json.LenientDecode([]byte(genesisResponse), &genesis)
if err != nil {
return err

Check warning on line 381 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L381

Added line #L381 was not covered by tests
}
if reflect.DeepEqual(genesis, sdk.Genesis{}) {
return fmt.Errorf("unable to fetch genesis file from API at %s", algodImp.cfg.NetAddr)
Expand Down Expand Up @@ -418,13 +418,29 @@
}

// SyncError is used to indicate algod and conduit are not synchronized.
// The retrievedRound is the round returned from an algod status call.
// The expectedRound is the round conduit expected to have gotten back.
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
type SyncError struct {
rnd uint64
expected uint64
retrievedRound uint64
expectedRound uint64
err error
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
}

// NewSyncError creates a new SyncError.
func NewSyncError(retrievedRound, expectedRound uint64, err error) *SyncError {
return &SyncError{
retrievedRound: retrievedRound,
expectedRound: expectedRound,
err: err,
}
}

func (e *SyncError) Error() string {
return fmt.Sprintf("wrong round returned from status for round: %d != %d", e.rnd, e.expected)
return fmt.Sprintf("wrong round returned from status for round: retrieved(%d) != expected(%d): %v", e.retrievedRound, e.expectedRound, e.err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shiqizng - as you pointed out yesterday, at face value it's hard to tell which round was which so I've added "retrieved" for the value gotten back from the endpoint vs. "expected" which is the importer's expectation.

}

func (e *SyncError) Unwrap() error {
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
return e.err

Check warning on line 443 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L442-L443

Added lines #L442 - L443 were not covered by tests
}

func waitForRoundWithTimeout(ctx context.Context, l *logrus.Logger, c *algod.Client, rnd uint64, to time.Duration) (uint64, error) {
Expand All @@ -440,10 +456,8 @@
if rnd <= status.LastRound {
return status.LastRound, nil
}
return 0, &SyncError{
rnd: status.LastRound,
expected: rnd,
}
// algod's timeout should not be reached because context.WithTimeout is used
return 0, NewSyncError(status.LastRound, rnd, fmt.Errorf("sync error, likely due to status after block timeout: %w", err))
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
}

// If there was a different error and the node is responsive, call status before returning a SyncError.
Expand All @@ -453,15 +467,12 @@
// If there was an error getting status, return the original error.
return 0, fmt.Errorf("unable to get status after block and status: %w", errors.Join(err, err2))
}
if status2.LastRound < rnd {
return 0, &SyncError{
rnd: status.LastRound,
expected: rnd,
}
return 0, NewSyncError(status2.LastRound, rnd, fmt.Errorf("status2.LastRound mismatch: %w", err))
}

Check warning on line 472 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L470-L472

Added lines #L470 - L472 were not covered by tests

// This is probably a connection error, not a SyncError.
return 0, fmt.Errorf("unknown errors: StatusAfterBlock(%w), Status(%w)", err, err2)

Check warning on line 475 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L475

Added line #L475 was not covered by tests
}

func (algodImp *algodImporter) getBlockInner(rnd uint64) (data.BlockData, error) {
Expand All @@ -470,11 +481,10 @@

nodeRound, err := waitForRoundWithTimeout(algodImp.ctx, algodImp.logger, algodImp.aclient, rnd, waitForRoundTimeout)
if err != nil {
// If context has expired.
if algodImp.ctx.Err() != nil {
return blk, fmt.Errorf("GetBlock ctx error: %w", err)
return blk, fmt.Errorf("importer algod.GetBlock() ctx cancelled: %w", err)
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
}
algodImp.logger.Errorf(err.Error())
algodImp.logger.Errorf("importer algod.GetBlock() called waitForRoundWithTimeout: %v", err)
return data.BlockData{}, err
}
start := time.Now()
Expand All @@ -483,14 +493,14 @@
dt := time.Since(start)
getAlgodRawBlockTimeSeconds.Observe(dt.Seconds())
if err != nil {
algodImp.logger.Errorf("error getting block for round %d: %s", rnd, err.Error())
algodImp.logger.Errorf("importer algod.GetBlock() error getting block for round %d: %s", rnd, err.Error())
return data.BlockData{}, err
}
tmpBlk := new(models.BlockResponse)
err = msgpack.Decode(blockbytes, tmpBlk)
if err != nil {
return blk, err
}

Check warning on line 503 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L502-L503

Added lines #L502 - L503 were not covered by tests

blk.BlockHeader = tmpBlk.Block.BlockHeader
blk.Payset = tmpBlk.Block.Payset
Expand All @@ -503,9 +513,9 @@
delta, err = algodImp.getDelta(rnd)
if err != nil {
if nodeRound < rnd {
err = fmt.Errorf("ledger state delta not found: node round (%d) is behind required round (%d), ensure follower node has its sync round set to the required round: %w", nodeRound, rnd, err)
err = fmt.Errorf("importer algod.GetBlock() ledger state delta not found: node round (%d) is behind required round (%d), ensure follower node has its sync round set to the required round: %w", nodeRound, rnd, err)

Check warning on line 516 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L516

Added line #L516 was not covered by tests
} else {
err = fmt.Errorf("ledger state delta not found: node round (%d), required round (%d): verify follower node configuration and ensure follower node has its sync round set to the required round, re-deploying the follower node may be necessary: %w", nodeRound, rnd, err)
err = fmt.Errorf("importer algod.GetBlock() ledger state delta not found: node round (%d), required round (%d): verify follower node configuration and ensure follower node has its sync round set to the required round, re-deploying the follower node may be necessary: %w", nodeRound, rnd, err)
}
algodImp.logger.Error(err.Error())
return data.BlockData{}, err
Expand All @@ -523,10 +533,10 @@
if err != nil {
target := &SyncError{}
if errors.As(err, &target) {
algodImp.logger.Warnf("Sync error detected, attempting to set the sync round to recover the node: %s", err.Error())
algodImp.logger.Warnf("importer algod.GetBlock() sync error detected, attempting to set the sync round to recover the node: %s", err.Error())
_, _ = algodImp.aclient.SetSyncRound(rnd).Do(algodImp.ctx)
} else {
err = fmt.Errorf("error getting block for round %d, check node configuration: %s", rnd, err)
err = fmt.Errorf("importer algod.GetBlock() error getting block for round %d, check node configuration: %s", rnd, err)
algodImp.logger.Errorf(err.Error())
}
return data.BlockData{}, err
Expand Down
18 changes: 10 additions & 8 deletions conduit/plugins/importers/algod/algod_importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ package algodimporter
import (
"context"
"fmt"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

"github.com/algorand/go-algorand-sdk/v2/client/v2/algod"
"github.com/algorand/go-algorand-sdk/v2/client/v2/common/models"
sdk "github.com/algorand/go-algorand-sdk/v2/types"
Expand Down Expand Up @@ -699,8 +700,8 @@ func TestGetBlockErrors(t *testing.T) {
blockAfterResponder: MakeBlockAfterResponder(models.NodeStatus{LastRound: 50}),
blockResponder: BlockResponder,
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, ""),
err: fmt.Sprintf("wrong round returned from status for round: 50 != 200"),
logs: []string{"wrong round returned from status for round: 50 != 200", "Sync error detected, attempting to set the sync round to recover the node"},
err: fmt.Sprintf("wrong round returned from status for round: retrieved(50) != expected(200)"),
logs: []string{"wrong round returned from status for round: retrieved(50) != expected(200)", "sync error detected, attempting to set the sync round to recover the node"},
},
{
name: "Cannot get delta (caught up)",
Expand Down Expand Up @@ -755,7 +756,8 @@ func TestGetBlockErrors(t *testing.T) {
// Make sure each of the expected log messages are present
for _, log := range tc.logs {
found := false
for _, entry := range hook.AllEntries() {
hookEntries := hook.AllEntries()
for _, entry := range hookEntries {
fmt.Println(strings.Contains(entry.Message, log))
found = found || strings.Contains(entry.Message, log)
}
Expand Down
Loading