Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate ResetConsensusSubscription to raw SQL #1348

Merged
merged 2 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 15 additions & 34 deletions stores/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func NewSQLStore(cfg Config) (*SQLStore, modules.ConsensusChangeID, error) {
}

// Get latest consensus change ID or init db.
ci, ccid, err := initConsensusInfo(db)
ci, ccid, err := initConsensusInfo(ctx, dbMain)
if err != nil {
return nil, modules.ConsensusChangeID{}, err
}
Expand Down Expand Up @@ -249,7 +249,7 @@ func NewSQLStore(cfg Config) (*SQLStore, modules.ConsensusChangeID, error) {
walletAddress: cfg.WalletAddress,
chainIndex: types.ChainIndex{
Height: ci.Height,
ID: types.BlockID(ci.BlockID),
ID: types.BlockID(ci.ID),
},

lastPrunedAt: time.Now(),
Expand Down Expand Up @@ -510,47 +510,28 @@ func (s *SQLStore) retryTransaction(ctx context.Context, fc func(tx *gorm.DB) er
return fmt.Errorf("retryTransaction failed: %w", err)
}

func initConsensusInfo(db *gorm.DB) (dbConsensusInfo, modules.ConsensusChangeID, error) {
var ci dbConsensusInfo
if err := db.
Where(&dbConsensusInfo{Model: Model{ID: consensusInfoID}}).
Attrs(dbConsensusInfo{
Model: Model{ID: consensusInfoID},
CCID: modules.ConsensusChangeBeginning[:],
}).
FirstOrCreate(&ci).
Error; err != nil {
return dbConsensusInfo{}, modules.ConsensusChangeID{}, err
}
var ccid modules.ConsensusChangeID
copy(ccid[:], ci.CCID)
return ci, ccid, nil
func initConsensusInfo(ctx context.Context, db sql.Database) (ci types.ChainIndex, ccid modules.ConsensusChangeID, err error) {
err = db.Transaction(ctx, func(tx sql.DatabaseTx) error {
ci, ccid, err = tx.InitConsensusInfo(ctx)
return err
})
return
}

func (s *SQLStore) ResetConsensusSubscription(ctx context.Context) error {
// empty tables and reinit consensus_infos
var ci dbConsensusInfo
err := s.retryTransaction(ctx, func(tx *gorm.DB) error {
if err := s.db.Exec("DELETE FROM consensus_infos").Error; err != nil {
return err
} else if err := s.db.Exec("DELETE FROM siacoin_elements").Error; err != nil {
return err
} else if err := s.db.Exec("DELETE FROM transactions").Error; err != nil {
return err
} else if ci, _, err = initConsensusInfo(tx); err != nil {
return err
}
return nil
// reset db
var ci types.ChainIndex
var err error
err = s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
ci, err = tx.ResetConsensusSubscription(ctx)
return err
})
if err != nil {
return err
}
// reset in-memory state.
s.persistMu.Lock()
s.chainIndex = types.ChainIndex{
Height: ci.Height,
ID: types.BlockID(ci.BlockID),
}
s.chainIndex = ci
s.persistMu.Unlock()
return nil
}
9 changes: 9 additions & 0 deletions stores/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/webhooks"
"go.sia.tech/siad/modules"
)

// The database interfaces define all methods that a SQL database must implement
Expand Down Expand Up @@ -141,6 +142,10 @@ type (
// HostBlocklist returns the list of host addresses on the blocklist.
HostBlocklist(ctx context.Context) ([]string, error)

// InitConsensusInfo initializes the consensus info in the database or
// returns the latest one.
InitConsensusInfo(ctx context.Context) (types.ChainIndex, modules.ConsensusChangeID, error)

// InsertObject inserts a new object into the database.
InsertObject(ctx context.Context, bucket, key, contractSet string, dirID int64, o object.Object, mimeType, eTag string, md api.ObjectUserMetadata) error

Expand Down Expand Up @@ -211,6 +216,10 @@ type (
// returned.
RenameObjects(ctx context.Context, bucket, prefixOld, prefixNew string, dirID int64, force bool) error

// ResetConsenusSubscription resets the consensus subscription in the
// database.
ResetConsensusSubscription(ctx context.Context) (types.ChainIndex, error)

// ResetLostSectors resets the lost sector count for the given host.
ResetLostSectors(ctx context.Context, hk types.PublicKey) error

Expand Down
36 changes: 36 additions & 0 deletions stores/sql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import (
"go.sia.tech/renterd/internal/sql"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/webhooks"
"go.sia.tech/siad/modules"
"lukechampine.com/frand"
)

const consensuInfoID = 1

var ErrNegativeOffset = errors.New("offset can not be negative")

func AbortMultipartUpload(ctx context.Context, tx sql.Tx, bucket, key string, uploadID string) error {
Expand Down Expand Up @@ -1294,6 +1297,39 @@ func RemoveOfflineHosts(ctx context.Context, tx sql.Tx, minRecentFailures uint64
return res.RowsAffected()
}

func InitConsensusInfo(ctx context.Context, tx sql.Tx) (types.ChainIndex, modules.ConsensusChangeID, error) {
// try fetch existing
var ccid modules.ConsensusChangeID
var ci types.ChainIndex
err := tx.QueryRow(ctx, "SELECT cc_id, height, block_id FROM consensus_infos WHERE id = ?", consensuInfoID).
Scan((*CCID)(&ccid), &ci.Height, (*Hash256)(&ci.ID))
if err != nil && !errors.Is(err, dsql.ErrNoRows) {
return types.ChainIndex{}, modules.ConsensusChangeID{}, fmt.Errorf("failed to fetch consensus info: %w", err)
} else if err == nil {
return ci, ccid, nil
}
// otherwise init
ci = types.ChainIndex{}
if _, err := tx.Exec(ctx, "INSERT INTO consensus_infos (id, created_at, cc_id, height, block_id) VALUES (?, ?, ?, ?, ?)",
consensuInfoID, time.Now(), (CCID)(modules.ConsensusChangeBeginning), ci.Height, (Hash256)(ci.ID)); err != nil {
return types.ChainIndex{}, modules.ConsensusChangeID{}, fmt.Errorf("failed to init consensus infos: %w", err)
}
return types.ChainIndex{}, modules.ConsensusChangeBeginning, nil
}

func ResetConsensusSubscription(ctx context.Context, tx sql.Tx) (ci types.ChainIndex, err error) {
if _, err := tx.Exec(ctx, "DELETE FROM consensus_infos"); err != nil {
return types.ChainIndex{}, fmt.Errorf("failed to delete consensus infos: %w", err)
} else if _, err := tx.Exec(ctx, "DELETE FROM siacoin_elements"); err != nil {
return types.ChainIndex{}, fmt.Errorf("failed to delete siacoin elements: %w", err)
} else if _, err := tx.Exec(ctx, "DELETE FROM transactions"); err != nil {
return types.ChainIndex{}, fmt.Errorf("failed to delete transactions: %w", err)
} else if ci, _, err = InitConsensusInfo(ctx, tx); err != nil {
return types.ChainIndex{}, fmt.Errorf("failed to initialize consensus info: %w", err)
}
return ci, nil
}

func ResetLostSectors(ctx context.Context, tx sql.Tx, hk types.PublicKey) error {
_, err := tx.Exec(ctx, "UPDATE hosts SET lost_sectors = 0 WHERE public_key = ?", PublicKey(hk))
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions stores/sql/mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.sia.tech/renterd/object"
ssql "go.sia.tech/renterd/stores/sql"
"go.sia.tech/renterd/webhooks"
"go.sia.tech/siad/modules"
"lukechampine.com/frand"

"go.sia.tech/renterd/internal/sql"
Expand Down Expand Up @@ -355,6 +356,10 @@ func (tx *MainDatabaseTx) HostsForScanning(ctx context.Context, maxLastScan time
return ssql.HostsForScanning(ctx, tx, maxLastScan, offset, limit)
}

func (tx *MainDatabaseTx) InitConsensusInfo(ctx context.Context) (types.ChainIndex, modules.ConsensusChangeID, error) {
return ssql.InitConsensusInfo(ctx, tx)
}

func (tx *MainDatabaseTx) InsertObject(ctx context.Context, bucket, key, contractSet string, dirID int64, o object.Object, mimeType, eTag string, md api.ObjectUserMetadata) error {
// get bucket id
var bucketID int64
Expand Down Expand Up @@ -604,6 +609,10 @@ func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld,
return nil
}

func (tx *MainDatabaseTx) ResetConsensusSubscription(ctx context.Context) (types.ChainIndex, error) {
return ssql.ResetConsensusSubscription(ctx, tx)
}

func (tx *MainDatabaseTx) ResetLostSectors(ctx context.Context, hk types.PublicKey) error {
return ssql.ResetLostSectors(ctx, tx, hk)
}
Expand Down
9 changes: 9 additions & 0 deletions stores/sql/sqlite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"go.sia.tech/renterd/object"
ssql "go.sia.tech/renterd/stores/sql"
"go.sia.tech/renterd/webhooks"
"go.sia.tech/siad/modules"
"lukechampine.com/frand"

"go.uber.org/zap"
Expand Down Expand Up @@ -344,6 +345,10 @@ func (tx *MainDatabaseTx) HostsForScanning(ctx context.Context, maxLastScan time
return ssql.HostsForScanning(ctx, tx, maxLastScan, offset, limit)
}

func (tx *MainDatabaseTx) InitConsensusInfo(ctx context.Context) (types.ChainIndex, modules.ConsensusChangeID, error) {
return ssql.InitConsensusInfo(ctx, tx)
}

func (tx *MainDatabaseTx) InsertObject(ctx context.Context, bucket, key, contractSet string, dirID int64, o object.Object, mimeType, eTag string, md api.ObjectUserMetadata) error {
// get bucket id
var bucketID int64
Expand Down Expand Up @@ -602,6 +607,10 @@ func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld,
return nil
}

func (tx *MainDatabaseTx) ResetConsensusSubscription(ctx context.Context) (types.ChainIndex, error) {
return ssql.ResetConsensusSubscription(ctx, tx)
}

func (tx *MainDatabaseTx) ResetLostSectors(ctx context.Context, hk types.PublicKey) error {
return ssql.ResetLostSectors(ctx, tx, hk)
}
Expand Down
19 changes: 19 additions & 0 deletions stores/sql/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/siad/modules"
)

const (
Expand All @@ -24,6 +25,7 @@ const (
type (
AutopilotConfig api.AutopilotConfig
BigInt big.Int
CCID modules.ConsensusChangeID
Currency types.Currency
FileContractID types.FileContractID
Hash256 types.Hash256
Expand All @@ -46,6 +48,7 @@ var (
_ scannerValuer = (*AutopilotConfig)(nil)
_ scannerValuer = (*BigInt)(nil)
_ scannerValuer = (*BusSetting)(nil)
_ scannerValuer = (*CCID)(nil)
_ scannerValuer = (*Currency)(nil)
_ scannerValuer = (*FileContractID)(nil)
_ scannerValuer = (*Hash256)(nil)
Expand Down Expand Up @@ -99,6 +102,22 @@ func (b BigInt) Value() (driver.Value, error) {
return (*big.Int)(&b).String(), nil
}

// Scan scan value into CCID, implements sql.Scanner interface.
func (c *CCID) Scan(value interface{}) error {
switch value := value.(type) {
case []byte:
copy(c[:], value)
default:
return fmt.Errorf("failed to unmarshal CCID value: %v %t", value, value)
}
return nil
}

// Value returns a publicKey value, implements driver.Valuer interface.
func (c CCID) Value() (driver.Value, error) {
return c[:], nil
}

// Scan scan value into Currency, implements sql.Scanner interface.
func (c *Currency) Scan(value interface{}) error {
var s string
Expand Down
Loading