Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Algod Importer: Longer Timeout #133

Merged
merged 16 commits into from
Aug 11, 2023
49 changes: 29 additions & 20 deletions conduit/plugins/importers/algod/algod_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
)

var (
waitForRoundTimeout = 5 * time.Second
waitForRoundTimeout = 30 * time.Second
)

const catchpointsURL = "https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/%s_catchpoints.txt"
Expand All @@ -68,14 +68,14 @@
}

func (algodImp *algodImporter) OnComplete(input data.BlockData) error {
if algodImp.mode == followerMode {
syncRound := input.Round() + 1
_, err := algodImp.aclient.SetSyncRound(syncRound).Do(algodImp.ctx)
algodImp.logger.Tracef("importer algod.OnComplete(BlockData) called SetSyncRound(syncRound=%d) err: %v", syncRound, err)
return err

Check warning on line 75 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L71-L75

Added lines #L71 - L75 were not covered by tests
}

return nil

Check warning on line 78 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L78

Added line #L78 was not covered by tests
}

func (algodImp *algodImporter) Metadata() plugins.Metadata {
Expand Down Expand Up @@ -165,11 +165,11 @@
func getMissingCatchpointLabel(URL string, nextRound uint64) (string, error) {
resp, err := http.Get(URL)
if err != nil {
return "", err
}

Check warning on line 169 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L168-L169

Added lines #L168 - L169 were not covered by tests
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("failed to read catchpoint label response: %w", err)

Check warning on line 172 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L172

Added line #L172 was not covered by tests
}

if resp.StatusCode != 200 {
Expand All @@ -184,7 +184,7 @@
line := scanner.Text()
round, err := parseCatchpointRound(line)
if err != nil {
return "", err

Check warning on line 187 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L187

Added line #L187 was not covered by tests
}
// TODO: Change >= to > after go-algorand#5352 is fixed.
if uint64(round) >= nextRound {
Expand Down Expand Up @@ -370,7 +370,7 @@
algodImp.aclient = client
genesisResponse, err := algodImp.aclient.GetGenesis().Do(algodImp.ctx)
if err != nil {
return err

Check warning on line 373 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L373

Added line #L373 was not covered by tests
}

genesis := sdk.Genesis{}
Expand All @@ -378,7 +378,7 @@
// Don't fail on unknown properties here since the go-algorand and SDK genesis types differ slightly
err = json.LenientDecode([]byte(genesisResponse), &genesis)
if err != nil {
return err

Check warning on line 381 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L381

Added line #L381 was not covered by tests
}
if reflect.DeepEqual(genesis, sdk.Genesis{}) {
return fmt.Errorf("unable to fetch genesis file from API at %s", algodImp.cfg.NetAddr)
Expand Down Expand Up @@ -418,13 +418,29 @@
}

// SyncError is used to indicate algod and conduit are not synchronized.
// The retrievedRound is the round returned from an algod status call.
// The expectedRound is the round conduit expected to have gotten back.
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
type SyncError struct {
rnd uint64
expected uint64
retrievedRound uint64
expectedRound uint64
err error
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
}

// NewSyncError creates a new SyncError.
func NewSyncError(retrievedRound, expectedRound uint64, err error) *SyncError {
return &SyncError{
retrievedRound: retrievedRound,
expectedRound: expectedRound,
err: err,
}
}

func (e *SyncError) Error() string {
return fmt.Sprintf("wrong round returned from status for round: %d != %d", e.rnd, e.expected)
return fmt.Sprintf("wrong round returned from status for round: retrieved(%d) != expected(%d): %v", e.retrievedRound, e.expectedRound, e.err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shiqizng - as you pointed out yesterday, at face value it's hard to tell which round was which so I've added "retrieved" for the value gotten back from the endpoint vs. "expected" which is the importer's expectation.

}

func (e *SyncError) Unwrap() error {
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
return e.err

Check warning on line 443 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L442-L443

Added lines #L442 - L443 were not covered by tests
}

func waitForRoundWithTimeout(ctx context.Context, l *logrus.Logger, c *algod.Client, rnd uint64, to time.Duration) (uint64, error) {
Expand All @@ -440,10 +456,8 @@
if rnd <= status.LastRound {
return status.LastRound, nil
}
return 0, &SyncError{
rnd: status.LastRound,
expected: rnd,
}
// algod's timeout should not be reached because context.WithTimeout is used
return 0, NewSyncError(status.LastRound, rnd, fmt.Errorf("sync error, likely due to status after block timeout"))
}

// If there was a different error and the node is responsive, call status before returning a SyncError.
Expand All @@ -453,15 +467,12 @@
// If there was an error getting status, return the original error.
return 0, fmt.Errorf("unable to get status after block and status: %w", errors.Join(err, err2))
}
if status2.LastRound < rnd {
return 0, &SyncError{
rnd: status.LastRound,
expected: rnd,
}
return 0, NewSyncError(status2.LastRound, rnd, fmt.Errorf("status2.LastRound mismatch: %w", err))
}

Check warning on line 472 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L470-L472

Added lines #L470 - L472 were not covered by tests

// This is probably a connection error, not a SyncError.
return 0, fmt.Errorf("unknown errors: StatusAfterBlock(%w), Status(%w)", err, err2)

Check warning on line 475 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L475

Added line #L475 was not covered by tests
}

func (algodImp *algodImporter) getBlockInner(rnd uint64) (data.BlockData, error) {
Expand All @@ -470,10 +481,7 @@

nodeRound, err := waitForRoundWithTimeout(algodImp.ctx, algodImp.logger, algodImp.aclient, rnd, waitForRoundTimeout)
if err != nil {
// If context has expired.
if algodImp.ctx.Err() != nil {
return blk, fmt.Errorf("GetBlock ctx error: %w", err)
}
err = fmt.Errorf("called waitForRoundWithTimeout: %w", err)
algodImp.logger.Errorf(err.Error())
return data.BlockData{}, err
}
Expand All @@ -483,14 +491,15 @@
dt := time.Since(start)
getAlgodRawBlockTimeSeconds.Observe(dt.Seconds())
if err != nil {
algodImp.logger.Errorf("error getting block for round %d: %s", rnd, err.Error())
err = fmt.Errorf("error getting block for round %d: %w", rnd, err)
algodImp.logger.Errorf(err.Error())
return data.BlockData{}, err
}
tmpBlk := new(models.BlockResponse)
err = msgpack.Decode(blockbytes, tmpBlk)
if err != nil {
return blk, err
return blk, fmt.Errorf("error decoding block for round %d: %w", rnd, err)
}

Check warning on line 502 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L501-L502

Added lines #L501 - L502 were not covered by tests

blk.BlockHeader = tmpBlk.Block.BlockHeader
blk.Payset = tmpBlk.Block.Payset
Expand All @@ -503,7 +512,7 @@
delta, err = algodImp.getDelta(rnd)
if err != nil {
if nodeRound < rnd {
err = fmt.Errorf("ledger state delta not found: node round (%d) is behind required round (%d), ensure follower node has its sync round set to the required round: %w", nodeRound, rnd, err)

Check warning on line 515 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L515

Added line #L515 was not covered by tests
} else {
err = fmt.Errorf("ledger state delta not found: node round (%d), required round (%d): verify follower node configuration and ensure follower node has its sync round set to the required round, re-deploying the follower node may be necessary: %w", nodeRound, rnd, err)
}
Expand All @@ -523,10 +532,10 @@
if err != nil {
target := &SyncError{}
if errors.As(err, &target) {
algodImp.logger.Warnf("Sync error detected, attempting to set the sync round to recover the node: %s", err.Error())
algodImp.logger.Warnf("importer algod.GetBlock() sync error detected, attempting to set the sync round to recover the node: %s", err.Error())
_, _ = algodImp.aclient.SetSyncRound(rnd).Do(algodImp.ctx)
} else {
err = fmt.Errorf("error getting block for round %d, check node configuration: %s", rnd, err)
err = fmt.Errorf("importer algod.GetBlock() error getting block for round %d, check node configuration: %s", rnd, err)
algodImp.logger.Errorf(err.Error())
}
return data.BlockData{}, err
Expand Down
52 changes: 32 additions & 20 deletions conduit/plugins/importers/algod/algod_importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ package algodimporter
import (
"context"
"fmt"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

"github.com/algorand/go-algorand-sdk/v2/client/v2/algod"
"github.com/algorand/go-algorand-sdk/v2/client/v2/common/models"
sdk "github.com/algorand/go-algorand-sdk/v2/types"
Expand Down Expand Up @@ -681,34 +682,39 @@ func TestGetBlockErrors(t *testing.T) {
name: "Cannot wait for block",
rnd: 123,
blockAfterResponder: MakeJsonResponderSeries("/wait-for-block-after", []int{http.StatusOK, http.StatusNotFound}, []interface{}{models.NodeStatus{LastRound: 1}}),
err: fmt.Sprintf("error getting block for round 123"),
blockResponder: nil,
deltaResponder: nil,
err: "error getting block for round 123",
logs: []string{"error getting block for round 123"},
},
{
name: "Cannot get block",
rnd: 123,
blockAfterResponder: BlockAfterResponder,
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}),
blockResponder: MakeMsgpStatusResponder("get", "/v2/blocks/", http.StatusNotFound, ""),
err: fmt.Sprintf("error getting block for round 123"),
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}),
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
err: "error getting block for round 123",
logs: []string{"error getting block for round 123"},
},
{
name: "Cannot get delta (node behind, re-send sync)",
name: "Cannot get delta - node behind, re-send sync",
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
rnd: 200,
blockAfterResponder: MakeBlockAfterResponder(models.NodeStatus{LastRound: 50}),
blockResponder: BlockResponder,
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, ""),
err: fmt.Sprintf("wrong round returned from status for round: 50 != 200"),
logs: []string{"wrong round returned from status for round: 50 != 200", "Sync error detected, attempting to set the sync round to recover the node"},
err: "wrong round returned from status for round: retrieved(50) != expected(200)",
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
logs: []string{
"wrong round returned from status for round: retrieved(50) != expected(200)",
"sync error detected, attempting to set the sync round to recover the node",
},
},
{
name: "Cannot get delta (caught up)",
name: "Cannot get delta - caught up",
rnd: 200,
blockAfterResponder: MakeBlockAfterResponder(models.NodeStatus{LastRound: 200}),
blockResponder: BlockResponder,
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, ""),
err: fmt.Sprintf("ledger state delta not found: node round (200), required round (200)"),
err: "ledger state delta not found: node round (200), required round (200)",
logs: []string{"ledger state delta not found: node round (200), required round (200)"},
},
}
Expand Down Expand Up @@ -752,21 +758,27 @@ func TestGetBlockErrors(t *testing.T) {
_, err = testImporter.GetBlock(tc.rnd)
noError := assert.ErrorContains(t, err, tc.err)

// Make sure each of the expected log messages are present
// Make sure each of the expected log messages are present in the hookEntries
hookEntries := hook.AllEntries()
for _, log := range tc.logs {
found := false
for _, entry := range hook.AllEntries() {
fmt.Println(strings.Contains(entry.Message, log))
found = found || strings.Contains(entry.Message, log)
fmt.Println("~~~\nCurrent log: ", log)
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
for _, entry := range hookEntries {
logIsSubstring := strings.Contains(entry.Message, log)
found = found || logIsSubstring
fmt.Printf("expectedMessageInLog=%t, found=%t:\n\t%s\n", logIsSubstring, found, entry.Message)
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
}
noError = noError && assert.True(t, found, "(%s) Expected log was not found: '%s'", tc.name, log)
if !noError {
fmt.Printf(">>>>>WE HAVE A PROBLEM<<<<<<\n")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it a little easier to find the failing needle in the printouts haystack since we aren't using require assertions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should use !found as the condition, in tests that provide multiple log entries would print WE HAVE A PROBLEM for all checks after the first one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
noError = noError && assert.True(t, found, "Expected log was not found: '%s'", log)
}

// Print logs if there was an error.
if !noError {
fmt.Println("An error was detected, printing logs")
fmt.Printf("An error was detected, printing logs (%s)\n", tc.name)
fmt.Println("------------------------------------")
for _, entry := range hook.AllEntries() {
for _, entry := range hookEntries {
fmt.Printf(" %s\n", entry.Message)
}
fmt.Println("------------------------------------")
Expand Down
6 changes: 5 additions & 1 deletion conduit/plugins/importers/algod/mock_algod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ func NewAlgodHandler(responders ...algodCustomHandler) *AlgodHandler {

// ServeHTTP implements the http.Handler interface for AlgodHandler
func (handler *AlgodHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
for _, responder := range handler.responders {
for i, responder := range handler.responders {
_ = i
if responder == nil {
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
continue
}
if responder(req, w) {
return
}
Expand Down
Loading