Skip to content

Commit

Permalink
fixup! Implement compat flag for queues to not block on waitUntil'ed …
Browse files Browse the repository at this point in the history
…work
  • Loading branch information
a-robinson committed Feb 24, 2025
1 parent cce4b36 commit f97c4f7
Showing 1 changed file with 22 additions and 16 deletions.
38 changes: 22 additions & 16 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ namespace {
struct StartQueueEventResponse {
jsg::Ref<QueueEvent> event = nullptr;
kj::Maybe<kj::Promise<void>> exportedHandlerProm;
bool isServiceWorkerHandler = false;
};

StartQueueEventResponse startQueueEvent(EventTarget& globalEventTarget,
Expand All @@ -483,6 +484,7 @@ StartQueueEventResponse startQueueEvent(EventTarget& globalEventTarget,
}

kj::Maybe<kj::Promise<void>> exportedHandlerProm;
bool isServiceWorkerHandler = false;
KJ_IF_SOME(h, exportedHandler) {
auto queueHandler = KJ_ASSERT_NONNULL(handlerHandler.tryUnwrap(lock, h.self.getHandle(lock)));
KJ_IF_SOME(f, queueHandler.queue) {
Expand All @@ -505,30 +507,17 @@ StartQueueEventResponse startQueueEvent(EventTarget& globalEventTarget,
JSG_FAIL_REQUIRE(Error, "Handler does not export a queue() function.");
}
} else {
isServiceWorkerHandler = true;
if (globalEventTarget.getHandlerCount("queue") == 0) {
lock.logWarningOnce("Received a QueueEvent but we lack an event listener for queue events. "
"Did you remember to call addEventListener(\"queue\", ...)?");
JSG_FAIL_REQUIRE(Error, "No event listener registered for queue messages.");
}
globalEventTarget.dispatchEventImpl(lock, event.addRef());
event->setCompletionStatus(QueueEvent::CompletedSuccessfully{});
if (FeatureFlags::get(js).getQueueConsumerNoWaitForWaitUntil()) {
// TODO(now): It doesn't really make sense for event-listener scripts NOT to wait on
// waitUntils, since without waitUntil an event listener callback can't do anything async.
// Should we add a hack in here such that we continue waiting on waitUntil'ed promises even
// with the compat flag for these scripts?
//
// The main alternative here would be to add a new method to QueueEvent that allows a
// specific promise to be registered as the handler, mimicking the event.respondWith()
// function on fetch events. We could even just call it respondWith(), although the behavior
// would admittedly be different given that there's no response to return (unlike for fetch
// handlers). It feels weird adding new functionality for legacy event listener scripts given
// that we had already stopped promoting by the time Queues first shipped, but it's not as if
// it should be hard to add.
}
}

return StartQueueEventResponse{kj::mv(event), kj::mv(exportedHandlerProm)};
return StartQueueEventResponse{kj::mv(event), kj::mv(exportedHandlerProm), isServiceWorkerHandler};
}

} // namespace
Expand All @@ -540,6 +529,9 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
kj::TaskSet& waitUntilTasks) {
incomingRequest->delivered();
auto& context = incomingRequest->getContext();
KJ_DEFER({
waitUntilTasks.add(incomingRequest->drain().attach(kj::mv(incomingRequest)));
});

kj::String queueName;
uint32_t batchSize;
Expand Down Expand Up @@ -568,6 +560,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
struct QueueEventHolder: public kj::Refcounted {
jsg::Ref<QueueEvent> event = nullptr;
kj::Maybe<kj::Promise<void>> exportedHandlerProm;
bool isServiceWorkerHandler = false;
};
auto queueEventHolder = kj::refcounted<QueueEventHolder>();

Expand All @@ -586,6 +579,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor()), typeHandler);
queueEvent->event = kj::mv(startResp.event);
queueEvent->exportedHandlerProm = kj::mv(startResp.exportedHandlerProm);
queueEvent->isServiceWorkerHandler = startResp.isServiceWorkerHandler;
});

auto compatFlags = context.getWorker().getIsolate().getApi().getFeatureFlags();
Expand All @@ -595,7 +589,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
auto timeoutPromise = context.getLimitEnforcer().limitScheduled();
auto outcome =
co_await runProm
.then([queueEvent = kj::mv(queueEventHolder)]() mutable -> kj::Promise<EventOutcome> {
.then([queueEvent = kj::addRef(*queueEventHolder)]() mutable -> kj::Promise<EventOutcome> {
KJ_IF_SOME(handlerProm, queueEvent->exportedHandlerProm) {
return handlerProm.then([]() { return EventOutcome::OK; });
}
Expand All @@ -612,6 +606,18 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
return EventOutcome::EXCEPTION;
}));

if (outcome == EventOutcome::OK && queueEventHolder->isServiceWorkerHandler) {
// HACK: For service-worker syntax, we effectively ignore the compatibility flag and wait
// for all waitUntil tasks anyway, since otherwise there's no way to do async work from an
// event listener callback.
// It'd be nicer if we could fall through to the code below for the non-compat-flag logic in
// this case, but we don't even know if the worker uses service worker syntax until after
// runProm resolves, so we just copy the bare essentials here.
auto result = co_await incomingRequest->finishScheduled();
bool completed = result == IoContext_IncomingRequest::FinishScheduledResult::COMPLETED;
outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU;
}

KJ_IF_SOME(status, context.getLimitEnforcer().getLimitsExceeded()) {
outcome = status;
}
Expand Down

0 comments on commit f97c4f7

Please sign in to comment.