Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bugfix] Fix temp table deletion causing runaway allocations #3278

Merged
merged 5 commits into from
Sep 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions internal/db/bundb/bundb.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func sqliteConn(ctx context.Context) (*bun.DB, error) {
}

// Build SQLite connection address with prefs.
address = buildSQLiteAddress(address)
address, inMem := buildSQLiteAddress(address)

// Open new DB instance
sqldb, err := sql.Open("sqlite-gts", address)
Expand All @@ -365,7 +365,13 @@ func sqliteConn(ctx context.Context) (*bun.DB, error) {
// - https://www.alexedwards.net/blog/configuring-sqldb
sqldb.SetMaxOpenConns(maxOpenConns()) // x number of conns per CPU
sqldb.SetMaxIdleConns(1) // only keep max 1 idle connection around
sqldb.SetConnMaxLifetime(0) // don't kill connections due to age
if inMem {
log.Warn(nil, "using sqlite in-memory mode; all data will be deleted when gts shuts down; this mode should only be used for debugging or running tests")
// Don't close aged connections as this may wipe the DB.
sqldb.SetConnMaxLifetime(0)
} else {
sqldb.SetConnMaxLifetime(5 * time.Minute)
}

db := bun.NewDB(sqldb, sqlitedialect.New())

Expand Down Expand Up @@ -485,7 +491,8 @@ func deriveBunDBPGOptions() (*pgx.ConnConfig, error) {

// buildSQLiteAddress will build an SQLite address string from given config input,
// appending user defined SQLite connection preferences (e.g. cache_size, journal_mode etc).
func buildSQLiteAddress(addr string) string {
// The returned bool indicates whether this is an in-memory address or not.
func buildSQLiteAddress(addr string) (string, bool) {
// Notes on SQLite preferences:
//
// - SQLite by itself supports setting a subset of its configuration options
Expand Down Expand Up @@ -543,11 +550,11 @@ func buildSQLiteAddress(addr string) string {
// see https://pkg.go.dev/modernc.org/sqlite#Driver.Open
prefs.Add("_txlock", "immediate")

inMem := false
if addr == ":memory:" {
log.Warn(nil, "using sqlite in-memory mode; all data will be deleted when gts shuts down; this mode should only be used for debugging or running tests")

// Use random name for in-memory instead of ':memory:', so
// multiple in-mem databases can be created without conflict.
inMem = true
addr = "/" + uuid.NewString()
prefs.Add("vfs", "memdb")
}
Expand Down Expand Up @@ -581,5 +588,5 @@ func buildSQLiteAddress(addr string) string {
b.WriteString(addr)
b.WriteString("?")
b.WriteString(prefs.Encode())
return b.String()
return b.String(), inMem
}
220 changes: 142 additions & 78 deletions internal/db/bundb/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"errors"
"slices"
"time"

"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
Expand Down Expand Up @@ -334,39 +335,81 @@ func (c *conversationDB) DeleteConversationsByOwnerAccountID(ctx context.Context
}

func (c *conversationDB) DeleteStatusFromConversations(ctx context.Context, statusID string) error {
// SQL returning the current time.
var nowSQL string
switch c.db.Dialect().Name() {
case dialect.SQLite:
nowSQL = "DATE('now')"
case dialect.PG:
nowSQL = "NOW()"
default:
log.Panicf(nil, "db conn %s was neither pg nor sqlite", c.db)
}
var (
updatedConversationIDs = []string{}
deletedConversationIDs = []string{}

updatedConversationIDs := []string{}
deletedConversationIDs := []string{}
// Method of creating + dropping temp
// tables differs depending on driver.
tmpQ string
)

if c.db.Dialect().Name() == dialect.PG {
// On Postgres, we can instruct PG to clean
// up temp tables on commit, so we can just
// use any connection from the pool without
// caring what happens to it when we're done.
tmpQ = "CREATE TEMPORARY TABLE ? ON COMMIT DROP AS (?)"
} else {
// On SQLite, we can't instruct SQLite to drop
// temp tables on commit, and we can't manually
// drop temp tables without triggering a bug.
// So we leave the temp tables alone, in the
// knowledge they'll be cleaned up when this
// connection gets recycled (in max 5min).
tmpQ = "CREATE TEMPORARY TABLE ? AS ?"
}

if err := c.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
// Delete this status from conversation-to-status links.
if _, err := tx.NewDelete().
Model((*gtsmodel.ConversationToStatus)(nil)).
// First delete this status from
// conversation-to-status links.
_, err := tx.
NewDelete().
Table("conversation_to_statuses").
Where("? = ?", bun.Ident("status_id"), statusID).
Exec(ctx); // nocollapse
err != nil {
return gtserror.Newf("error deleting conversation-to-status links while deleting status %s: %w", statusID, err)
Exec(ctx)
if err != nil {
return gtserror.Newf(
"error deleting conversation-to-status links while deleting status %s: %w",
statusID, err,
)
}

// Note: Bun doesn't currently support CREATE TABLE … AS SELECT … so we need to use raw queries here.

// Create a temporary table with all statuses other than the deleted status
// in each conversation for which the deleted status is the last status
// (if there are such statuses).
conversationStatusesTempTable := "conversation_statuses_" + id.NewULID()
if _, err := tx.NewRaw(
"CREATE TEMPORARY TABLE ? AS ?",
bun.Ident(conversationStatusesTempTable),
// Note: Bun doesn't currently support `CREATE TABLE … AS SELECT …`
// so we need to use raw queries to create temporary tables.

// Create a temporary table containing all statuses other than
// the deleted status, in each conversation for which the deleted
// status is the last status, if there are such statuses.
//
// This will produce a query like:
//
// CREATE TEMPORARY TABLE "conversation_statuses_01J78T2AR0YCZ4YR12WSCZ608S"
// AS (
// SELECT
// "conversations"."id" AS "conversation_id",
// "conversation_to_statuses"."status_id" AS "id",
// "statuses"."created_at"
// FROM
// "conversations"
// LEFT JOIN "conversation_to_statuses" ON (
// "conversations"."id" = "conversation_to_statuses"."conversation_id"
// )
// AND (
// "conversation_to_statuses"."status_id" != '01J78T2BQ4TN5S2XSC9VNQ5GBS'
// )
// LEFT JOIN "statuses" ON (
// "conversation_to_statuses"."status_id" = "statuses"."id"
// )
// WHERE
// (
// "conversations"."last_status_id" = '01J78T2BQ4TN5S2XSC9VNQ5GBS'
// )
// )
conversationStatusesTmp := "conversation_statuses_" + id.NewULID()
conversationStatusesTmpQ := tx.NewRaw(
tmpQ,
bun.Ident(conversationStatusesTmp),
tx.NewSelect().
ColumnExpr(
"? AS ?",
Expand Down Expand Up @@ -402,31 +445,54 @@ func (c *conversationDB) DeleteStatusFromConversations(ctx context.Context, stat
bun.Ident("conversations.last_status_id"),
statusID,
),
).
Exec(ctx); // nocollapse
err != nil {
return gtserror.Newf("error creating conversationStatusesTempTable while deleting status %s: %w", statusID, err)
)
_, err = conversationStatusesTmpQ.Exec(ctx)
if err != nil {
return gtserror.Newf(
"error creating temp table %s while deleting status %s: %w",
conversationStatusesTmp, statusID, err,
)
}

// Create a temporary table with the most recently created status in each conversation
// for which the deleted status is the last status (if there is such a status).
latestConversationStatusesTempTable := "latest_conversation_statuses_" + id.NewULID()
if _, err := tx.NewRaw(
"CREATE TEMPORARY TABLE ? AS ?",
bun.Ident(latestConversationStatusesTempTable),
// Create a temporary table with the most recently created
// status in each conversation for which the deleted status
// is the last status, if there is such a status.
//
// This will produce a query like:
//
// CREATE TEMPORARY TABLE "latest_conversation_statuses_01J78T2AR0E46SJSH6C7NRZ7MR"
// AS (
// SELECT
// "conversation_statuses"."conversation_id",
// "conversation_statuses"."id"
// FROM
// "conversation_statuses_01J78T2AR0YCZ4YR12WSCZ608S" AS "conversation_statuses"
// LEFT JOIN "conversation_statuses_01J78T2AR0YCZ4YR12WSCZ608S" AS "later_statuses" ON (
// "conversation_statuses"."conversation_id" = "later_statuses"."conversation_id"
// )
// AND (
// "later_statuses"."created_at" > "conversation_statuses"."created_at"
// )
// WHERE
// ("later_statuses"."id" IS NULL)
// )
latestConversationStatusesTmp := "latest_conversation_statuses_" + id.NewULID()
latestConversationStatusesTmpQ := tx.NewRaw(
tmpQ,
bun.Ident(latestConversationStatusesTmp),
tx.NewSelect().
Column(
"conversation_statuses.conversation_id",
"conversation_statuses.id",
).
TableExpr(
"? AS ?",
bun.Ident(conversationStatusesTempTable),
bun.Ident(conversationStatusesTmp),
bun.Ident("conversation_statuses"),
).
Join(
"LEFT JOIN ? AS ?",
bun.Ident(conversationStatusesTempTable),
bun.Ident(conversationStatusesTmp),
bun.Ident("later_statuses"),
).
JoinOn(
Expand All @@ -440,68 +506,66 @@ func (c *conversationDB) DeleteStatusFromConversations(ctx context.Context, stat
bun.Ident("conversation_statuses.created_at"),
).
Where("? IS NULL", bun.Ident("later_statuses.id")),
).
Exec(ctx); // nocollapse
err != nil {
return gtserror.Newf("error creating latestConversationStatusesTempTable while deleting status %s: %w", statusID, err)
)
_, err = latestConversationStatusesTmpQ.Exec(ctx)
if err != nil {
return gtserror.Newf(
"error creating temp table %s while deleting status %s: %w",
conversationStatusesTmp, statusID, err,
)
}

// For every conversation where the given status was the last one,
// reset its last status to the most recently created in the conversation other than that one,
// if there is such a status.
// reset its last status to the most recently created in the
// conversation other than that one, if there is such a status.
// Return conversation IDs for invalidation.
if err := tx.NewUpdate().
Model((*gtsmodel.Conversation)(nil)).
SetColumn("last_status_id", "?", bun.Ident("latest_conversation_statuses.id")).
SetColumn("updated_at", "?", bun.Safe(nowSQL)).
TableExpr("? AS ?", bun.Ident(latestConversationStatusesTempTable), bun.Ident("latest_conversation_statuses")).
Where("?TableAlias.? = ?", bun.Ident("id"), bun.Ident("latest_conversation_statuses.conversation_id")).
updateQ := tx.NewUpdate().
Table("conversations").
TableExpr("? AS ?", bun.Ident(latestConversationStatusesTmp), bun.Ident("latest_conversation_statuses")).
Set("? = ?", bun.Ident("last_status_id"), bun.Ident("latest_conversation_statuses.id")).
Set("? = ?", bun.Ident("updated_at"), time.Now()).
Where("? = ?", bun.Ident("conversations.id"), bun.Ident("latest_conversation_statuses.conversation_id")).
Where("? IS NOT NULL", bun.Ident("latest_conversation_statuses.id")).
Returning("?TableName.?", bun.Ident("id")).
Scan(ctx, &updatedConversationIDs); // nocollapse
err != nil {
return gtserror.Newf("error rolling back last status for conversation while deleting status %s: %w", statusID, err)
Returning("?", bun.Ident("conversations.id"))
_, err = updateQ.Exec(ctx, &updatedConversationIDs)
if err != nil {
return gtserror.Newf(
"error rolling back last status for conversation while deleting status %s: %w",
statusID, err,
)
}

// If there is no such status, delete the conversation.
// Return conversation IDs for invalidation.
if err := tx.NewDelete().
Model((*gtsmodel.Conversation)(nil)).
// If there is no such status,
// just delete the conversation.
// Return IDs for invalidation.
_, err = tx.
NewDelete().
Table("conversations").
Where(
"? IN (?)",
bun.Ident("id"),
tx.NewSelect().
Table(latestConversationStatusesTempTable).
Table(latestConversationStatusesTmp).
Column("conversation_id").
Where("? IS NULL", bun.Ident("id")),
).
Returning("?", bun.Ident("id")).
Scan(ctx, &deletedConversationIDs); // nocollapse
err != nil {
return gtserror.Newf("error deleting conversation while deleting status %s: %w", statusID, err)
}

// Clean up.
for _, tempTable := range []string{
conversationStatusesTempTable,
latestConversationStatusesTempTable,
} {
if _, err := tx.NewDropTable().Table(tempTable).Exec(ctx); err != nil {
return gtserror.Newf(
"error dropping temporary table %s after deleting status %s: %w",
tempTable,
statusID,
err,
)
}
Exec(ctx, &deletedConversationIDs)
if err != nil {
return gtserror.Newf(
"error deleting conversation while deleting status %s: %w",
statusID, err,
)
}

return nil
}); err != nil {
return err
}

// Invalidate cache entries.
updatedConversationIDs = append(updatedConversationIDs, deletedConversationIDs...)
updatedConversationIDs = util.Deduplicate(updatedConversationIDs)
c.state.Caches.DB.Conversation.InvalidateIDs("ID", updatedConversationIDs)

return nil
Expand Down