Skip to content

Commit

Permalink
Add compat flag for queue consumers to not block on waitUntil'ed work
Browse files Browse the repository at this point in the history
The intent is that the event can be considered done when the
queue() handler's returned promise resolves, allowing acked/nacked
message info to be returned back to the queues infrastructure sooner,
rather than requiring all waitUntil'ed work to finish first. This has
two important benefits:
1. This will keep consumer invocations from getting slowed down by
   slow/hung waitUntil'ed tasks.
1. This will keep consumer invocations from failing due to failing
   waitUntil tasks.

This commit does not implement the new functionality, just adds the flag
and adds the logic to read the flag where needed. The actual
implementation will come in the next commit for easier review.
  • Loading branch information
a-robinson committed Feb 24, 2025
1 parent f7bcdbb commit 1fc8dfe
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 54 deletions.
116 changes: 62 additions & 54 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,7 @@ QueueMessage::QueueMessage(
attempts(message.attempts),
result(result) {}

jsg::JsValue QueueMessage::getBody(jsg::Lock& js) {
return body.getHandle(js);
}
jsg::JsValue QueueMessage::getBody(jsg::Lock& js) { return body.getHandle(js); }

void QueueMessage::retry(jsg::Optional<QueueRetryOptions> options) {
if (result->ackAll) {
Expand Down Expand Up @@ -550,7 +548,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
// It's a little ugly, but the usage of waitUntil (and finishScheduled) down below are here so
// that users can write queue handlers in the old addEventListener("queue", ...) syntax (where we
// can't just wait on their addEventListener handler to resolve because it can't be async).
context.addWaitUntil(context.run(
auto runProm = context.run(
[this, entrypointName = entrypointName, &context, queueEvent = kj::addRef(*queueEventHolder),
&metrics = incomingRequest->getMetrics(),
props = kj::mv(props)](Worker::Lock& lock) mutable {
Expand All @@ -560,60 +558,70 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
queueEvent->event = startQueueEvent(lock.getGlobalScope(), kj::mv(params),
context.addObject(result), lock,
lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor()), typeHandler);
}));

// TODO(soon): There's a good chance we'll want a different wall-clock timeout for queue handlers
// than for scheduled workers, but it's not at all clear yet to me what it should be, so just
// reuse the scheduled worker logic and timeout for now.
auto result = co_await incomingRequest->finishScheduled();
bool completed = result == IoContext_IncomingRequest::FinishScheduledResult::COMPLETED;

// Log some debug info if the request timed out or was aborted.
// In particular, detect whether or not the users queue() handler function completed
// and include info about other waitUntil tasks that may have caused the request to timeout.
if (!completed) {
kj::String status;
if (queueEventHolder->event.get() == nullptr) {
status = kj::str("Empty");
} else {
KJ_SWITCH_ONEOF(queueEventHolder->event->getCompletionStatus()) {
KJ_CASE_ONEOF(i, QueueEvent::Incomplete) {
status = kj::str("Incomplete");
break;
}
KJ_CASE_ONEOF(s, QueueEvent::CompletedSuccessfully) {
status = kj::str("Completed Succesfully");
break;
}
KJ_CASE_ONEOF(e, QueueEvent::CompletedWithError) {
status = kj::str("Completed with error:", e.error);
break;
});

auto compatFlags = context.getWorker().getIsolate().getApi().getFeatureFlags();
if (compatFlags.getQueueConsumerNoWaitForWaitUntil()) {
// The user has opted in to only waiting on their event handler rather than all waitUntil'd
// promises.
KJ_UNIMPLEMENTED("TODO(now)");
} else {
// The user has not opted in to the new waitUntil behavior, so we need to add the queue()
// handler's promise to the waitUntil promises and then wait on them all to finish.
context.addWaitUntil(kj::mv(runProm));

// We reuse the finishScheduled() method for convenience, since queues use the same wall clock
// timeout as scheduled workers.
auto result = co_await incomingRequest->finishScheduled();
bool completed = result == IoContext_IncomingRequest::FinishScheduledResult::COMPLETED;

// Log some debug info if the request timed out or was aborted.
// In particular, detect whether or not the users queue() handler function completed
// and include info about other waitUntil tasks that may have caused the request to timeout.
if (!completed) {
kj::String status;
if (queueEventHolder->event.get() == nullptr) {
status = kj::str("Empty");
} else {
KJ_SWITCH_ONEOF(queueEventHolder->event->getCompletionStatus()) {
KJ_CASE_ONEOF(i, QueueEvent::Incomplete) {
status = kj::str("Incomplete");
break;
}
KJ_CASE_ONEOF(s, QueueEvent::CompletedSuccessfully) {
status = kj::str("Completed Succesfully");
break;
}
KJ_CASE_ONEOF(e, QueueEvent::CompletedWithError) {
status = kj::str("Completed with error:", e.error);
break;
}
}
}
auto& ioContext = incomingRequest->getContext();
auto scriptId = ioContext.getWorker().getScript().getId();
auto tasks = ioContext.getWaitUntilTasks().trace();
if (result == IoContext_IncomingRequest::FinishScheduledResult::TIMEOUT) {
KJ_LOG(WARNING, "NOSENTRY queue event hit timeout", scriptId, status, tasks);
} else if (result == IoContext_IncomingRequest::FinishScheduledResult::ABORTED) {
// Attempt to grab the error message to understand the reason for the abort.
// Include a timeout just in case for some unexpected reason the onAbort promise hasn't
// already rejected.
kj::String abortError;
co_await ioContext.onAbort()
.then([] {}, [&abortError](kj::Exception&& e) {
abortError = kj::str(e);
}).exclusiveJoin(ioContext.afterLimitTimeout(1 * kj::MICROSECONDS).then([&abortError]() {
abortError = kj::str("onAbort() promise has unexpectedly not yet been rejected");
}));
KJ_LOG(WARNING, "NOSENTRY queue event aborted", abortError, scriptId, status, tasks);
}
}
auto& ioContext = incomingRequest->getContext();
auto scriptId = ioContext.getWorker().getScript().getId();
auto tasks = ioContext.getWaitUntilTasks().trace();
if (result == IoContext_IncomingRequest::FinishScheduledResult::TIMEOUT) {
KJ_LOG(WARNING, "NOSENTRY queue event hit timeout", scriptId, status, tasks);
} else if (result == IoContext_IncomingRequest::FinishScheduledResult::ABORTED) {
// Attempt to grab the error message to understand the reason for the abort.
// Include a timeout just in case for some unexpected reason the onAbort promise hasn't
// already rejected.
kj::String abortError;
co_await ioContext.onAbort()
.then([] {}, [&abortError](kj::Exception&& e) {
abortError = kj::str(e);
}).exclusiveJoin(ioContext.afterLimitTimeout(1 * kj::MICROSECONDS).then([&abortError]() {
abortError = kj::str("onAbort() promise has unexpectedly not yet been rejected");
}));
KJ_LOG(WARNING, "NOSENTRY queue event aborted", abortError, scriptId, status, tasks);
}
}

co_return WorkerInterface::CustomEvent::Result{
.outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU,
};
co_return WorkerInterface::CustomEvent::Result{
.outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU,
};
}
}

kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::sendRpc(
Expand Down
9 changes: 9 additions & 0 deletions src/workerd/io/compatibility-date.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -697,4 +697,13 @@ struct CompatibilityFlags @0x8f8c1b68151b6cef {
$compatEnableFlag("memory_cache_delete")
$experimental;
# Enables delete operations on memory cache if enabled.

queueConsumerNoWaitForWaitUntil @74 :Bool
$compatEnableFlag("queue_consumer_no_wait_for_wait_until")
$compatDisableFlag("queue_consumer_wait_for_wait_until");
# If enabled, does not require all waitUntil'ed promises to resolve successfully before reporting
# succeeded/failed messages/batches back from a queue consumer to the Queues service. This
# prevents a slow waitUntil'ed promise from slowing down consumption of messages from a queue,
# which has been a recurring problem for the prior behavior (which did wait for all waitUntil'ed
# tasks to complete.
}

0 comments on commit 1fc8dfe

Please sign in to comment.