Skip to content

Commit

Permalink
[bugfix] Lock when checking/creating notifs to avoid race (superserio…
Browse files Browse the repository at this point in the history
…usbusiness#2890)

* [bugfix] Lock when checking/creating notifs to avoid race

* test notif spam
  • Loading branch information
tsmethurst authored and nyarla committed Jun 19, 2024
1 parent 32d7de9 commit de315fa
Show file tree
Hide file tree
Showing 15 changed files with 290 additions and 133 deletions.
2 changes: 1 addition & 1 deletion internal/processing/account/move.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (p *Processor) MoveSelf(
// in quick succession, so get a lock on
// this account.
lockKey := originAcct.URI
unlock := p.state.AccountLocks.Lock(lockKey)
unlock := p.state.ProcessingLocks.Lock(lockKey)
defer unlock()

// Ensure we have a valid, up-to-date representation of the target account.
Expand Down
2 changes: 1 addition & 1 deletion internal/processing/admin/accountapprove.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (p *Processor) AccountApprove(
// Get a lock on the account URI,
// to ensure it's not also being
// rejected at the same time!
unlock := p.state.AccountLocks.Lock(user.Account.URI)
unlock := p.state.ProcessingLocks.Lock(user.Account.URI)
defer unlock()

if !*user.Approved {
Expand Down
2 changes: 1 addition & 1 deletion internal/processing/admin/accountreject.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (p *Processor) AccountReject(
// Get a lock on the account URI,
// since we're going to be deleting
// it and its associated user.
unlock := p.state.AccountLocks.Lock(user.Account.URI)
unlock := p.state.ProcessingLocks.Lock(user.Account.URI)
defer unlock()

// Can't reject an account with a
Expand Down
4 changes: 2 additions & 2 deletions internal/processing/status/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (p *Processor) PinCreate(ctx context.Context, requestingAccount *gtsmodel.A
}

// Get a lock on this account.
unlock := p.state.AccountLocks.Lock(requestingAccount.URI)
unlock := p.state.ProcessingLocks.Lock(requestingAccount.URI)
defer unlock()

if !targetStatus.PinnedAt.IsZero() {
Expand Down Expand Up @@ -148,7 +148,7 @@ func (p *Processor) PinRemove(ctx context.Context, requestingAccount *gtsmodel.A
}

// Get a lock on this account.
unlock := p.state.AccountLocks.Lock(requestingAccount.URI)
unlock := p.state.ProcessingLocks.Lock(requestingAccount.URI)
defer unlock()

if targetStatus.PinnedAt.IsZero() {
Expand Down
2 changes: 1 addition & 1 deletion internal/processing/workers/fromclientapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
type clientAPI struct {
state *state.State
converter *typeutils.Converter
surface *surface
surface *Surface
federate *federate
account *account.Processor
utils *utils
Expand Down
2 changes: 1 addition & 1 deletion internal/processing/workers/fromfediapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
// from the federation/ActivityPub API.
type fediAPI struct {
state *state.State
surface *surface
surface *Surface
federate *federate
account *account.Processor
utils *utils
Expand Down
14 changes: 7 additions & 7 deletions internal/processing/workers/surface.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
)

// surface wraps functions for 'surfacing' the result
// Surface wraps functions for 'surfacing' the result
// of processing a message, eg:
// - timelining a status
// - removing a status from timelines
// - sending a notification to a user
// - sending an email
type surface struct {
state *state.State
converter *typeutils.Converter
stream *stream.Processor
filter *visibility.Filter
emailSender email.Sender
type Surface struct {
State *state.State
Converter *typeutils.Converter
Stream *stream.Processor
Filter *visibility.Filter
EmailSender email.Sender
}
52 changes: 26 additions & 26 deletions internal/processing/workers/surfaceemail.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (

// emailUserReportClosed emails the user who created the
// given report, to inform them the report has been closed.
func (s *surface) emailUserReportClosed(ctx context.Context, report *gtsmodel.Report) error {
user, err := s.state.DB.GetUserByAccountID(ctx, report.Account.ID)
func (s *Surface) emailUserReportClosed(ctx context.Context, report *gtsmodel.Report) error {
user, err := s.State.DB.GetUserByAccountID(ctx, report.Account.ID)
if err != nil {
return gtserror.Newf("db error getting user: %w", err)
}
Expand All @@ -51,12 +51,12 @@ func (s *surface) emailUserReportClosed(ctx context.Context, report *gtsmodel.Re
return nil
}

instance, err := s.state.DB.GetInstance(ctx, config.GetHost())
instance, err := s.State.DB.GetInstance(ctx, config.GetHost())
if err != nil {
return gtserror.Newf("db error getting instance: %w", err)
}

if err := s.state.DB.PopulateReport(ctx, report); err != nil {
if err := s.State.DB.PopulateReport(ctx, report); err != nil {
return gtserror.Newf("error populating report: %w", err)
}

Expand All @@ -69,20 +69,20 @@ func (s *surface) emailUserReportClosed(ctx context.Context, report *gtsmodel.Re
ActionTakenComment: report.ActionTaken,
}

return s.emailSender.SendReportClosedEmail(user.Email, reportClosedData)
return s.EmailSender.SendReportClosedEmail(user.Email, reportClosedData)
}

// emailUserPleaseConfirm emails the given user
// to ask them to confirm their email address.
func (s *surface) emailUserPleaseConfirm(ctx context.Context, user *gtsmodel.User) error {
func (s *Surface) emailUserPleaseConfirm(ctx context.Context, user *gtsmodel.User) error {
if user.UnconfirmedEmail == "" ||
user.UnconfirmedEmail == user.Email {
// User has already confirmed this
// email address; nothing to do.
return nil
}

instance, err := s.state.DB.GetInstance(ctx, config.GetHost())
instance, err := s.State.DB.GetInstance(ctx, config.GetHost())
if err != nil {
return gtserror.Newf("db error getting instance: %w", err)
}
Expand All @@ -97,7 +97,7 @@ func (s *surface) emailUserPleaseConfirm(ctx context.Context, user *gtsmodel.Use
)

// Assemble email contents and send the email.
if err := s.emailSender.SendConfirmEmail(
if err := s.EmailSender.SendConfirmEmail(
user.UnconfirmedEmail,
email.ConfirmData{
Username: user.Account.Username,
Expand All @@ -116,7 +116,7 @@ func (s *surface) emailUserPleaseConfirm(ctx context.Context, user *gtsmodel.Use
user.ConfirmationSentAt = now
user.LastEmailedAt = now

if err := s.state.DB.UpdateUser(
if err := s.State.DB.UpdateUser(
ctx,
user,
"confirmation_token",
Expand All @@ -131,7 +131,7 @@ func (s *surface) emailUserPleaseConfirm(ctx context.Context, user *gtsmodel.Use

// emailUserSignupApproved emails the given user
// to inform them their sign-up has been approved.
func (s *surface) emailUserSignupApproved(ctx context.Context, user *gtsmodel.User) error {
func (s *Surface) emailUserSignupApproved(ctx context.Context, user *gtsmodel.User) error {
// User may have been approved without
// their email address being confirmed
// yet. Just send to whatever we have.
Expand All @@ -140,13 +140,13 @@ func (s *surface) emailUserSignupApproved(ctx context.Context, user *gtsmodel.Us
emailAddr = user.UnconfirmedEmail
}

instance, err := s.state.DB.GetInstance(ctx, config.GetHost())
instance, err := s.State.DB.GetInstance(ctx, config.GetHost())
if err != nil {
return gtserror.Newf("db error getting instance: %w", err)
}

// Assemble email contents and send the email.
if err := s.emailSender.SendSignupApprovedEmail(
if err := s.EmailSender.SendSignupApprovedEmail(
emailAddr,
email.SignupApprovedData{
Username: user.Account.Username,
Expand All @@ -162,7 +162,7 @@ func (s *surface) emailUserSignupApproved(ctx context.Context, user *gtsmodel.Us
now := time.Now()
user.LastEmailedAt = now

if err := s.state.DB.UpdateUser(
if err := s.State.DB.UpdateUser(
ctx,
user,
"last_emailed_at",
Expand All @@ -175,14 +175,14 @@ func (s *surface) emailUserSignupApproved(ctx context.Context, user *gtsmodel.Us

// emailUserSignupApproved emails the given user
// to inform them their sign-up has been approved.
func (s *surface) emailUserSignupRejected(ctx context.Context, deniedUser *gtsmodel.DeniedUser) error {
instance, err := s.state.DB.GetInstance(ctx, config.GetHost())
func (s *Surface) emailUserSignupRejected(ctx context.Context, deniedUser *gtsmodel.DeniedUser) error {
instance, err := s.State.DB.GetInstance(ctx, config.GetHost())
if err != nil {
return gtserror.Newf("db error getting instance: %w", err)
}

// Assemble email contents and send the email.
return s.emailSender.SendSignupRejectedEmail(
return s.EmailSender.SendSignupRejectedEmail(
deniedUser.Email,
email.SignupRejectedData{
Message: deniedUser.Message,
Expand All @@ -194,13 +194,13 @@ func (s *surface) emailUserSignupRejected(ctx context.Context, deniedUser *gtsmo

// emailAdminReportOpened emails all active moderators/admins
// of this instance that a new report has been created.
func (s *surface) emailAdminReportOpened(ctx context.Context, report *gtsmodel.Report) error {
instance, err := s.state.DB.GetInstance(ctx, config.GetHost())
func (s *Surface) emailAdminReportOpened(ctx context.Context, report *gtsmodel.Report) error {
instance, err := s.State.DB.GetInstance(ctx, config.GetHost())
if err != nil {
return gtserror.Newf("error getting instance: %w", err)
}

toAddresses, err := s.state.DB.GetInstanceModeratorAddresses(ctx)
toAddresses, err := s.State.DB.GetInstanceModeratorAddresses(ctx)
if err != nil {
if errors.Is(err, db.ErrNoEntries) {
// No registered moderator addresses.
Expand All @@ -209,7 +209,7 @@ func (s *surface) emailAdminReportOpened(ctx context.Context, report *gtsmodel.R
return gtserror.Newf("error getting instance moderator addresses: %w", err)
}

if err := s.state.DB.PopulateReport(ctx, report); err != nil {
if err := s.State.DB.PopulateReport(ctx, report); err != nil {
return gtserror.Newf("error populating report: %w", err)
}

Expand All @@ -221,7 +221,7 @@ func (s *surface) emailAdminReportOpened(ctx context.Context, report *gtsmodel.R
ReportTargetDomain: report.TargetAccount.Domain,
}

if err := s.emailSender.SendNewReportEmail(toAddresses, reportData); err != nil {
if err := s.EmailSender.SendNewReportEmail(toAddresses, reportData); err != nil {
return gtserror.Newf("error emailing instance moderators: %w", err)
}

Expand All @@ -230,13 +230,13 @@ func (s *surface) emailAdminReportOpened(ctx context.Context, report *gtsmodel.R

// emailAdminNewSignup emails all active moderators/admins of this
// instance that a new account sign-up has been submitted to the instance.
func (s *surface) emailAdminNewSignup(ctx context.Context, newUser *gtsmodel.User) error {
instance, err := s.state.DB.GetInstance(ctx, config.GetHost())
func (s *Surface) emailAdminNewSignup(ctx context.Context, newUser *gtsmodel.User) error {
instance, err := s.State.DB.GetInstance(ctx, config.GetHost())
if err != nil {
return gtserror.Newf("error getting instance: %w", err)
}

toAddresses, err := s.state.DB.GetInstanceModeratorAddresses(ctx)
toAddresses, err := s.State.DB.GetInstanceModeratorAddresses(ctx)
if err != nil {
if errors.Is(err, db.ErrNoEntries) {
// No registered moderator addresses.
Expand All @@ -246,7 +246,7 @@ func (s *surface) emailAdminNewSignup(ctx context.Context, newUser *gtsmodel.Use
}

// Ensure user populated.
if err := s.state.DB.PopulateUser(ctx, newUser); err != nil {
if err := s.State.DB.PopulateUser(ctx, newUser); err != nil {
return gtserror.Newf("error populating user: %w", err)
}

Expand All @@ -259,7 +259,7 @@ func (s *surface) emailAdminNewSignup(ctx context.Context, newUser *gtsmodel.Use
SignupURL: instance.URI + "/settings/admin/accounts/" + newUser.AccountID,
}

if err := s.emailSender.SendNewSignupEmail(toAddresses, newSignupData); err != nil {
if err := s.EmailSender.SendNewSignupEmail(toAddresses, newSignupData); err != nil {
return gtserror.Newf("error emailing instance moderators: %w", err)
}

Expand Down
Loading

0 comments on commit de315fa

Please sign in to comment.