-
Notifications
You must be signed in to change notification settings - Fork 146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reintroduce slot supplier & add many tests #2143
Reintroduce slot supplier & add many tests #2143
Conversation
permit = | ||
timeLimiter.callWithTimeout( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the actual fix. The source of the bug was not assigning the permit when reservation was called behind this callWithTimeout
function.
testWorkflowRule.getTestEnvironment().close(); | ||
assertEquals( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some dupe among these tests with this bit, and asserting on the metrics, etc.
Not sure if it's actually worth it or not to dedupe it, though. Welcome any thoughts on that.
@@ -49,9 +54,16 @@ public class WorkflowSlotTests { | |||
private final int MAX_CONCURRENT_WORKFLOW_TASK_EXECUTION_SIZE = 100; | |||
private final int MAX_CONCURRENT_ACTIVITY_EXECUTION_SIZE = 1000; | |||
private final int MAX_CONCURRENT_LOCAL_ACTIVITY_EXECUTION_SIZE = 10000; | |||
private final CountingSlotSupplier<WorkflowSlotInfo> workflowTaskSlotSupplier = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how I feel about this because now you are not longer testing the default worker setting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a fairly thin wrapper and the only reasonable way I had to actually test the slot mechanism directly instead of via metrics without exposing a bunch more stuff publicly.
If you have another suggestion I'm open for sure, but, this was seemingly the best way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I was thinking you could have a few tests that schedule more activities/workflows then you have slots so you know some had to be freed. Thoughts on a test like that? You can also keep a static count in the activity to cross check with.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure I can add something like that
temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java
Outdated
Show resolved
Hide resolved
temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java
Outdated
Show resolved
Hide resolved
temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java
Outdated
Show resolved
Hide resolved
temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java
Outdated
Show resolved
Hide resolved
temporal-sdk/src/test/java/io/temporal/worker/ResourceBasedTunerTests.java
Show resolved
Hide resolved
temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java
Outdated
Show resolved
Hide resolved
9d52c23
to
747dce7
Compare
@@ -135,6 +135,8 @@ private MetricsType() {} | |||
// gauge | |||
public static final String WORKER_TASK_SLOTS_AVAILABLE = | |||
TEMPORAL_METRICS_PREFIX + "worker_task_slots_available"; | |||
public static final String WORKER_TASK_SLOTS_USED = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: these are public, which is weird but it is what it is, so should be marked experimental.
temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java
Outdated
Show resolved
Hide resolved
@@ -416,14 +437,13 @@ private AttemptTaskHandlerImpl(ActivityTaskHandler handler) { | |||
|
|||
@Override | |||
public void handle(LocalActivityAttemptTask attemptTask) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would need to double check the Java SDK, but is handle
here called per attempt of a local activity? Is there a risk we markSlotUsed
multiple times for the same local activity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. Added a test & fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the original code it seems like before the local activity slot metric was not considered "used" while the local activity was backing off. Now I think the slot will be considered "used" if the activity is executing or not. Do you think I am correct? If so I think that is fine since the slot is not available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually looking at the code more it appears there is a more significant change here. Before the local activity concurrency was limited by the number of threads in the thread pool, and more then the max local activity task slot could be scheduled. Now we are limiting the max number of local activities being scheduled to the number of local activity slots. That could have implications to users who use local activities for certain use cases like polling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed on call:
- Will keep the ratelimiter which is 2x max size, or for implementations w/o max size, 2x number of currently running activities (only to be triggered if tryReserve fails).
- Release slots for LAs which are backing off, prioritize retries and re-acquire then
747dce7
to
765dfca
Compare
@@ -154,6 +161,14 @@ public boolean callback(LocalActivityResult result) { | |||
if (scheduleToCloseFuture != null) { | |||
scheduleToCloseFuture.cancel(false); | |||
} | |||
SlotReleaseReason reason = SlotReleaseReason.taskComplete(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that the slot was never used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No - it's used in the sense that we tried to run the LA even if it timed out, which counts as taskComplete
in this case. We might need finer-grained reasons at some point if people ask for them
temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java
Outdated
Show resolved
Hide resolved
TrackingSlotSupplier<LocalActivitySlotInfo> slotSupplier, | ||
Functions.Proc1<LocalActivityAttemptTask> afterReservedCallback) { | ||
this.afterReservedCallback = afterReservedCallback; | ||
// TODO: See if I can adjust this for dynamic ones based on current rate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kinda the last thing. I don't think there's really much sensible that can be done here, but maybe we want to make it an option?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I would open an issue to revisit the backoff logic. My main goal was to just maintain the current behavior for a fixed slot suppliers without some more thought.
} | ||
// Permit can be null in the event of a timeout while waiting on a permit | ||
if (permit != null) { | ||
slotSupplier.releaseSlot(reason, permit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't we say we would move the release out of here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good point. And move it instead to handle
, which is more like the normal task types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the main issue is that this gets referenced from like a bajillion places, so it's not obvious that centralizing it is going to catch all those... but I can see what happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It also looks like your release the slot again in handleResult
? Maybe it is being release twice. The slot acquire/release. handle
is the right spot IMO, currently the SDK takes and releases slots after handle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At a high level the slot acquire/release should be tied to the processing of the task, not the reporting of the result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 done
@@ -584,6 +554,8 @@ private void handleResult( | |||
executionContext, activityTask, activityHandlerResult.getTaskFailed().getFailure()); | |||
|
|||
if (retryDecision.doNextAttempt()) { | |||
// Release slot before scheduling the next attempt | |||
slotSupplier.releaseSlot(SlotReleaseReason.willRetry(), executionContext.getPermit()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't you pass the failure here like you do for normal activities?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normal activities don't actually set an error - it's more like when processing failed rather than the task itself failed that the error
variant is used. So I think this is consistent.
This reverts commit 46b239d.
…y when accept timeout exists
…aking test take too long
3cd6cd6
to
061d066
Compare
// where scheduleToStart is already fired, but didn't report a completion yet. | ||
boolean shouldDiscardTheAttempt = scheduleToStartFired || executionContext.isCompleted(); | ||
if (shouldDiscardTheAttempt) { | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here shouldn't the task slot release reason be not used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite. This is a timeout, and the timeouts right now are also taskComplete. Not used is right now just "didn't even get a task"
temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java
Outdated
Show resolved
Hide resolved
Did we add a test that shows retrying local activities do not hold the slot while in a back-off stage? |
publishSlotsMetric(); | ||
} | ||
|
||
public void releaseSlot(SlotReleaseReason reason, SlotPermit permit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens on a double release? I think right now it will silently pass?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will. There's not a great way to avoid that that I see, at least not without making some kind of publicly accessible isReleased
field on the permit, since that's in worker rather than internal. Could be done though. Alternatively it can just be tested for with CountingSlotSupplier
in the tests which is in essence doing it by making sure the acquire/releases always line up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I was thinking it was easier to check then maybe it actually is since it probably comes down to the route slot supplier. Maybe just testing CountingSlotSupplier
is enough
This is implicitly covered in |
2b1f325
to
ceb3455
Compare
ceb3455
to
0f8e0b6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I know @cretz reviewed the original, may want to double check if he has any thoughts, not required though.
Chad mentioned he's good w/ your review |
What was changed
Re-introduce slot supplier framework reverted in #2134
Why?
Bring back feature with fix for leaking local activity slots
Checklist
Closes
How was this tested:
Added many tests
Any docs updates needed?