Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[performance] massively improved ActivityPub delivery worker efficiency #2812

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
aa01437
add delivery worker type that pulls from queue to httpclient package
NyaaaWhatsUpDoc Apr 2, 2024
8aede54
finish up some code commenting, bodge a vendored activity library cha…
NyaaaWhatsUpDoc Apr 3, 2024
8fdd5b1
hook up queue deletion logic
NyaaaWhatsUpDoc Apr 3, 2024
d7e14d5
support deleting queued http requests by target ID
NyaaaWhatsUpDoc Apr 3, 2024
033b2e7
don't index APRequest by hostname in the queue
NyaaaWhatsUpDoc Apr 3, 2024
73ef636
use gorun
NyaaaWhatsUpDoc Apr 3, 2024
3439ce7
use the original context's values when wrapping msg type as delivery{}
NyaaaWhatsUpDoc Apr 3, 2024
a8ea1fe
actually log in the AP delivery worker ...
NyaaaWhatsUpDoc Apr 3, 2024
3c9451b
add uncommitted changes
NyaaaWhatsUpDoc Apr 3, 2024
175c870
use errors.AsV2()
NyaaaWhatsUpDoc Apr 4, 2024
d899239
use errorsv2.AsV2()
NyaaaWhatsUpDoc Apr 4, 2024
f70ef72
finish adding some code comments, add bad host handling to delivery w…
NyaaaWhatsUpDoc Apr 4, 2024
7080930
slightly tweak deliveryworkerpool API, use advanced sender multiplier
NyaaaWhatsUpDoc Apr 4, 2024
ba60397
remove PopCtx() method, let others instead rely on Wait()
NyaaaWhatsUpDoc Apr 4, 2024
a3bd01d
shuffle things around to move delivery stuff into transport/ subpkg
NyaaaWhatsUpDoc Apr 5, 2024
5d6ffd7
remove dead code
NyaaaWhatsUpDoc Apr 5, 2024
9ce0ffc
formatting
NyaaaWhatsUpDoc Apr 5, 2024
0f4c425
validate request before queueing for delivery
NyaaaWhatsUpDoc Apr 5, 2024
9264a82
finish adding code comments, fix up backoff code
NyaaaWhatsUpDoc Apr 5, 2024
fd7c958
finish adding more code comments
NyaaaWhatsUpDoc Apr 5, 2024
e06617d
clamp minimum no. senders to 1
NyaaaWhatsUpDoc Apr 5, 2024
e17474f
add start/stop logging to delivery worker, some slight changes
NyaaaWhatsUpDoc Apr 5, 2024
500e755
remove double logging
NyaaaWhatsUpDoc Apr 5, 2024
f17ef91
use worker ptrs
NyaaaWhatsUpDoc Apr 5, 2024
4811ded
expose the embedded log fields in httpclient.Request{}
NyaaaWhatsUpDoc Apr 5, 2024
59ec876
ensure request context values are preserved when updating ctx
NyaaaWhatsUpDoc Apr 5, 2024
1b9308e
add delivery worker tests
NyaaaWhatsUpDoc Apr 6, 2024
51f6fd5
fix linter issues
NyaaaWhatsUpDoc Apr 6, 2024
39e6d05
ensure delivery worker gets inited in testrig
NyaaaWhatsUpDoc Apr 7, 2024
3880020
fix tests to delivering messages to check worker delivery queue
NyaaaWhatsUpDoc Apr 8, 2024
a90d51b
update error type to use ptr instead of value receiver
NyaaaWhatsUpDoc Apr 8, 2024
46130df
fix test calling Workers{}.Start() instead of testrig.StartWorkers()
NyaaaWhatsUpDoc Apr 8, 2024
58d5bd5
update docs for advanced-sender-multiplier
NyaaaWhatsUpDoc Apr 8, 2024
ad6cfc0
update to the latest activity library version
NyaaaWhatsUpDoc Apr 8, 2024
65a54b4
add comment about not using httptest.Server{}
NyaaaWhatsUpDoc Apr 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/gotosocial/action/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ var Start action.GTSAction = func(ctx context.Context) error {
TLSInsecureSkipVerify: config.GetHTTPClientTLSInsecureSkipVerify(),
})

// Initialize delivery worker with http client.
state.Workers.Delivery.Init(client)

// Initialize workers.
state.Workers.Start()
defer state.Workers.Stop()
Expand Down
13 changes: 4 additions & 9 deletions docs/configuration/advanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,10 @@ advanced-throttling-multiplier: 8
# Default: "30s"
advanced-throttling-retry-after: "30s"

# Int. CPU multiplier for the amount of goroutines to spawn in order to send messages via ActivityPub.
# Messages will be batched so that at most multiplier * CPU count messages will be sent out at once.
# This can be tuned to limit concurrent POSTing to remote inboxes, preventing your instance CPU
# usage from skyrocketing when an account with many followers posts a new status.
#
# Messages are split among available senders, and each sender processes its assigned messages in serial.
# For example, say a user with 1000 followers is on an instance with 2 CPUs. With the default multiplier
# of 2, this means 4 senders would be in process at once on this instance. When the user creates a new post,
# each sender would end up iterating through about 250 Create messages + delivering them to remote instances.
# Int. CPU multiplier for the fixed number of goroutines to spawn in order to send messages via ActivityPub.
# Messages will be batched and pushed to a singular queue, from which multiplier * CPU count goroutines will
# pull and attempt deliveries. This can be tuned to limit concurrent posting to remote inboxes, preventing
# your instance CPU usage skyrocketing when accounts with many followers post statuses.
#
# If you set this to 0 or less, only 1 sender will be used regardless of CPU count. This may be
# useful in cases where you are working with very tight network or CPU constraints.
Expand Down
13 changes: 4 additions & 9 deletions example/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1042,15 +1042,10 @@ advanced-throttling-multiplier: 8
# Default: "30s"
advanced-throttling-retry-after: "30s"

# Int. CPU multiplier for the amount of goroutines to spawn in order to send messages via ActivityPub.
# Messages will be batched so that at most multiplier * CPU count messages will be sent out at once.
# This can be tuned to limit concurrent POSTing to remote inboxes, preventing your instance CPU
# usage from skyrocketing when an account with many followers posts a new status.
#
# Messages are split among available senders, and each sender processes its assigned messages in serial.
# For example, say a user with 1000 followers is on an instance with 2 CPUs. With the default multiplier
# of 2, this means 4 senders would be in process at once on this instance. When the user creates a new post,
# each sender would end up iterating through about 250 Create messages + delivering them to remote instances.
# Int. CPU multiplier for the fixed number of goroutines to spawn in order to send messages via ActivityPub.
# Messages will be batched and pushed to a singular queue, from which multiplier * CPU count goroutines will
# pull and attempt deliveries. This can be tuned to limit concurrent posting to remote inboxes, preventing
# your instance CPU usage skyrocketing when accounts with many followers post statuses.
#
# If you set this to 0 or less, only 1 sender will be used regardless of CPU count. This may be
# useful in cases where you are working with very tight network or CPU constraints.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
github.com/spf13/cobra v1.8.0
github.com/spf13/viper v1.18.2
github.com/stretchr/testify v1.9.0
github.com/superseriousbusiness/activity v1.6.0-gts.0.20240221151241-5d56c04088d4
github.com/superseriousbusiness/activity v1.6.0-gts.0.20240408131430-247f7f7110f0
tsmethurst marked this conversation as resolved.
Show resolved Hide resolved
github.com/superseriousbusiness/httpsig v1.2.0-SSB
github.com/superseriousbusiness/oauth2/v4 v4.3.2-SSB.0.20230227143000-f4900831d6c8
github.com/tdewolff/minify/v2 v2.20.19
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -623,8 +623,8 @@ github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/sunfish-shogi/bufseekio v0.0.0-20210207115823-a4185644b365/go.mod h1:dEzdXgvImkQ3WLI+0KQpmEx8T/C/ma9KeS3AfmU899I=
github.com/superseriousbusiness/activity v1.6.0-gts.0.20240221151241-5d56c04088d4 h1:kPjQR/hVZtROTzkxptp/EIR7Wm58O8jppwpCFrZ7sVU=
github.com/superseriousbusiness/activity v1.6.0-gts.0.20240221151241-5d56c04088d4/go.mod h1:AZw0Xb4Oju8rmaJCZ21gc5CPg47MmNgyac+Hx5jo8VM=
github.com/superseriousbusiness/activity v1.6.0-gts.0.20240408131430-247f7f7110f0 h1:zPdbgwbjPxrJqme2sFTMQoML5ukNWRhChOnilR47rss=
github.com/superseriousbusiness/activity v1.6.0-gts.0.20240408131430-247f7f7110f0/go.mod h1:AZw0Xb4Oju8rmaJCZ21gc5CPg47MmNgyac+Hx5jo8VM=
github.com/superseriousbusiness/go-jpeg-image-structure/v2 v2.0.0-20220321154430-d89a106fdabe h1:ksl2oCx/Qo8sNDc3Grb8WGKBM9nkvhCm25uvlT86azE=
github.com/superseriousbusiness/go-jpeg-image-structure/v2 v2.0.0-20220321154430-d89a106fdabe/go.mod h1:gH4P6gN1V+wmIw5o97KGaa1RgXB/tVpC2UNzijhg3E4=
github.com/superseriousbusiness/go-png-image-structure/v2 v2.0.1-SSB h1:8psprYSK1KdOSH7yQ4PbJq0YYaGQY+gzdW/B0ExDb/8=
Expand Down
11 changes: 6 additions & 5 deletions internal/api/activitypub/users/inboxpost.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,33 @@
package users

import (
"errors"
"net/http"

"github.com/gin-gonic/gin"
apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/log"

errorsv2 "codeberg.org/gruf/go-errors/v2"
)

// InboxPOSTHandler deals with incoming POST requests to an actor's inbox.
// Eg., POST to https://example.org/users/whatever/inbox.
func (m *Module) InboxPOSTHandler(c *gin.Context) {
_, err := m.processor.Fedi().InboxPost(c.Request.Context(), c.Writer, c.Request)
if err != nil {
errWithCode := new(gtserror.WithCode)
errWithCode := errorsv2.AsV2[gtserror.WithCode](err)

if !errors.As(err, errWithCode) {
if errWithCode == nil {
// Something else went wrong, and someone forgot to return
// an errWithCode! It's chill though. Log the error but don't
// return it as-is to the caller, to avoid leaking internals.
log.Errorf(c.Request.Context(), "returning Bad Request to caller, err was: %q", err)
*errWithCode = gtserror.NewErrorBadRequest(err)
errWithCode = gtserror.NewErrorBadRequest(err)
}

// Pass along confirmed error with code to the main error handler
apiutil.ErrorHandler(c, *errWithCode, m.processor.InstanceGetV1)
apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1)
return
}

Expand Down
1 change: 1 addition & 0 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
)

type Caches struct {

// GTS provides access to the collection of
// gtsmodel object caches. (used by the database).
GTS GTSCaches
Expand Down
6 changes: 3 additions & 3 deletions internal/federation/federatingactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (f *federatingActor) PostInboxScheme(ctx context.Context, w http.ResponseWr
// so we specifically have to check for already wrapped with code.
//
ctx, authenticated, err := f.sideEffectActor.AuthenticatePostInbox(ctx, w, r)
if errors.As(err, new(gtserror.WithCode)) {
if errorsv2.AsV2[gtserror.WithCode](err) != nil {
// If it was already wrapped with an
// HTTP code then don't bother rewrapping
// it, just return it as-is for caller to
Expand Down Expand Up @@ -131,15 +131,15 @@ func (f *federatingActor) PostInboxScheme(ctx context.Context, w http.ResponseWr
// Check authorization of the activity; this will include blocks.
authorized, err := f.sideEffectActor.AuthorizePostInbox(ctx, w, activity)
if err != nil {
if errors.As(err, new(errOtherIRIBlocked)) {
if errorsv2.AsV2[*errOtherIRIBlocked](err) != nil {
// There's no direct block between requester(s) and
// receiver. However, one or more of the other IRIs
// involved in the request (account replied to, note
// boosted, etc) is blocked either at domain level or
// by the receiver. We don't need to return 403 here,
// instead, just return 202 accepted but don't do any
// further processing of the activity.
return true, nil
return true, nil //nolint
}

// Real error has occurred.
Expand Down
25 changes: 15 additions & 10 deletions internal/federation/federatingactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"bytes"
"context"
"encoding/json"
"io"
"net/url"
"testing"
"time"
Expand Down Expand Up @@ -129,23 +130,27 @@ func (suite *FederatingActorTestSuite) TestSendRemoteFollower() {
suite.NotNil(activity)

// because we added 1 remote follower for zork, there should be a url in sentMessage
var sent [][]byte
var sent []byte
if !testrig.WaitFor(func() bool {
sentI, ok := httpClient.SentMessages.Load(*testRemoteAccount.SharedInboxURI)
if ok {
sent, ok = sentI.([][]byte)
if !ok {
panic("SentMessages entry was not []byte")
}
return true
delivery, ok := suite.state.Workers.Delivery.Queue.Pop()
if !ok {
return false
}
return false
if !testrig.EqualRequestURIs(delivery.Request.URL, *testRemoteAccount.SharedInboxURI) {
panic("differing request uris")
}
sent, err = io.ReadAll(delivery.Request.Body)
if err != nil {
panic("error reading body: " + err.Error())
}
return true

}) {
suite.FailNow("timed out waiting for message")
}

dst := new(bytes.Buffer)
err = json.Indent(dst, sent[0], "", " ")
err = json.Indent(dst, sent, "", " ")
suite.NoError(err)
suite.Equal(`{
"@context": "https://www.w3.org/ns/activitystreams",
Expand Down
4 changes: 2 additions & 2 deletions internal/federation/federatingdb/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error {
// in a delete we only get the URI, we can't know if we have a status or a profile or something else,
// so we have to try a few different things...
if s, err := f.state.DB.GetStatusByURI(ctx, id.String()); err == nil && requestingAcct.ID == s.AccountID {
l.Debugf("uri is for STATUS with id: %s", s.ID)
l.Debugf("deleting status: %s", s.ID)
f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
APObjectType: ap.ObjectNote,
APActivityType: ap.ActivityDelete,
Expand All @@ -61,7 +61,7 @@ func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error {
}

if a, err := f.state.DB.GetAccountByURI(ctx, id.String()); err == nil && requestingAcct.ID == a.ID {
l.Debugf("uri is for ACCOUNT with id %s", a.ID)
l.Debugf("deleting account: %s", a.ID)
f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
APObjectType: ap.ObjectProfile,
APActivityType: ap.ActivityDelete,
Expand Down
4 changes: 2 additions & 2 deletions internal/federation/federatingprotocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type errOtherIRIBlocked struct {
iriStrs []string
}

func (e errOtherIRIBlocked) Error() string {
func (e *errOtherIRIBlocked) Error() string {
iriStrsNice := "[" + strings.Join(e.iriStrs, ", ") + "]"
if e.domainBlock {
return "domain block exists for one or more of " + iriStrsNice
Expand All @@ -67,7 +67,7 @@ func newErrOtherIRIBlocked(
e.iriStrs = append(e.iriStrs, iri.String())
}

return e
return &e
}

/*
Expand Down
6 changes: 3 additions & 3 deletions internal/federation/federatingprotocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"io"
"net/http"
"net/http/httptest"
"net/url"
"testing"

errorsv2 "codeberg.org/gruf/go-errors/v2"
"github.com/stretchr/testify/suite"
"github.com/superseriousbusiness/gotosocial/internal/ap"
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
Expand Down Expand Up @@ -101,8 +101,8 @@ func (suite *FederatingProtocolTestSuite) authenticatePostInbox(

recorder := httptest.NewRecorder()
newContext, authed, err := suite.federator.AuthenticatePostInbox(ctx, recorder, request)
if withCode := new(gtserror.WithCode); (errors.As(err, withCode) &&
(*withCode).Code() >= 500) || (err != nil && (*withCode) == nil) {
if withCode := errorsv2.AsV2[gtserror.WithCode](err); // nocollapse
(withCode != nil && withCode.Code() >= 500) || (err != nil && withCode == nil) {
// NOTE: the behaviour here is a little strange as we have
// the competing code styles of the go-fed interface expecting
// that any err is a no-go, but authed bool is intended to be
Expand Down
Loading