diff --git a/conduit/plugins/importers/algod/algod_importer.go b/conduit/plugins/importers/algod/algod_importer.go index f705428a..c26cb4d1 100644 --- a/conduit/plugins/importers/algod/algod_importer.go +++ b/conduit/plugins/importers/algod/algod_importer.go @@ -42,7 +42,7 @@ const ( ) var ( - waitForRoundTimeout = 5 * time.Second + waitForRoundTimeout = 30 * time.Second ) const catchpointsURL = "https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/%s_catchpoints.txt" @@ -419,12 +419,31 @@ func (algodImp *algodImporter) getDelta(rnd uint64) (sdk.LedgerStateDelta, error // SyncError is used to indicate algod and conduit are not synchronized. type SyncError struct { - rnd uint64 - expected uint64 + // retrievedRound is the round returned from an algod status call. + retrievedRound uint64 + + // expectedRound is the round conduit expected to have gotten back. + expectedRound uint64 + + // err is the error that was received from the endpoint caller. + err error +} + +// NewSyncError creates a new SyncError. +func NewSyncError(retrievedRound, expectedRound uint64, err error) *SyncError { + return &SyncError{ + retrievedRound: retrievedRound, + expectedRound: expectedRound, + err: err, + } } func (e *SyncError) Error() string { - return fmt.Sprintf("wrong round returned from status for round: %d != %d", e.rnd, e.expected) + return fmt.Sprintf("wrong round returned from status for round: retrieved(%d) != expected(%d): %v", e.retrievedRound, e.expectedRound, e.err) +} + +func (e *SyncError) Unwrap() error { + return e.err } func waitForRoundWithTimeout(ctx context.Context, l *logrus.Logger, c *algod.Client, rnd uint64, to time.Duration) (uint64, error) { @@ -440,10 +459,8 @@ func waitForRoundWithTimeout(ctx context.Context, l *logrus.Logger, c *algod.Cli if rnd <= status.LastRound { return status.LastRound, nil } - return 0, &SyncError{ - rnd: status.LastRound, - expected: rnd, - } + // algod's timeout should not be reached because context.WithTimeout is used + return 0, NewSyncError(status.LastRound, rnd, fmt.Errorf("sync error, likely due to status after block timeout")) } // If there was a different error and the node is responsive, call status before returning a SyncError. @@ -454,10 +471,7 @@ func waitForRoundWithTimeout(ctx context.Context, l *logrus.Logger, c *algod.Cli return 0, fmt.Errorf("unable to get status after block and status: %w", errors.Join(err, err2)) } if status2.LastRound < rnd { - return 0, &SyncError{ - rnd: status.LastRound, - expected: rnd, - } + return 0, NewSyncError(status2.LastRound, rnd, fmt.Errorf("status2.LastRound mismatch: %w", err)) } // This is probably a connection error, not a SyncError. @@ -470,10 +484,7 @@ func (algodImp *algodImporter) getBlockInner(rnd uint64) (data.BlockData, error) nodeRound, err := waitForRoundWithTimeout(algodImp.ctx, algodImp.logger, algodImp.aclient, rnd, waitForRoundTimeout) if err != nil { - // If context has expired. - if algodImp.ctx.Err() != nil { - return blk, fmt.Errorf("GetBlock ctx error: %w", err) - } + err = fmt.Errorf("called waitForRoundWithTimeout: %w", err) algodImp.logger.Errorf(err.Error()) return data.BlockData{}, err } @@ -483,13 +494,14 @@ func (algodImp *algodImporter) getBlockInner(rnd uint64) (data.BlockData, error) dt := time.Since(start) getAlgodRawBlockTimeSeconds.Observe(dt.Seconds()) if err != nil { - algodImp.logger.Errorf("error getting block for round %d: %s", rnd, err.Error()) + err = fmt.Errorf("error getting block for round %d: %w", rnd, err) + algodImp.logger.Errorf(err.Error()) return data.BlockData{}, err } tmpBlk := new(models.BlockResponse) err = msgpack.Decode(blockbytes, tmpBlk) if err != nil { - return blk, err + return blk, fmt.Errorf("error decoding block for round %d: %w", rnd, err) } blk.BlockHeader = tmpBlk.Block.BlockHeader @@ -523,10 +535,10 @@ func (algodImp *algodImporter) GetBlock(rnd uint64) (data.BlockData, error) { if err != nil { target := &SyncError{} if errors.As(err, &target) { - algodImp.logger.Warnf("Sync error detected, attempting to set the sync round to recover the node: %s", err.Error()) + algodImp.logger.Warnf("importer algod.GetBlock() sync error detected, attempting to set the sync round to recover the node: %s", err.Error()) _, _ = algodImp.aclient.SetSyncRound(rnd).Do(algodImp.ctx) } else { - err = fmt.Errorf("error getting block for round %d, check node configuration: %s", rnd, err) + err = fmt.Errorf("importer algod.GetBlock() error getting block for round %d, check node configuration: %s", rnd, err) algodImp.logger.Errorf(err.Error()) } return data.BlockData{}, err diff --git a/conduit/plugins/importers/algod/algod_importer_test.go b/conduit/plugins/importers/algod/algod_importer_test.go index 49dc3fd7..00c07a9e 100644 --- a/conduit/plugins/importers/algod/algod_importer_test.go +++ b/conduit/plugins/importers/algod/algod_importer_test.go @@ -3,17 +3,18 @@ package algodimporter import ( "context" "fmt" - "github.com/sirupsen/logrus" - "github.com/sirupsen/logrus/hooks/test" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "gopkg.in/yaml.v3" "net/http" "net/http/httptest" "strings" "testing" "time" + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v3" + "github.com/algorand/go-algorand-sdk/v2/client/v2/algod" "github.com/algorand/go-algorand-sdk/v2/client/v2/common/models" sdk "github.com/algorand/go-algorand-sdk/v2/types" @@ -681,34 +682,39 @@ func TestGetBlockErrors(t *testing.T) { name: "Cannot wait for block", rnd: 123, blockAfterResponder: MakeJsonResponderSeries("/wait-for-block-after", []int{http.StatusOK, http.StatusNotFound}, []interface{}{models.NodeStatus{LastRound: 1}}), - err: fmt.Sprintf("error getting block for round 123"), + blockResponder: nil, + deltaResponder: nil, + err: "error getting block for round 123", logs: []string{"error getting block for round 123"}, }, { name: "Cannot get block", rnd: 123, blockAfterResponder: BlockAfterResponder, - deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}), blockResponder: MakeMsgpStatusResponder("get", "/v2/blocks/", http.StatusNotFound, ""), - err: fmt.Sprintf("error getting block for round 123"), + deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}), + err: "error getting block for round 123", logs: []string{"error getting block for round 123"}, }, { - name: "Cannot get delta (node behind, re-send sync)", + name: "Cannot get delta - node behind, re-send sync", rnd: 200, blockAfterResponder: MakeBlockAfterResponder(models.NodeStatus{LastRound: 50}), blockResponder: BlockResponder, deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, ""), - err: fmt.Sprintf("wrong round returned from status for round: 50 != 200"), - logs: []string{"wrong round returned from status for round: 50 != 200", "Sync error detected, attempting to set the sync round to recover the node"}, + err: "wrong round returned from status for round: retrieved(50) != expected(200)", + logs: []string{ + "wrong round returned from status for round: retrieved(50) != expected(200)", + "sync error detected, attempting to set the sync round to recover the node", + }, }, { - name: "Cannot get delta (caught up)", + name: "Cannot get delta - caught up", rnd: 200, blockAfterResponder: MakeBlockAfterResponder(models.NodeStatus{LastRound: 200}), blockResponder: BlockResponder, deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, ""), - err: fmt.Sprintf("ledger state delta not found: node round (200), required round (200)"), + err: "ledger state delta not found: node round (200), required round (200)", logs: []string{"ledger state delta not found: node round (200), required round (200)"}, }, } @@ -752,21 +758,26 @@ func TestGetBlockErrors(t *testing.T) { _, err = testImporter.GetBlock(tc.rnd) noError := assert.ErrorContains(t, err, tc.err) - // Make sure each of the expected log messages are present + // Make sure each of the expected log messages are present in the hookEntries + hookEntries := hook.AllEntries() for _, log := range tc.logs { found := false - for _, entry := range hook.AllEntries() { - fmt.Println(strings.Contains(entry.Message, log)) - found = found || strings.Contains(entry.Message, log) + for _, entry := range hookEntries { + logIsSubstring := strings.Contains(entry.Message, log) + found = found || logIsSubstring + fmt.Printf("logIsSubstring=%t, found=%t:\n\t%s\n", logIsSubstring, found, entry.Message) } - noError = noError && assert.True(t, found, "Expected log was not found: '%s'", log) + if !found { + fmt.Printf(">>>>>>WE HAVE A PROBLEM<<<<<<\n") + } + noError = noError && assert.True(t, found, "(%s) Expected log was not found: '%s'", tc.name, log) } // Print logs if there was an error. if !noError { - fmt.Println("An error was detected, printing logs") + fmt.Printf("An error was detected, printing logs (%s)\n", tc.name) fmt.Println("------------------------------------") - for _, entry := range hook.AllEntries() { + for _, entry := range hookEntries { fmt.Printf(" %s\n", entry.Message) } fmt.Println("------------------------------------") diff --git a/conduit/plugins/importers/algod/mock_algod_test.go b/conduit/plugins/importers/algod/mock_algod_test.go index f5b5bed5..deb61c44 100644 --- a/conduit/plugins/importers/algod/mock_algod_test.go +++ b/conduit/plugins/importers/algod/mock_algod_test.go @@ -33,7 +33,11 @@ func NewAlgodHandler(responders ...algodCustomHandler) *AlgodHandler { // ServeHTTP implements the http.Handler interface for AlgodHandler func (handler *AlgodHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - for _, responder := range handler.responders { + for i, responder := range handler.responders { + _ = i + if responder == nil { + continue + } if responder(req, w) { return }