diff --git a/catalog/cataloger.go b/catalog/cataloger.go index b314573ae83..a4fdc7fb4eb 100644 --- a/catalog/cataloger.go +++ b/catalog/cataloger.go @@ -168,6 +168,19 @@ type ExportConfigurator interface { PutExportConfiguration(repository string, branch string, conf *ExportConfiguration) error } +type ExportStateHandler interface { + // ExportMarkStart starts an export operation on branch of repo and returns the ref of + // the previous export. If the previous export failed it returns ErrExportFailed. If + // another export is running return state ExportStatusInProgress -- and caller should + // clean it up by removing and adding the "next export" withint this transaction. If + // another transaction concurrently runs ExportMarkStart on branchID, one blocks until + // the other is done. + ExportMarkStart(tx db.Tx, repo string, branch string, newRef string) (string, CatalogBranchExportStatus, error) + // Delete any export state for repo. Mostly useful in tests: in a living system the + // export state is part of the state of the world. + ExportStateDelete(tx db.Tx, repo string, branch string) error +} + type Cataloger interface { RepositoryCataloger BranchCataloger @@ -178,6 +191,7 @@ type Cataloger interface { Merger Hookser ExportConfigurator + ExportStateHandler io.Closer } diff --git a/catalog/cataloger_export.go b/catalog/cataloger_export.go index be3819e10e2..bcfaec4cbcf 100644 --- a/catalog/cataloger_export.go +++ b/catalog/cataloger_export.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/georgysavva/scany/pgxscan" + "github.com/jackc/pgx/v4" "github.com/lib/pq" "github.com/treeverse/lakefs/db" ) @@ -101,7 +102,7 @@ func (c *cataloger) GetExportConfigurations() ([]ExportConfigurationForBranch, e e.export_path export_path, e.export_status_path export_status_path, e.last_keys_in_prefix_regexp last_keys_in_prefix_regexp FROM catalog_branches_export e JOIN catalog_branches b ON e.branch_id = b.id - JOIN catalog_repositories r ON b.repository_id = r.id`) + JOIN catalog_repositories r ON b.repository_id = r.id`) if err != nil { return nil, err } @@ -133,3 +134,68 @@ func (c *cataloger) PutExportConfiguration(repository string, branch string, con }) return err } + +type errExportFailed struct { + Message string // Error string reported in database +} + +func (e errExportFailed) Error() string { + return e.Message +} + +var ErrExportFailed = errExportFailed{} + +func (c *cataloger) ExportMarkStart(tx db.Tx, repo string, branch string, newRef string) (oldRef string, state CatalogBranchExportStatus, err error) { + var res struct { + CurrentRef string + State CatalogBranchExportStatus + ErrorMessage string + } + branchID, err := c.getBranchIDCache(tx, repo, branch) + if err != nil { + return + } + err = tx.Get(&res, ` + SELECT current_ref, state, error_message + FROM catalog_branches_export_state + WHERE branch_id=$1 FOR UPDATE`, + branchID) + if err != nil && !errors.Is(err, ErrEntryNotFound) { + err = fmt.Errorf("ExportMarkStart: failed to get existing state: %w", err) + return + } + oldRef = res.CurrentRef + state = res.State + + tag, err := tx.Exec(` + UPDATE catalog_branches + SET current_ref=$2, state='in-progress', error_message=NULL + WHERE branch_id=$1`, + branchID, newRef) + if err != nil { + return + } + if tag.RowsAffected() != 1 { + err = fmt.Errorf("[I] ExportMarkStart: Updated %d rows instead of just 1: %w", pgx.ErrNoRows, err) + return + } + if state == ExportStatusFailed { + err = errExportFailed{res.ErrorMessage} + } + return +} + +func (c *cataloger) ExportStateDelete(tx db.Tx, repo string, branch string) error { + branchID, err := c.getBranchIDCache(tx, repo, branch) + if err != nil { + return err + } + tag, err := tx.Exec(`DELETE FROM catalog_branches_export_state WHERE branch_id=$1`, branchID) + if err != nil { + return err + } + if tag.RowsAffected() != 1 { + return fmt.Errorf("[I] ExportStateDelete: deleted %d rows instead of just 1: %w", tag.RowsAffected(), pgx.ErrNoRows) + } + return nil +} diff --git a/catalog/cataloger_export_test.go b/catalog/cataloger_export_test.go index dca1e303941..0bfab05d096 100644 --- a/catalog/cataloger_export_test.go +++ b/catalog/cataloger_export_test.go @@ -3,12 +3,16 @@ package catalog import ( "context" "errors" + "fmt" "regexp/syntax" "sort" "testing" "github.com/go-test/deep" + "github.com/jackc/pgx" + "github.com/jackc/pgx/v4/pgxpool" "github.com/lib/pq" + "github.com/treeverse/lakefs/db" ) const ( @@ -37,10 +41,6 @@ func (s configForBranchSlice) Swap(i int, j int) { } func TestExportConfiguration(t *testing.T) { - const ( - branchID = 17 - anotherBranchID = 29 - ) ctx := context.Background() c := testCataloger(t) repo := testCatalogerRepo(t, ctx, c, prefix, defaultBranch) @@ -148,3 +148,85 @@ func TestExportConfiguration(t *testing.T) { } }) } + +func TestExportState(t *testing.T) { + const ( + ref1 = "this commit" + ref2 = "that commit" + ) + ctx := context.Background() + c := testCataloger(t) + repo := testCatalogerRepo(t, ctx, c, prefix, defaultBranch) + + cases := []struct { + name string + startRef string // start with this ref (and state) if set, otherwise start with no row + startState CatalogBranchExportStatus + setRef string + expectState CatalogBranchExportStatus + expectErr func(t *testing.T, err error) + }{ + { + name: "clean", + setRef: ref2, + expectState: ExportStatusInProgress, + }, { + name: "reset", + startRef: ref1, + setRef: ref2, + expectState: ExportStatusInProgress, + }, { + name: "previousFailed", + startRef: ref1, + startState: ExportStatusFailed, + setRef: ref2, + expectState: ExportStatusInProgress, + expectErr: func(t *testing.T, err error) { + if !errors.Is(err, ErrExportFailed) { + t.Errorf("expected ErrExportFailed but got %s", err) + } + }, + }, + } + for _, tt := range cases { + pool, err := pgxpool.Connect(ctx, c.DbConnURI) + if err != nil { + t.Fatalf(err.Error()) + } + err = db.Ping(ctx, pool) + if err != nil { + t.Fatalf(err.Error()) + } + d := db.NewPgxDatabase(pool) + t.Run(tt.name, func(t *testing.T) { + _, err = d.Transact(func(tx db.Tx) (interface{}, error) { + if tt.startRef != "" { + // This also ends up testing ExportMarkStart in the same way + // each time. + if _, _, err := c.ExportMarkStart(tx, repo, defaultBranch, tt.startRef); err != nil { + return nil, fmt.Errorf("setup (mark previous): %w", err) + } + } else { + if err := c.ExportStateDelete(tx, repo, defaultBranch); err != nil && !errors.Is(err, pgx.ErrNoRows) { + return nil, fmt.Errorf("setup (delete): %w", err) + } + } + + gotRef, gotState, err := c.ExportMarkStart(tx, repo, defaultBranch, tt.setRef) + if tt.expectErr != nil { + tt.expectErr(t, err) + } + if gotRef != tt.startRef { + t.Errorf("expected to old ref %s but got %s", tt.startRef, gotRef) + } + if tt.startState != "" && gotState != tt.startState { + t.Errorf("expected previous state %s but got %s", tt.startState, gotState) + } + return nil, nil + }) + if err != nil { + t.Errorf(err.Error()) + } + }) + } +} diff --git a/db/database.go b/db/database.go index 5998a68fde9..e9202480c4f 100644 --- a/db/database.go +++ b/db/database.go @@ -302,9 +302,8 @@ func (d *PgxDatabase) Stats() sql.DBStats { // waiting. The two are close enough (but race each other). WaitCount: stat.EmptyAcquireCount(), // Time to acquire is close enough to time spent waiting; fudge. - WaitDuration: stat.AcquireDuration(), - MaxIdleClosed: 0, // TODO(ariels): Does pgx have this anywhere? - MaxIdleTimeClosed: 0, // TODO(ariels): Could generate this with post-connect/release hooks - MaxLifetimeClosed: 0, // TODO(ariels): Is this even possible in pgx? + WaitDuration: stat.AcquireDuration(), + // Not clear that pgx can report MaxIdleClosed, MaxIdleTimeClosed, + // MaxLifetimeClosed. } } diff --git a/ddl/000009_export_current.up.sql b/ddl/000009_export_current.up.sql index e29f10df534..9a6a9346584 100644 --- a/ddl/000009_export_current.up.sql +++ b/ddl/000009_export_current.up.sql @@ -16,10 +16,14 @@ CREATE TABLE IF NOT EXISTS catalog_branches_export_state ( ALTER TABLE catalog_branches_export_state ADD CONSTRAINT branches_export_state_branches_fk FOREIGN KEY (branch_id) REFERENCES catalog_branches(id) + -- Does *not* reference catalog_branches_export - state is independent of configuration, + -- e.g. when configuration is changed. ON DELETE CASCADE; ALTER TABLE catalog_branches_export_state ADD CONSTRAINT catalog_branches_export_error_on_failure CHECK ((state = 'export-failed') = (error_message IS NOT NULL)); +-- BUG(ariels): reset export state when catalog_branches_export changes it physical address. + END; diff --git a/go.mod b/go.mod index dae8c309f9b..16a594b471d 100644 --- a/go.mod +++ b/go.mod @@ -39,6 +39,7 @@ require ( github.com/jackc/pgerrcode v0.0.0-20190803225404-afa3381909a6 github.com/jackc/pgproto3/v2 v2.0.4 // indirect github.com/jackc/pgtype v1.4.2 + github.com/jackc/pgx v3.6.2+incompatible github.com/jackc/pgx/v4 v4.8.1 github.com/jamiealquiza/tachymeter v2.0.0+incompatible github.com/jedib0t/go-pretty v4.3.0+incompatible