Skip to content

Commit

Permalink
algod importer: Add catchup functionality to algod importer (#32)
Browse files Browse the repository at this point in the history
* Add catchup functionality to algod importer
  • Loading branch information
Eric-Warehime committed Mar 31, 2023
1 parent 37b9b6a commit 442791a
Show file tree
Hide file tree
Showing 34 changed files with 563 additions and 321 deletions.
8 changes: 4 additions & 4 deletions cmd/conduit/internal/list/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

"github.com/spf13/cobra"

"github.com/algorand/conduit/conduit"
"github.com/algorand/conduit/conduit/pipeline"
"github.com/algorand/conduit/conduit/plugins"
)

// Command is the list command to embed in a root cobra command.
Expand All @@ -34,7 +34,7 @@ Example:
SilenceErrors: true,
}

func makeDetailsCommand(pluginType string, data func() []conduit.Metadata) *cobra.Command {
func makeDetailsCommand(pluginType string, data func() []plugins.Metadata) *cobra.Command {
return &cobra.Command{
Use: pluginType + "s",
Aliases: []string{pluginType},
Expand All @@ -57,7 +57,7 @@ func init() {
Command.AddCommand(makeDetailsCommand("exporter", pipeline.ExporterMetadata))
}

func printDetails(name string, plugins []conduit.Metadata) {
func printDetails(name string, plugins []plugins.Metadata) {
for _, data := range plugins {
if data.Name == name {
fmt.Println(data.SampleConfig)
Expand All @@ -68,7 +68,7 @@ func printDetails(name string, plugins []conduit.Metadata) {
fmt.Printf("Plugin not found: %s\n", name)
}

func printMetadata(w io.Writer, plugins []conduit.Metadata, leftIndent int) {
func printMetadata(w io.Writer, plugins []plugins.Metadata, leftIndent int) {
sort.Slice(plugins, func(i, j int) bool {
return plugins[i].Name < plugins[j].Name
})
Expand Down
1 change: 1 addition & 0 deletions conduit/data/block_export_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type RoundProvider interface {
// variables
type InitProvider interface {
GetGenesis() *sdk.Genesis
SetGenesis(*sdk.Genesis)
NextDBRound() sdk.Round
}

Expand Down
5 changes: 5 additions & 0 deletions conduit/init_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ func MakePipelineInitProvider(currentRound *sdk.Round, genesis *sdk.Genesis) *Pi
}
}

// SetGenesis updates the genesis block in the init provider
func (a *PipelineInitProvider) SetGenesis(genesis *sdk.Genesis) {
a.genesis = genesis
}

// GetGenesis produces genesis pointer
func (a *PipelineInitProvider) GetGenesis() *sdk.Genesis {
return a.genesis
Expand Down
15 changes: 0 additions & 15 deletions conduit/metadata.go

This file was deleted.

83 changes: 76 additions & 7 deletions conduit/pipeline/metadata.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,93 @@
package pipeline

// TODO: move this to plugins package after reorganizing plugin packages.

import (
"github.com/algorand/conduit/conduit"
"encoding/json"
"errors"
"fmt"
"os"
"path"

"github.com/algorand/conduit/conduit/plugins"
"github.com/algorand/conduit/conduit/plugins/exporters"
"github.com/algorand/conduit/conduit/plugins/importers"
"github.com/algorand/conduit/conduit/plugins/processors"
)

// state contains the pipeline state.
type state struct {
GenesisHash string `json:"genesis-hash"`
Network string `json:"network"`
NextRound uint64 `json:"next-round"`
}

// encodeToFile writes the state object to the dataDir
func (s *state) encodeToFile(dataDir string) error {
pipelineMetadataFilePath := metadataPath(dataDir)
tempFilename := fmt.Sprintf("%s.temp", pipelineMetadataFilePath)
file, err := os.Create(tempFilename)
if err != nil {
return fmt.Errorf("encodeMetadataToFile(): failed to create temp metadata file: %w", err)
}
defer file.Close()
err = json.NewEncoder(file).Encode(s)
if err != nil {
return fmt.Errorf("encodeMetadataToFile(): failed to write temp metadata: %w", err)
}

err = os.Rename(tempFilename, pipelineMetadataFilePath)
if err != nil {
return fmt.Errorf("encodeMetadataToFile(): failed to replace metadata file: %w", err)
}
return nil
}

// readBlockMetadata attempts to deserialize state from the provided directory
func readBlockMetadata(dataDir string) (state, error) {
var metadata state
pipelineMetadataFilePath := metadataPath(dataDir)
populated, err := isFilePopulated(pipelineMetadataFilePath)
if err != nil || !populated {
return metadata, err
}
var data []byte
data, err = os.ReadFile(pipelineMetadataFilePath)
if err != nil {
return metadata, fmt.Errorf("error reading metadata: %w", err)
}
err = json.Unmarshal(data, &metadata)
if err != nil {
return metadata, fmt.Errorf("error reading metadata: %w", err)
}
return metadata, err
}

// isFilePopulated returns a bool denoting whether the file at path exists w/ non-zero size
func isFilePopulated(path string) (bool, error) {
stat, err := os.Stat(path)
if errors.Is(err, os.ErrNotExist) || (stat != nil && stat.Size() == 0) {
return false, nil
}
if err != nil {
return false, err
}
return true, err
}

// metadataPath returns the canonical path for the serialized pipeline state
func metadataPath(dataDir string) string {
return path.Join(dataDir, "metadata.json")
}

// AllMetadata gets a slice with metadata from all registered plugins.
func AllMetadata() (results []conduit.Metadata) {
func AllMetadata() (results []plugins.Metadata) {
results = append(results, ImporterMetadata()...)
results = append(results, ProcessorMetadata()...)
results = append(results, ExporterMetadata()...)
return
}

// ImporterMetadata gets a slice with metadata for all importers.Importer plugins.
func ImporterMetadata() (results []conduit.Metadata) {
func ImporterMetadata() (results []plugins.Metadata) {
for _, constructor := range importers.Importers {
plugin := constructor.New()
results = append(results, plugin.Metadata())
Expand All @@ -27,7 +96,7 @@ func ImporterMetadata() (results []conduit.Metadata) {
}

// ProcessorMetadata gets a slice with metadata for all importers.Processor plugins.
func ProcessorMetadata() (results []conduit.Metadata) {
func ProcessorMetadata() (results []plugins.Metadata) {
for _, constructor := range processors.Processors {
plugin := constructor.New()
results = append(results, plugin.Metadata())
Expand All @@ -36,7 +105,7 @@ func ProcessorMetadata() (results []conduit.Metadata) {
}

// ExporterMetadata gets a slice with metadata for all importers.Exporter plugins.
func ExporterMetadata() (results []conduit.Metadata) {
func ExporterMetadata() (results []plugins.Metadata) {
for _, constructor := range exporters.Exporters {
plugin := constructor.New()
results = append(results, plugin.Metadata())
Expand Down
46 changes: 42 additions & 4 deletions conduit/pipeline/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"gopkg.in/yaml.v3"
)

import (
_ "github.com/algorand/conduit/conduit/plugins/exporters/all"
_ "github.com/algorand/conduit/conduit/plugins/exporters/example"
_ "github.com/algorand/conduit/conduit/plugins/importers/all"
_ "github.com/algorand/conduit/conduit/plugins/processors/all"
"github.com/algorand/indexer/conduit/pipeline"
)

// TestSamples ensures that all plugins contain a sample file with valid yaml.
Expand All @@ -22,9 +20,49 @@ func TestSamples(t *testing.T) {
data := data
t.Run(data.Name, func(t *testing.T) {
t.Parallel()
var config NameConfigPair
var config pipeline.NameConfigPair
assert.NoError(t, yaml.Unmarshal([]byte(data.SampleConfig), &config))
assert.Equal(t, data.Name, config.Name)
})
}
}

// TestBlockMetaDataFile tests that metadata.json file is created as expected
func TestBlockMetaDataFile(t *testing.T) {
datadir := t.TempDir()
pipelineMetadata := state{
NextRound: 3,
}

// Test the file is not created yet
populated, err := isFilePopulated(metadataPath(datadir))
assert.NoError(t, err)
assert.False(t, populated)

// Write the file
err = pipelineMetadata.encodeToFile(datadir)
assert.NoError(t, err)

// Test that file is created
populated, err = isFilePopulated(metadataPath(datadir))
assert.NoError(t, err)
assert.True(t, populated)

// Test that file loads correctly
metaData, err := readBlockMetadata(datadir)
assert.NoError(t, err)
assert.Equal(t, pipelineMetadata.GenesisHash, metaData.GenesisHash)
assert.Equal(t, pipelineMetadata.NextRound, metaData.NextRound)
assert.Equal(t, pipelineMetadata.Network, metaData.Network)

// Test that file encodes correctly
pipelineMetadata.GenesisHash = "HASH"
pipelineMetadata.NextRound = 7
err = pipelineMetadata.encodeToFile(datadir)
assert.NoError(t, err)
metaData, err = readBlockMetadata(datadir)
assert.NoError(t, err)
assert.Equal(t, "HASH", metaData.GenesisHash)
assert.Equal(t, uint64(7), metaData.NextRound)
assert.Equal(t, pipelineMetadata.Network, metaData.Network)
}
Loading

0 comments on commit 442791a

Please sign in to comment.