Skip to content

Commit

Permalink
[performance] processing media and scheduled jobs improvements (#1482)
Browse files Browse the repository at this point in the history
* replace media workers with just runners.WorkerPool, move to state structure, use go-sched for global task scheduling

* improved code comment

* fix worker tryUntil function, update go-runners/go-sched

* make preprocess functions package public, use these where possible to stop doubled up processing

* remove separate emoji worker pool

* limit calls to time.Now() during media preprocessing

* use Processor{} to manage singular runtime of processing media

* ensure workers get started when media manager is used

* improved error setting in processing media, fix media test

* port changes from processingmedia to processing emoji

* finish code commenting

* finish code commenting and comment-out client API + federator worker pools until concurrency worker pools replaced

* linterrrrrrrrrrrrrrrr

---------

Signed-off-by: kim <grufwub@gmail.com>
  • Loading branch information
NyaaaWhatsUpDoc authored Feb 13, 2023
1 parent 76d1b48 commit acc9592
Show file tree
Hide file tree
Showing 54 changed files with 1,844 additions and 2,671 deletions.
6 changes: 6 additions & 0 deletions cmd/gotosocial/action/admin/account/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
var Create action.GTSAction = func(ctx context.Context) error {
var state state.State
state.Caches.Init()
state.Workers.Start()

dbConn, err := bundb.NewBunDBService(ctx, &state)
if err != nil {
Expand Down Expand Up @@ -97,6 +98,7 @@ var Create action.GTSAction = func(ctx context.Context) error {
var Confirm action.GTSAction = func(ctx context.Context) error {
var state state.State
state.Caches.Init()
state.Workers.Start()

dbConn, err := bundb.NewBunDBService(ctx, &state)
if err != nil {
Expand Down Expand Up @@ -140,6 +142,7 @@ var Confirm action.GTSAction = func(ctx context.Context) error {
var Promote action.GTSAction = func(ctx context.Context) error {
var state state.State
state.Caches.Init()
state.Workers.Start()

dbConn, err := bundb.NewBunDBService(ctx, &state)
if err != nil {
Expand Down Expand Up @@ -180,6 +183,7 @@ var Promote action.GTSAction = func(ctx context.Context) error {
var Demote action.GTSAction = func(ctx context.Context) error {
var state state.State
state.Caches.Init()
state.Workers.Start()

dbConn, err := bundb.NewBunDBService(ctx, &state)
if err != nil {
Expand Down Expand Up @@ -220,6 +224,7 @@ var Demote action.GTSAction = func(ctx context.Context) error {
var Disable action.GTSAction = func(ctx context.Context) error {
var state state.State
state.Caches.Init()
state.Workers.Start()

dbConn, err := bundb.NewBunDBService(ctx, &state)
if err != nil {
Expand Down Expand Up @@ -260,6 +265,7 @@ var Disable action.GTSAction = func(ctx context.Context) error {
var Password action.GTSAction = func(ctx context.Context) error {
var state state.State
state.Caches.Init()
state.Workers.Start()

dbConn, err := bundb.NewBunDBService(ctx, &state)
if err != nil {
Expand Down
17 changes: 8 additions & 9 deletions cmd/gotosocial/action/admin/media/prune/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,24 @@ type prune struct {
func setupPrune(ctx context.Context) (*prune, error) {
var state state.State
state.Caches.Init()
state.Workers.Start()

dbService, err := bundb.NewBunDBService(ctx, &state)
if err != nil {
return nil, fmt.Errorf("error creating dbservice: %w", err)
}

storage, err := gtsstorage.AutoConfig() //nolint:contextcheck
//nolint:contextcheck
storage, err := gtsstorage.AutoConfig()
if err != nil {
return nil, fmt.Errorf("error creating storage backend: %w", err)
}

manager, err := media.NewManager(dbService, storage) //nolint:contextcheck
if err != nil {
return nil, fmt.Errorf("error instantiating mediamanager: %w", err)
}
state.DB = dbService
state.Storage = storage

//nolint:contextcheck
manager := media.NewManager(&state)

return &prune{
dbService: dbService,
Expand All @@ -70,9 +73,5 @@ func (p *prune) shutdown(ctx context.Context) error {
return fmt.Errorf("error closing dbservice: %w", err)
}

if err := p.manager.Stop(); err != nil {
return fmt.Errorf("error closing media manager: %w", err)
}

return nil
}
34 changes: 18 additions & 16 deletions cmd/gotosocial/action/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,33 +91,35 @@ var Start action.GTSAction = func(ctx context.Context) error {
return fmt.Errorf("error creating instance instance: %s", err)
}

// Create the client API and federator worker pools
// NOTE: these MUST NOT be used until they are passed to the
// processor and it is started. The reason being that the processor
// sets the Worker process functions and start the underlying pools
clientWorker := concurrency.NewWorkerPool[messages.FromClientAPI](-1, -1)
fedWorker := concurrency.NewWorkerPool[messages.FromFederator](-1, -1)

federatingDB := federatingdb.New(dbService, fedWorker)

// build converters and util
typeConverter := typeutils.NewConverter(dbService)

// Open the storage backend
storage, err := gtsstorage.AutoConfig()
if err != nil {
return fmt.Errorf("error creating storage backend: %w", err)
}

// Set the state storage driver
state.Storage = storage

// Build HTTP client (TODO: add configurables here)
client := httpclient.New(httpclient.Config{})

// Initialize workers.
state.Workers.Start()
defer state.Workers.Stop()

// Create the client API and federator worker pools
// NOTE: these MUST NOT be used until they are passed to the
// processor and it is started. The reason being that the processor
// sets the Worker process functions and start the underlying pools
// TODO: move these into state.Workers (and maybe reformat worker pools).
clientWorker := concurrency.NewWorkerPool[messages.FromClientAPI](-1, -1)
fedWorker := concurrency.NewWorkerPool[messages.FromFederator](-1, -1)

// build backend handlers
mediaManager, err := media.NewManager(dbService, storage)
if err != nil {
return fmt.Errorf("error creating media manager: %s", err)
}
mediaManager := media.NewManager(&state)
oauthServer := oauth.New(ctx, dbService)
typeConverter := typeutils.NewConverter(dbService)
federatingDB := federatingdb.New(dbService, fedWorker, typeConverter)
transportController := transport.NewController(dbService, federatingDB, &federation.Clock{}, client)
federator := federation.NewFederator(dbService, federatingDB, transportController, typeConverter, mediaManager)

Expand Down
10 changes: 7 additions & 3 deletions cmd/gotosocial/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ package main

import (
"log"
"runtime/debug"
godebug "runtime/debug"
"strings"

"codeberg.org/gruf/go-debug"
"github.com/spf13/cobra"

_ "github.com/superseriousbusiness/gotosocial/docs"
Expand Down Expand Up @@ -60,9 +61,12 @@ func main() {

// add subcommands
rootCmd.AddCommand(serverCommands())
rootCmd.AddCommand(testrigCommands())
rootCmd.AddCommand(debugCommands())
rootCmd.AddCommand(adminCommands())
if debug.DEBUG {
// only add testrig if debug enabled.
rootCmd.AddCommand(testrigCommands())
}

// run
if err := rootCmd.Execute(); err != nil {
Expand All @@ -73,7 +77,7 @@ func main() {
// version will build a version string from binary's stored build information.
func version() string {
// Read build information from binary
build, ok := debug.ReadBuildInfo()
build, ok := godebug.ReadBuildInfo()
if !ok {
return ""
}
Expand Down
9 changes: 4 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ require (
codeberg.org/gruf/go-bytesize v1.0.2
codeberg.org/gruf/go-byteutil v1.0.2
codeberg.org/gruf/go-cache/v3 v3.2.2
codeberg.org/gruf/go-debug v1.2.0
codeberg.org/gruf/go-debug v1.3.0
codeberg.org/gruf/go-errors/v2 v2.1.1
codeberg.org/gruf/go-fastcopy v1.1.2
codeberg.org/gruf/go-kv v1.5.2
codeberg.org/gruf/go-logger/v2 v2.2.1
codeberg.org/gruf/go-mutexes v1.1.5
codeberg.org/gruf/go-runners v1.5.1
codeberg.org/gruf/go-runners v1.6.0
codeberg.org/gruf/go-sched v1.2.3
codeberg.org/gruf/go-store/v2 v2.2.1
github.com/KimMachineGun/automemlimit v0.2.4
github.com/abema/go-mp4 v0.10.0
Expand All @@ -37,7 +38,6 @@ require (
github.com/minio/minio-go/v7 v7.0.48
github.com/mitchellh/mapstructure v1.5.0
github.com/oklog/ulid v1.3.1
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/cobra v1.6.1
github.com/spf13/viper v1.15.0
github.com/stretchr/testify v1.8.1
Expand Down Expand Up @@ -66,7 +66,7 @@ require (

require (
codeberg.org/gruf/go-atomics v1.1.0 // indirect
codeberg.org/gruf/go-bitutil v1.0.1 // indirect
codeberg.org/gruf/go-bitutil v1.1.0 // indirect
codeberg.org/gruf/go-bytes v1.0.2 // indirect
codeberg.org/gruf/go-fastpath v1.0.3 // indirect
codeberg.org/gruf/go-fastpath/v2 v2.0.0 // indirect
Expand All @@ -75,7 +75,6 @@ require (
codeberg.org/gruf/go-mangler v1.2.2 // indirect
codeberg.org/gruf/go-maps v1.0.3 // indirect
codeberg.org/gruf/go-pools v1.1.0 // indirect
codeberg.org/gruf/go-sched v1.2.0 // indirect
github.com/aymerick/douceur v0.2.0 // indirect
github.com/cilium/ebpf v0.4.0 // indirect
github.com/containerd/cgroups v1.0.4 // indirect
Expand Down
18 changes: 8 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3f
codeberg.org/gruf/go-atomics v1.1.0 h1:ni9QXYoRUFYQMXE3akWaUb1wMcPBDc05Md6Rgml7W58=
codeberg.org/gruf/go-atomics v1.1.0/go.mod h1:a/4/y/LgvjxjQVnpoy1VVkOSzLS1W9i1g4SJ0nflAa4=
codeberg.org/gruf/go-bitutil v1.0.0/go.mod h1:sb8IjlDnjVTz8zPK/8lmHesKxY0Yb3iqHWjUM/SkphA=
codeberg.org/gruf/go-bitutil v1.0.1 h1:l8z9nOvCpHhicU2LZyJ6jLK03UNzCF6bxVCwu+VEenQ=
codeberg.org/gruf/go-bitutil v1.0.1/go.mod h1:3ezHnADoiRJs9jgn65AEZ3HY7dsabAYLmmnIvseCGJI=
codeberg.org/gruf/go-bitutil v1.1.0 h1:U1Q+A1mtnPk+npqYrlRBc9ar2C5hYiBd17l1Wrp2Bt8=
codeberg.org/gruf/go-bitutil v1.1.0/go.mod h1:rGibFevYTQfYKcPv0Df5KpG8n5xC3AfD4d/UgYeoNy0=
codeberg.org/gruf/go-bytes v1.0.0/go.mod h1:1v/ibfaosfXSZtRdW2rWaVrDXMc9E3bsi/M9Ekx39cg=
codeberg.org/gruf/go-bytes v1.0.2 h1:malqE42Ni+h1nnYWBUAJaDDtEzF4aeN4uPN8DfMNNvo=
codeberg.org/gruf/go-bytes v1.0.2/go.mod h1:1v/ibfaosfXSZtRdW2rWaVrDXMc9E3bsi/M9Ekx39cg=
Expand All @@ -51,8 +51,8 @@ codeberg.org/gruf/go-byteutil v1.0.2 h1:OesVyK5VKWeWdeDR00zRJ+Oy8hjXx1pBhn7WVvcZ
codeberg.org/gruf/go-byteutil v1.0.2/go.mod h1:cWM3tgMCroSzqoBXUXMhvxTxYJp+TbCr6ioISRY5vSU=
codeberg.org/gruf/go-cache/v3 v3.2.2 h1:hq6/RITgpcArjzbYSyo3uFxfIw7wW3KqAQjEaN7dj58=
codeberg.org/gruf/go-cache/v3 v3.2.2/go.mod h1:+Eje6nCvN8QF71VyYjMWMnkdv6t1kHnCO/SvyC4K12Q=
codeberg.org/gruf/go-debug v1.2.0 h1:WBbTMnK1ArFKUmgv04aO2JiC/daTOB8zQGi521qb7OU=
codeberg.org/gruf/go-debug v1.2.0/go.mod h1:N+vSy9uJBQgpQcJUqjctvqFz7tBHJf+S/PIjLILzpLg=
codeberg.org/gruf/go-debug v1.3.0 h1:PIRxQiWUFKtGOGZFdZ3Y0pqyfI0Xr87j224IYe2snZs=
codeberg.org/gruf/go-debug v1.3.0/go.mod h1:N+vSy9uJBQgpQcJUqjctvqFz7tBHJf+S/PIjLILzpLg=
codeberg.org/gruf/go-errors/v2 v2.0.0/go.mod h1:ZRhbdhvgoUA3Yw6e56kd9Ox984RrvbEFC2pOXyHDJP4=
codeberg.org/gruf/go-errors/v2 v2.1.1 h1:oj7JUIvUBafF60HrwN74JrCMol1Ouh3gq1ggrH5hGTw=
codeberg.org/gruf/go-errors/v2 v2.1.1/go.mod h1:LfzD9nkAAJpEDbkUqOZQ2jdaQ8VrK0pnR36zLOMFq6Y=
Expand All @@ -79,10 +79,10 @@ codeberg.org/gruf/go-mutexes v1.1.5 h1:8Y8DwCGf24MyzOSaPvLrtk/B4ecVx4z+fppL6dY+P
codeberg.org/gruf/go-mutexes v1.1.5/go.mod h1:1j/6/MBeBQUedAtAtysLLnBKogfOZAxdym0E3wlaBD8=
codeberg.org/gruf/go-pools v1.1.0 h1:LbYP24eQLl/YI1fSU2pafiwhGol1Z1zPjRrMsXpF88s=
codeberg.org/gruf/go-pools v1.1.0/go.mod h1:ZMYpt/DjQWYC3zFD3T97QWSFKs62zAUGJ/tzvgB9D68=
codeberg.org/gruf/go-runners v1.5.1 h1:ekhhxKvO6D/VC7nS/xpv71/iRX01JSqcBEbahqPUghg=
codeberg.org/gruf/go-runners v1.5.1/go.mod h1:kUM6GYL7dC+f9Sc/XuwdvB/mB4FuI4fJFb150ADMsmw=
codeberg.org/gruf/go-sched v1.2.0 h1:utZl/7srVcbh30rFw42LC2/cMtak4UZRxtIOt/5riNA=
codeberg.org/gruf/go-sched v1.2.0/go.mod h1:v4ueWq+fAtAw9JYt4aFXvadI1YoOqofgHQgszRYuslA=
codeberg.org/gruf/go-runners v1.6.0 h1:cAHKxMgtkb3v6it4qZZs4fo+yYgICNCrYvFlayuvSdk=
codeberg.org/gruf/go-runners v1.6.0/go.mod h1:QRcSExqXX8DM0rm8Xs6qX7baOzyvw0JIe4mu3TsQT+Y=
codeberg.org/gruf/go-sched v1.2.3 h1:H5ViDxxzOBR3uIyGBCf0eH8b1L8wMybOXcdtUUTXZHk=
codeberg.org/gruf/go-sched v1.2.3/go.mod h1:vT9uB6KWFIIwnG9vcPY2a0alYNoqdL1mSzRM8I+PK7A=
codeberg.org/gruf/go-store/v2 v2.2.1 h1:lbvMjhMLebefiaPNLtWvPySKSYM5xN1aztSxxz+vCzU=
codeberg.org/gruf/go-store/v2 v2.2.1/go.mod h1:pxdyfSzau8fFs1TfZlyRzhDYvZWLaj1sXpcjXpzBB6k=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
Expand Down Expand Up @@ -478,8 +478,6 @@ github.com/quasoft/memstore v0.0.0-20191010062613-2bce066d2b0b h1:aUNXCGgukb4gtY
github.com/quasoft/memstore v0.0.0-20191010062613-2bce066d2b0b/go.mod h1:wTPjTepVu7uJBYgZ0SdWHQlIas582j6cn2jgk4DDdlg=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
Expand Down
4 changes: 2 additions & 2 deletions internal/federation/dereferencing/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (d *deref) fetchRemoteAccountAvatar(ctx context.Context, tsport transport.T
}

// Create new media processing request from the media manager instance.
processing, err := d.mediaManager.ProcessMedia(ctx, data, nil, accountID, &media.AdditionalMediaInfo{
processing, err := d.mediaManager.PreProcessMedia(ctx, data, nil, accountID, &media.AdditionalMediaInfo{
Avatar: func() *bool { v := false; return &v }(),
RemoteURL: &avatarURL,
})
Expand Down Expand Up @@ -407,7 +407,7 @@ func (d *deref) fetchRemoteAccountHeader(ctx context.Context, tsport transport.T
}

// Create new media processing request from the media manager instance.
processing, err := d.mediaManager.ProcessMedia(ctx, data, nil, accountID, &media.AdditionalMediaInfo{
processing, err := d.mediaManager.PreProcessMedia(ctx, data, nil, accountID, &media.AdditionalMediaInfo{
Header: func() *bool { v := true; return &v }(),
RemoteURL: &headerURL,
})
Expand Down
4 changes: 1 addition & 3 deletions internal/federation/dereferencing/emoji.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (d *deref) GetRemoteEmoji(ctx context.Context, requestingUsername string, r
return t.DereferenceMedia(innerCtx, derefURI)
}

newProcessing, err := d.mediaManager.ProcessEmoji(ctx, dataFunc, nil, shortcode, id, emojiURI, ai, refresh)
newProcessing, err := d.mediaManager.PreProcessEmoji(ctx, dataFunc, nil, shortcode, id, emojiURI, ai, refresh)
if err != nil {
return nil, fmt.Errorf("GetRemoteEmoji: error processing emoji %s: %s", shortcodeDomain, err)
}
Expand Down Expand Up @@ -146,7 +146,6 @@ func (d *deref) populateEmojis(ctx context.Context, rawEmojis []*gtsmodel.Emoji,
Disabled: gotEmoji.Disabled,
VisibleInPicker: gotEmoji.VisibleInPicker,
}, refresh)

if err != nil {
log.Errorf("populateEmojis: couldn't refresh remote emoji %s: %s", shortcodeDomain, err)
continue
Expand All @@ -172,7 +171,6 @@ func (d *deref) populateEmojis(ctx context.Context, rawEmojis []*gtsmodel.Emoji,
Disabled: e.Disabled,
VisibleInPicker: e.VisibleInPicker,
}, refresh)

if err != nil {
log.Errorf("populateEmojis: couldn't get remote emoji %s: %s", shortcodeDomain, err)
continue
Expand Down
4 changes: 2 additions & 2 deletions internal/federation/federatingdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ type federatingDB struct {
}

// New returns a DB interface using the given database and config
func New(db db.DB, fedWorker *concurrency.WorkerPool[messages.FromFederator]) DB {
func New(db db.DB, fedWorker *concurrency.WorkerPool[messages.FromFederator], tc typeutils.TypeConverter) DB {
fdb := federatingDB{
locks: mutexes.NewMap(-1, -1), // use defaults
db: db,
fedWorker: fedWorker,
typeConverter: typeutils.NewConverter(db),
typeConverter: tc,
}
return &fdb
}
3 changes: 0 additions & 3 deletions internal/gotosocial/gotosocial.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ func (gts *gotosocial) Stop(ctx context.Context) error {
if err := gts.apiRouter.Stop(ctx); err != nil {
return err
}
if err := gts.mediaManager.Stop(); err != nil {
return err
}
if err := gts.db.Stop(ctx); err != nil {
return err
}
Expand Down
Loading

0 comments on commit acc9592

Please sign in to comment.