Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: vaccum status tables if they cross threshold #3434

Merged
merged 16 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,8 @@ func init() {
var (
maxDSSize, maxMigrateOnce, maxMigrateDSProbe int
maxTableSize int64
vaccumFullStatusTableThreshold int64
vaccumAnalyzeStatusTableThreshold int64
jobDoneMigrateThres, jobStatusMigrateThres float64
jobMinRowsMigrateThres float64
migrateDSLoopSleepDuration time.Duration
Expand Down Expand Up @@ -642,6 +644,8 @@ func loadConfig() {
config.RegisterIntConfigVariable(10, &maxMigrateDSProbe, true, 1, "JobsDB.maxMigrateDSProbe")
config.RegisterInt64ConfigVariable(300, &maxTableSize, true, 1000000, "JobsDB.maxTableSizeInMB")
config.RegisterInt64ConfigVariable(10000, &backupRowsBatchSize, true, 1, "JobsDB.backupRowsBatchSize")
config.RegisterInt64ConfigVariable(500, &vaccumFullStatusTableThreshold, true, 1000000, "JobsDB.vaccumFullStatusTableThresholdInMB")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Value should be in bytes so we don't need to include the InMB prefix. You may use 500*bytesize.MB for the default value.

config.RegisterInt64ConfigVariable(30000, &vaccumAnalyzeStatusTableThreshold, true, 1, "JobsDB.vaccumAnalyzeStatusTableThreshold")
config.RegisterInt64ConfigVariable(64*bytesize.MB, &backupMaxTotalPayloadSize, true, 1, "JobsDB.maxBackupTotalPayloadSize")
config.RegisterDurationConfigVariable(30, &migrateDSLoopSleepDuration, true, time.Second, []string{"JobsDB.migrateDSLoopSleepDuration", "JobsDB.migrateDSLoopSleepDurationInS"}...)
config.RegisterDurationConfigVariable(5, &addNewDSLoopSleepDuration, true, time.Second, []string{"JobsDB.addNewDSLoopSleepDuration", "JobsDB.addNewDSLoopSleepDurationInS"}...)
Expand Down
91 changes: 83 additions & 8 deletions jobsdb/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,52 @@ func (jd *HandleT) doMigrateDS(ctx context.Context) error {
return err
}

// based on size of given DSs, gives a list of DSs for us to vaccum status tables
func (jd *HandleT) getVacuumCandidates(ctx context.Context, dsList []dataSetT) ([]dataSetT, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need to have some tests verifying this method's behaviour

// get name and it's size of all tables
var rows *sql.Rows
rows, err := jd.dbHandle.QueryContext(
ctx,
`SELECT pg_total_relation_size(oid) AS size, relname
FROM pg_class
where relname = ANY(
SELECT tablename
FROM pg_catalog.pg_tables
WHERE schemaname NOT IN ('pg_catalog','information_schema')
AND tablename like $1
)`,
jd.tablePrefix+"_job%",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are only interested on job status tables, correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, made the change in query

)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()

tableSizes := map[string]int64{}
for rows.Next() {
var (
tableSize int64
tableName string
)
err = rows.Scan(&tableSize, &tableName)
if err != nil {
return nil, err
}
tableSizes[tableName] = tableSize
}
err = rows.Err()
if err != nil {
return nil, err
}

datasets := lo.Filter(dsList,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can do this comparison above too, eliminating the need for tableSizes map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but the query gives me all the tables and their sizes. I'm doing the filtering since we are only interested in a few status tables that are going to be cleaned

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant something like this:

    .
    .
    .
	defer func() { _ = rows.Close() }()
    toVacuum := map[string]struct{}{}
	for rows.Next() {
		var (
			tableSize int64
			tableName string
		)
		err = rows.Scan(&tableSize, &tableName)
		if err != nil {
			return nil, err
		}
        if tableSize > vacuumFullStatusTableThreshold {
				toVacuum[tableName] = struct{}{}
		}
	}
	err = rows.Err()
	if err != nil {
		return nil, err
	}
    return toVacuum, nil
}

Why do we need to pass dsList to this function..?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just read your comment again and understand now that you were wanting to vacuum only tables that were marked for compaction.
Might as well vacuum all..?

Copy link
Member

@Sidddddarth Sidddddarth Jun 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or if you intend to vacuum only a subset of dsList, you may use it as part of the query.
Maybe the threshold as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can vacuum all. WDYT @atzoum ?

func(ds dataSetT, idx int) bool {
tableSize := tableSizes[ds.JobStatusTable]
return tableSize > vaccumFullStatusTableThreshold
})
return datasets, nil
}

// based on an estimate of the rows in DSs, gives a list of DSs for us to cleanup status tables
func (jd *HandleT) getCleanUpCandidates(ctx context.Context, dsList []dataSetT) ([]dataSetT, error) {
// get analyzer estimates for the number of rows(jobs, statuses) in each DS
Expand Down Expand Up @@ -206,6 +252,11 @@ func (jd *HandleT) cleanupStatusTables(ctx context.Context, dsList []dataSetT) e
if err != nil {
return err
}
// vaccum status table if total size exceeds vaccumFullStatusTableThreshold in the toCompact list
toVaccum, err := jd.getVacuumCandidates(ctx, toCompact)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A table that we know is going to be vacuumed full doesn't need to be vacuumed if it is in the cleanup list

if err != nil {
return err
}
start := time.Now()
defer stats.Default.NewTaggedStat(
"jobsdb_compact_status_tables",
Expand All @@ -223,29 +274,53 @@ func (jd *HandleT) cleanupStatusTables(ctx context.Context, dsList []dataSetT) e
return err
}
}
for _, statusTable := range toVaccum {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if a status table belongs to both toCompact and toVaccum, it'll undergo vacuum twice(analyze and full).
Is such a situation possible..?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That shouldn't be possible because of the condition here:

if numJobStatusDeleted > vacuumAnalyzeStatusTableThreshold && canBeVacuumed {

if err := jd.vaccumStatusTable(
ctx,
tx,
statusTable.JobStatusTable,
); err != nil {
return err
}
}
return nil
})
}

// cleanStatusTable deletes all rows except for the latest status for each job
func (*HandleT) cleanStatusTable(ctx context.Context, tx *Tx, table string) error {
_, err := tx.ExecContext(
deletedJobsQuery := fmt.Sprintf(`WITH deleted AS (DELETE FROM %[1]q
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessary, as a delete statement returns the number of rows affected already

WHERE NOT id = ANY(
SELECT DISTINCT ON (job_id) id from "%[1]s" ORDER BY job_id ASC, id DESC
) RETURNING *) SELECT count(*) FROM deleted;`, table)

var numJobStatusDeleted int64
var err error
if err = tx.QueryRowContext(
ctx,
fmt.Sprintf(`DELETE FROM %[1]q
WHERE NOT id = ANY(
SELECT DISTINCT ON (job_id) id from "%[1]s" ORDER BY job_id ASC, id DESC
)`, table),
)
if err != nil {
deletedJobsQuery,
).Scan(&numJobStatusDeleted); err != nil {
return err
}

query := fmt.Sprintf(`ANALYZE %q`, table)
if numJobStatusDeleted > vaccumAnalyzeStatusTableThreshold {
query = fmt.Sprintf(`VACCUM ANALYZE %q`, table)
}
_, err = tx.ExecContext(
ctx,
fmt.Sprintf(`ANALYZE %q`, table),
query,
)
return err
}

func (*HandleT) vaccumStatusTable(ctx context.Context, tx *Tx, table string) error {
if _, err := tx.ExecContext(ctx, fmt.Sprintf(`VACCUM FULL %[1]q`, table)); err != nil {
return err
}
return nil
}

// getMigrationList returns the list of datasets to migrate from,
// the number of unfinished jobs contained in these datasets
// and the dataset before which the new (migrated) dataset that will hold these jobs needs to be created
Expand Down