-
Notifications
You must be signed in to change notification settings - Fork 314
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(router): support for isolation modes using limiters #3379
Conversation
b0c180a
to
e94c239
Compare
Codecov ReportPatch coverage:
Additional details and impacted files@@ Coverage Diff @@
## master #3379 +/- ##
==========================================
- Coverage 68.47% 68.44% -0.03%
==========================================
Files 330 327 -3
Lines 53042 52801 -241
==========================================
- Hits 36319 36140 -179
+ Misses 14358 14312 -46
+ Partials 2365 2349 -16
☔ View full report in Codecov by Sentry. |
e94c239
to
f58d64c
Compare
0bc1a2c
to
b552588
Compare
} | ||
|
||
// TODO: delete this once we remove the old fair pickup algorithm and move MultiTenantLegacy#GetAllJobs inside JobsDB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[note] added some todos for cleaning up in the next release
if memStat, err := mem.Get(); err == nil { | ||
memoryLimit := int64(80 * memStat.Total / 100) | ||
logger.NewLogger().Infow("Setting memory limit to", "limit", memoryLimit) | ||
debug.SetMemoryLimit(memoryLimit) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[note] setting a soft memory limit (aka GOMEMLIMIT
) to 80% of the container's available memory.
go func() { | ||
wstart := time.Now() | ||
w.Stop() | ||
wg.Done() | ||
wp.logger.Debugf("worker %s stopped in %s", w.partition, time.Since(wstart)) | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[note] stopping workers in parallel
@@ -1289,7 +1268,7 @@ func Unique(stringSlice []string) []string { | |||
} | |||
|
|||
func UseFairPickup() bool { | |||
return config.GetBool("JobsDB.fairPickup", false) || config.GetBool("EnableMultitenancy", false) | |||
return config.GetBool("JobsDB.fairPickup", false) && config.GetString("Router.isolationMode", "default") == "none" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[note] fair pickup will be used only if it is enabled and isolation mode is none
status *jobsdb.JobStatusT | ||
} | ||
|
||
type reloadableConfig struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[note] since we will be starting new routers after switching from degraded to normal mode, moving all reloadable configuration to a separate struct, so that there is minimal leakage!
And promise to prepare a solution for non-leaking reloadable configuration in rudder-go-kit in the future!
b6f698d
to
5131cd0
Compare
db7358c
to
9a87e48
Compare
gateway/integration_test.go
Outdated
@@ -22,7 +22,7 @@ import ( | |||
"github.com/ory/dockertest/v3" | |||
"github.com/stretchr/testify/require" | |||
|
|||
kitHelper "github.com/rudderlabs/rudder-go-kit/testhelper" | |||
kit_helper "github.com/rudderlabs/rudder-go-kit/testhelper" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't we adhering to the outcome of the poll :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
enjoy!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We voted for the same thing but lost :'(
9a87e48
to
b53907e
Compare
@@ -289,7 +249,7 @@ func (w *worker) WorkerProcess() { | |||
} | |||
|
|||
case <-timeout: | |||
timeout = time.After(jobsBatchTimeout) | |||
timeout = time.After(w.rt.reloadableConfig.jobsBatchTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do have this both here and at line 60?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
further refactoring is required indeed, but will refrain from doing it now as changes are already enough :)
b53907e
to
03e1197
Compare
2b97b12
to
8c0f9d7
Compare
Description
Introducing isolation strategies in
router
, following the same pattern that we are already using inprocessor
&batchrouter
.Router is one of a kind, since it now uses not 1 but 2 different types of workers:
Isolation modes
none
- a single partition worker picks up and processes jobs for all workspaces & destinations.workspace
- using a separate partition worker per active workspace (default mode in multi-tenant deployments).destination
- using a separate partition worker per active destination (default mode in non-multi-tenant deployments).Limiters
pickup
- for picking up jobs from jobsdb and assigning them to workers (limit: 50 goroutines/destType)transform
- for controlling concurrency while transforming jobs (limit: 100 goroutines/destType)batch
- for controlling concurrency while batching jobs (limit: 100 goroutines/destType)process
- for controlling concurrency while processing jobs within workers (limit: 100 goroutines/destType)Dynamic limiter priorities
Router uses dynamic priorities for executing tasks inside its limiters. Each partition keeps track of it's throughput and error rate for every limiter-enabled operation, by using
partition.Stats
. These statistics are used to calculate a performance score between 0-100, which in turn gets mapped to a limiter priority between 1-4. This priority is especially important when there is congestion in the limiter (goroutines requesting more slots than the limiter's total cap). The limiter itself has a mechanism to avoid starvation of lower-priority jobs, by periodically incrementing their priority (by default every 1 sec, seeWithLimiterDynamicPeriod
).Future cleanup work
The existing fair pickup algorithm has been retained and will be used only when
JobsDB.fairPickup
is enabled and isolation mode is set tonone
. This is to be able to revert in case an unexpected issue happens during the rollout. However, the final plan is to removeMultiTenantJobsDB - unionQuery.go
andMultiTenantI - tenantstats.go
in the next release, after router isolation has been released, tested and proven with production workloads.Performance
It comes as no surprise to say that performance is again on par (and even better) with the previous solution up to 200 partitions. After that threshold, throughput starts deteriorating.
The relevant benchmark can be found inside
router_isolation_test.go
and a visualisation of one execution of this benchmark is available here.Other notable changes
Init
function from the package.AdaptivePayloadLimiter.enabled
is nowtrue
by default.GOMEMLIMIT
) to 80% of the container's available memory.bytesize
,queue
,mem
,profiler
andsync
packages fromrudder-go-kit
.pipeline_delay_min
&pipeline_delay_max
.Notion Ticket
Link
Security