diff --git a/README.md b/README.md index 59051a6c..99abc928 100644 --- a/README.md +++ b/README.md @@ -57,11 +57,3 @@ Indexer was built in a way that strongly coupled it to Postgresql, and the defin Going forward we will continue to maintain the Indexer application, however our main focus will be enabling and optimizing a multitude of use cases through the Conduit pipeline design rather the singular Indexer pipeline. For a more detailed look at the differences between Conduit and Indexer, see [our migration guide](./docs/tutorials/IndexerMigration.md). - -# Known Issues - -## Restarting Follower Nodes Multiple Times in a Row - -When a follower node is restarted, the sync round is advanced to the node's ledger round. This causes a chain reaction where the node's ledger round is then advanced by `MaxAcctLookback` rounds. When this happens, the node should temporarily have access to 2 * `MaxAcctLookback` ledger state delta responses because some had been previously persisted to disk. However, if the follower node is restarted a second time before conduit has consumed the temporary ledger state delta objects, the node will become desynchronized from Conduit. - -When this happens the follower node must be manually re-synchronized with Conduit. This is done by launching a new follower node or running fast catchup to move to an earlier round. diff --git a/conduit/plugins/importers/algod/README.md b/conduit/plugins/importers/algod/README.md index cff60c27..4937dc51 100644 --- a/conduit/plugins/importers/algod/README.md +++ b/conduit/plugins/importers/algod/README.md @@ -6,7 +6,9 @@ This plugin imports block data from an algod node. Fetch blocks data from the [a ### Automatic Fast Catchup -If an admin API token and catchpoint are set, the plugin will automatically run fast catchup on startup if the node is behind the current pipeline round. +If an admin API token is set, the plugin will attempt to use a fast catchup when it would help reach the target round. +A specific catchpoint can be provided, otherwise one will be selected automatically by querying the catchpoint URLs +listed in the sample. ### Follower Node Orchestration @@ -34,12 +36,16 @@ When using a follower node, ledger state delta objects are provided to the proce # Algod catchpoint catchup arguments catchup-config: - # The catchpoint to use when running fast catchup. Select an appropriate catchpoint for your deployment. - # They are published in the following locations: - # mainnet: https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/mainnet_catchpoints.txt - # betanet: https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/betanet_catchpoints.txt - # testnet: https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/testnet_catchpoints.txt - catchpoint: "" - # Algod Admin API Token + # Algod Admin API Token. Set the admin token to use fast catchup during + # startup. The importer checks to see if a catchup would help and if so + # the catchpoint label will be used. If no catchpoint is provided, the + # importer will automatically select one. admin-token: "" -``` \ No newline at end of file + # The catchpoint to use when running fast catchup. If this is set it + # overrides 'auto: true'. To select an appropriate catchpoint for your + # deployment, see the list of available catchpoints for each network: + # mainnet: https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/mainnet_catchpoints.txt + # betanet: https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/betanet_catchpoints.txt + # testnet: https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/testnet_catchpoints.txt + catchpoint: "" +``` diff --git a/conduit/plugins/importers/algod/algod_importer.go b/conduit/plugins/importers/algod/algod_importer.go index 2935f5e7..273ee097 100644 --- a/conduit/plugins/importers/algod/algod_importer.go +++ b/conduit/plugins/importers/algod/algod_importer.go @@ -1,9 +1,12 @@ package algodimporter import ( + "bufio" "context" _ "embed" // used to embed config "fmt" + "io" + "net/http" "net/url" "reflect" "strconv" @@ -43,6 +46,8 @@ const ( retries = 5 ) +const catchpointsURL = "https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/%s_catchpoints.txt" + type algodImporter struct { aclient *algod.Client logger *logrus.Logger @@ -96,7 +101,7 @@ func parseCatchpointRound(catchpoint string) (round sdk.Round, err error) { return } -func (algodImp *algodImporter) startCatchpointCatchup() error { +func (algodImp *algodImporter) startCatchpointCatchup(catchpoint string) error { // Run catchpoint catchup client, err := common.MakeClient(algodImp.cfg.NetAddr, "X-Algo-API-Token", algodImp.cfg.CatchupConfig.AdminToken) if err != nil { @@ -106,13 +111,13 @@ func (algodImp *algodImporter) startCatchpointCatchup() error { err = client.Post( algodImp.ctx, &resp, - fmt.Sprintf("/v2/catchup/%s", common.EscapeParams(algodImp.cfg.CatchupConfig.Catchpoint)...), + fmt.Sprintf("/v2/catchup/%s", common.EscapeParams(catchpoint)...), nil, nil, nil, ) if err != nil { - return fmt.Errorf("POST /v2/catchup/%s received unexpected error: %w", algodImp.cfg.CatchupConfig.Catchpoint, err) + return fmt.Errorf("POST /v2/catchup/%s received unexpected error: %w", catchpoint, err) } return nil } @@ -154,18 +159,78 @@ func (algodImp *algodImporter) monitorCatchpointCatchup() error { return nil } -func (algodImp *algodImporter) catchupNode(initProvider data.InitProvider) error { - if algodImp.mode == followerMode { - // Set the sync round to the round provided by initProvider - _, err := algodImp.aclient.SetSyncRound(uint64(initProvider.NextDBRound())).Do(algodImp.ctx) +func getMissingCatchpointLabel(URL string, nextRound uint64) (string, error) { + resp, err := http.Get(URL) + if err != nil { + return "", err + } + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("failed to read catchpoint label response: %w", err) + } + + if resp.StatusCode != 200 { + return "", fmt.Errorf("failed to lookup catchpoint label list (%d): %s", resp.StatusCode, string(body)) + } + + // look for best match without going over + var label string + labels := string(body) + scanner := bufio.NewScanner(strings.NewReader(labels)) + for scanner.Scan() && scanner.Text() != "" { + line := scanner.Text() + round, err := parseCatchpointRound(line) if err != nil { - return fmt.Errorf("received unexpected error setting sync round (%d): %w", initProvider.NextDBRound(), err) + return "", err + } + // TODO: Change >= to > after go-algorand#5352 is fixed. + if uint64(round) >= nextRound { + break } + label = line } - // Run Catchpoint Catchup - if algodImp.cfg.CatchupConfig.Catchpoint != "" { - cpRound, err := parseCatchpointRound(algodImp.cfg.CatchupConfig.Catchpoint) + if label == "" { + return "", fmt.Errorf("no catchpoint label found for round %d at: %s", nextRound, URL) + } + + return label, nil +} + +// checkRounds to see if catchup is needed, an error is returned if a bad state +// is detected. +func checkRounds(logger *logrus.Logger, catchpointRound, nodeRound, targetRound uint64) (bool, error) { + // Make sure catchpoint round is not in the future + // TODO: Change < to <= after go-algorand#5352 is fixed. + canCatchup := catchpointRound < targetRound + mustCatchup := targetRound < nodeRound + shouldCatchup := nodeRound < catchpointRound + + msg := fmt.Sprintf("Node round %d, target round %d, catchpoint round %d", nodeRound, targetRound, catchpointRound) + + if canCatchup && mustCatchup { + logger.Infof("Catchup required, node round ahead of target round. %s.", msg) + return true, nil + } + + if canCatchup && shouldCatchup { + logger.Infof("Catchup requested. %s.", msg) + return true, nil + } + + if !canCatchup && mustCatchup { + err := fmt.Errorf("node round %d and catchpoint round %d are ahead of target round %d", nodeRound, catchpointRound, targetRound) + logger.Errorf("Catchup required but no valid catchpoint available, %s.", err.Error()) + return false, err + } + + logger.Infof("No catchup required. %s.", msg) + return false, nil +} + +func (algodImp *algodImporter) catchupNode(catchpoint string, targetRound uint64) error { + if catchpoint != "" { + cpRound, err := parseCatchpointRound(catchpoint) if err != nil { return err } @@ -173,26 +238,36 @@ func (algodImp *algodImporter) catchupNode(initProvider data.InitProvider) error if err != nil { return fmt.Errorf("received unexpected error failed to get node status: %w", err) } - if cpRound <= sdk.Round(nStatus.LastRound) { - algodImp.logger.Infof( - "Skipping catchpoint catchup for %s, since it's before node round %d", - algodImp.cfg.CatchupConfig.Catchpoint, - nStatus.LastRound, - ) + + if runCatchup, err := checkRounds(algodImp.logger, uint64(cpRound), nStatus.LastRound, targetRound); !runCatchup || err != nil { + return err } else { - err = algodImp.startCatchpointCatchup() + algodImp.logger.Infof("Starting catchpoint catchup with label %s", catchpoint) + + err = algodImp.startCatchpointCatchup(catchpoint) + if err != nil { + return err + } + + // Wait for algod to catchup + err = algodImp.monitorCatchpointCatchup() if err != nil { return err } } - // Wait for algod to catchup - err = algodImp.monitorCatchpointCatchup() + } + + // Set the sync round after fast-catchup in case the node round is ahead of the target round. + // Trying to set it before would cause an error. + if algodImp.mode == followerMode { + // Set the sync round to the round provided by initProvider + _, err := algodImp.aclient.SetSyncRound(targetRound).Do(algodImp.ctx) if err != nil { - return err + return fmt.Errorf("received unexpected error setting sync round (%d): %w", targetRound, err) } } - _, err := algodImp.aclient.StatusAfterBlock(uint64(initProvider.NextDBRound())).Do(algodImp.ctx) + _, err := algodImp.aclient.StatusAfterBlock(targetRound).Do(algodImp.ctx) if err != nil { err = fmt.Errorf("received unexpected error (StatusAfterBlock) waiting for node to catchup: %w", err) } @@ -253,7 +328,22 @@ func (algodImp *algodImporter) Init(ctx context.Context, initProvider data.InitP return nil, fmt.Errorf("unable to fetch genesis file from API at %s", algodImp.cfg.NetAddr) } - err = algodImp.catchupNode(initProvider) + catchpoint := "" + + // If there is an admin token, look for a catchpoint to use. + if algodImp.cfg.CatchupConfig.AdminToken != "" { + if algodImp.cfg.CatchupConfig.Catchpoint != "" { + catchpoint = algodImp.cfg.CatchupConfig.Catchpoint + } else { + URL := fmt.Sprintf(catchpointsURL, genesis.Network) + catchpoint, err = getMissingCatchpointLabel(URL, uint64(initProvider.NextDBRound())) + if err != nil { + return nil, fmt.Errorf("unable to lookup catchpoint: %w", err) + } + } + } + + err = algodImp.catchupNode(catchpoint, uint64(initProvider.NextDBRound())) return &genesis, err } diff --git a/conduit/plugins/importers/algod/algod_importer_test.go b/conduit/plugins/importers/algod/algod_importer_test.go index 5531cab1..d7ca29ef 100644 --- a/conduit/plugins/importers/algod/algod_importer_test.go +++ b/conduit/plugins/importers/algod/algod_importer_test.go @@ -95,104 +95,211 @@ netaddr: %s } } +func Test_checkRounds(t *testing.T) { + type args struct { + catchpointRound uint64 + nodeRound uint64 + targetRound uint64 + } + tests := []struct { + name string + args args + want bool + wantErr assert.ErrorAssertionFunc + wantLogLevel logrus.Level + wantLogMsg string + }{ + { + name: "Skip catchpoint", + args: args{ + catchpointRound: 1000, + nodeRound: 1001, + targetRound: 1002, + }, + want: false, + wantErr: assert.NoError, + wantLogLevel: logrus.InfoLevel, + wantLogMsg: "No catchup required. Node round 1001, target round 1002, catchpoint round 1000.", + }, + { + name: "Catchup requested.", + args: args{ + catchpointRound: 1002, + nodeRound: 1001, + targetRound: 1003, + }, + want: true, + wantErr: assert.NoError, + wantLogLevel: logrus.InfoLevel, + wantLogMsg: "Catchup requested. Node round 1001, target round 1003, catchpoint round 1002.", + }, + { + name: "Catchup required. Success.", + args: args{ + catchpointRound: 1000, + nodeRound: 5000, + targetRound: 1002, + }, + want: true, + wantErr: assert.NoError, + wantLogLevel: logrus.InfoLevel, + wantLogMsg: "Catchup required, node round ahead of target round. Node round 5000, target round 1002, catchpoint round 1000.", + }, + { + name: "Catchup required. Error.", + args: args{ + catchpointRound: 6000, + nodeRound: 5000, + targetRound: 1002, + }, + want: false, + wantErr: assert.Error, + wantLogLevel: logrus.ErrorLevel, + wantLogMsg: "Catchup required but no valid catchpoint available, node round 5000 and catchpoint round 6000 are ahead of target round 1002.", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testLogger, hook := test.NewNullLogger() + got, err := checkRounds(testLogger, tt.args.catchpointRound, tt.args.nodeRound, tt.args.targetRound) + + // Write 1 line to the log. + require.Len(t, hook.Entries, 1) + require.Equal(t, tt.wantLogLevel, hook.Entries[0].Level) + require.Equal(t, tt.wantLogMsg, hook.Entries[0].Message) + + // Check the error + if !tt.wantErr(t, err, fmt.Sprintf("checkRounds(-, %v, %v, %v)", tt.args.catchpointRound, tt.args.nodeRound, tt.args.targetRound)) { + return + } + + // Check return values + assert.Equalf(t, tt.want, got, "checkRounds(-, %v, %v, %v)", tt.args.catchpointRound, tt.args.nodeRound, tt.args.targetRound) + + }) + } +} + func TestInitCatchup(t *testing.T) { tests := []struct { name string catchpoint string + targetRound sdk.Round + adminToken string // to trigger fast-catchup algodServer *httptest.Server err string logs []string }{ - {"sync round failure", "", - NewAlgodServer( + { + name: "sync round failure", + targetRound: 1, + algodServer: NewAlgodServer( GenesisResponder, MakeSyncRoundResponder(http.StatusBadRequest)), - "received unexpected error setting sync round (1): HTTP 400", - []string{}}, - {"catchpoint parse failure", "notvalid", - NewAlgodServer( + err: "received unexpected error setting sync round (1): HTTP 400", + logs: []string{}}, + { + name: "catchpoint parse failure", + adminToken: "admin", + catchpoint: "notvalid", + algodServer: NewAlgodServer( GenesisResponder, MakeSyncRoundResponder(http.StatusOK)), - "unable to parse catchpoint, invalid format", - []string{}}, - {"invalid catchpoint round uint parsing error", "abcd#abcd", - NewAlgodServer( + err: "unable to parse catchpoint, invalid format", + logs: []string{}}, + { + name: "invalid catchpoint round uint parsing error", + adminToken: "admin", + catchpoint: "abcd#abcd", + algodServer: NewAlgodServer( GenesisResponder, MakeSyncRoundResponder(http.StatusOK)), - "invalid syntax", - []string{}}, - {"node status failure", "1234#abcd", - NewAlgodServer( + err: "invalid syntax", + logs: []string{}}, + { + name: "node status failure", + adminToken: "admin", + catchpoint: "1234#abcd", + algodServer: NewAlgodServer( GenesisResponder, MakeSyncRoundResponder(http.StatusOK), MakeStatusResponder("/v2/status", http.StatusBadRequest, "")), - "received unexpected error failed to get node status: HTTP 400", - []string{}}, - {"catchpoint round before node round skips fast catchup", "1234#abcd", - NewAlgodServer( + err: "received unexpected error failed to get node status: HTTP 400", + logs: []string{}}, + { + name: "catchpoint round before node round skips fast catchup", + adminToken: "admin", + catchpoint: "1234#abcd", + targetRound: 1235, + algodServer: NewAlgodServer( GenesisResponder, MakeSyncRoundResponder(http.StatusOK), MakeNodeStatusResponder(models.NodeStatus{LastRound: 1235})), - "", - []string{"Skipping catchpoint catchup for 1234#abcd, since it's before node round 1235"}}, - {"start catchpoint catchup failure", "1236#abcd", - NewAlgodServer( + logs: []string{"No catchup required. Node round 1235, target round 1235, catchpoint round 1234."}, + }, { + name: "start catchpoint catchup failure", + adminToken: "admin", + catchpoint: "1236#abcd", + targetRound: 1240, + algodServer: NewAlgodServer( GenesisResponder, MakeSyncRoundResponder(http.StatusOK), MakeNodeStatusResponder(models.NodeStatus{LastRound: 1235}), MakeStatusResponder("/v2/catchup/", http.StatusBadRequest, "")), - "POST /v2/catchup/1236#abcd received unexpected error: HTTP 400", - []string{}}, - {"monitor catchup node status failure", "1236#abcd", - NewAlgodServer( + err: "POST /v2/catchup/1236#abcd received unexpected error: HTTP 400", + logs: []string{}, + }, + { + name: "monitor catchup node status failure", + adminToken: "admin", + catchpoint: "1236#abcd", + targetRound: 1239, + algodServer: NewAlgodServer( GenesisResponder, MakeSyncRoundResponder(http.StatusOK), MakeJsonResponderSeries("/v2/status", []int{http.StatusOK, http.StatusBadRequest}, []interface{}{models.NodeStatus{LastRound: 1235}}), MakeStatusResponder("/v2/catchup/", http.StatusOK, "")), - "received unexpected error getting node status: HTTP 400", - []string{}}, - {"monitor catchup success", "1236#abcd", - NewAlgodServer( + err: "received unexpected error getting node status: HTTP 400", + logs: []string{}, + }, { + name: "auto catchup used (even if the mocking isn't setup for it)", + adminToken: "admin", + catchpoint: "", + algodServer: NewAlgodServer( GenesisResponder, - MakeSyncRoundResponder(http.StatusOK), - MakeJsonResponderSeries("/v2/status", []int{http.StatusOK}, []interface{}{ - models.NodeStatus{LastRound: 1235}, - models.NodeStatus{Catchpoint: "1236#abcd", CatchpointProcessedAccounts: 1, CatchpointTotalAccounts: 1}, - models.NodeStatus{Catchpoint: "1236#abcd", CatchpointVerifiedAccounts: 1, CatchpointTotalAccounts: 1}, - models.NodeStatus{Catchpoint: "1236#abcd", CatchpointAcquiredBlocks: 1, CatchpointTotalBlocks: 1}, - models.NodeStatus{Catchpoint: "1236#abcd"}, - models.NodeStatus{LastRound: 1236}, - }), - MakeStatusResponder("/v2/catchup/", http.StatusOK, "")), - "", - []string{ - "catchup phase Processed Accounts: 1 / 1", - "catchup phase Verified Accounts: 1 / 1", - "catchup phase Acquired Blocks: 1 / 1", - "catchup phase Verified Blocks", - }}, - {"wait for node to catchup error", "1236#abcd", - NewAlgodServer( + ), + err: "failed to lookup catchpoint label list", + }, { + name: "wait for node to catchup error", + adminToken: "admin", + targetRound: 1240, + catchpoint: "1236#abcd", + algodServer: NewAlgodServer( GenesisResponder, MakeSyncRoundResponder(http.StatusOK), MakeJsonResponderSeries("/v2/status", []int{http.StatusOK, http.StatusOK, http.StatusBadRequest}, []interface{}{models.NodeStatus{LastRound: 1235}}), MakeStatusResponder("/v2/catchup/", http.StatusOK, "")), - "received unexpected error (StatusAfterBlock) waiting for node to catchup: HTTP 400", - []string{}}, + err: "received unexpected error (StatusAfterBlock) waiting for node to catchup: HTTP 400", + logs: []string{}, + }, } for _, ttest := range tests { ttest := ttest t.Run(ttest.name, func(t *testing.T) { t.Parallel() testLogger, hook := test.NewNullLogger() - testImporter := New() - cfgStr := fmt.Sprintf(`--- -mode: %s -netaddr: %s -catchup-config: - catchpoint: %s -`, "follower", ttest.algodServer.URL, ttest.catchpoint) - _, err := testImporter.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil), plugins.MakePluginConfig(cfgStr), testLogger) + testImporter := &algodImporter{} + cfg := Config{ + Mode: "follower", + NetAddr: ttest.algodServer.URL, + CatchupConfig: CatchupParams{ + Catchpoint: ttest.catchpoint, + AdminToken: ttest.adminToken, + }, + } + cfgStr, err := yaml.Marshal(cfg) + require.NoError(t, err) + _, err = testImporter.Init(ctx, conduit.MakePipelineInitProvider(&ttest.targetRound, nil), plugins.MakePluginConfig(string(cfgStr)), testLogger) if ttest.err != "" { require.ErrorContains(t, err, ttest.err) } else { @@ -520,3 +627,23 @@ func TestGetBlockErrors(t *testing.T) { }) } } + +func TestGetMissingCatchpointLabel(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, "1000#abcd\n1100#abcd\n1200#abcd") + })) + defer ts.Close() + label, err := getMissingCatchpointLabel(ts.URL, 1101) + require.NoError(t, err) + // closest without going over + require.Equal(t, "1100#abcd", label) +} + +func TestGetMissingCatchpointLabelError(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, "") + })) + defer ts.Close() + _, err := getMissingCatchpointLabel(ts.URL, 1100) + require.ErrorContains(t, err, "no catchpoint label found for round 1100 at:") +} diff --git a/conduit/plugins/importers/algod/mock_algod_test.go b/conduit/plugins/importers/algod/mock_algod_test.go index 0a7fdf75..dfe233b1 100644 --- a/conduit/plugins/importers/algod/mock_algod_test.go +++ b/conduit/plugins/importers/algod/mock_algod_test.go @@ -70,6 +70,7 @@ func MakeGenesisResponder(genesis types.Genesis) func(reqPath string, w http.Res // GenesisResponder handles /v2/genesis requests and returns an empty Genesis object var GenesisResponder = MakeGenesisResponder(types.Genesis{ Comment: "", + Network: "FAKE", DevMode: true, }) diff --git a/conduit/plugins/importers/algod/sample.yaml b/conduit/plugins/importers/algod/sample.yaml index 9637f6e0..573b6cf2 100644 --- a/conduit/plugins/importers/algod/sample.yaml +++ b/conduit/plugins/importers/algod/sample.yaml @@ -16,11 +16,15 @@ # Algod catchpoint catchup arguments catchup-config: - # The catchpoint to use when running fast catchup. Select an appropriate catchpoint for your deployment. - # They are published in the following locations: - # mainnet: https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/mainnet_catchpoints.txt - # betanet: https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/betanet_catchpoints.txt - # testnet: https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/testnet_catchpoints.txt - catchpoint: "" - # Algod Admin API Token + # Algod Admin API Token. Set the admin token to use fast catchup during + # startup. The importer checks to see if a catchup would help and if so + # the catchpoint label will be used. If no catchpoint is provided, the + # importer will automatically select one. admin-token: "" + # The catchpoint to use when running fast catchup. If this is set it + # overrides 'auto: true'. To select an appropriate catchpoint for your + # deployment, see the list of available catchpoints for each network: + # mainnet: https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/mainnet_catchpoints.txt + # betanet: https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/betanet_catchpoints.txt + # testnet: https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/testnet_catchpoints.txt + catchpoint: "" diff --git a/docs/tutorials/IndexerMigration.md b/docs/tutorials/IndexerMigration.md index 45e87bae..6200a4a6 100644 --- a/docs/tutorials/IndexerMigration.md +++ b/docs/tutorials/IndexerMigration.md @@ -26,7 +26,7 @@ graph LR; ledger["Local Ledger"] db["PostgreSQL DB"] restapi["REST API"] - + algod-->index; subgraph "Data Pipeline" index-->ledger; @@ -82,7 +82,6 @@ graph LR; ro2---psql; lb---ro3; ro3---psql; - ``` Because the database connection can only tolerate a single writer without having race conditions and/or deadlocks, Indexer offers a read-only mode which does not run the data pipeline and has no write access to the database. It's @@ -120,7 +119,6 @@ graph LR; ro2---psql; lb---ro3; ro3---psql; - ``` With this architecture you're free to do things like use filter processors to limit the size of your database--though @@ -133,7 +131,7 @@ it could fetch any block it needed, Conduit's algod importer is not required to what is called Follower mode in algod. You can take a look [here](https://github.com/algorand/go-algorand/blob/master/node/follower_node.go) if you're interested in looking at the source code for the follower node. -To run algod in Follower mode, it must be launched with `EnableFollowMode` set to `true` in the algod config. +To run algod in Follower mode, it must be launched with `EnableFollowMode` set to `true` in the algod config. Doing this will provide the new features listed below, but will remove the node's ability to participate in consensus (propose new blocks), or broadcast transactions to be included in new blocks. It will only, as hinted by its name, follow the chain. @@ -142,7 +140,7 @@ Follower mode algod provides 2 key features required by Conduit. ### Sync Rounds In order to remove the requirement of algod being Archival, we needed a way to ensure that algod would have the data available for the particular round that our Conduit pipeline was importing at any given time. We've done this through -the concept of a `sync` round. +the concept of a `sync` round. The sync round is a parameter which Conduit provides to algod. It specifies the particular round that we want algod to ensure is kept in the cache until we've finished running it through our Conduit pipeline. When a sync round is set on a @@ -183,7 +181,7 @@ fetch all required data directly from the algod node, and ensure that the node i ## Altering Architecture of Indexer Applications Let's take a look at a theoretical application more closely and see what in particular we need to do to alter it to use -Conduit. +Conduit. Here's a diagram of a containerized application (leaving out some details such as load balancers, and scaled out API nodes). @@ -196,7 +194,7 @@ graph LR; subgraph "Persistent Volume" ll["Local Ledger"]; end - + indexer-->algod; indexer-->psql; indexer-->ll; @@ -231,7 +229,7 @@ Follower nodes provide the same methods of catchup as regular algod nodes, but w recent ledger round upon startup. Thaht scenario will be covered in step 3. ### Step 2: Remove the Local Ledger -Because our Conduit pipeline will use the Follower node's state delta API, we no longer need our local ledger persistent +Because our Conduit pipeline will use the Follower node's state delta API, we no longer need our local ledger persistent volume. It can be removed. ### Step 3: Refactor our Indexer Writer to Conduit @@ -255,11 +253,12 @@ metrics: log-level: "INFO" importer: name: "algod", - config: + config: "netaddr": $ALGOD_ADDR, "token": $ALGOD_TOKEN, "mode": "follower", catchup-config: + auto: false catchpoint: "" admin-token: "" exporter: @@ -268,27 +267,23 @@ exporter: "connection-string": $PGSQL_CONNECTION_STRING, ``` -If your algod node needs to run fast catchup, you can fill in the catchup-config section. You'll need to first look up your Indexer round from the postgres database. The Indexer stores the latest round in the database, and you can read it via the `/health` endpoint. The result is formatted in json -so you can use jq to more easily see your Indexer's round (if your Indexer is listening locally on port 8980). -```bash -curl http://localhost:8980/health | jq '.round' -``` +If your algod node needs to run fast catchup, you can fill in the catchup-config section. You'll need to first look up your Admin API token from algod. It is in a file named `algod.admin.token`. To automatically lookup the appropriate catchpoint label, set `auto: true`. -Now that you can look up a catchpoint, conduit will run fast catchup on your node if a catchpoint is provided. Look up the closest catchpoint prior to the desired sync round. -For a list of catchpoints, you can reference the following: +Note: to manually lookup the catchpoint label, see the complete listings here: * [Mainnet](https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/mainnet_catchpoints.txt) * [Testnet](https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/testnet_catchpoints.txt) * [Betanet](https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/betanet_catchpoints.txt) -For example, if your postgres database is on round 25001234, use the following configuration: +Your `catchup-config` section should be updated to the following: ```yaml catchup-config: - catchpoint: "25000000#EOX5UYQV4IXTGYQCIDR7ZLUK6WZGDC5EG6PYQGBG6PBYNAQUPN2Q" + auto: true admin-token: "$ALGOD_ADMIN_TOKEN" ``` -Then run Conduit, `conduit -d $CONDUIT_DATA_DIR`! +With configuration complete, run Conduit: `conduit -d $CONDUIT_DATA_DIR` You can separately run your Indexer with `--no-algod` to connect your API to the database. If you configured a catchpoint, Conduit will facilitate a fast catchup during initialization. Once the catchpoint has been reached the node will resume normal catchup to advance from the catchpoint round to target round defined in postgres. The fast-catchup and catchup process may take anywhere from 30 minutes to over an hour depending on hardware and disk configurations. + diff --git a/docs/tutorials/IndexerWriter.md b/docs/tutorials/IndexerWriter.md index ecd1cb5e..c463bdd7 100644 --- a/docs/tutorials/IndexerWriter.md +++ b/docs/tutorials/IndexerWriter.md @@ -118,9 +118,10 @@ Configuration: * `importer.config.token`: the contents of `algod_data/algod.token` * `exporter.config.connection-string`: `host=localhost port=5555 user=algorand password=pgpass dbname=conduit` -If you are connecting to an existing PostgreSQL database, you can also set a -catchpoint and the admin token. If those are set Conduit will automatically -initialize the node using fast catchup. +If you are connecting to an existing PostgreSQL database, you can also set +the admin token and enable `auto`. You can optionally +specify the catchup label directly. If those are configured, Conduit will +coordinate initializing the node by using fast catchup. Review the inline documentation in `conduit.yml` and decide if there are any other settings you would like to update.