Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: vaccum status tables if they cross threshold #3434

Merged
merged 16 commits into from
Jun 6, 2023

Conversation

BonapartePC
Copy link
Contributor

@BonapartePC BonapartePC commented Jun 2, 2023

Description

Vacuum status table if they cross threshold

Notion Ticket

https://www.notion.so/rudderstacks/Vacuum-and-Vacuum-full-during-jobsdb-compaction-6f78b37caea64aef9161c1732fdd76a2

Security

  • The code changed/added as part of this pull request won't create any security issues with how the software is being used.

jobsdb/jobsdb.go Outdated
@@ -642,6 +644,8 @@ func loadConfig() {
config.RegisterIntConfigVariable(10, &maxMigrateDSProbe, true, 1, "JobsDB.maxMigrateDSProbe")
config.RegisterInt64ConfigVariable(300, &maxTableSize, true, 1000000, "JobsDB.maxTableSizeInMB")
config.RegisterInt64ConfigVariable(10000, &backupRowsBatchSize, true, 1, "JobsDB.backupRowsBatchSize")
config.RegisterInt64ConfigVariable(500, &vaccumFullStatusTableThreshold, true, 1000000, "JobsDB.vaccumFullStatusTableThresholdInMB")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Value should be in bytes so we don't need to include the InMB prefix. You may use 500*bytesize.MB for the default value.

return nil
})
}

// cleanStatusTable deletes all rows except for the latest status for each job
func (*HandleT) cleanStatusTable(ctx context.Context, tx *Tx, table string) error {
_, err := tx.ExecContext(
deletedJobsQuery := fmt.Sprintf(`WITH deleted AS (DELETE FROM %[1]q
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessary, as a delete statement returns the number of rows affected already

@@ -153,6 +153,52 @@ func (jd *HandleT) doMigrateDS(ctx context.Context) error {
return err
}

// based on size of given DSs, gives a list of DSs for us to vaccum status tables
func (jd *HandleT) getVacuumCandidates(ctx context.Context, dsList []dataSetT) ([]dataSetT, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need to have some tests verifying this method's behaviour

@@ -206,6 +252,11 @@ func (jd *HandleT) cleanupStatusTables(ctx context.Context, dsList []dataSetT) e
if err != nil {
return err
}
// vaccum status table if total size exceeds vaccumFullStatusTableThreshold in the toCompact list
toVaccum, err := jd.getVacuumCandidates(ctx, toCompact)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A table that we know is going to be vacuumed full doesn't need to be vacuumed if it is in the cleanup list

@codecov
Copy link

codecov bot commented Jun 5, 2023

Codecov Report

Patch coverage: 67.94% and project coverage change: -0.06 ⚠️

Comparison is base (3d03653) 68.53% compared to head (498da6c) 68.48%.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #3434      +/-   ##
==========================================
- Coverage   68.53%   68.48%   -0.06%     
==========================================
  Files         330      330              
  Lines       52905    52973      +68     
==========================================
+ Hits        36259    36277      +18     
- Misses      14296    14335      +39     
- Partials     2350     2361      +11     
Impacted Files Coverage Δ
jobsdb/migration.go 74.66% <63.23%> (-3.08%) ⬇️
jobsdb/backup.go 75.00% <100.00%> (ø)
jobsdb/jobsdb.go 73.97% <100.00%> (+0.02%) ⬆️
warehouse/schema.go 100.00% <100.00%> (ø)

... and 12 files with indirect coverage changes

☔ View full report in Codecov by Sentry.
📢 Do you have feedback about the report comment? Let us know in this issue.

return nil, err
}

datasets := lo.Filter(dsList,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can do this comparison above too, eliminating the need for tableSizes map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but the query gives me all the tables and their sizes. I'm doing the filtering since we are only interested in a few status tables that are going to be cleaned

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant something like this:

    .
    .
    .
	defer func() { _ = rows.Close() }()
    toVacuum := map[string]struct{}{}
	for rows.Next() {
		var (
			tableSize int64
			tableName string
		)
		err = rows.Scan(&tableSize, &tableName)
		if err != nil {
			return nil, err
		}
        if tableSize > vacuumFullStatusTableThreshold {
				toVacuum[tableName] = struct{}{}
		}
	}
	err = rows.Err()
	if err != nil {
		return nil, err
	}
    return toVacuum, nil
}

Why do we need to pass dsList to this function..?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just read your comment again and understand now that you were wanting to vacuum only tables that were marked for compaction.
Might as well vacuum all..?

Copy link
Member

@Sidddddarth Sidddddarth Jun 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or if you intend to vacuum only a subset of dsList, you may use it as part of the query.
Maybe the threshold as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can vacuum all. WDYT @atzoum ?

@@ -223,29 +274,53 @@ func (jd *HandleT) cleanupStatusTables(ctx context.Context, dsList []dataSetT) e
return err
}
}
for _, statusTable := range toVaccum {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if a status table belongs to both toCompact and toVaccum, it'll undergo vacuum twice(analyze and full).
Is such a situation possible..?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That shouldn't be possible because of the condition here:

if numJobStatusDeleted > vacuumAnalyzeStatusTableThreshold && canBeVacuumed {

WHERE schemaname NOT IN ('pg_catalog','information_schema')
AND tablename like $1
)`,
jd.tablePrefix+"_job%",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are only interested on job status tables, correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, made the change in query

Comment on lines 194 to 202
datasets := lo.Filter(dsList,
func(ds dataSetT, idx int) bool {
tableSize := tableSizes[ds.JobStatusTable]
return tableSize > vacuumFullStatusTableThreshold
})
toVacuum := map[string]struct{}{}
for _, ds := range datasets {
toVacuum[ds.JobStatusTable] = struct{}{}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code below is less and more efficient :)

Suggested change
datasets := lo.Filter(dsList,
func(ds dataSetT, idx int) bool {
tableSize := tableSizes[ds.JobStatusTable]
return tableSize > vacuumFullStatusTableThreshold
})
toVacuum := map[string]struct{}{}
for _, ds := range datasets {
toVacuum[ds.JobStatusTable] = struct{}{}
}
toVacuum := map[string]struct{}{}
for _, ds := range dsList {
if tableSizes[ds.JobStatusTable] > vacuumFullStatusTableThreshold {
toVacuum[ds.JobStatusTable] = struct{}{}
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -206,6 +256,11 @@ func (jd *HandleT) cleanupStatusTables(ctx context.Context, dsList []dataSetT) e
if err != nil {
return err
}
// vacuum status table if total size exceeds vacuumFullStatusTableThreshold in the toCompact list
toVacuum, err := jd.getVacuumCandidates(ctx, toCompact)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe we should only expect to be doing full vacuuming for tables that need to be compacted. A table might reach the full vacuuming threshold and still not need a compaction yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. Should we just vacuum all the tables or just tables in dsList? Tagged you in the above message as well

Copy link
Contributor

@atzoum atzoum Jun 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Define all tables. Which additional tables are you proposing to vacuum full and why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean the tables which the query returns. Do we need to pass dsList? We can just return all the tables that the query returns which cross the threshold right?

Copy link
Contributor

@atzoum atzoum Jun 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what else is going to be included now or in the future if we remove this filter. Limiting the tables to those contained in the dsList can easily guarantee that we are not performing an undesirable full vaccuming (costly operation) to another table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense
cc @Sidddddarth

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so a subset of the compacted tables will be vacuumed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, a subset of dsList will be vacuumed. compacted tables and tables that we're going to vacuum are independent at this point

var rows *sql.Rows
rows, err := jd.dbHandle.QueryContext(
ctx,
`SELECT pg_relation_size(oid) AS size, relname
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
`SELECT pg_relation_size(oid) AS size, relname
`SELECT pg_table_size(oid) AS size, relname

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@BonapartePC BonapartePC requested a review from atzoum June 6, 2023 06:46
@@ -153,6 +153,53 @@ func (jd *HandleT) doMigrateDS(ctx context.Context) error {
return err
}

// based on size of given DSs, gives a list of DSs for us to vacuum status tables
func (jd *HandleT) getVacuumCandidates(ctx context.Context, dsList []dataSetT) (map[string]struct{}, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (jd *HandleT) getVacuumCandidates(ctx context.Context, dsList []dataSetT) (map[string]struct{}, error) {
func (jd *HandleT) getVacuumFullCandidates(ctx context.Context, dsList []dataSetT) (map[string]struct{}, error) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 329 to 335
func (*HandleT) vacuumFullStatusTable(ctx context.Context, tx *Tx, table string) error {
if _, err := tx.ExecContext(ctx, fmt.Sprintf(`VACUUM FULL %[1]q`, table)); err != nil {
return err
}
return nil
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe we need this as a separate method

@BonapartePC BonapartePC requested a review from atzoum June 6, 2023 07:13
Comment on lines 285 to 290
// vacuum full if present in toVacuum
if ok {
if _, err := tx.ExecContext(ctx, fmt.Sprintf(`VACUUM FULL %[1]q`, table)); err != nil {
return err
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Full vacuuming shall be performed on its own loop, following but not coupled with the compaction loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, my bad 🤦
Made the changes

@@ -196,4 +196,10 @@ func TestMigration(t *testing.T) {
).Scan(&count)
require.NoError(t, err)
require.EqualValues(t, 10, count)

vacuumFullStatusTableThreshold = 8 * 1024 // 8KB is common value for toast size in postgres
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
vacuumFullStatusTableThreshold = 8 * 1024 // 8KB is common value for toast size in postgres
vacuumFullStatusTableThreshold = 8 * bytesize.KB // 8KB is common value for toast size in postgres

@BonapartePC BonapartePC requested a review from atzoum June 6, 2023 07:29
Comment on lines 286 to 290
for table := range toVacuum {
if _, err := tx.ExecContext(ctx, fmt.Sprintf(`VACUUM FULL %[1]q`, table)); err != nil {
return err
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If more than 1 status tables need full vacuuming, it would be preferable for it to happen in ascending dataset order instead of randomly (map iteration order is random).

That being said, getVacuumFullCandidates could return a slice instead of a map & cleanupStatusTables could prepare a map from it for its own purposes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@BonapartePC BonapartePC requested a review from atzoum June 6, 2023 08:14
Comment on lines 264 to 267
toVacuumFullMap := map[string]struct{}{}
for _, table := range toVacuumFull {
toVacuumFullMap[table] = struct{}{}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is 1 line we can get rid of!

Suggested change
toVacuumFullMap := map[string]struct{}{}
for _, table := range toVacuumFull {
toVacuumFullMap[table] = struct{}{}
}
toVacuumFullMap := lo.Associate(toVacuumFull, func(k string) (string, struct{}) {
return k, struct{}{}
})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@BonapartePC BonapartePC requested a review from atzoum June 6, 2023 08:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants