Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compat flag for queue consumers to not block on waitUntil'ed work #3596

Merged
merged 2 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 132 additions & 59 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -461,15 +461,20 @@ void QueueEvent::ackAll() {
}

namespace {
jsg::Ref<QueueEvent> startQueueEvent(EventTarget& globalEventTarget,

struct StartQueueEventResponse {
jsg::Ref<QueueEvent> event = nullptr;
kj::Maybe<kj::Promise<void>> exportedHandlerProm;
bool isServiceWorkerHandler = false;
};

StartQueueEventResponse startQueueEvent(EventTarget& globalEventTarget,
kj::OneOf<rpc::EventDispatcher::QueueParams::Reader, QueueEvent::Params> params,
IoPtr<QueueEventResult> result,
Worker::Lock& lock,
kj::Maybe<ExportedHandler&> exportedHandler,
const jsg::TypeHandler<QueueExportedHandler>& handlerHandler) {
jsg::Lock& js = lock;
// Start a queue event (called from C++, not JS). Similar to startScheduled(), the caller must
// wait for waitUntil()s to produce the final QueueResult.
jsg::Ref<QueueEvent> event(nullptr);
KJ_SWITCH_ONEOF(params) {
KJ_CASE_ONEOF(p, rpc::EventDispatcher::QueueParams::Reader) {
Expand All @@ -480,23 +485,31 @@ jsg::Ref<QueueEvent> startQueueEvent(EventTarget& globalEventTarget,
}
}

kj::Maybe<kj::Promise<void>> exportedHandlerProm;
bool isServiceWorkerHandler = false;
KJ_IF_SOME(h, exportedHandler) {
auto queueHandler = KJ_ASSERT_NONNULL(handlerHandler.tryUnwrap(lock, h.self.getHandle(lock)));
KJ_IF_SOME(f, queueHandler.queue) {
auto promise = f(lock, jsg::alloc<QueueController>(event.addRef()),
jsg::JsValue(h.env.getHandle(js)).addRef(js), h.getCtx());
event->waitUntil(promise.then([event = event.addRef()]() mutable {
jsg::JsValue(h.env.getHandle(js)).addRef(js), h.getCtx())
.then([event = event.addRef()]() mutable {
event->setCompletionStatus(QueueEvent::CompletedSuccessfully{});
}, [event = event.addRef()](kj::Exception&& e) mutable {
event->setCompletionStatus(QueueEvent::CompletedWithError{kj::cp(e)});
return kj::mv(e);
}));
});
if (FeatureFlags::get(js).getQueueConsumerNoWaitForWaitUntil()) {
exportedHandlerProm = kj::mv(promise);
} else {
event->waitUntil(kj::mv(promise));
}
} else {
lock.logWarningOnce("Received a QueueEvent but we lack a handler for QueueEvents. "
"Did you remember to export a queue() function?");
JSG_FAIL_REQUIRE(Error, "Handler does not export a queue() function.");
}
} else {
isServiceWorkerHandler = true;
if (globalEventTarget.getHandlerCount("queue") == 0) {
lock.logWarningOnce("Received a QueueEvent but we lack an event listener for queue events. "
"Did you remember to call addEventListener(\"queue\", ...)?");
Expand All @@ -506,8 +519,10 @@ jsg::Ref<QueueEvent> startQueueEvent(EventTarget& globalEventTarget,
event->setCompletionStatus(QueueEvent::CompletedSuccessfully{});
}

return event.addRef();
return StartQueueEventResponse{
kj::mv(event), kj::mv(exportedHandlerProm), isServiceWorkerHandler};
}

} // namespace

kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
Expand All @@ -517,6 +532,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
kj::TaskSet& waitUntilTasks) {
incomingRequest->delivered();
auto& context = incomingRequest->getContext();
KJ_DEFER({ waitUntilTasks.add(incomingRequest->drain().attach(kj::mv(incomingRequest))); });

kj::String queueName;
uint32_t batchSize;
Expand Down Expand Up @@ -544,76 +560,133 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
// waitUntil'ed callback safely without worrying about whether this coroutine gets canceled.
struct QueueEventHolder: public kj::Refcounted {
jsg::Ref<QueueEvent> event = nullptr;
kj::Maybe<kj::Promise<void>> exportedHandlerProm;
bool isServiceWorkerHandler = false;
};
auto queueEventHolder = kj::refcounted<QueueEventHolder>();

// It's a little ugly, but the usage of waitUntil (and finishScheduled) down below are here so
// that users can write queue handlers in the old addEventListener("queue", ...) syntax (where we
// can't just wait on their addEventListener handler to resolve because it can't be async).
context.addWaitUntil(context.run(
auto runProm = context.run(
[this, entrypointName = entrypointName, &context, queueEvent = kj::addRef(*queueEventHolder),
&metrics = incomingRequest->getMetrics(),
props = kj::mv(props)](Worker::Lock& lock) mutable {
jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock);

auto& typeHandler = lock.getWorker().getIsolate().getApi().getQueueTypeHandler(lock);
queueEvent->event = startQueueEvent(lock.getGlobalScope(), kj::mv(params),
auto startResp = startQueueEvent(lock.getGlobalScope(), kj::mv(params),
context.addObject(result), lock,
lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor()), typeHandler);
}));

// TODO(soon): There's a good chance we'll want a different wall-clock timeout for queue handlers
// than for scheduled workers, but it's not at all clear yet to me what it should be, so just
// reuse the scheduled worker logic and timeout for now.
auto result = co_await incomingRequest->finishScheduled();
bool completed = result == IoContext_IncomingRequest::FinishScheduledResult::COMPLETED;

// Log some debug info if the request timed out or was aborted.
// In particular, detect whether or not the users queue() handler function completed
// and include info about other waitUntil tasks that may have caused the request to timeout.
if (!completed) {
kj::String status;
if (queueEventHolder->event.get() == nullptr) {
status = kj::str("Empty");
} else {
KJ_SWITCH_ONEOF(queueEventHolder->event->getCompletionStatus()) {
KJ_CASE_ONEOF(i, QueueEvent::Incomplete) {
status = kj::str("Incomplete");
break;
}
KJ_CASE_ONEOF(s, QueueEvent::CompletedSuccessfully) {
status = kj::str("Completed Succesfully");
break;
}
KJ_CASE_ONEOF(e, QueueEvent::CompletedWithError) {
status = kj::str("Completed with error:", e.error);
break;
}
queueEvent->event = kj::mv(startResp.event);
queueEvent->exportedHandlerProm = kj::mv(startResp.exportedHandlerProm);
queueEvent->isServiceWorkerHandler = startResp.isServiceWorkerHandler;
});

auto compatFlags = context.getWorker().getIsolate().getApi().getFeatureFlags();
if (compatFlags.getQueueConsumerNoWaitForWaitUntil()) {
// The user has opted in to only waiting on their event handler rather than all waitUntil'd
// promises.
auto timeoutPromise = context.getLimitEnforcer().limitScheduled();
// Start invoking the queue handler. The promise chain here is intended to mimic the behavior of
// finishScheduled, but only waiting on the promise returned by the event handler rather than on
// all waitUntil'ed promises.
auto outcome = co_await runProm
.then([queueEvent = kj::addRef(
*queueEventHolder)]() mutable -> kj::Promise<EventOutcome> {
// If it returned a promise, wait on the promise.
KJ_IF_SOME(handlerProm, queueEvent->exportedHandlerProm) {
return handlerProm.then([]() { return EventOutcome::OK; });
}
return EventOutcome::OK;
})
.catch_([](kj::Exception&& e) {
// If any exceptions were thrown, mark the outcome accordingly.
return EventOutcome::EXCEPTION;
})
.exclusiveJoin(timeoutPromise.then([] {
// Join everything against a timeout to ensure queue handlers can't run forever.
return EventOutcome::EXCEEDED_CPU;
})).exclusiveJoin(context.onAbort().then([] {
// Also handle anything that might cause the worker to get aborted.
// This is a change from the outcome we returned on abort before the compat flag, but better
// matches the behavior of fetch() handlers and the semantics of what's actually happening.
return EventOutcome::EXCEPTION;
}, [](kj::Exception&&) { return EventOutcome::EXCEPTION; }));

if (outcome == EventOutcome::OK && queueEventHolder->isServiceWorkerHandler) {
// HACK: For service-worker syntax, we effectively ignore the compatibility flag and wait
// for all waitUntil tasks anyway, since otherwise there's no way to do async work from an
// event listener callback.
// It'd be nicer if we could fall through to the code below for the non-compat-flag logic in
// this case, but we don't even know if the worker uses service worker syntax until after
// runProm resolves, so we just copy the bare essentials here.
auto result = co_await incomingRequest->finishScheduled();
bool completed = result == IoContext_IncomingRequest::FinishScheduledResult::COMPLETED;
outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU;
}
auto& ioContext = incomingRequest->getContext();
auto scriptId = ioContext.getWorker().getScript().getId();
auto tasks = ioContext.getWaitUntilTasks().trace();
if (result == IoContext_IncomingRequest::FinishScheduledResult::TIMEOUT) {
KJ_LOG(WARNING, "NOSENTRY queue event hit timeout", scriptId, status, tasks);
} else if (result == IoContext_IncomingRequest::FinishScheduledResult::ABORTED) {
// Attempt to grab the error message to understand the reason for the abort.
// Include a timeout just in case for some unexpected reason the onAbort promise hasn't
// already rejected.
kj::String abortError;
co_await ioContext.onAbort()
.then([] {}, [&abortError](kj::Exception&& e) {
abortError = kj::str(e);
}).exclusiveJoin(ioContext.afterLimitTimeout(1 * kj::MICROSECONDS).then([&abortError]() {
abortError = kj::str("onAbort() promise has unexpectedly not yet been rejected");
}));
KJ_LOG(WARNING, "NOSENTRY queue event aborted", abortError, scriptId, status, tasks);

KJ_IF_SOME(status, context.getLimitEnforcer().getLimitsExceeded()) {
outcome = status;
}
co_return WorkerInterface::CustomEvent::Result{.outcome = outcome};
} else {
// The user has not opted in to the new waitUntil behavior, so we need to add the queue()
// handler's promise to the waitUntil promises and then wait on them all to finish.
context.addWaitUntil(kj::mv(runProm));

// We reuse the finishScheduled() method for convenience, since queues use the same wall clock
// timeout as scheduled workers.
auto result = co_await incomingRequest->finishScheduled();
bool completed = result == IoContext_IncomingRequest::FinishScheduledResult::COMPLETED;

// Log some debug info if the request timed out or was aborted.
// In particular, detect whether or not the users queue() handler function completed
// and include info about other waitUntil tasks that may have caused the request to timeout.
if (!completed) {
kj::String status;
if (queueEventHolder->event.get() == nullptr) {
status = kj::str("Empty");
} else {
KJ_SWITCH_ONEOF(queueEventHolder->event->getCompletionStatus()) {
KJ_CASE_ONEOF(i, QueueEvent::Incomplete) {
status = kj::str("Incomplete");
break;
}
KJ_CASE_ONEOF(s, QueueEvent::CompletedSuccessfully) {
status = kj::str("Completed Succesfully");
break;
}
KJ_CASE_ONEOF(e, QueueEvent::CompletedWithError) {
status = kj::str("Completed with error:", e.error);
break;
}
}
}
auto& ioContext = incomingRequest->getContext();
auto scriptId = ioContext.getWorker().getScript().getId();
auto tasks = ioContext.getWaitUntilTasks().trace();
if (result == IoContext_IncomingRequest::FinishScheduledResult::TIMEOUT) {
KJ_LOG(WARNING, "NOSENTRY queue event hit timeout", scriptId, status, tasks);
} else if (result == IoContext_IncomingRequest::FinishScheduledResult::ABORTED) {
// Attempt to grab the error message to understand the reason for the abort.
// Include a timeout just in case for some unexpected reason the onAbort promise hasn't
// already rejected.
kj::String abortError;
co_await ioContext.onAbort()
.then([] {}, [&abortError](kj::Exception&& e) {
abortError = kj::str(e);
}).exclusiveJoin(ioContext.afterLimitTimeout(1 * kj::MICROSECONDS).then([&abortError]() {
abortError = kj::str("onAbort() promise has unexpectedly not yet been rejected");
}));
KJ_LOG(WARNING, "NOSENTRY queue event aborted", abortError, scriptId, status, tasks);
}
}
}

co_return WorkerInterface::CustomEvent::Result{
.outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU,
};
co_return WorkerInterface::CustomEvent::Result{
.outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU,
};
}
}

kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::sendRpc(
Expand Down
11 changes: 11 additions & 0 deletions src/workerd/io/compatibility-date.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -703,4 +703,15 @@ struct CompatibilityFlags @0x8f8c1b68151b6cef {
$compatDisableFlag("nonclass_entrypoint_reuses_ctx_across_invocations")
$compatEnableDate("2025-03-10");
# Creates a unique ExportedHandler for each call to `export default` thus allowing a unique ctx per invocation

queueConsumerNoWaitForWaitUntil @75 :Bool
$compatEnableFlag("queue_consumer_no_wait_for_wait_until")
$compatDisableFlag("queue_consumer_wait_for_wait_until");
# If enabled, does not require all waitUntil'ed promises to resolve successfully before reporting
# succeeded/failed messages/batches back from a queue consumer to the Queues service. This
# prevents a slow waitUntil'ed promise from slowing down consumption of messages from a queue,
# which has been a recurring problem for the prior behavior (which did wait for all waitUntil'ed
# tasks to complete.
# This intentionally doesn't have a compatEnableDate yet until so we can let some users opt-in to
# try it before enabling it for all new scripts, but will eventually need one.
}