Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

algod importer: Auto catchpoint label lookup. #65

Merged
merged 17 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,3 @@ Indexer was built in a way that strongly coupled it to Postgresql, and the defin
Going forward we will continue to maintain the Indexer application, however our main focus will be enabling and optimizing a multitude of use cases through the Conduit pipeline design rather the singular Indexer pipeline.

For a more detailed look at the differences between Conduit and Indexer, see [our migration guide](./docs/tutorials/IndexerMigration.md).

# Known Issues

## Restarting Follower Nodes Multiple Times in a Row

When a follower node is restarted, the sync round is advanced to the node's ledger round. This causes a chain reaction where the node's ledger round is then advanced by `MaxAcctLookback` rounds. When this happens, the node should temporarily have access to 2 * `MaxAcctLookback` ledger state delta responses because some had been previously persisted to disk. However, if the follower node is restarted a second time before conduit has consumed the temporary ledger state delta objects, the node will become desynchronized from Conduit.

When this happens the follower node must be manually re-synchronized with Conduit. This is done by launching a new follower node or running fast catchup to move to an earlier round.
24 changes: 15 additions & 9 deletions conduit/plugins/importers/algod/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ This plugin imports block data from an algod node. Fetch blocks data from the [a

### Automatic Fast Catchup

If an admin API token and catchpoint are set, the plugin will automatically run fast catchup on startup if the node is behind the current pipeline round.
If an admin API token is set, the plugin will attempt to use a fast catchup when it would help reach the target round.
A specific catchpoint can be provided, otherwise one will be selected automatically by querying the catchpoint URLs
listed in the sample.

### Follower Node Orchestration

Expand Down Expand Up @@ -34,12 +36,16 @@ When using a follower node, ledger state delta objects are provided to the proce

# Algod catchpoint catchup arguments
catchup-config:
# The catchpoint to use when running fast catchup. Select an appropriate catchpoint for your deployment.
# They are published in the following locations:
# mainnet: https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/mainnet_catchpoints.txt
# betanet: https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/betanet_catchpoints.txt
# testnet: https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/testnet_catchpoints.txt
catchpoint: ""
# Algod Admin API Token
# Algod Admin API Token. Set the admin token to use fast catchup during
# startup. The importer checks to see if a catchup would help and if so
# the catchpoint label will be used. If no catchpoint is provided, the
# importer will automatically select one.
admin-token: ""
```
# The catchpoint to use when running fast catchup. If this is set it
# overrides 'auto: true'. To select an appropriate catchpoint for your
# deployment, see the list of available catchpoints for each network:
# mainnet: https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/mainnet_catchpoints.txt
# betanet: https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/betanet_catchpoints.txt
# testnet: https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/testnet_catchpoints.txt
catchpoint: ""
```
136 changes: 113 additions & 23 deletions conduit/plugins/importers/algod/algod_importer.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package algodimporter

import (
"bufio"
"context"
_ "embed" // used to embed config
"fmt"
"io"
"net/http"
"net/url"
"reflect"
"strconv"
Expand Down Expand Up @@ -43,6 +46,8 @@ const (
retries = 5
)

const catchpointsURL = "https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/%s_catchpoints.txt"

type algodImporter struct {
aclient *algod.Client
logger *logrus.Logger
Expand Down Expand Up @@ -96,7 +101,7 @@ func parseCatchpointRound(catchpoint string) (round sdk.Round, err error) {
return
}

func (algodImp *algodImporter) startCatchpointCatchup() error {
func (algodImp *algodImporter) startCatchpointCatchup(catchpoint string) error {
// Run catchpoint catchup
client, err := common.MakeClient(algodImp.cfg.NetAddr, "X-Algo-API-Token", algodImp.cfg.CatchupConfig.AdminToken)
if err != nil {
Expand All @@ -106,13 +111,13 @@ func (algodImp *algodImporter) startCatchpointCatchup() error {
err = client.Post(
algodImp.ctx,
&resp,
fmt.Sprintf("/v2/catchup/%s", common.EscapeParams(algodImp.cfg.CatchupConfig.Catchpoint)...),
fmt.Sprintf("/v2/catchup/%s", common.EscapeParams(catchpoint)...),
nil,
nil,
nil,
)
if err != nil {
return fmt.Errorf("POST /v2/catchup/%s received unexpected error: %w", algodImp.cfg.CatchupConfig.Catchpoint, err)
return fmt.Errorf("POST /v2/catchup/%s received unexpected error: %w", catchpoint, err)
}
return nil
}
Expand Down Expand Up @@ -154,45 +159,115 @@ func (algodImp *algodImporter) monitorCatchpointCatchup() error {
return nil
}

func (algodImp *algodImporter) catchupNode(initProvider data.InitProvider) error {
if algodImp.mode == followerMode {
// Set the sync round to the round provided by initProvider
_, err := algodImp.aclient.SetSyncRound(uint64(initProvider.NextDBRound())).Do(algodImp.ctx)
func getMissingCatchpointLabel(URL string, nextRound uint64) (string, error) {
resp, err := http.Get(URL)
if err != nil {
return "", err
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("failed to read catchpoint label response: %w", err)
}

if resp.StatusCode != 200 {
return "", fmt.Errorf("failed to lookup catchpoint label list (%d): %s", resp.StatusCode, string(body))
}

// look for best match without going over
var label string
labels := string(body)
scanner := bufio.NewScanner(strings.NewReader(labels))
for scanner.Scan() && scanner.Text() != "" {
line := scanner.Text()
round, err := parseCatchpointRound(line)
if err != nil {
return fmt.Errorf("received unexpected error setting sync round (%d): %w", initProvider.NextDBRound(), err)
return "", err
}
// TODO: Change >= to > after go-algorand#5352 is fixed.
if uint64(round) >= nextRound {
break
}
label = line
}

// Run Catchpoint Catchup
if algodImp.cfg.CatchupConfig.Catchpoint != "" {
cpRound, err := parseCatchpointRound(algodImp.cfg.CatchupConfig.Catchpoint)
if label == "" {
return "", fmt.Errorf("no catchpoint label found for round %d at: %s", nextRound, URL)
}

return label, nil
}

// checkRounds to see if catchup is needed, an error is returned if a bad state
// is detected.
func checkRounds(logger *logrus.Logger, catchpointRound, nodeRound, targetRound uint64) (bool, error) {
// Make sure catchpoint round is not in the future
// TODO: Change < to <= after go-algorand#5352 is fixed.
canCatchup := catchpointRound < targetRound
mustCatchup := targetRound < nodeRound
shouldCatchup := nodeRound < catchpointRound

msg := fmt.Sprintf("Node round %d, target round %d, catchpoint round %d", nodeRound, targetRound, catchpointRound)

if canCatchup && mustCatchup {
logger.Infof("Catchup required, node round ahead of target round. %s.", msg)
return true, nil
}

if canCatchup && shouldCatchup {
logger.Infof("Catchup requested. %s.", msg)
return true, nil
}

if !canCatchup && mustCatchup {
err := fmt.Errorf("node round %d and catchpoint round %d are ahead of target round %d", nodeRound, catchpointRound, targetRound)
logger.Errorf("Catchup required but no valid catchpoint available, %s.", err.Error())
return false, err
}

logger.Infof("No catchup required. %s.", msg)
return false, nil
}

func (algodImp *algodImporter) catchupNode(catchpoint string, targetRound uint64) error {
if catchpoint != "" {
cpRound, err := parseCatchpointRound(catchpoint)
if err != nil {
return err
}
nStatus, err := algodImp.aclient.Status().Do(algodImp.ctx)
if err != nil {
return fmt.Errorf("received unexpected error failed to get node status: %w", err)
}
if cpRound <= sdk.Round(nStatus.LastRound) {
algodImp.logger.Infof(
"Skipping catchpoint catchup for %s, since it's before node round %d",
algodImp.cfg.CatchupConfig.Catchpoint,
nStatus.LastRound,
)

if runCatchup, err := checkRounds(algodImp.logger, uint64(cpRound), nStatus.LastRound, targetRound); !runCatchup || err != nil {
return err
} else {
err = algodImp.startCatchpointCatchup()
algodImp.logger.Infof("Starting catchpoint catchup with label %s", catchpoint)

err = algodImp.startCatchpointCatchup(catchpoint)
if err != nil {
return err
}

// Wait for algod to catchup
err = algodImp.monitorCatchpointCatchup()
if err != nil {
return err
}
}
// Wait for algod to catchup
err = algodImp.monitorCatchpointCatchup()
}

// Set the sync round after fast-catchup in case the node round is ahead of the target round.
// Trying to set it before would cause an error.
if algodImp.mode == followerMode {
// Set the sync round to the round provided by initProvider
_, err := algodImp.aclient.SetSyncRound(targetRound).Do(algodImp.ctx)
if err != nil {
return err
return fmt.Errorf("received unexpected error setting sync round (%d): %w", targetRound, err)
}
}

_, err := algodImp.aclient.StatusAfterBlock(uint64(initProvider.NextDBRound())).Do(algodImp.ctx)
_, err := algodImp.aclient.StatusAfterBlock(targetRound).Do(algodImp.ctx)
if err != nil {
err = fmt.Errorf("received unexpected error (StatusAfterBlock) waiting for node to catchup: %w", err)
}
Expand Down Expand Up @@ -253,7 +328,22 @@ func (algodImp *algodImporter) Init(ctx context.Context, initProvider data.InitP
return nil, fmt.Errorf("unable to fetch genesis file from API at %s", algodImp.cfg.NetAddr)
}

err = algodImp.catchupNode(initProvider)
catchpoint := ""

// If there is an admin token, look for a catchpoint to use.
if algodImp.cfg.CatchupConfig.AdminToken != "" {
if algodImp.cfg.CatchupConfig.Catchpoint != "" {
catchpoint = algodImp.cfg.CatchupConfig.Catchpoint
} else {
URL := fmt.Sprintf(catchpointsURL, genesis.Network)
catchpoint, err = getMissingCatchpointLabel(URL, uint64(initProvider.NextDBRound()))
if err != nil {
return nil, fmt.Errorf("unable to lookup catchpoint: %w", err)
}
}
}

err = algodImp.catchupNode(catchpoint, uint64(initProvider.NextDBRound()))

return &genesis, err
}
Expand Down
Loading