From 4149032f6b0540f3db94397295b0914941c819ec Mon Sep 17 00:00:00 2001 From: tobi Date: Sun, 8 Sep 2024 14:49:35 +0200 Subject: [PATCH 1/5] [bugfix] Fix temp table deletion causing runaway allocations --- internal/db/bundb/conversation.go | 415 +++++++++++++++++++----------- 1 file changed, 264 insertions(+), 151 deletions(-) diff --git a/internal/db/bundb/conversation.go b/internal/db/bundb/conversation.go index d8245dc581..3b5f3fdd3f 100644 --- a/internal/db/bundb/conversation.go +++ b/internal/db/bundb/conversation.go @@ -21,6 +21,7 @@ import ( "context" "errors" "slices" + "time" "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/gtscontext" @@ -334,174 +335,286 @@ func (c *conversationDB) DeleteConversationsByOwnerAccountID(ctx context.Context } func (c *conversationDB) DeleteStatusFromConversations(ctx context.Context, statusID string) error { - // SQL returning the current time. - var nowSQL string - switch c.db.Dialect().Name() { - case dialect.SQLite: - nowSQL = "DATE('now')" - case dialect.PG: - nowSQL = "NOW()" - default: - log.Panicf(nil, "db conn %s was neither pg nor sqlite", c.db) - } - - updatedConversationIDs := []string{} - deletedConversationIDs := []string{} + var ( + updatedConversationIDs = []string{} + deletedConversationIDs = []string{} + conversationStatusesTmp = "conversation_statuses_" + id.NewULID() + latestConversationStatusesTmp = "latest_conversation_statuses_" + id.NewULID() + + // Method of creating + dropping temp + // tables differs depending on driver. + tmpQ string + tx bun.Tx + err error + ) - if err := c.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - // Delete this status from conversation-to-status links. - if _, err := tx.NewDelete(). - Model((*gtsmodel.ConversationToStatus)(nil)). - Where("? = ?", bun.Ident("status_id"), statusID). - Exec(ctx); // nocollapse - err != nil { - return gtserror.Newf("error deleting conversation-to-status links while deleting status %s: %w", statusID, err) + if c.db.Dialect().Name() == dialect.PG { + // On Postgres, we can instruct PG to clean + // up temp tables on commit, so we can just + // use any connection from the pool without + // caring what happens to it when we're done. + tmpQ = "CREATE TEMPORARY TABLE ? ON COMMIT DROP AS (?)" + + // Instantiate tx from the pool. + tx, err = c.db.BeginTx(ctx, nil) + } else { + // On SQLite, we can't instruct SQLite to drop + // temp tables on commit, and we can't manually + // drop temp tables without triggering a bug, + // so work around this by obtaining a new conn + // from the pool, and closing it when finished + // (thereby also cleaning up any temp tables). + tmpQ = "CREATE TEMPORARY TABLE ? AS (?)" + + conn, cErr := c.db.Conn(ctx) + if cErr != nil { + return gtserror.Newf( + "error getting conn while deleting status %s: %w", + statusID, cErr, + ) } + defer func() { + if err := conn.Close(); err != nil { + log.Errorf(ctx, "error closing conn: %v", err) + } + }() - // Note: Bun doesn't currently support CREATE TABLE … AS SELECT … so we need to use raw queries here. + // Instantiate tx from the new conn. + tx, err = conn.BeginTx(ctx, nil) + } - // Create a temporary table with all statuses other than the deleted status - // in each conversation for which the deleted status is the last status - // (if there are such statuses). - conversationStatusesTempTable := "conversation_statuses_" + id.NewULID() - if _, err := tx.NewRaw( - "CREATE TEMPORARY TABLE ? AS ?", - bun.Ident(conversationStatusesTempTable), - tx.NewSelect(). - ColumnExpr( - "? AS ?", - bun.Ident("conversations.id"), - bun.Ident("conversation_id"), - ). - ColumnExpr( - "? AS ?", - bun.Ident("conversation_to_statuses.status_id"), - bun.Ident("id"), - ). - Column("statuses.created_at"). - Table("conversations"). - Join("LEFT JOIN ?", bun.Ident("conversation_to_statuses")). - JoinOn( - "? = ?", - bun.Ident("conversations.id"), - bun.Ident("conversation_to_statuses.conversation_id"), - ). - JoinOn( - "? != ?", - bun.Ident("conversation_to_statuses.status_id"), - statusID, - ). - Join("LEFT JOIN ?", bun.Ident("statuses")). - JoinOn( - "? = ?", - bun.Ident("conversation_to_statuses.status_id"), - bun.Ident("statuses.id"), - ). - Where( - "? = ?", - bun.Ident("conversations.last_status_id"), - statusID, - ), - ). - Exec(ctx); // nocollapse - err != nil { - return gtserror.Newf("error creating conversationStatusesTempTable while deleting status %s: %w", statusID, err) - } + if err != nil { + return gtserror.Newf( + "error starting transaction while deleting status %s: %w", + statusID, err, + ) + } - // Create a temporary table with the most recently created status in each conversation - // for which the deleted status is the last status (if there is such a status). - latestConversationStatusesTempTable := "latest_conversation_statuses_" + id.NewULID() - if _, err := tx.NewRaw( - "CREATE TEMPORARY TABLE ? AS ?", - bun.Ident(latestConversationStatusesTempTable), - tx.NewSelect(). - Column( - "conversation_statuses.conversation_id", - "conversation_statuses.id", - ). - TableExpr( - "? AS ?", - bun.Ident(conversationStatusesTempTable), - bun.Ident("conversation_statuses"), - ). - Join( - "LEFT JOIN ? AS ?", - bun.Ident(conversationStatusesTempTable), - bun.Ident("later_statuses"), - ). - JoinOn( - "? = ?", - bun.Ident("conversation_statuses.conversation_id"), - bun.Ident("later_statuses.conversation_id"), - ). - JoinOn( - "? > ?", - bun.Ident("later_statuses.created_at"), - bun.Ident("conversation_statuses.created_at"), - ). - Where("? IS NULL", bun.Ident("later_statuses.id")), - ). - Exec(ctx); // nocollapse - err != nil { - return gtserror.Newf("error creating latestConversationStatusesTempTable while deleting status %s: %w", statusID, err) - } + // From this point on, we *must* trigger + // tx.Rollback before returning on error. - // For every conversation where the given status was the last one, - // reset its last status to the most recently created in the conversation other than that one, - // if there is such a status. - // Return conversation IDs for invalidation. - if err := tx.NewUpdate(). - Model((*gtsmodel.Conversation)(nil)). - SetColumn("last_status_id", "?", bun.Ident("latest_conversation_statuses.id")). - SetColumn("updated_at", "?", bun.Safe(nowSQL)). - TableExpr("? AS ?", bun.Ident(latestConversationStatusesTempTable), bun.Ident("latest_conversation_statuses")). - Where("?TableAlias.? = ?", bun.Ident("id"), bun.Ident("latest_conversation_statuses.conversation_id")). - Where("? IS NOT NULL", bun.Ident("latest_conversation_statuses.id")). - Returning("?TableName.?", bun.Ident("id")). - Scan(ctx, &updatedConversationIDs); // nocollapse - err != nil { - return gtserror.Newf("error rolling back last status for conversation while deleting status %s: %w", statusID, err) + // First delete this status from + // conversation-to-status links. + _, err = tx. + NewDelete(). + Table("conversation_to_statuses"). + Where("? = ?", bun.Ident("status_id"), statusID). + Exec(ctx) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Errorf(ctx, "error rolling back: %v", err) } + return gtserror.Newf( + "error deleting conversation-to-status links while deleting status %s: %w", + statusID, err, + ) + } - // If there is no such status, delete the conversation. - // Return conversation IDs for invalidation. - if err := tx.NewDelete(). - Model((*gtsmodel.Conversation)(nil)). - Where( - "? IN (?)", + // Note: Bun doesn't currently support `CREATE TABLE … AS SELECT …` + // so we need to use raw queries to create temporary tables. + + // Create a temporary table containing all statuses other than + // the deleted status, in each conversation for which the deleted + // status is the last status, if there are such statuses. + // + // This will produce a query like: + // + // CREATE TEMPORARY TABLE "conversation_statuses_01J78T2AR0YCZ4YR12WSCZ608S" + // AS ( + // SELECT + // "conversations"."id" AS "conversation_id", + // "conversation_to_statuses"."status_id" AS "id", + // "statuses"."created_at" + // FROM + // "conversations" + // LEFT JOIN "conversation_to_statuses" ON ( + // "conversations"."id" = "conversation_to_statuses"."conversation_id" + // ) + // AND ( + // "conversation_to_statuses"."status_id" != '01J78T2BQ4TN5S2XSC9VNQ5GBS' + // ) + // LEFT JOIN "statuses" ON ( + // "conversation_to_statuses"."status_id" = "statuses"."id" + // ) + // WHERE + // ( + // "conversations"."last_status_id" = '01J78T2BQ4TN5S2XSC9VNQ5GBS' + // ) + // ) + conversationStatusesTmpQ := tx.NewRaw( + tmpQ, + bun.Ident(conversationStatusesTmp), + tx.NewSelect(). + ColumnExpr( + "? AS ?", + bun.Ident("conversations.id"), + bun.Ident("conversation_id"), + ). + ColumnExpr( + "? AS ?", + bun.Ident("conversation_to_statuses.status_id"), bun.Ident("id"), - tx.NewSelect(). - Table(latestConversationStatusesTempTable). - Column("conversation_id"). - Where("? IS NULL", bun.Ident("id")), ). - Returning("?", bun.Ident("id")). - Scan(ctx, &deletedConversationIDs); // nocollapse - err != nil { - return gtserror.Newf("error deleting conversation while deleting status %s: %w", statusID, err) + Column("statuses.created_at"). + Table("conversations"). + Join("LEFT JOIN ?", bun.Ident("conversation_to_statuses")). + JoinOn( + "? = ?", + bun.Ident("conversations.id"), + bun.Ident("conversation_to_statuses.conversation_id"), + ). + JoinOn( + "? != ?", + bun.Ident("conversation_to_statuses.status_id"), + statusID, + ). + Join("LEFT JOIN ?", bun.Ident("statuses")). + JoinOn( + "? = ?", + bun.Ident("conversation_to_statuses.status_id"), + bun.Ident("statuses.id"), + ). + Where( + "? = ?", + bun.Ident("conversations.last_status_id"), + statusID, + ), + ) + _, err = conversationStatusesTmpQ.Exec(ctx) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Errorf(ctx, "error rolling back: %v", err) } + return gtserror.Newf( + "error creating temp table %s while deleting status %s: %w", + conversationStatusesTmp, statusID, err, + ) + } - // Clean up. - for _, tempTable := range []string{ - conversationStatusesTempTable, - latestConversationStatusesTempTable, - } { - if _, err := tx.NewDropTable().Table(tempTable).Exec(ctx); err != nil { - return gtserror.Newf( - "error dropping temporary table %s after deleting status %s: %w", - tempTable, - statusID, - err, - ) - } + // Create a temporary table with the most recently created + // status in each conversation for which the deleted status + // is the last status, if there is such a status. + // + // This will produce a query like: + // + // CREATE TEMPORARY TABLE "latest_conversation_statuses_01J78T2AR0E46SJSH6C7NRZ7MR" + // AS ( + // SELECT + // "conversation_statuses"."conversation_id", + // "conversation_statuses"."id" + // FROM + // "conversation_statuses_01J78T2AR0YCZ4YR12WSCZ608S" AS "conversation_statuses" + // LEFT JOIN "conversation_statuses_01J78T2AR0YCZ4YR12WSCZ608S" AS "later_statuses" ON ( + // "conversation_statuses"."conversation_id" = "later_statuses"."conversation_id" + // ) + // AND ( + // "later_statuses"."created_at" > "conversation_statuses"."created_at" + // ) + // WHERE + // ("later_statuses"."id" IS NULL) + // ) + + latestConversationStatusesTmpQ := tx.NewRaw( + tmpQ, + bun.Ident(latestConversationStatusesTmp), + tx.NewSelect(). + Column( + "conversation_statuses.conversation_id", + "conversation_statuses.id", + ). + TableExpr( + "? AS ?", + bun.Ident(conversationStatusesTmp), + bun.Ident("conversation_statuses"), + ). + Join( + "LEFT JOIN ? AS ?", + bun.Ident(conversationStatusesTmp), + bun.Ident("later_statuses"), + ). + JoinOn( + "? = ?", + bun.Ident("conversation_statuses.conversation_id"), + bun.Ident("later_statuses.conversation_id"), + ). + JoinOn( + "? > ?", + bun.Ident("later_statuses.created_at"), + bun.Ident("conversation_statuses.created_at"), + ). + Where("? IS NULL", bun.Ident("later_statuses.id")), + ) + _, err = latestConversationStatusesTmpQ.Exec(ctx) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Errorf(ctx, "error rolling back: %v", err) } + return gtserror.Newf( + "error creating temp table %s while deleting status %s: %w", + conversationStatusesTmp, statusID, err, + ) + } - return nil - }); err != nil { - return err + // For every conversation where the given status was the last one, + // reset its last status to the most recently created in the + // conversation other than that one, if there is such a status. + // Return conversation IDs for invalidation. + updateQ := tx.NewUpdate(). + TableExpr("? AS ?", bun.Ident("conversations"), bun.Ident("conversation")). + TableExpr("? AS ?", bun.Ident(latestConversationStatusesTmp), bun.Ident("latest_conversation_statuses")). + Set("last_status_id = ?", bun.Ident("latest_conversation_statuses.id")). + Set("updated_at = ?", time.Now()). + Where("? = ?", bun.Ident("conversation.id"), bun.Ident("latest_conversation_statuses.conversation_id")). + Where("? IS NOT NULL", bun.Ident("latest_conversation_statuses.id")). + Returning("?", bun.Ident("conversation.id")) + _, err = updateQ.Exec(ctx, &updatedConversationIDs) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Errorf(ctx, "error rolling back: %v", err) + } + return gtserror.Newf( + "error rolling back last status for conversation while deleting status %s: %w", + statusID, err, + ) + } + + // If there is no such status, + // just delete the conversation. + // Return IDs for invalidation. + _, err = tx. + NewDelete(). + Table("conversations"). + Where( + "? IN (?)", + bun.Ident("id"), + tx.NewSelect(). + Table(latestConversationStatusesTmp). + Column("conversation_id"). + Where("? IS NULL", bun.Ident("id")), + ). + Returning("?", bun.Ident("id")). + Exec(ctx, &deletedConversationIDs) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Errorf(ctx, "error rolling back: %v", err) + } + return gtserror.Newf( + "error deleting conversation while deleting status %s: %w", + statusID, err, + ) + } + + // We're done, commit everything. + if err := tx.Commit(); err != nil { + return gtserror.Newf( + "error committing transaction while deleting status %s: %w", + statusID, err, + ) } + // Invalidate cache entries. updatedConversationIDs = append(updatedConversationIDs, deletedConversationIDs...) + updatedConversationIDs = util.Deduplicate(updatedConversationIDs) c.state.Caches.DB.Conversation.InvalidateIDs("ID", updatedConversationIDs) return nil From d6bfdefd9f8e99d536d9457ea63a7b06ddc08286 Mon Sep 17 00:00:00 2001 From: tobi Date: Sun, 8 Sep 2024 14:53:57 +0200 Subject: [PATCH 2/5] move some vars around --- internal/db/bundb/conversation.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/internal/db/bundb/conversation.go b/internal/db/bundb/conversation.go index 3b5f3fdd3f..c849756c2a 100644 --- a/internal/db/bundb/conversation.go +++ b/internal/db/bundb/conversation.go @@ -336,10 +336,8 @@ func (c *conversationDB) DeleteConversationsByOwnerAccountID(ctx context.Context func (c *conversationDB) DeleteStatusFromConversations(ctx context.Context, statusID string) error { var ( - updatedConversationIDs = []string{} - deletedConversationIDs = []string{} - conversationStatusesTmp = "conversation_statuses_" + id.NewULID() - latestConversationStatusesTmp = "latest_conversation_statuses_" + id.NewULID() + updatedConversationIDs = []string{} + deletedConversationIDs = []string{} // Method of creating + dropping temp // tables differs depending on driver. @@ -441,6 +439,7 @@ func (c *conversationDB) DeleteStatusFromConversations(ctx context.Context, stat // "conversations"."last_status_id" = '01J78T2BQ4TN5S2XSC9VNQ5GBS' // ) // ) + conversationStatusesTmp := "conversation_statuses_" + id.NewULID() conversationStatusesTmpQ := tx.NewRaw( tmpQ, bun.Ident(conversationStatusesTmp), @@ -513,7 +512,7 @@ func (c *conversationDB) DeleteStatusFromConversations(ctx context.Context, stat // WHERE // ("later_statuses"."id" IS NULL) // ) - + latestConversationStatusesTmp := "latest_conversation_statuses_" + id.NewULID() latestConversationStatusesTmpQ := tx.NewRaw( tmpQ, bun.Ident(latestConversationStatusesTmp), From 1347690e5a542eef7fa600ff21bafb9f98a2af92 Mon Sep 17 00:00:00 2001 From: tobi Date: Sun, 8 Sep 2024 15:28:04 +0200 Subject: [PATCH 3/5] small fixes --- internal/db/bundb/conversation.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/db/bundb/conversation.go b/internal/db/bundb/conversation.go index c849756c2a..a49d7d940e 100644 --- a/internal/db/bundb/conversation.go +++ b/internal/db/bundb/conversation.go @@ -362,7 +362,7 @@ func (c *conversationDB) DeleteStatusFromConversations(ctx context.Context, stat // so work around this by obtaining a new conn // from the pool, and closing it when finished // (thereby also cleaning up any temp tables). - tmpQ = "CREATE TEMPORARY TABLE ? AS (?)" + tmpQ = "CREATE TEMPORARY TABLE ? AS ?" conn, cErr := c.db.Conn(ctx) if cErr != nil { @@ -559,13 +559,13 @@ func (c *conversationDB) DeleteStatusFromConversations(ctx context.Context, stat // conversation other than that one, if there is such a status. // Return conversation IDs for invalidation. updateQ := tx.NewUpdate(). - TableExpr("? AS ?", bun.Ident("conversations"), bun.Ident("conversation")). + Table("conversations"). TableExpr("? AS ?", bun.Ident(latestConversationStatusesTmp), bun.Ident("latest_conversation_statuses")). - Set("last_status_id = ?", bun.Ident("latest_conversation_statuses.id")). - Set("updated_at = ?", time.Now()). - Where("? = ?", bun.Ident("conversation.id"), bun.Ident("latest_conversation_statuses.conversation_id")). + Set("? = ?", bun.Ident("last_status_id"), bun.Ident("latest_conversation_statuses.id")). + Set("? = ?", bun.Ident("updated_at"), time.Now()). + Where("? = ?", bun.Ident("conversations.id"), bun.Ident("latest_conversation_statuses.conversation_id")). Where("? IS NOT NULL", bun.Ident("latest_conversation_statuses.id")). - Returning("?", bun.Ident("conversation.id")) + Returning("?", bun.Ident("conversations.id")) _, err = updateQ.Exec(ctx, &updatedConversationIDs) if err != nil { if err := tx.Rollback(); err != nil { From e26257085b4bf149781ae0e3130b9b19ee630bbc Mon Sep 17 00:00:00 2001 From: tobi Date: Sun, 8 Sep 2024 16:00:39 +0200 Subject: [PATCH 4/5] rely on conn max age to recycle temp tables --- internal/db/bundb/bundb.go | 19 +- internal/db/bundb/conversation.go | 432 +++++++++++++----------------- 2 files changed, 204 insertions(+), 247 deletions(-) diff --git a/internal/db/bundb/bundb.go b/internal/db/bundb/bundb.go index 0e58cb7fb2..6ecd43cbcc 100644 --- a/internal/db/bundb/bundb.go +++ b/internal/db/bundb/bundb.go @@ -352,7 +352,7 @@ func sqliteConn(ctx context.Context) (*bun.DB, error) { } // Build SQLite connection address with prefs. - address = buildSQLiteAddress(address) + address, inMem := buildSQLiteAddress(address) // Open new DB instance sqldb, err := sql.Open("sqlite-gts", address) @@ -365,7 +365,13 @@ func sqliteConn(ctx context.Context) (*bun.DB, error) { // - https://www.alexedwards.net/blog/configuring-sqldb sqldb.SetMaxOpenConns(maxOpenConns()) // x number of conns per CPU sqldb.SetMaxIdleConns(1) // only keep max 1 idle connection around - sqldb.SetConnMaxLifetime(0) // don't kill connections due to age + if inMem { + log.Warn(nil, "using sqlite in-memory mode; all data will be deleted when gts shuts down; this mode should only be used for debugging or running tests") + // Don't close aged connections as this may wipe the DB. + sqldb.SetConnMaxLifetime(0) + } else { + sqldb.SetConnMaxLifetime(5 * time.Minute) + } db := bun.NewDB(sqldb, sqlitedialect.New()) @@ -485,7 +491,8 @@ func deriveBunDBPGOptions() (*pgx.ConnConfig, error) { // buildSQLiteAddress will build an SQLite address string from given config input, // appending user defined SQLite connection preferences (e.g. cache_size, journal_mode etc). -func buildSQLiteAddress(addr string) string { +// The returned bool indicates whether this is an in-memory address or not. +func buildSQLiteAddress(addr string) (string, bool) { // Notes on SQLite preferences: // // - SQLite by itself supports setting a subset of its configuration options @@ -543,11 +550,11 @@ func buildSQLiteAddress(addr string) string { // see https://pkg.go.dev/modernc.org/sqlite#Driver.Open prefs.Add("_txlock", "immediate") + inMem := false if addr == ":memory:" { - log.Warn(nil, "using sqlite in-memory mode; all data will be deleted when gts shuts down; this mode should only be used for debugging or running tests") - // Use random name for in-memory instead of ':memory:', so // multiple in-mem databases can be created without conflict. + inMem = true addr = "/" + uuid.NewString() prefs.Add("vfs", "memdb") } @@ -581,5 +588,5 @@ func buildSQLiteAddress(addr string) string { b.WriteString(addr) b.WriteString("?") b.WriteString(prefs.Encode()) - return b.String() + return b.String(), inMem } diff --git a/internal/db/bundb/conversation.go b/internal/db/bundb/conversation.go index a49d7d940e..a196713700 100644 --- a/internal/db/bundb/conversation.go +++ b/internal/db/bundb/conversation.go @@ -342,8 +342,6 @@ func (c *conversationDB) DeleteStatusFromConversations(ctx context.Context, stat // Method of creating + dropping temp // tables differs depending on driver. tmpQ string - tx bun.Tx - err error ) if c.db.Dialect().Name() == dialect.PG { @@ -352,264 +350,216 @@ func (c *conversationDB) DeleteStatusFromConversations(ctx context.Context, stat // use any connection from the pool without // caring what happens to it when we're done. tmpQ = "CREATE TEMPORARY TABLE ? ON COMMIT DROP AS (?)" - - // Instantiate tx from the pool. - tx, err = c.db.BeginTx(ctx, nil) } else { // On SQLite, we can't instruct SQLite to drop // temp tables on commit, and we can't manually - // drop temp tables without triggering a bug, - // so work around this by obtaining a new conn - // from the pool, and closing it when finished - // (thereby also cleaning up any temp tables). + // drop temp tables without triggering a bug. + // So we leave the temp tables alone, in the + // knowledge they'll be cleaned up when this + // connection gets recycled (in max 5min). tmpQ = "CREATE TEMPORARY TABLE ? AS ?" + } - conn, cErr := c.db.Conn(ctx) - if cErr != nil { + c.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { + // First delete this status from + // conversation-to-status links. + _, err := tx. + NewDelete(). + Table("conversation_to_statuses"). + Where("? = ?", bun.Ident("status_id"), statusID). + Exec(ctx) + if err != nil { return gtserror.Newf( - "error getting conn while deleting status %s: %w", - statusID, cErr, + "error deleting conversation-to-status links while deleting status %s: %w", + statusID, err, ) } - defer func() { - if err := conn.Close(); err != nil { - log.Errorf(ctx, "error closing conn: %v", err) - } - }() - - // Instantiate tx from the new conn. - tx, err = conn.BeginTx(ctx, nil) - } - if err != nil { - return gtserror.Newf( - "error starting transaction while deleting status %s: %w", - statusID, err, + // Note: Bun doesn't currently support `CREATE TABLE … AS SELECT …` + // so we need to use raw queries to create temporary tables. + + // Create a temporary table containing all statuses other than + // the deleted status, in each conversation for which the deleted + // status is the last status, if there are such statuses. + // + // This will produce a query like: + // + // CREATE TEMPORARY TABLE "conversation_statuses_01J78T2AR0YCZ4YR12WSCZ608S" + // AS ( + // SELECT + // "conversations"."id" AS "conversation_id", + // "conversation_to_statuses"."status_id" AS "id", + // "statuses"."created_at" + // FROM + // "conversations" + // LEFT JOIN "conversation_to_statuses" ON ( + // "conversations"."id" = "conversation_to_statuses"."conversation_id" + // ) + // AND ( + // "conversation_to_statuses"."status_id" != '01J78T2BQ4TN5S2XSC9VNQ5GBS' + // ) + // LEFT JOIN "statuses" ON ( + // "conversation_to_statuses"."status_id" = "statuses"."id" + // ) + // WHERE + // ( + // "conversations"."last_status_id" = '01J78T2BQ4TN5S2XSC9VNQ5GBS' + // ) + // ) + conversationStatusesTmp := "conversation_statuses_" + id.NewULID() + conversationStatusesTmpQ := tx.NewRaw( + tmpQ, + bun.Ident(conversationStatusesTmp), + tx.NewSelect(). + ColumnExpr( + "? AS ?", + bun.Ident("conversations.id"), + bun.Ident("conversation_id"), + ). + ColumnExpr( + "? AS ?", + bun.Ident("conversation_to_statuses.status_id"), + bun.Ident("id"), + ). + Column("statuses.created_at"). + Table("conversations"). + Join("LEFT JOIN ?", bun.Ident("conversation_to_statuses")). + JoinOn( + "? = ?", + bun.Ident("conversations.id"), + bun.Ident("conversation_to_statuses.conversation_id"), + ). + JoinOn( + "? != ?", + bun.Ident("conversation_to_statuses.status_id"), + statusID, + ). + Join("LEFT JOIN ?", bun.Ident("statuses")). + JoinOn( + "? = ?", + bun.Ident("conversation_to_statuses.status_id"), + bun.Ident("statuses.id"), + ). + Where( + "? = ?", + bun.Ident("conversations.last_status_id"), + statusID, + ), ) - } - - // From this point on, we *must* trigger - // tx.Rollback before returning on error. - - // First delete this status from - // conversation-to-status links. - _, err = tx. - NewDelete(). - Table("conversation_to_statuses"). - Where("? = ?", bun.Ident("status_id"), statusID). - Exec(ctx) - if err != nil { - if err := tx.Rollback(); err != nil { - log.Errorf(ctx, "error rolling back: %v", err) + _, err = conversationStatusesTmpQ.Exec(ctx) + if err != nil { + return gtserror.Newf( + "error creating temp table %s while deleting status %s: %w", + conversationStatusesTmp, statusID, err, + ) } - return gtserror.Newf( - "error deleting conversation-to-status links while deleting status %s: %w", - statusID, err, - ) - } - // Note: Bun doesn't currently support `CREATE TABLE … AS SELECT …` - // so we need to use raw queries to create temporary tables. - - // Create a temporary table containing all statuses other than - // the deleted status, in each conversation for which the deleted - // status is the last status, if there are such statuses. - // - // This will produce a query like: - // - // CREATE TEMPORARY TABLE "conversation_statuses_01J78T2AR0YCZ4YR12WSCZ608S" - // AS ( - // SELECT - // "conversations"."id" AS "conversation_id", - // "conversation_to_statuses"."status_id" AS "id", - // "statuses"."created_at" - // FROM - // "conversations" - // LEFT JOIN "conversation_to_statuses" ON ( - // "conversations"."id" = "conversation_to_statuses"."conversation_id" - // ) - // AND ( - // "conversation_to_statuses"."status_id" != '01J78T2BQ4TN5S2XSC9VNQ5GBS' - // ) - // LEFT JOIN "statuses" ON ( - // "conversation_to_statuses"."status_id" = "statuses"."id" - // ) - // WHERE - // ( - // "conversations"."last_status_id" = '01J78T2BQ4TN5S2XSC9VNQ5GBS' - // ) - // ) - conversationStatusesTmp := "conversation_statuses_" + id.NewULID() - conversationStatusesTmpQ := tx.NewRaw( - tmpQ, - bun.Ident(conversationStatusesTmp), - tx.NewSelect(). - ColumnExpr( - "? AS ?", - bun.Ident("conversations.id"), - bun.Ident("conversation_id"), - ). - ColumnExpr( - "? AS ?", - bun.Ident("conversation_to_statuses.status_id"), - bun.Ident("id"), - ). - Column("statuses.created_at"). - Table("conversations"). - Join("LEFT JOIN ?", bun.Ident("conversation_to_statuses")). - JoinOn( - "? = ?", - bun.Ident("conversations.id"), - bun.Ident("conversation_to_statuses.conversation_id"), - ). - JoinOn( - "? != ?", - bun.Ident("conversation_to_statuses.status_id"), - statusID, - ). - Join("LEFT JOIN ?", bun.Ident("statuses")). - JoinOn( - "? = ?", - bun.Ident("conversation_to_statuses.status_id"), - bun.Ident("statuses.id"), - ). - Where( - "? = ?", - bun.Ident("conversations.last_status_id"), - statusID, - ), - ) - _, err = conversationStatusesTmpQ.Exec(ctx) - if err != nil { - if err := tx.Rollback(); err != nil { - log.Errorf(ctx, "error rolling back: %v", err) - } - return gtserror.Newf( - "error creating temp table %s while deleting status %s: %w", - conversationStatusesTmp, statusID, err, + // Create a temporary table with the most recently created + // status in each conversation for which the deleted status + // is the last status, if there is such a status. + // + // This will produce a query like: + // + // CREATE TEMPORARY TABLE "latest_conversation_statuses_01J78T2AR0E46SJSH6C7NRZ7MR" + // AS ( + // SELECT + // "conversation_statuses"."conversation_id", + // "conversation_statuses"."id" + // FROM + // "conversation_statuses_01J78T2AR0YCZ4YR12WSCZ608S" AS "conversation_statuses" + // LEFT JOIN "conversation_statuses_01J78T2AR0YCZ4YR12WSCZ608S" AS "later_statuses" ON ( + // "conversation_statuses"."conversation_id" = "later_statuses"."conversation_id" + // ) + // AND ( + // "later_statuses"."created_at" > "conversation_statuses"."created_at" + // ) + // WHERE + // ("later_statuses"."id" IS NULL) + // ) + latestConversationStatusesTmp := "latest_conversation_statuses_" + id.NewULID() + latestConversationStatusesTmpQ := tx.NewRaw( + tmpQ, + bun.Ident(latestConversationStatusesTmp), + tx.NewSelect(). + Column( + "conversation_statuses.conversation_id", + "conversation_statuses.id", + ). + TableExpr( + "? AS ?", + bun.Ident(conversationStatusesTmp), + bun.Ident("conversation_statuses"), + ). + Join( + "LEFT JOIN ? AS ?", + bun.Ident(conversationStatusesTmp), + bun.Ident("later_statuses"), + ). + JoinOn( + "? = ?", + bun.Ident("conversation_statuses.conversation_id"), + bun.Ident("later_statuses.conversation_id"), + ). + JoinOn( + "? > ?", + bun.Ident("later_statuses.created_at"), + bun.Ident("conversation_statuses.created_at"), + ). + Where("? IS NULL", bun.Ident("later_statuses.id")), ) - } - - // Create a temporary table with the most recently created - // status in each conversation for which the deleted status - // is the last status, if there is such a status. - // - // This will produce a query like: - // - // CREATE TEMPORARY TABLE "latest_conversation_statuses_01J78T2AR0E46SJSH6C7NRZ7MR" - // AS ( - // SELECT - // "conversation_statuses"."conversation_id", - // "conversation_statuses"."id" - // FROM - // "conversation_statuses_01J78T2AR0YCZ4YR12WSCZ608S" AS "conversation_statuses" - // LEFT JOIN "conversation_statuses_01J78T2AR0YCZ4YR12WSCZ608S" AS "later_statuses" ON ( - // "conversation_statuses"."conversation_id" = "later_statuses"."conversation_id" - // ) - // AND ( - // "later_statuses"."created_at" > "conversation_statuses"."created_at" - // ) - // WHERE - // ("later_statuses"."id" IS NULL) - // ) - latestConversationStatusesTmp := "latest_conversation_statuses_" + id.NewULID() - latestConversationStatusesTmpQ := tx.NewRaw( - tmpQ, - bun.Ident(latestConversationStatusesTmp), - tx.NewSelect(). - Column( - "conversation_statuses.conversation_id", - "conversation_statuses.id", - ). - TableExpr( - "? AS ?", - bun.Ident(conversationStatusesTmp), - bun.Ident("conversation_statuses"), - ). - Join( - "LEFT JOIN ? AS ?", - bun.Ident(conversationStatusesTmp), - bun.Ident("later_statuses"), - ). - JoinOn( - "? = ?", - bun.Ident("conversation_statuses.conversation_id"), - bun.Ident("later_statuses.conversation_id"), - ). - JoinOn( - "? > ?", - bun.Ident("later_statuses.created_at"), - bun.Ident("conversation_statuses.created_at"), - ). - Where("? IS NULL", bun.Ident("later_statuses.id")), - ) - _, err = latestConversationStatusesTmpQ.Exec(ctx) - if err != nil { - if err := tx.Rollback(); err != nil { - log.Errorf(ctx, "error rolling back: %v", err) + _, err = latestConversationStatusesTmpQ.Exec(ctx) + if err != nil { + return gtserror.Newf( + "error creating temp table %s while deleting status %s: %w", + conversationStatusesTmp, statusID, err, + ) } - return gtserror.Newf( - "error creating temp table %s while deleting status %s: %w", - conversationStatusesTmp, statusID, err, - ) - } - // For every conversation where the given status was the last one, - // reset its last status to the most recently created in the - // conversation other than that one, if there is such a status. - // Return conversation IDs for invalidation. - updateQ := tx.NewUpdate(). - Table("conversations"). - TableExpr("? AS ?", bun.Ident(latestConversationStatusesTmp), bun.Ident("latest_conversation_statuses")). - Set("? = ?", bun.Ident("last_status_id"), bun.Ident("latest_conversation_statuses.id")). - Set("? = ?", bun.Ident("updated_at"), time.Now()). - Where("? = ?", bun.Ident("conversations.id"), bun.Ident("latest_conversation_statuses.conversation_id")). - Where("? IS NOT NULL", bun.Ident("latest_conversation_statuses.id")). - Returning("?", bun.Ident("conversations.id")) - _, err = updateQ.Exec(ctx, &updatedConversationIDs) - if err != nil { - if err := tx.Rollback(); err != nil { - log.Errorf(ctx, "error rolling back: %v", err) + // For every conversation where the given status was the last one, + // reset its last status to the most recently created in the + // conversation other than that one, if there is such a status. + // Return conversation IDs for invalidation. + updateQ := tx.NewUpdate(). + Table("conversations"). + TableExpr("? AS ?", bun.Ident(latestConversationStatusesTmp), bun.Ident("latest_conversation_statuses")). + Set("? = ?", bun.Ident("last_status_id"), bun.Ident("latest_conversation_statuses.id")). + Set("? = ?", bun.Ident("updated_at"), time.Now()). + Where("? = ?", bun.Ident("conversations.id"), bun.Ident("latest_conversation_statuses.conversation_id")). + Where("? IS NOT NULL", bun.Ident("latest_conversation_statuses.id")). + Returning("?", bun.Ident("conversations.id")) + _, err = updateQ.Exec(ctx, &updatedConversationIDs) + if err != nil { + return gtserror.Newf( + "error rolling back last status for conversation while deleting status %s: %w", + statusID, err, + ) } - return gtserror.Newf( - "error rolling back last status for conversation while deleting status %s: %w", - statusID, err, - ) - } - // If there is no such status, - // just delete the conversation. - // Return IDs for invalidation. - _, err = tx. - NewDelete(). - Table("conversations"). - Where( - "? IN (?)", - bun.Ident("id"), - tx.NewSelect(). - Table(latestConversationStatusesTmp). - Column("conversation_id"). - Where("? IS NULL", bun.Ident("id")), - ). - Returning("?", bun.Ident("id")). - Exec(ctx, &deletedConversationIDs) - if err != nil { - if err := tx.Rollback(); err != nil { - log.Errorf(ctx, "error rolling back: %v", err) + // If there is no such status, + // just delete the conversation. + // Return IDs for invalidation. + _, err = tx. + NewDelete(). + Table("conversations"). + Where( + "? IN (?)", + bun.Ident("id"), + tx.NewSelect(). + Table(latestConversationStatusesTmp). + Column("conversation_id"). + Where("? IS NULL", bun.Ident("id")), + ). + Returning("?", bun.Ident("id")). + Exec(ctx, &deletedConversationIDs) + if err != nil { + return gtserror.Newf( + "error deleting conversation while deleting status %s: %w", + statusID, err, + ) } - return gtserror.Newf( - "error deleting conversation while deleting status %s: %w", - statusID, err, - ) - } - // We're done, commit everything. - if err := tx.Commit(); err != nil { - return gtserror.Newf( - "error committing transaction while deleting status %s: %w", - statusID, err, - ) - } + return nil + }) // Invalidate cache entries. updatedConversationIDs = append(updatedConversationIDs, deletedConversationIDs...) From 869ac888b035f2a37856b4a69dbbe06e5d1921b3 Mon Sep 17 00:00:00 2001 From: tobi Date: Sun, 8 Sep 2024 16:02:29 +0200 Subject: [PATCH 5/5] fackin' ell m8 --- internal/db/bundb/conversation.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/db/bundb/conversation.go b/internal/db/bundb/conversation.go index a196713700..053b23e319 100644 --- a/internal/db/bundb/conversation.go +++ b/internal/db/bundb/conversation.go @@ -360,7 +360,7 @@ func (c *conversationDB) DeleteStatusFromConversations(ctx context.Context, stat tmpQ = "CREATE TEMPORARY TABLE ? AS ?" } - c.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { + if err := c.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { // First delete this status from // conversation-to-status links. _, err := tx. @@ -559,7 +559,9 @@ func (c *conversationDB) DeleteStatusFromConversations(ctx context.Context, stat } return nil - }) + }); err != nil { + return err + } // Invalidate cache entries. updatedConversationIDs = append(updatedConversationIDs, deletedConversationIDs...)