-
Notifications
You must be signed in to change notification settings - Fork 314
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: vaccum status tables if they cross threshold #3434
Conversation
9cc9b70
to
47e3925
Compare
jobsdb/jobsdb.go
Outdated
@@ -642,6 +644,8 @@ func loadConfig() { | |||
config.RegisterIntConfigVariable(10, &maxMigrateDSProbe, true, 1, "JobsDB.maxMigrateDSProbe") | |||
config.RegisterInt64ConfigVariable(300, &maxTableSize, true, 1000000, "JobsDB.maxTableSizeInMB") | |||
config.RegisterInt64ConfigVariable(10000, &backupRowsBatchSize, true, 1, "JobsDB.backupRowsBatchSize") | |||
config.RegisterInt64ConfigVariable(500, &vaccumFullStatusTableThreshold, true, 1000000, "JobsDB.vaccumFullStatusTableThresholdInMB") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Value should be in bytes so we don't need to include the InMB prefix. You may use 500*bytesize.MB
for the default value.
jobsdb/migration.go
Outdated
return nil | ||
}) | ||
} | ||
|
||
// cleanStatusTable deletes all rows except for the latest status for each job | ||
func (*HandleT) cleanStatusTable(ctx context.Context, tx *Tx, table string) error { | ||
_, err := tx.ExecContext( | ||
deletedJobsQuery := fmt.Sprintf(`WITH deleted AS (DELETE FROM %[1]q |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is necessary, as a delete statement returns the number of rows affected already
jobsdb/migration.go
Outdated
@@ -153,6 +153,52 @@ func (jd *HandleT) doMigrateDS(ctx context.Context) error { | |||
return err | |||
} | |||
|
|||
// based on size of given DSs, gives a list of DSs for us to vaccum status tables | |||
func (jd *HandleT) getVacuumCandidates(ctx context.Context, dsList []dataSetT) ([]dataSetT, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would need to have some tests verifying this method's behaviour
jobsdb/migration.go
Outdated
@@ -206,6 +252,11 @@ func (jd *HandleT) cleanupStatusTables(ctx context.Context, dsList []dataSetT) e | |||
if err != nil { | |||
return err | |||
} | |||
// vaccum status table if total size exceeds vaccumFullStatusTableThreshold in the toCompact list | |||
toVaccum, err := jd.getVacuumCandidates(ctx, toCompact) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A table that we know is going to be vacuumed full doesn't need to be vacuumed if it is in the cleanup list
4582bf2
to
28d0291
Compare
Codecov ReportPatch coverage:
Additional details and impacted files@@ Coverage Diff @@
## master #3434 +/- ##
==========================================
- Coverage 68.53% 68.48% -0.06%
==========================================
Files 330 330
Lines 52905 52973 +68
==========================================
+ Hits 36259 36277 +18
- Misses 14296 14335 +39
- Partials 2350 2361 +11
☔ View full report in Codecov by Sentry. |
jobsdb/migration.go
Outdated
return nil, err | ||
} | ||
|
||
datasets := lo.Filter(dsList, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can do this comparison above too, eliminating the need for tableSizes
map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, but the query gives me all the tables and their sizes. I'm doing the filtering since we are only interested in a few status tables that are going to be cleaned
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant something like this:
.
.
.
defer func() { _ = rows.Close() }()
toVacuum := map[string]struct{}{}
for rows.Next() {
var (
tableSize int64
tableName string
)
err = rows.Scan(&tableSize, &tableName)
if err != nil {
return nil, err
}
if tableSize > vacuumFullStatusTableThreshold {
toVacuum[tableName] = struct{}{}
}
}
err = rows.Err()
if err != nil {
return nil, err
}
return toVacuum, nil
}
Why do we need to pass dsList
to this function..?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just read your comment again and understand now that you were wanting to vacuum only tables that were marked for compaction.
Might as well vacuum all..?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or if you intend to vacuum only a subset of dsList
, you may use it as part of the query.
Maybe the threshold as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can vacuum all. WDYT @atzoum ?
jobsdb/migration.go
Outdated
@@ -223,29 +274,53 @@ func (jd *HandleT) cleanupStatusTables(ctx context.Context, dsList []dataSetT) e | |||
return err | |||
} | |||
} | |||
for _, statusTable := range toVaccum { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if a status table belongs to both toCompact
and toVaccum
, it'll undergo vacuum twice(analyze
and full
).
Is such a situation possible..?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That shouldn't be possible because of the condition here:
rudder-server/jobsdb/migration.go
Line 319 in 16b5c34
if numJobStatusDeleted > vacuumAnalyzeStatusTableThreshold && canBeVacuumed { |
jobsdb/migration.go
Outdated
WHERE schemaname NOT IN ('pg_catalog','information_schema') | ||
AND tablename like $1 | ||
)`, | ||
jd.tablePrefix+"_job%", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are only interested on job status tables, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, made the change in query
jobsdb/migration.go
Outdated
datasets := lo.Filter(dsList, | ||
func(ds dataSetT, idx int) bool { | ||
tableSize := tableSizes[ds.JobStatusTable] | ||
return tableSize > vacuumFullStatusTableThreshold | ||
}) | ||
toVacuum := map[string]struct{}{} | ||
for _, ds := range datasets { | ||
toVacuum[ds.JobStatusTable] = struct{}{} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code below is less and more efficient :)
datasets := lo.Filter(dsList, | |
func(ds dataSetT, idx int) bool { | |
tableSize := tableSizes[ds.JobStatusTable] | |
return tableSize > vacuumFullStatusTableThreshold | |
}) | |
toVacuum := map[string]struct{}{} | |
for _, ds := range datasets { | |
toVacuum[ds.JobStatusTable] = struct{}{} | |
} | |
toVacuum := map[string]struct{}{} | |
for _, ds := range dsList { | |
if tableSizes[ds.JobStatusTable] > vacuumFullStatusTableThreshold { | |
toVacuum[ds.JobStatusTable] = struct{}{} | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
jobsdb/migration.go
Outdated
@@ -206,6 +256,11 @@ func (jd *HandleT) cleanupStatusTables(ctx context.Context, dsList []dataSetT) e | |||
if err != nil { | |||
return err | |||
} | |||
// vacuum status table if total size exceeds vacuumFullStatusTableThreshold in the toCompact list | |||
toVacuum, err := jd.getVacuumCandidates(ctx, toCompact) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe we should only expect to be doing full vacuuming for tables that need to be compacted. A table might reach the full vacuuming threshold and still not need a compaction yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. Should we just vacuum all the tables or just tables in dsList? Tagged you in the above message as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Define all tables. Which additional tables are you proposing to vacuum full and why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean the tables which the query returns. Do we need to pass dsList
? We can just return all the tables that the query returns which cross the threshold right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what else is going to be included now or in the future if we remove this filter. Limiting the tables to those contained in the dsList can easily guarantee that we are not performing an undesirable full vaccuming (costly operation) to another table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense
cc @Sidddddarth
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so a subset of the compacted tables will be vacuumed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, a subset of dsList will be vacuumed. compacted tables and tables that we're going to vacuum are independent at this point
jobsdb/migration.go
Outdated
var rows *sql.Rows | ||
rows, err := jd.dbHandle.QueryContext( | ||
ctx, | ||
`SELECT pg_relation_size(oid) AS size, relname |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
`SELECT pg_relation_size(oid) AS size, relname | |
`SELECT pg_table_size(oid) AS size, relname |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
…er-server into chore.vaccumStatusTables
95bbbe1
to
41ecaae
Compare
jobsdb/migration.go
Outdated
@@ -153,6 +153,53 @@ func (jd *HandleT) doMigrateDS(ctx context.Context) error { | |||
return err | |||
} | |||
|
|||
// based on size of given DSs, gives a list of DSs for us to vacuum status tables | |||
func (jd *HandleT) getVacuumCandidates(ctx context.Context, dsList []dataSetT) (map[string]struct{}, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (jd *HandleT) getVacuumCandidates(ctx context.Context, dsList []dataSetT) (map[string]struct{}, error) { | |
func (jd *HandleT) getVacuumFullCandidates(ctx context.Context, dsList []dataSetT) (map[string]struct{}, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
jobsdb/migration.go
Outdated
func (*HandleT) vacuumFullStatusTable(ctx context.Context, tx *Tx, table string) error { | ||
if _, err := tx.ExecContext(ctx, fmt.Sprintf(`VACUUM FULL %[1]q`, table)); err != nil { | ||
return err | ||
} | ||
return nil | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe we need this as a separate method
…er-server into chore.vaccumStatusTables
jobsdb/migration.go
Outdated
// vacuum full if present in toVacuum | ||
if ok { | ||
if _, err := tx.ExecContext(ctx, fmt.Sprintf(`VACUUM FULL %[1]q`, table)); err != nil { | ||
return err | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Full vacuuming shall be performed on its own loop, following but not coupled with the compaction loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, my bad 🤦
Made the changes
jobsdb/migration_test.go
Outdated
@@ -196,4 +196,10 @@ func TestMigration(t *testing.T) { | |||
).Scan(&count) | |||
require.NoError(t, err) | |||
require.EqualValues(t, 10, count) | |||
|
|||
vacuumFullStatusTableThreshold = 8 * 1024 // 8KB is common value for toast size in postgres |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
vacuumFullStatusTableThreshold = 8 * 1024 // 8KB is common value for toast size in postgres | |
vacuumFullStatusTableThreshold = 8 * bytesize.KB // 8KB is common value for toast size in postgres |
jobsdb/migration.go
Outdated
for table := range toVacuum { | ||
if _, err := tx.ExecContext(ctx, fmt.Sprintf(`VACUUM FULL %[1]q`, table)); err != nil { | ||
return err | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If more than 1 status tables need full vacuuming, it would be preferable for it to happen in ascending dataset order instead of randomly (map iteration order is random).
That being said, getVacuumFullCandidates
could return a slice instead of a map & cleanupStatusTables
could prepare a map from it for its own purposes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
jobsdb/migration.go
Outdated
toVacuumFullMap := map[string]struct{}{} | ||
for _, table := range toVacuumFull { | ||
toVacuumFullMap[table] = struct{}{} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is 1 line we can get rid of!
toVacuumFullMap := map[string]struct{}{} | |
for _, table := range toVacuumFull { | |
toVacuumFullMap[table] = struct{}{} | |
} | |
toVacuumFullMap := lo.Associate(toVacuumFull, func(k string) (string, struct{}) { | |
return k, struct{}{} | |
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Description
Vacuum status table if they cross threshold
Notion Ticket
https://www.notion.so/rudderstacks/Vacuum-and-Vacuum-full-during-jobsdb-compaction-6f78b37caea64aef9161c1732fdd76a2
Security