Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

checkpoint: improve HTTP error handling #6130

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions checkpoint/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"net/url"
"path/filepath"
"slices"
"time"

"github.com/spf13/afero"
"go.uber.org/zap"
Expand Down Expand Up @@ -39,22 +40,31 @@

// set to false if atxs are not compatible before and after the checkpoint recovery.
PreserveOwnAtx bool `mapstructure:"preserve-own-atx"`

RetryMax int `mapstructure:"retry-max"`
RetryDelay time.Duration `mapstructure:"retry-delay"`
IgnoreCheckpointReqErrors bool `mapstructure:"ignore-checkpoint-req-errors"`
}

func DefaultConfig() Config {
return Config{
PreserveOwnAtx: true,
RetryMax: 5,
RetryDelay: 3 * time.Second,
}
}

type RecoverConfig struct {
GoldenAtx types.ATXID
DataDir string
DbFile string
LocalDbFile string
NodeIDs []types.NodeID // IDs to preserve own ATXs
Uri string
Restore types.LayerID
GoldenAtx types.ATXID
DataDir string
DbFile string
LocalDbFile string
NodeIDs []types.NodeID // IDs to preserve own ATXs
Uri string
Restore types.LayerID
RetryMax int
RetryDelay time.Duration
IgnoreCheckpointReqErrors bool
}

func (c *RecoverConfig) DbPath() string {
Expand All @@ -75,6 +85,8 @@
fs afero.Fs,
dataDir, uri string,
restore types.LayerID,
retryMax int,
retryDelay time.Duration,
) (string, error) {
parsed, err := url.Parse(uri)
if err != nil {
Expand All @@ -89,9 +101,10 @@
logger.Info("old recovery data backed up", log.ZContext(ctx), zap.String("dir", bdir))
}
dst := RecoveryFilename(dataDir, filepath.Base(parsed.String()), restore)
if err = httpToLocalFile(ctx, parsed, fs, dst); err != nil {
if err = httpToLocalFile(ctx, parsed, fs, dst, retryMax, retryDelay); err != nil {
return "", err
}

logger.Info("checkpoint data persisted", log.ZContext(ctx), zap.String("file", dst))
return dst, nil
}
Expand Down Expand Up @@ -144,10 +157,14 @@
case errors.Is(err, ErrCheckpointNotFound):
logger.Info("no checkpoint file available. not recovering", zap.String("uri", cfg.Uri))
return nil, nil
case err != nil:
case err == nil:
return preserve, nil
case cfg.IgnoreCheckpointReqErrors && errors.Is(err, ErrCheckpointRequestFailed):
logger.Error("ignoring checkpoint request error", zap.Error(err))
return nil, nil
default:

Check warning on line 165 in checkpoint/recovery.go

View check run for this annotation

Codecov / codecov/patch

checkpoint/recovery.go#L162-L165

Added lines #L162 - L165 were not covered by tests
return nil, err
}
return preserve, nil
}

func RecoverWithDb(
Expand All @@ -170,7 +187,10 @@
return nil, fmt.Errorf("remove old bootstrap data: %w", err)
}
logger.Info("recover from uri", zap.String("uri", cfg.Uri))
cpFile, err := copyToLocalFile(ctx, logger, fs, cfg.DataDir, cfg.Uri, cfg.Restore)
cpFile, err := copyToLocalFile(
ctx, logger, fs, cfg.DataDir, cfg.Uri, cfg.Restore,
cfg.RetryMax, cfg.RetryDelay,
)
fasmat marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
Expand Down
42 changes: 35 additions & 7 deletions checkpoint/recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
"net/http/httptest"
"path/filepath"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -119,33 +120,57 @@ func verifyDbContent(tb testing.TB, db *sql.Database) {
require.Empty(tb, extra)
}

func checkpointServer(t testing.TB) string {
func checkpointServerWithFaultInjection(t testing.TB, succeed func() bool) string {
mux := http.NewServeMux()
mux.HandleFunc("GET /snapshot-15", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(checkpointData))
if succeed == nil || succeed() {
w.Write([]byte(checkpointData))
} else {
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("service unavailable"))
}
})
ts := httptest.NewServer(mux)
t.Cleanup(ts.Close)
return ts.URL
}

func checkpointServer(t testing.TB) string {
return checkpointServerWithFaultInjection(t, nil)
}

func TestRecover(t *testing.T) {
fasmat marked this conversation as resolved.
Show resolved Hide resolved
t.Parallel()
url := checkpointServer(t)
var fail atomic.Bool
url := checkpointServerWithFaultInjection(t, func() bool {
// 2nd attempt will always succeed
return !fail.Swap(false)
})

tt := []struct {
name string
uri string
expErr error
name string
uri string
expErr error
reqFail bool
}{
{
name: "http",
uri: fmt.Sprintf("%s/snapshot-15", url),
},
{
name: "http+retry",
uri: fmt.Sprintf("%s/snapshot-15", url),
reqFail: true,
},
{
name: "not found",
uri: fmt.Sprintf("%s/snapshot-42", url),
expErr: checkpoint.ErrCheckpointNotFound,
},
{
name: "url unreachable",
uri: "http://nowhere/snapshot-15",
expErr: checkpoint.ErrCheckpointNotFound,
expErr: checkpoint.ErrCheckpointRequestFailed,
},
{
name: "ftp",
Expand All @@ -156,6 +181,7 @@ func TestRecover(t *testing.T) {

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
fail.Store(tc.reqFail)
fs := afero.NewMemMapFs()
cfg := &checkpoint.RecoverConfig{
GoldenAtx: goldenAtx,
Expand All @@ -165,6 +191,8 @@ func TestRecover(t *testing.T) {
NodeIDs: []types.NodeID{types.RandomNodeID()},
Uri: tc.uri,
Restore: types.LayerID(recoverLayer),
RetryMax: 5,
RetryDelay: 100 * time.Millisecond,
}
bsdir := filepath.Join(cfg.DataDir, bootstrap.DirName)
require.NoError(t, fs.MkdirAll(bsdir, 0o700))
Expand Down
42 changes: 30 additions & 12 deletions checkpoint/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"path/filepath"
"time"

"github.com/hashicorp/go-retryablehttp"
"github.com/santhosh-tekuri/jsonschema/v5"
"github.com/spf13/afero"

Expand All @@ -24,8 +25,9 @@
)

var (
ErrCheckpointNotFound = errors.New("checkpoint not found")
ErrUrlSchemeNotSupported = errors.New("url scheme not supported")
ErrCheckpointNotFound = errors.New("checkpoint not found")
ErrCheckpointRequestFailed = errors.New("checkpoint request failed")
ErrUrlSchemeNotSupported = errors.New("url scheme not supported")
)

type RecoveryFile struct {
Expand Down Expand Up @@ -109,22 +111,38 @@
return rf.Copy(fs, srcf)
}

func httpToLocalFile(ctx context.Context, resource *url.URL, fs afero.Fs, dst string) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, resource.String(), nil)
func httpToLocalFile(
ctx context.Context,
resource *url.URL,
fs afero.Fs,
dst string,
retryMax int,
retryDelay time.Duration,
) error {
c := retryablehttp.NewClient()
c.RetryMax = retryMax
c.RetryWaitMin = retryDelay
c.RetryWaitMax = retryDelay * 2
c.Backoff = retryablehttp.LinearJitterBackoff

req, err := retryablehttp.NewRequestWithContext(ctx, http.MethodGet, resource.String(), nil)
if err != nil {
return fmt.Errorf("create http request: %w", err)
}
resp, err := (&http.Client{}).Do(req)
urlErr := &url.Error{}
switch {
case errors.As(err, &urlErr):
return ErrCheckpointNotFound
case err != nil:
return fmt.Errorf("http get recovery file: %w", err)

resp, err := c.Do(req)
if err != nil {
// This shouldn't really happen. According to net/http docs for Do:
// "Any returned error will be of type *url.Error."
return fmt.Errorf("%w: %w", ErrCheckpointRequestFailed, err)
fasmat marked this conversation as resolved.
Show resolved Hide resolved
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
switch resp.StatusCode {
case http.StatusOK:
case http.StatusNotFound:
return ErrCheckpointNotFound
default:
return fmt.Errorf("%w: status code %d", ErrCheckpointRequestFailed, resp.StatusCode)

Check warning on line 145 in checkpoint/util.go

View check run for this annotation

Codecov / codecov/patch

checkpoint/util.go#L144-L145

Added lines #L144 - L145 were not covered by tests
}
rf, err := NewRecoveryFile(fs, dst)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ func AddFlags(flagSet *pflag.FlagSet, cfg *config.Config) (configPath *string) {
"recovery-uri", cfg.Recovery.Uri, "reset the node state based on the supplied checkpoint file")
flagSet.Uint32Var(&cfg.Recovery.Restore,
"recovery-layer", cfg.Recovery.Restore, "restart the mesh with the checkpoint file at this layer")
flagSet.BoolVar(&cfg.Recovery.IgnoreCheckpointReqErrors,
"ignore-checkpoint-req-errors", cfg.Recovery.IgnoreCheckpointReqErrors,
"ignore checkpoint request errors")

/** ======================== BaseConfig Flags ========================== **/
flagSet.StringVarP(&cfg.BaseConfig.DataDirParent, "data-folder", "d",
Expand Down
17 changes: 10 additions & 7 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,13 +436,16 @@
}
}
cfg := &checkpoint.RecoverConfig{
GoldenAtx: types.ATXID(app.Config.Genesis.GoldenATX()),
DataDir: app.Config.DataDir(),
DbFile: dbFile,
LocalDbFile: localDbFile,
NodeIDs: nodeIDs,
Uri: app.Config.Recovery.Uri,
Restore: types.LayerID(app.Config.Recovery.Restore),
GoldenAtx: types.ATXID(app.Config.Genesis.GoldenATX()),
DataDir: app.Config.DataDir(),
DbFile: dbFile,
LocalDbFile: localDbFile,
NodeIDs: nodeIDs,
Uri: app.Config.Recovery.Uri,
Restore: types.LayerID(app.Config.Recovery.Restore),
RetryMax: app.Config.Recovery.RetryMax,
RetryDelay: app.Config.Recovery.RetryDelay,
IgnoreCheckpointReqErrors: app.Config.Recovery.IgnoreCheckpointReqErrors,

Check warning on line 448 in node/node.go

View check run for this annotation

Codecov / codecov/patch

node/node.go#L439-L448

Added lines #L439 - L448 were not covered by tests
}

return checkpoint.Recover(ctx, app.log.Zap(), afero.NewOsFs(), cfg)
Expand Down
7 changes: 6 additions & 1 deletion systest/cluster/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,8 @@ func deployNode(
WithStartupProbe(
corev1.Probe().WithTCPSocket(
corev1.TCPSocketAction().WithPort(intstr.FromInt32(9092)),
).WithInitialDelaySeconds(10).WithPeriodSeconds(10),
).WithInitialDelaySeconds(10).WithPeriodSeconds(10).
WithFailureThreshold(10),
).
WithEnv(
corev1.EnvVar().WithName("GOMAXPROCS").WithValue("4"),
Expand Down Expand Up @@ -1223,6 +1224,10 @@ func CheckpointUrl(endpoint string) DeploymentFlag {
return DeploymentFlag{Name: "--recovery-uri", Value: endpoint}
}

func IgnoreCheckpointReqErrors() DeploymentFlag {
return DeploymentFlag{Name: "--ignore-checkpoint-req-errors", Value: strconv.FormatBool(true)}
}

func CheckpointLayer(restoreLayer uint32) DeploymentFlag {
return DeploymentFlag{Name: "--recovery-layer", Value: strconv.Itoa(int(restoreLayer))}
}
Expand Down
1 change: 1 addition & 0 deletions systest/tests/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func reuseCluster(tctx *testcontext.Context, restoreLayer uint32) (*cluster.Clus
cluster.WithBootstrapEpochs([]int{2, 4, 5}),
cluster.WithSmesherFlag(cluster.CheckpointUrl(fmt.Sprintf("%s/checkpoint", cluster.BootstrapperEndpoint(0)))),
cluster.WithSmesherFlag(cluster.CheckpointLayer(restoreLayer)),
cluster.WithSmesherFlag(cluster.IgnoreCheckpointReqErrors()),
)
}

Expand Down
Loading