Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: vaccum status tables if they cross threshold #3434

Merged
merged 16 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion jobsdb/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (jd *HandleT) backupDSLoop(ctx context.Context) {
// backupDS writes both jobs and job_staus table to JOBS_BACKUP_STORAGE_PROVIDER
func (jd *HandleT) backupDS(ctx context.Context, backupDSRange *dataSetRangeT) error {
if err := jd.WithTx(func(tx *Tx) error {
return jd.cleanStatusTable(ctx, tx, backupDSRange.ds.JobStatusTable)
return jd.cleanStatusTable(ctx, tx, backupDSRange.ds.JobStatusTable, true)
}); err != nil {
return fmt.Errorf("error while cleaning status table: %w", err)
}
Expand Down
4 changes: 4 additions & 0 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,8 @@ func init() {
var (
maxDSSize, maxMigrateOnce, maxMigrateDSProbe int
maxTableSize int64
vacuumFullStatusTableThreshold int64
vacuumAnalyzeStatusTableThreshold int64
jobDoneMigrateThres, jobStatusMigrateThres float64
jobMinRowsMigrateThres float64
migrateDSLoopSleepDuration time.Duration
Expand Down Expand Up @@ -642,6 +644,8 @@ func loadConfig() {
config.RegisterIntConfigVariable(10, &maxMigrateDSProbe, true, 1, "JobsDB.maxMigrateDSProbe")
config.RegisterInt64ConfigVariable(300, &maxTableSize, true, 1000000, "JobsDB.maxTableSizeInMB")
config.RegisterInt64ConfigVariable(10000, &backupRowsBatchSize, true, 1, "JobsDB.backupRowsBatchSize")
config.RegisterInt64ConfigVariable(500*bytesize.MB, &vacuumFullStatusTableThreshold, true, 1, "JobsDB.vacuumFullStatusTableThreshold")
config.RegisterInt64ConfigVariable(30000, &vacuumAnalyzeStatusTableThreshold, true, 1, "JobsDB.vacuumAnalyzeStatusTableThreshold")
config.RegisterInt64ConfigVariable(64*bytesize.MB, &backupMaxTotalPayloadSize, true, 1, "JobsDB.maxBackupTotalPayloadSize")
config.RegisterDurationConfigVariable(30, &migrateDSLoopSleepDuration, true, time.Second, []string{"JobsDB.migrateDSLoopSleepDuration", "JobsDB.migrateDSLoopSleepDurationInS"}...)
config.RegisterDurationConfigVariable(5, &addNewDSLoopSleepDuration, true, time.Second, []string{"JobsDB.addNewDSLoopSleepDuration", "JobsDB.addNewDSLoopSleepDurationInS"}...)
Expand Down
85 changes: 80 additions & 5 deletions jobsdb/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,53 @@ func (jd *HandleT) doMigrateDS(ctx context.Context) error {
return err
}

// based on size of given DSs, gives a list of DSs for us to vacuum full status tables
func (jd *HandleT) getVacuumFullCandidates(ctx context.Context, dsList []dataSetT) ([]string, error) {
// get name and it's size of all tables
var rows *sql.Rows
rows, err := jd.dbHandle.QueryContext(
ctx,
`SELECT pg_table_size(oid) AS size, relname
FROM pg_class
where relname = ANY(
SELECT tablename
FROM pg_catalog.pg_tables
WHERE schemaname NOT IN ('pg_catalog','information_schema')
AND tablename like $1
) order by relname;`,
jd.tablePrefix+"_job_status%",
)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()

tableSizes := map[string]int64{}
for rows.Next() {
var (
tableSize int64
tableName string
)
err = rows.Scan(&tableSize, &tableName)
if err != nil {
return nil, err
}
tableSizes[tableName] = tableSize
}
err = rows.Err()
if err != nil {
return nil, err
}

toVacuumFull := []string{}
for _, ds := range dsList {
if tableSizes[ds.JobStatusTable] > vacuumFullStatusTableThreshold {
toVacuumFull = append(toVacuumFull, ds.JobStatusTable)
}
}
return toVacuumFull, nil
}

// based on an estimate of the rows in DSs, gives a list of DSs for us to cleanup status tables
func (jd *HandleT) getCleanUpCandidates(ctx context.Context, dsList []dataSetT) ([]dataSetT, error) {
// get analyzer estimates for the number of rows(jobs, statuses) in each DS
Expand Down Expand Up @@ -209,6 +256,14 @@ func (jd *HandleT) cleanupStatusTables(ctx context.Context, dsList []dataSetT) e
if err != nil {
return err
}

toVacuumFull, err := jd.getVacuumFullCandidates(ctx, dsList)
if err != nil {
return err
}
toVacuumFullMap := lo.Associate(toVacuumFull, func(k string) (string, struct{}) {
return k, struct{}{}
})
start := time.Now()
defer stats.Default.NewTaggedStat(
"jobsdb_compact_status_tables",
Expand All @@ -218,33 +273,53 @@ func (jd *HandleT) cleanupStatusTables(ctx context.Context, dsList []dataSetT) e

return jd.WithTx(func(tx *Tx) error {
for _, statusTable := range toCompact {
table := statusTable.JobStatusTable
// clean up and vacuum if not present in toVacuumFullMap
_, ok := toVacuumFullMap[table]
if err := jd.cleanStatusTable(
ctx,
tx,
statusTable.JobStatusTable,
table,
!ok,
); err != nil {
return err
}
}
// vacuum full if present in toVacuumFull
for _, table := range toVacuumFull {
if _, err := tx.ExecContext(ctx, fmt.Sprintf(`VACUUM FULL %[1]q`, table)); err != nil {
return err
}
}
return nil
})
}

// cleanStatusTable deletes all rows except for the latest status for each job
func (*HandleT) cleanStatusTable(ctx context.Context, tx *Tx, table string) error {
_, err := tx.ExecContext(
func (*HandleT) cleanStatusTable(ctx context.Context, tx *Tx, table string, canBeVacuumed bool) error {
result, err := tx.ExecContext(
ctx,
fmt.Sprintf(`DELETE FROM %[1]q
WHERE NOT id = ANY(
WHERE NOT id = ANY(
SELECT DISTINCT ON (job_id) id from "%[1]s" ORDER BY job_id ASC, id DESC
)`, table),
)
if err != nil {
return err
}

numJobStatusDeleted, err := result.RowsAffected()
if err != nil {
return err
}

query := fmt.Sprintf(`ANALYZE %q`, table)
if numJobStatusDeleted > vacuumAnalyzeStatusTableThreshold && canBeVacuumed {
query = fmt.Sprintf(`VACUUM ANALYZE %q`, table)
}
_, err = tx.ExecContext(
ctx,
fmt.Sprintf(`ANALYZE %q`, table),
query,
)
return err
}
Expand Down
7 changes: 7 additions & 0 deletions jobsdb/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/testhelper/rand"
"github.com/rudderlabs/rudder-server/jobsdb/prebackup"
fileuploader "github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/utils/bytesize"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -196,4 +197,10 @@ func TestMigration(t *testing.T) {
).Scan(&count)
require.NoError(t, err)
require.EqualValues(t, 10, count)

vacuumFullStatusTableThreshold = 8 * bytesize.KB // 8KB is common value for toast size in postgres
toVacuum, err := jobDB.getVacuumFullCandidates(context.Background(), dsList)
require.NoError(t, err)
require.EqualValues(t, 4, len(dsList)) // total 4 DSs
require.EqualValues(t, 2, len(toVacuum)) // 2 DSs to vacuum since 2nd and 4th DSs have status tables with no entries
}