-
-
Notifications
You must be signed in to change notification settings - Fork 349
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[performance] update remaining worker pools to use queues #2865
[performance] update remaining worker pools to use queues #2865
Conversation
Ooh this is exciting 👀 |
// Create a media cleaner using the given state. | ||
// Start the job scheduler | ||
// (this is required for cleaner). | ||
state.Workers.StartScheduler() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this all had to get shuffled and the scheduler start separated out as the workers now panic if they are started with a nil processing function (since they rely on a message queue, instead of a function queue). and the scheduler has to get started before the workers as it's needed to initialize the cleaner, which is needed to initialize the processor, which in turn is needed to start the workers :p
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't forget to change this as needed in the other CLI commands too, re: #2843
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should also fix #2843
internal/httpclient/client.go
Outdated
@@ -116,6 +123,11 @@ func New(cfg Config) *Client { | |||
var c Client | |||
c.retries = 5 | |||
|
|||
if cfg.HTTPClient != nil { | |||
// Copy over existing client. | |||
c.client = *cfg.HTTPClient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain what changed here? Not in comments I mean, just here in the review :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i removed it here: c71099c
basically i was messing around at some point with being able to pass in an existing client so we could more easily use the httptest package, but that's out of scope for this PR :)
internal/workers/worker_fn.go
Outdated
|
||
// Start will attempt to start 'n' FnWorker{}s. | ||
func (p *FnWorkerPool) Start(n int) (ok bool) { | ||
if ok = (len(p.workers) == 0); ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is quite confusing to read because it's not really clear what work ok
is doing in here. It seems like you're using it twice for two different things but I'm not really following. Also the last time you use it in the && ok
bit, it must be true by that point right? 🤔 Could you refactor this a bit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah i'll refactor to add some comments and explain what's going on :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see: 92b2f7a
internal/workers/worker_fn.go
Outdated
func (p *FnWorkerPool) Stop() (ok bool) { | ||
if ok = (len(p.workers) > 0); ok { | ||
for i := range p.workers { | ||
ok = p.workers[i].Stop() && ok |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment here re this OK variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah i'll refactor to add some comments and explain what's going on :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see: 92b2f7a
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good changes! A necessary refactor! Just got some questions about code readability because there's some stuff my pea brain can't follow properly :')
Oeie sorry you'll have to merge main into this because of #2862 Should be a painless merge though I think (fingers crossed) |
Feel free to squerge once everything is cleaned up btw :) The code itself looked ready to me. |
…d work not dropped
…rkers in unexpected places
…t pull from queue
…running or already stopped
…handle side-effects)
238fa82
to
07e6331
Compare
…usbusiness#2865) * start replacing client + federator + media workers with new worker + queue types * refactor federatingDB.Delete(), drop queued messages when deleting account / status * move all queue purging to the processor workers * undo toolchain updates * code comments, ensure dereferencer worker pool gets started * update gruf libraries in readme * start the job scheduler separately to the worker pools * reshuffle ordering or server.go + remove duplicate worker start / stop * update go-list version * fix vendoring * move queue invalidation to before wipeing / deletion, to ensure queued work not dropped * add logging to worker processing functions in testrig, don't start workers in unexpected places * update go-structr to add (+then rely on) QueueCtx{} type * ensure more worker pools get started properly in tests * fix remaining broken tests relying on worker queue logic * fix account test suite queue popping logic, ensure noop workers do not pull from queue * move back accidentally shuffled account deletion order * ensure error (non nil!!) gets passed in refactored federatingDB{}.Delete() * silently drop deletes from accounts not permitted to * don't warn log on forwarded deletes * make if else clauses easier to parse * use getFederatorMsg() * improved code comment * improved code comment re: requesting account delete checks * remove boolean result from worker start / stop since false = already running or already stopped * remove optional passed-in http.client * remove worker starting from the admin CLI commands (we don't need to handle side-effects) * update prune cli to start scheduler but not all of the workers * fix rebase issues * remove redundant return statements * i'm sorry sir linter
…usbusiness#2865) * start replacing client + federator + media workers with new worker + queue types * refactor federatingDB.Delete(), drop queued messages when deleting account / status * move all queue purging to the processor workers * undo toolchain updates * code comments, ensure dereferencer worker pool gets started * update gruf libraries in readme * start the job scheduler separately to the worker pools * reshuffle ordering or server.go + remove duplicate worker start / stop * update go-list version * fix vendoring * move queue invalidation to before wipeing / deletion, to ensure queued work not dropped * add logging to worker processing functions in testrig, don't start workers in unexpected places * update go-structr to add (+then rely on) QueueCtx{} type * ensure more worker pools get started properly in tests * fix remaining broken tests relying on worker queue logic * fix account test suite queue popping logic, ensure noop workers do not pull from queue * move back accidentally shuffled account deletion order * ensure error (non nil!!) gets passed in refactored federatingDB{}.Delete() * silently drop deletes from accounts not permitted to * don't warn log on forwarded deletes * make if else clauses easier to parse * use getFederatorMsg() * improved code comment * improved code comment re: requesting account delete checks * remove boolean result from worker start / stop since false = already running or already stopped * remove optional passed-in http.client * remove worker starting from the admin CLI commands (we don't need to handle side-effects) * update prune cli to start scheduler but not all of the workers * fix rebase issues * remove redundant return statements * i'm sorry sir linter
Description
Updates the remaining worker pools (media, client, federator) worker pools to use the new queueing system, to allow for purging of jobs where possible (e.g. for incoming deletes that render other jobs unnecessary). This will make it easier in the future to drop + serialize all queued jobs to disk, and then resume those jobs on startup.
Checklist
go fmt ./...
andgolangci-lint run
.