Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

checkpoint: improve HTTP error handling #6130

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 32 additions & 18 deletions checkpoint/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"net/url"
"path/filepath"
"slices"
"time"

"github.com/spf13/afero"
"go.uber.org/zap"
Expand Down Expand Up @@ -39,22 +40,31 @@

// set to false if atxs are not compatible before and after the checkpoint recovery.
PreserveOwnAtx bool `mapstructure:"preserve-own-atx"`

RetryMax int `mapstructure:"retry-max"`
RetryDelay time.Duration `mapstructure:"retry-delay"`
IgnoreCheckpointReqErrors bool `mapstructure:"ignore-checkpoint-req-errors"`
}

func DefaultConfig() Config {
return Config{
PreserveOwnAtx: true,
RetryMax: 5,
RetryDelay: 3 * time.Second,
}
}

type RecoverConfig struct {
GoldenAtx types.ATXID
DataDir string
DbFile string
LocalDbFile string
NodeIDs []types.NodeID // IDs to preserve own ATXs
Uri string
Restore types.LayerID
GoldenAtx types.ATXID
DataDir string
DbFile string
LocalDbFile string
NodeIDs []types.NodeID // IDs to preserve own ATXs
Uri string
Restore types.LayerID
RetryMax int
RetryDelay time.Duration
IgnoreCheckpointReqErrors bool
}

func (c *RecoverConfig) DbPath() string {
Expand All @@ -73,25 +83,25 @@
ctx context.Context,
logger *zap.Logger,
fs afero.Fs,
dataDir, uri string,
restore types.LayerID,
cfg *RecoverConfig,
) (string, error) {
parsed, err := url.Parse(uri)
parsed, err := url.Parse(cfg.Uri)
if err != nil {
return "", fmt.Errorf("%w: parse recovery URI %v", err, uri)
return "", fmt.Errorf("%w: parse recovery URI %v", err, cfg.Uri)

Check warning on line 90 in checkpoint/recovery.go

View check run for this annotation

Codecov / codecov/patch

checkpoint/recovery.go#L90

Added line #L90 was not covered by tests
}
if parsed.Scheme != "http" && parsed.Scheme != "https" {
return "", fmt.Errorf("%w: %s", ErrUrlSchemeNotSupported, uri)
return "", fmt.Errorf("%w: %s", ErrUrlSchemeNotSupported, cfg.Uri)
}
if bdir, err := backupRecovery(fs, RecoveryDir(dataDir)); err != nil {
if bdir, err := backupRecovery(fs, RecoveryDir(cfg.DataDir)); err != nil {
return "", err
} else if bdir != "" {
logger.Info("old recovery data backed up", log.ZContext(ctx), zap.String("dir", bdir))
}
dst := RecoveryFilename(dataDir, filepath.Base(parsed.String()), restore)
if err = httpToLocalFile(ctx, parsed, fs, dst); err != nil {
dst := RecoveryFilename(cfg.DataDir, filepath.Base(parsed.String()), cfg.Restore)
if err = httpToLocalFile(ctx, parsed, fs, dst, cfg.RetryMax, cfg.RetryDelay); err != nil {
return "", err
}

logger.Info("checkpoint data persisted", log.ZContext(ctx), zap.String("file", dst))
return dst, nil
}
Expand Down Expand Up @@ -144,10 +154,14 @@
case errors.Is(err, ErrCheckpointNotFound):
logger.Info("no checkpoint file available. not recovering", zap.String("uri", cfg.Uri))
return nil, nil
case err != nil:
case err == nil:
return preserve, nil
case cfg.IgnoreCheckpointReqErrors && errors.Is(err, ErrCheckpointRequestFailed):
logger.Error("ignoring checkpoint request error", zap.Error(err))
return nil, nil
default:

Check warning on line 162 in checkpoint/recovery.go

View check run for this annotation

Codecov / codecov/patch

checkpoint/recovery.go#L159-L162

Added lines #L159 - L162 were not covered by tests
return nil, err
}
return preserve, nil
}

func RecoverWithDb(
Expand All @@ -170,7 +184,7 @@
return nil, fmt.Errorf("remove old bootstrap data: %w", err)
}
logger.Info("recover from uri", zap.String("uri", cfg.Uri))
cpFile, err := copyToLocalFile(ctx, logger, fs, cfg.DataDir, cfg.Uri, cfg.Restore)
cpFile, err := copyToLocalFile(ctx, logger, fs, cfg)
if err != nil {
return nil, err
}
Expand Down
160 changes: 107 additions & 53 deletions checkpoint/recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"net/http"
"net/http/httptest"
"path/filepath"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -131,65 +133,117 @@ func checkpointServer(t testing.TB) string {

func TestRecover(t *testing.T) {
fasmat marked this conversation as resolved.
Show resolved Hide resolved
t.Parallel()
url := checkpointServer(t)

tt := []struct {
name string
uri string
expErr error
}{
{
name: "http",
uri: fmt.Sprintf("%s/snapshot-15", url),
},
{
name: "url unreachable",
uri: "http://nowhere/snapshot-15",
expErr: checkpoint.ErrCheckpointNotFound,
},
{
name: "ftp",
uri: "ftp://snapshot-15",
expErr: checkpoint.ErrUrlSchemeNotSupported,
},
setup := func(t *testing.T, handler http.HandlerFunc) (afero.Fs, *checkpoint.RecoverConfig) {
t.Helper()
fs := afero.NewMemMapFs()

mux := http.NewServeMux()
mux.HandleFunc("GET /snapshot-15", handler)
ts := httptest.NewServer(mux)
t.Cleanup(ts.Close)
cfg := &checkpoint.RecoverConfig{
GoldenAtx: goldenAtx,
DataDir: t.TempDir(),
DbFile: "test.sql",
LocalDbFile: "local.sql",
NodeIDs: []types.NodeID{types.RandomNodeID()},
Uri: fmt.Sprintf("%s/snapshot-15", ts.URL),
Restore: types.LayerID(recoverLayer),
RetryMax: 5,
RetryDelay: 100 * time.Millisecond,
}
require.NoError(t, fs.MkdirAll(filepath.Join(cfg.DataDir, bootstrap.DirName), 0o700))
return fs, cfg
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
fs := afero.NewMemMapFs()
cfg := &checkpoint.RecoverConfig{
GoldenAtx: goldenAtx,
DataDir: t.TempDir(),
DbFile: "test.sql",
LocalDbFile: "local.sql",
NodeIDs: []types.NodeID{types.RandomNodeID()},
Uri: tc.uri,
Restore: types.LayerID(recoverLayer),
}
bsdir := filepath.Join(cfg.DataDir, bootstrap.DirName)
require.NoError(t, fs.MkdirAll(bsdir, 0o700))
db := sql.InMemory()
localDB := localsql.InMemory()
data, err := checkpoint.RecoverWithDb(context.Background(), zaptest.NewLogger(t), db, localDB, fs, cfg)
if tc.expErr != nil {
require.ErrorIs(t, err, tc.expErr)
t.Run("http", func(t *testing.T) {
db := sql.InMemory()
localDB := localsql.InMemory()
fs, cfg := setup(t, func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(checkpointData))
})

data, err := checkpoint.RecoverWithDb(context.Background(), zaptest.NewLogger(t), db, localDB, fs, cfg)
require.NoError(t, err)
require.Nil(t, data)

newDB, err := sql.Open("file:" + filepath.Join(cfg.DataDir, cfg.DbFile))
require.NoError(t, err)
require.NotNil(t, newDB)
defer newDB.Close()
verifyDbContent(t, newDB)

restore, err := recovery.CheckpointInfo(newDB)
require.NoError(t, err)
require.EqualValues(t, recoverLayer, restore)
})

t.Run("http+retry", func(t *testing.T) {
t.Parallel()
db := sql.InMemory()
localDB := localsql.InMemory()
var fail atomic.Bool
fail.Store(true)
fs, cfg := setup(t, func(w http.ResponseWriter, r *http.Request) {
if fail.CompareAndSwap(true, false) { // fail on first request
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("service unavailable"))
return
}
require.NoError(t, err)
require.Nil(t, data)
newDB, err := sql.Open("file:" + filepath.Join(cfg.DataDir, cfg.DbFile))
require.NoError(t, err)
require.NotNil(t, newDB)
defer newDB.Close()
verifyDbContent(t, newDB)
restore, err := recovery.CheckpointInfo(newDB)
require.NoError(t, err)
require.EqualValues(t, recoverLayer, restore)
exist, err := afero.Exists(fs, bsdir)
require.NoError(t, err)
require.False(t, exist)
w.Write([]byte(checkpointData))
})
}

data, err := checkpoint.RecoverWithDb(context.Background(), zaptest.NewLogger(t), db, localDB, fs, cfg)
require.NoError(t, err)
require.Nil(t, data)

newDB, err := sql.Open("file:" + filepath.Join(cfg.DataDir, cfg.DbFile))
require.NoError(t, err)
require.NotNil(t, newDB)
defer newDB.Close()
verifyDbContent(t, newDB)

restore, err := recovery.CheckpointInfo(newDB)
require.NoError(t, err)
require.EqualValues(t, recoverLayer, restore)
})

t.Run("not found", func(t *testing.T) {
t.Parallel()
db := sql.InMemory()
localDB := localsql.InMemory()
fs, cfg := setup(t, func(w http.ResponseWriter, r *http.Request) {})
cfg.Uri = strings.Replace(cfg.Uri, "/snapshot-15", "/snapshot-42", -1) // unavailable snapshot

data, err := checkpoint.RecoverWithDb(context.Background(), zaptest.NewLogger(t), db, localDB, fs, cfg)
require.ErrorIs(t, err, checkpoint.ErrCheckpointNotFound)
require.Nil(t, data)
})

t.Run("url unreachable", func(t *testing.T) {
t.Parallel()
db := sql.InMemory()
localDB := localsql.InMemory()
fs, cfg := setup(t, func(w http.ResponseWriter, r *http.Request) {})
cfg.Uri = "http://nowhere/snapshot-15" // unreachable url

data, err := checkpoint.RecoverWithDb(context.Background(), zaptest.NewLogger(t), db, localDB, fs, cfg)
require.ErrorIs(t, err, checkpoint.ErrCheckpointRequestFailed)
require.Nil(t, data)
})

t.Run("unsupported scheme", func(t *testing.T) {
t.Parallel()
db := sql.InMemory()
localDB := localsql.InMemory()
fs, cfg := setup(t, func(w http.ResponseWriter, r *http.Request) {})
cfg.Uri = "ftp://snapshot-15" // unsupported scheme

data, err := checkpoint.RecoverWithDb(context.Background(), zaptest.NewLogger(t), db, localDB, fs, cfg)
require.ErrorIs(t, err, checkpoint.ErrUrlSchemeNotSupported)
require.Nil(t, data)
})
}

func TestRecover_SameRecoveryInfo(t *testing.T) {
Expand Down
40 changes: 28 additions & 12 deletions checkpoint/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"path/filepath"
"time"

"github.com/hashicorp/go-retryablehttp"
"github.com/santhosh-tekuri/jsonschema/v5"
"github.com/spf13/afero"

Expand All @@ -24,8 +25,9 @@
)

var (
ErrCheckpointNotFound = errors.New("checkpoint not found")
ErrUrlSchemeNotSupported = errors.New("url scheme not supported")
ErrCheckpointNotFound = errors.New("checkpoint not found")
ErrCheckpointRequestFailed = errors.New("checkpoint request failed")
ErrUrlSchemeNotSupported = errors.New("url scheme not supported")
)

type RecoveryFile struct {
Expand Down Expand Up @@ -109,22 +111,36 @@
return rf.Copy(fs, srcf)
}

func httpToLocalFile(ctx context.Context, resource *url.URL, fs afero.Fs, dst string) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, resource.String(), nil)
func httpToLocalFile(
ctx context.Context,
resource *url.URL,
fs afero.Fs,
dst string,
retryMax int,
retryDelay time.Duration,
) error {
c := retryablehttp.NewClient()
c.RetryMax = retryMax
c.RetryWaitMin = retryDelay
c.RetryWaitMax = retryDelay * 2
c.Backoff = retryablehttp.LinearJitterBackoff

req, err := retryablehttp.NewRequestWithContext(ctx, http.MethodGet, resource.String(), nil)
if err != nil {
return fmt.Errorf("create http request: %w", err)
}
resp, err := (&http.Client{}).Do(req)
urlErr := &url.Error{}
switch {
case errors.As(err, &urlErr):
return ErrCheckpointNotFound
case err != nil:
return fmt.Errorf("http get recovery file: %w", err)

resp, err := c.Do(req)
if err != nil {
return fmt.Errorf("%w: %w", ErrCheckpointRequestFailed, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
switch resp.StatusCode {
case http.StatusOK:
case http.StatusNotFound:
return ErrCheckpointNotFound
default:
return fmt.Errorf("%w: status code %d", ErrCheckpointRequestFailed, resp.StatusCode)

Check warning on line 143 in checkpoint/util.go

View check run for this annotation

Codecov / codecov/patch

checkpoint/util.go#L142-L143

Added lines #L142 - L143 were not covered by tests
}
rf, err := NewRecoveryFile(fs, dst)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ func AddFlags(flagSet *pflag.FlagSet, cfg *config.Config) (configPath *string) {
"recovery-uri", cfg.Recovery.Uri, "reset the node state based on the supplied checkpoint file")
flagSet.Uint32Var(&cfg.Recovery.Restore,
"recovery-layer", cfg.Recovery.Restore, "restart the mesh with the checkpoint file at this layer")
flagSet.BoolVar(&cfg.Recovery.IgnoreCheckpointReqErrors,
"ignore-checkpoint-req-errors", cfg.Recovery.IgnoreCheckpointReqErrors,
"ignore checkpoint request errors")

/** ======================== BaseConfig Flags ========================== **/
flagSet.StringVarP(&cfg.BaseConfig.DataDirParent, "data-folder", "d",
Expand Down
17 changes: 10 additions & 7 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,13 +436,16 @@
}
}
cfg := &checkpoint.RecoverConfig{
GoldenAtx: types.ATXID(app.Config.Genesis.GoldenATX()),
DataDir: app.Config.DataDir(),
DbFile: dbFile,
LocalDbFile: localDbFile,
NodeIDs: nodeIDs,
Uri: app.Config.Recovery.Uri,
Restore: types.LayerID(app.Config.Recovery.Restore),
GoldenAtx: types.ATXID(app.Config.Genesis.GoldenATX()),
DataDir: app.Config.DataDir(),
DbFile: dbFile,
LocalDbFile: localDbFile,
NodeIDs: nodeIDs,
Uri: app.Config.Recovery.Uri,
Restore: types.LayerID(app.Config.Recovery.Restore),
RetryMax: app.Config.Recovery.RetryMax,
RetryDelay: app.Config.Recovery.RetryDelay,
IgnoreCheckpointReqErrors: app.Config.Recovery.IgnoreCheckpointReqErrors,

Check warning on line 448 in node/node.go

View check run for this annotation

Codecov / codecov/patch

node/node.go#L439-L448

Added lines #L439 - L448 were not covered by tests
}

return checkpoint.Recover(ctx, app.log.Zap(), afero.NewOsFs(), cfg)
Expand Down
Loading
Loading