Skip to content

Commit

Permalink
[performance] update remaining worker pools to use queues (#2865)
Browse files Browse the repository at this point in the history
* start replacing client + federator + media workers with new worker + queue types

* refactor federatingDB.Delete(), drop queued messages when deleting account / status

* move all queue purging to the processor workers

* undo toolchain updates

* code comments, ensure dereferencer worker pool gets started

* update gruf libraries in readme

* start the job scheduler separately to the worker pools

* reshuffle ordering or server.go + remove duplicate worker start / stop

* update go-list version

* fix vendoring

* move queue invalidation to before wipeing / deletion, to ensure queued work not dropped

* add logging to worker processing functions in testrig, don't start workers in unexpected places

* update go-structr to add (+then rely on) QueueCtx{} type

* ensure more worker pools get started properly in tests

* fix remaining broken tests relying on worker queue logic

* fix account test suite queue popping logic, ensure noop workers do not pull from queue

* move back accidentally shuffled account deletion order

* ensure error (non nil!!) gets passed in refactored federatingDB{}.Delete()

* silently drop deletes from accounts not permitted to

* don't warn log on forwarded deletes

* make if else clauses easier to parse

* use getFederatorMsg()

* improved code comment

* improved code comment re: requesting account delete checks

* remove boolean result from worker start / stop since false = already running or already stopped

* remove optional passed-in http.client

* remove worker starting from the admin CLI commands (we don't need to handle side-effects)

* update prune cli to start scheduler but not all of the workers

* fix rebase issues

* remove redundant return statements

* i'm sorry sir linter
  • Loading branch information
NyaaaWhatsUpDoc authored Apr 26, 2024
1 parent ba4f51c commit c9c0773
Show file tree
Hide file tree
Showing 79 changed files with 1,894 additions and 836 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,11 @@ The following open source libraries, frameworks, and tools are used by GoToSocia
- [gruf/go-cache](https://codeberg.org/gruf/go-cache); LRU and TTL caches. [MIT License](https://spdx.org/licenses/MIT.html).
- [gruf/go-debug](https://codeberg.org/gruf/go-debug); debug build tag. [MIT License](https://spdx.org/licenses/MIT.html).
- [gruf/go-errors](https://codeberg.org/gruf/go-errors); context-like error w/ value wrapping [MIT License](https://spdx.org/licenses/MIT.html).
- [gruf/go-fastcopy](https://codeberg.org/gruf/go-fastcopy); performant pooled I/O copying [MIT License](https://spdx.org/licenses/MIT.html).
- [gruf/go-fastcopy](https://codeberg.org/gruf/go-fastcopy); performant (buffer pooled) I/O copying [MIT License](https://spdx.org/licenses/MIT.html).
- [gruf/go-kv](https://codeberg.org/gruf/go-kv); log field formatting. [MIT License](https://spdx.org/licenses/MIT.html).
- [gruf/go-list](https://codeberg.org/gruf/go-list); generic doubly linked list. [MIT License](https://spdx.org/licenses/MIT.html).
- [gruf/go-mutexes](https://codeberg.org/gruf/go-mutexes); safemutex & mutex map. [MIT License](https://spdx.org/licenses/MIT.html).
- [gruf/go-runners](https://codeberg.org/gruf/go-runners); workerpools and synchronization. [MIT License](https://spdx.org/licenses/MIT.html).
- [gruf/go-runners](https://codeberg.org/gruf/go-runners); synchronization utilities. [MIT License](https://spdx.org/licenses/MIT.html).
- [gruf/go-sched](https://codeberg.org/gruf/go-sched); task scheduler. [MIT License](https://spdx.org/licenses/MIT.html).
- [gruf/go-store](https://codeberg.org/gruf/go-store); file storage backend (local & s3). [MIT License](https://spdx.org/licenses/MIT.html).
- [gruf/go-structr](https://codeberg.org/gruf/go-structr); struct caching + queueing with automated indexing by field. [MIT License](https://spdx.org/licenses/MIT.html).
Expand Down
1 change: 0 additions & 1 deletion cmd/gotosocial/action/admin/account/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func initState(ctx context.Context) (*state.State, error) {
var state state.State
state.Caches.Init()
state.Caches.Start()
state.Workers.Start()

// Set the state DB connection
dbConn, err := bundb.NewBunDBService(ctx, &state)
Expand Down
5 changes: 4 additions & 1 deletion cmd/gotosocial/action/admin/media/prune/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ func setupPrune(ctx context.Context) (*prune, error) {
state.Caches.Init()
state.Caches.Start()

state.Workers.Start()
// Scheduler is required for the
// claner, but no other workers
// are needed for this CLI action.
state.Workers.StartScheduler()

dbService, err := bundb.NewBunDBService(ctx, &state)
if err != nil {
Expand Down
56 changes: 29 additions & 27 deletions cmd/gotosocial/action/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/filter/spam"
"github.com/superseriousbusiness/gotosocial/internal/filter/visibility"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/metrics"
"github.com/superseriousbusiness/gotosocial/internal/middleware"
tlprocessor "github.com/superseriousbusiness/gotosocial/internal/processing/timeline"
Expand Down Expand Up @@ -128,25 +129,6 @@ var Start action.GTSAction = func(ctx context.Context) error {
TLSInsecureSkipVerify: config.GetHTTPClientTLSInsecureSkipVerify(),
})

// Initialize delivery worker with http client.
state.Workers.Delivery.Init(client)

// Initialize workers.
state.Workers.Start()
defer state.Workers.Stop()

// Add a task to the scheduler to sweep caches.
// Frequency = 1 * minute
// Threshold = 80% capacity
_ = state.Workers.Scheduler.AddRecurring(
"@cachesweep", // id
time.Time{}, // start
time.Minute, // freq
func(context.Context, time.Time) {
state.Caches.Sweep(60)
},
)

// Build handlers used in later initializations.
mediaManager := media.NewManager(&state)
oauthServer := oauth.New(ctx, dbService)
Expand Down Expand Up @@ -195,10 +177,27 @@ var Start action.GTSAction = func(ctx context.Context) error {
return fmt.Errorf("error starting list timeline: %s", err)
}

// Create a media cleaner using the given state.
// Start the job scheduler
// (this is required for cleaner).
state.Workers.StartScheduler()

// Add a task to the scheduler to sweep caches.
// Frequency = 1 * minute
// Threshold = 60% capacity
_ = state.Workers.Scheduler.AddRecurring(
"@cachesweep", // id
time.Time{}, // start
time.Minute, // freq
func(context.Context, time.Time) {
state.Caches.Sweep(60)
},
)

// Create background cleaner.
cleaner := cleaner.New(&state)

// Create the processor using all the other services we've created so far.
// Create the processor using all the
// other services we've created so far.
processor := processing.NewProcessor(
cleaner,
typeConverter,
Expand All @@ -209,13 +208,16 @@ var Start action.GTSAction = func(ctx context.Context) error {
emailSender,
)

// Set state client / federator asynchronous worker enqueue functions
state.Workers.EnqueueClientAPI = processor.Workers().EnqueueClientAPI
state.Workers.EnqueueFediAPI = processor.Workers().EnqueueFediAPI
// Initialize the specialized workers.
state.Workers.Client.Init(messages.ClientMsgIndices())
state.Workers.Federator.Init(messages.FederatorMsgIndices())
state.Workers.Delivery.Init(client)
state.Workers.Client.Process = processor.Workers().ProcessFromClientAPI
state.Workers.Federator.Process = processor.Workers().ProcessFromFediAPI

// Set state client / federator synchronous processing functions.
state.Workers.ProcessFromClientAPI = processor.Workers().ProcessFromClientAPI
state.Workers.ProcessFromFediAPI = processor.Workers().ProcessFromFediAPI
// Initialize workers.
state.Workers.Start()
defer state.Workers.Stop()

// Schedule tasks for all existing poll expiries.
if err := processor.Polls().ScheduleAll(ctx); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ require (
codeberg.org/gruf/go-fastcopy v1.1.2
codeberg.org/gruf/go-iotools v0.0.0-20230811115124-5d4223615a7f
codeberg.org/gruf/go-kv v1.6.4
codeberg.org/gruf/go-list v0.0.0-20240425093752-494db03d641f
codeberg.org/gruf/go-logger/v2 v2.2.1
codeberg.org/gruf/go-mutexes v1.4.1
codeberg.org/gruf/go-runners v1.6.2
codeberg.org/gruf/go-sched v1.2.3
codeberg.org/gruf/go-store/v2 v2.2.4
codeberg.org/gruf/go-structr v0.6.2
codeberg.org/gruf/go-structr v0.7.0
codeberg.org/superseriousbusiness/exif-terminator v0.7.0
github.com/DmitriyVTitov/size v1.5.0
github.com/KimMachineGun/automemlimit v0.6.0
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ codeberg.org/gruf/go-iotools v0.0.0-20230811115124-5d4223615a7f h1:Kazm/PInN2m1S
codeberg.org/gruf/go-iotools v0.0.0-20230811115124-5d4223615a7f/go.mod h1:B8uq4yHtIcKXhBZT9C/SYisz25lldLHMVpwZPz4ADLQ=
codeberg.org/gruf/go-kv v1.6.4 h1:3NZiW8HVdBM3kpOiLb7XfRiihnzZWMAixdCznguhILk=
codeberg.org/gruf/go-kv v1.6.4/go.mod h1:O/YkSvKiS9XsRolM3rqCd9YJmND7dAXu9z+PrlYO4bc=
codeberg.org/gruf/go-list v0.0.0-20240425093752-494db03d641f h1:Ss6Z+vygy+jOGhj96d/GwsYYDd22QmIcH74zM7/nQkw=
codeberg.org/gruf/go-list v0.0.0-20240425093752-494db03d641f/go.mod h1:F9pl4h34iuVN7kucKam9fLwsItTc+9mmaKt7pNXRd/4=
codeberg.org/gruf/go-logger/v2 v2.2.1 h1:RP2u059EQKTBFV3cN8X6xDxNk2RkzqdgXGKflKqB7Oc=
codeberg.org/gruf/go-logger/v2 v2.2.1/go.mod h1:m/vBfG5jNUmYXI8Hg9aVSk7Pn8YgEBITQB/B/CzdRss=
codeberg.org/gruf/go-loosy v0.0.0-20231007123304-bb910d1ab5c4 h1:IXwfoU7f2whT6+JKIKskNl/hBlmWmnF1vZd84Eb3cyA=
Expand All @@ -72,8 +74,8 @@ codeberg.org/gruf/go-sched v1.2.3 h1:H5ViDxxzOBR3uIyGBCf0eH8b1L8wMybOXcdtUUTXZHk
codeberg.org/gruf/go-sched v1.2.3/go.mod h1:vT9uB6KWFIIwnG9vcPY2a0alYNoqdL1mSzRM8I+PK7A=
codeberg.org/gruf/go-store/v2 v2.2.4 h1:8HO1Jh2gg7boQKA3hsDAIXd9zwieu5uXwDXEcTOD9js=
codeberg.org/gruf/go-store/v2 v2.2.4/go.mod h1:zI4VWe5CpXAktYMtaBMrgA5QmO0sQH53LBRvfn1huys=
codeberg.org/gruf/go-structr v0.6.2 h1:1zs7UkPBsRGRDMHhrfFL7GrwAyPHxFXCchu8ADv/zuM=
codeberg.org/gruf/go-structr v0.6.2/go.mod h1:K1FXkUyO6N/JKt8aWqyQ8rtW7Z9ZmXKWP8mFAQ2OJjE=
codeberg.org/gruf/go-structr v0.7.0 h1:gy0/wD7718HwJDoBMeMumk4+7veLrkumgCEOnCyzS8w=
codeberg.org/gruf/go-structr v0.7.0/go.mod h1:K1FXkUyO6N/JKt8aWqyQ8rtW7Z9ZmXKWP8mFAQ2OJjE=
codeberg.org/superseriousbusiness/exif-terminator v0.7.0 h1:Y6VApSXhKqExG0H2hZ2JelRK4xmWdjDQjn13CpEfzko=
codeberg.org/superseriousbusiness/exif-terminator v0.7.0/go.mod h1:gCWKduudUWFzsnixoMzu0FYVdxHWG+AbXnZ50DqxsUE=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
Expand Down
3 changes: 2 additions & 1 deletion internal/api/client/statuses/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func (suite *StatusStandardTestSuite) SetupSuite() {

func (suite *StatusStandardTestSuite) SetupTest() {
suite.state.Caches.Init()
testrig.StartNoopWorkers(&suite.state)

testrig.InitTestConfig()
testrig.InitTestLog()
Expand All @@ -98,6 +97,8 @@ func (suite *StatusStandardTestSuite) SetupTest() {
suite.emailSender = testrig.NewEmailSender("../../../../web/template/", nil)
suite.processor = testrig.NewTestProcessor(&suite.state, suite.federator, suite.emailSender, suite.mediaManager)
suite.statusModule = statuses.New(suite.processor)

testrig.StartWorkers(&suite.state, suite.processor.Workers())
}

func (suite *StatusStandardTestSuite) TearDownTest() {
Expand Down
4 changes: 2 additions & 2 deletions internal/api/client/statuses/statusdelete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"io"
"net/http"
"net/http/httptest"
"strings"
Expand Down Expand Up @@ -69,7 +69,7 @@ func (suite *StatusDeleteTestSuite) TestPostDelete() {

result := recorder.Result()
defer result.Body.Close()
b, err := ioutil.ReadAll(result.Body)
b, err := io.ReadAll(result.Body)
suite.NoError(err)

statusReply := &apimodel.Status{}
Expand Down
8 changes: 4 additions & 4 deletions internal/federation/dereferencing/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (d *Dereferencer) GetAccountByURI(ctx context.Context, requestUser string,

if accountable != nil {
// This account was updated, enqueue re-dereference featured posts.
d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {
if err := d.dereferenceAccountFeatured(ctx, requestUser, account); err != nil {
log.Errorf(ctx, "error fetching account featured collection: %v", err)
}
Expand Down Expand Up @@ -201,7 +201,7 @@ func (d *Dereferencer) GetAccountByUsernameDomain(ctx context.Context, requestUs

if accountable != nil {
// This account was updated, enqueue re-dereference featured posts.
d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {
if err := d.dereferenceAccountFeatured(ctx, requestUser, account); err != nil {
log.Errorf(ctx, "error fetching account featured collection: %v", err)
}
Expand Down Expand Up @@ -322,7 +322,7 @@ func (d *Dereferencer) RefreshAccount(

if accountable != nil {
// This account was updated, enqueue re-dereference featured posts.
d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {
if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil {
log.Errorf(ctx, "error fetching account featured collection: %v", err)
}
Expand Down Expand Up @@ -362,7 +362,7 @@ func (d *Dereferencer) RefreshAccountAsync(
}

// Enqueue a worker function to enrich this account async.
d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {
latest, accountable, err := d.enrichAccountSafely(ctx, requestUser, uri, account, accountable)
if err != nil {
log.Errorf(ctx, "error enriching remote account: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion internal/federation/dereferencing/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (d *Dereferencer) RefreshStatusAsync(
}

// Enqueue a worker function to re-fetch this status entirely async.
d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {
latest, statusable, _, err := d.enrichStatusSafely(ctx,
requestUser,
uri,
Expand Down
4 changes: 2 additions & 2 deletions internal/federation/dereferencing/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ func (d *Dereferencer) dereferenceThread(
}

// Enqueue dereferencing remaining status thread, (children), asychronously .
d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {
if err := d.DereferenceStatusDescendants(ctx, requestUser, uri, statusable); err != nil {
log.Error(ctx, err)
}
})
} else {
// This is an existing status, dereference the WHOLE thread asynchronously.
d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {
if err := d.DereferenceStatusAncestors(ctx, requestUser, status); err != nil {
log.Error(ctx, err)
}
Expand Down
26 changes: 12 additions & 14 deletions internal/federation/federatingdb/accept.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,12 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA
return err
}

// Process side effects asynchronously.
f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
APObjectType: ap.ActivityFollow,
APActivityType: ap.ActivityAccept,
GTSModel: follow,
ReceivingAccount: receivingAcct,
RequestingAccount: requestingAcct,
f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{
APObjectType: ap.ActivityFollow,
APActivityType: ap.ActivityAccept,
GTSModel: follow,
Receiving: receivingAcct,
Requesting: requestingAcct,
})
}

Expand Down Expand Up @@ -138,13 +137,12 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA
return err
}

// Process side effects asynchronously.
f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
APObjectType: ap.ActivityFollow,
APActivityType: ap.ActivityAccept,
GTSModel: follow,
ReceivingAccount: receivingAcct,
RequestingAccount: requestingAcct,
f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{
APObjectType: ap.ActivityFollow,
APActivityType: ap.ActivityAccept,
GTSModel: follow,
Receiving: receivingAcct,
Requesting: requestingAcct,
})

continue
Expand Down
12 changes: 6 additions & 6 deletions internal/federation/federatingdb/announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ func (f *federatingDB) Announce(ctx context.Context, announce vocab.ActivityStre
}

// This is a new boost. Process side effects asynchronously.
f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
APObjectType: ap.ActivityAnnounce,
APActivityType: ap.ActivityCreate,
GTSModel: boost,
ReceivingAccount: receivingAcct,
RequestingAccount: requestingAcct,
f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{
APObjectType: ap.ActivityAnnounce,
APActivityType: ap.ActivityCreate,
GTSModel: boost,
Receiving: receivingAcct,
Requesting: requestingAcct,
})

return nil
Expand Down
8 changes: 5 additions & 3 deletions internal/federation/federatingdb/announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package federatingdb_test

import (
"testing"
"time"

"github.com/stretchr/testify/suite"
"github.com/superseriousbusiness/activity/streams/vocab"
Expand All @@ -42,7 +43,7 @@ func (suite *AnnounceTestSuite) TestNewAnnounce() {
suite.NoError(err)

// should be a message heading to the processor now, which we can intercept here
msg := <-suite.fromFederator
msg, _ := suite.getFederatorMsg(5 * time.Second)
suite.Equal(ap.ActivityAnnounce, msg.APObjectType)
suite.Equal(ap.ActivityCreate, msg.APActivityType)

Expand All @@ -69,7 +70,7 @@ func (suite *AnnounceTestSuite) TestAnnounceTwice() {
suite.NoError(err)

// should be a message heading to the processor now, which we can intercept here
msg := <-suite.fromFederator
msg, _ := suite.getFederatorMsg(5 * time.Second)
suite.Equal(ap.ActivityAnnounce, msg.APObjectType)
suite.Equal(ap.ActivityCreate, msg.APActivityType)
boost, ok := msg.GTSModel.(*gtsmodel.Status)
Expand All @@ -94,7 +95,8 @@ func (suite *AnnounceTestSuite) TestAnnounceTwice() {

// since this is a repeat announce with the same URI, just delivered to a different inbox,
// we should have nothing in the messages channel...
suite.Empty(suite.fromFederator)
_, ok = suite.getFederatorMsg(time.Second)
suite.False(ok)
}

func TestAnnounceTestSuite(t *testing.T) {
Expand Down
Loading

0 comments on commit c9c0773

Please sign in to comment.