-
Notifications
You must be signed in to change notification settings - Fork 314
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: vaccum status tables if they cross threshold #3434
Changes from 2 commits
47e3925
860a081
28d0291
2d34a67
16b5c34
8e9c939
41ecaae
600d599
586a016
0334c7f
bf94b54
a3d93c3
9e486df
65f1f3e
fb9a90a
498da6c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -153,6 +153,52 @@ func (jd *HandleT) doMigrateDS(ctx context.Context) error { | |||
return err | ||||
} | ||||
|
||||
// based on size of given DSs, gives a list of DSs for us to vaccum status tables | ||||
func (jd *HandleT) getVacuumCandidates(ctx context.Context, dsList []dataSetT) ([]dataSetT, error) { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We would need to have some tests verifying this method's behaviour |
||||
// get name and it's size of all tables | ||||
var rows *sql.Rows | ||||
rows, err := jd.dbHandle.QueryContext( | ||||
ctx, | ||||
`SELECT pg_total_relation_size(oid) AS size, relname | ||||
FROM pg_class | ||||
where relname = ANY( | ||||
SELECT tablename | ||||
FROM pg_catalog.pg_tables | ||||
WHERE schemaname NOT IN ('pg_catalog','information_schema') | ||||
AND tablename like $1 | ||||
)`, | ||||
jd.tablePrefix+"_job%", | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we are only interested on job status tables, correct? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, made the change in query |
||||
) | ||||
if err != nil { | ||||
return nil, err | ||||
} | ||||
defer func() { _ = rows.Close() }() | ||||
|
||||
tableSizes := map[string]int64{} | ||||
for rows.Next() { | ||||
var ( | ||||
tableSize int64 | ||||
tableName string | ||||
) | ||||
err = rows.Scan(&tableSize, &tableName) | ||||
if err != nil { | ||||
return nil, err | ||||
} | ||||
tableSizes[tableName] = tableSize | ||||
} | ||||
err = rows.Err() | ||||
if err != nil { | ||||
return nil, err | ||||
} | ||||
|
||||
datasets := lo.Filter(dsList, | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can do this comparison above too, eliminating the need for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True, but the query gives me all the tables and their sizes. I'm doing the filtering since we are only interested in a few status tables that are going to be cleaned There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I meant something like this:
Why do we need to pass There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just read your comment again and understand now that you were wanting to vacuum only tables that were marked for compaction. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or if you intend to vacuum only a subset of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can vacuum all. WDYT @atzoum ? |
||||
func(ds dataSetT, idx int) bool { | ||||
tableSize := tableSizes[ds.JobStatusTable] | ||||
return tableSize > vaccumFullStatusTableThreshold | ||||
}) | ||||
return datasets, nil | ||||
} | ||||
|
||||
// based on an estimate of the rows in DSs, gives a list of DSs for us to cleanup status tables | ||||
func (jd *HandleT) getCleanUpCandidates(ctx context.Context, dsList []dataSetT) ([]dataSetT, error) { | ||||
// get analyzer estimates for the number of rows(jobs, statuses) in each DS | ||||
|
@@ -206,6 +252,11 @@ func (jd *HandleT) cleanupStatusTables(ctx context.Context, dsList []dataSetT) e | |||
if err != nil { | ||||
return err | ||||
} | ||||
// vaccum status table if total size exceeds vaccumFullStatusTableThreshold in the toCompact list | ||||
toVaccum, err := jd.getVacuumCandidates(ctx, toCompact) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A table that we know is going to be vacuumed full doesn't need to be vacuumed if it is in the cleanup list |
||||
if err != nil { | ||||
return err | ||||
} | ||||
start := time.Now() | ||||
defer stats.Default.NewTaggedStat( | ||||
"jobsdb_compact_status_tables", | ||||
|
@@ -223,29 +274,53 @@ func (jd *HandleT) cleanupStatusTables(ctx context.Context, dsList []dataSetT) e | |||
return err | ||||
} | ||||
} | ||||
for _, statusTable := range toVaccum { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if a status table belongs to both There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That shouldn't be possible because of the condition here: rudder-server/jobsdb/migration.go Line 319 in 16b5c34
|
||||
if err := jd.vaccumStatusTable( | ||||
ctx, | ||||
tx, | ||||
statusTable.JobStatusTable, | ||||
); err != nil { | ||||
return err | ||||
} | ||||
} | ||||
return nil | ||||
}) | ||||
} | ||||
|
||||
// cleanStatusTable deletes all rows except for the latest status for each job | ||||
func (*HandleT) cleanStatusTable(ctx context.Context, tx *Tx, table string) error { | ||||
_, err := tx.ExecContext( | ||||
deletedJobsQuery := fmt.Sprintf(`WITH deleted AS (DELETE FROM %[1]q | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is necessary, as a delete statement returns the number of rows affected already |
||||
WHERE NOT id = ANY( | ||||
SELECT DISTINCT ON (job_id) id from "%[1]s" ORDER BY job_id ASC, id DESC | ||||
) RETURNING *) SELECT count(*) FROM deleted;`, table) | ||||
|
||||
var numJobStatusDeleted int64 | ||||
var err error | ||||
if err = tx.QueryRowContext( | ||||
ctx, | ||||
fmt.Sprintf(`DELETE FROM %[1]q | ||||
WHERE NOT id = ANY( | ||||
SELECT DISTINCT ON (job_id) id from "%[1]s" ORDER BY job_id ASC, id DESC | ||||
)`, table), | ||||
) | ||||
if err != nil { | ||||
deletedJobsQuery, | ||||
).Scan(&numJobStatusDeleted); err != nil { | ||||
return err | ||||
} | ||||
|
||||
query := fmt.Sprintf(`ANALYZE %q`, table) | ||||
if numJobStatusDeleted > vaccumAnalyzeStatusTableThreshold { | ||||
query = fmt.Sprintf(`VACCUM ANALYZE %q`, table) | ||||
} | ||||
_, err = tx.ExecContext( | ||||
ctx, | ||||
fmt.Sprintf(`ANALYZE %q`, table), | ||||
query, | ||||
) | ||||
return err | ||||
} | ||||
|
||||
func (*HandleT) vaccumStatusTable(ctx context.Context, tx *Tx, table string) error { | ||||
if _, err := tx.ExecContext(ctx, fmt.Sprintf(`VACCUM FULL %[1]q`, table)); err != nil { | ||||
return err | ||||
} | ||||
return nil | ||||
} | ||||
|
||||
// getMigrationList returns the list of datasets to migrate from, | ||||
// the number of unfinished jobs contained in these datasets | ||||
// and the dataset before which the new (migrated) dataset that will hold these jobs needs to be created | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Value should be in bytes so we don't need to include the InMB prefix. You may use
500*bytesize.MB
for the default value.