Skip to content

Commit

Permalink
Implement compat flag for queues to not block on waitUntil'ed work
Browse files Browse the repository at this point in the history
  • Loading branch information
a-robinson committed Feb 24, 2025
1 parent 1fc8dfe commit c96dc1e
Showing 1 changed file with 66 additions and 10 deletions.
76 changes: 66 additions & 10 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,9 @@ QueueMessage::QueueMessage(
attempts(message.attempts),
result(result) {}

jsg::JsValue QueueMessage::getBody(jsg::Lock& js) { return body.getHandle(js); }
jsg::JsValue QueueMessage::getBody(jsg::Lock& js) {
return body.getHandle(js);
}

void QueueMessage::retry(jsg::Optional<QueueRetryOptions> options) {
if (result->ackAll) {
Expand Down Expand Up @@ -459,15 +461,20 @@ void QueueEvent::ackAll() {
}

namespace {
jsg::Ref<QueueEvent> startQueueEvent(EventTarget& globalEventTarget,

struct StartQueueEventResponse {
jsg::Ref<QueueEvent> event = nullptr;
kj::Maybe<kj::Promise<void>> exportedHandlerProm;
bool isServiceWorkerHandler = false;
};

StartQueueEventResponse startQueueEvent(EventTarget& globalEventTarget,
kj::OneOf<rpc::EventDispatcher::QueueParams::Reader, QueueEvent::Params> params,
IoPtr<QueueEventResult> result,
Worker::Lock& lock,
kj::Maybe<ExportedHandler&> exportedHandler,
const jsg::TypeHandler<QueueExportedHandler>& handlerHandler) {
jsg::Lock& js = lock;
// Start a queue event (called from C++, not JS). Similar to startScheduled(), the caller must
// wait for waitUntil()s to produce the final QueueResult.
jsg::Ref<QueueEvent> event(nullptr);
KJ_SWITCH_ONEOF(params) {
KJ_CASE_ONEOF(p, rpc::EventDispatcher::QueueParams::Reader) {
Expand All @@ -478,23 +485,31 @@ jsg::Ref<QueueEvent> startQueueEvent(EventTarget& globalEventTarget,
}
}

kj::Maybe<kj::Promise<void>> exportedHandlerProm;
bool isServiceWorkerHandler = false;
KJ_IF_SOME(h, exportedHandler) {
auto queueHandler = KJ_ASSERT_NONNULL(handlerHandler.tryUnwrap(lock, h.self.getHandle(lock)));
KJ_IF_SOME(f, queueHandler.queue) {
auto promise = f(lock, jsg::alloc<QueueController>(event.addRef()),
jsg::JsValue(h.env.getHandle(js)).addRef(js), h.getCtx());
event->waitUntil(promise.then([event = event.addRef()]() mutable {
jsg::JsValue(h.env.getHandle(js)).addRef(js), h.getCtx())
.then([event = event.addRef()]() mutable {
event->setCompletionStatus(QueueEvent::CompletedSuccessfully{});
}, [event = event.addRef()](kj::Exception&& e) mutable {
event->setCompletionStatus(QueueEvent::CompletedWithError{kj::cp(e)});
return kj::mv(e);
}));
});
if (FeatureFlags::get(js).getQueueConsumerNoWaitForWaitUntil()) {
exportedHandlerProm = kj::mv(promise);
} else {
event->waitUntil(kj::mv(promise));
}
} else {
lock.logWarningOnce("Received a QueueEvent but we lack a handler for QueueEvents. "
"Did you remember to export a queue() function?");
JSG_FAIL_REQUIRE(Error, "Handler does not export a queue() function.");
}
} else {
isServiceWorkerHandler = true;
if (globalEventTarget.getHandlerCount("queue") == 0) {
lock.logWarningOnce("Received a QueueEvent but we lack an event listener for queue events. "
"Did you remember to call addEventListener(\"queue\", ...)?");
Expand All @@ -504,8 +519,10 @@ jsg::Ref<QueueEvent> startQueueEvent(EventTarget& globalEventTarget,
event->setCompletionStatus(QueueEvent::CompletedSuccessfully{});
}

return event.addRef();
return StartQueueEventResponse{
kj::mv(event), kj::mv(exportedHandlerProm), isServiceWorkerHandler};
}

} // namespace

kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
Expand All @@ -515,6 +532,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
kj::TaskSet& waitUntilTasks) {
incomingRequest->delivered();
auto& context = incomingRequest->getContext();
KJ_DEFER({ waitUntilTasks.add(incomingRequest->drain().attach(kj::mv(incomingRequest))); });

kj::String queueName;
uint32_t batchSize;
Expand Down Expand Up @@ -542,6 +560,8 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
// waitUntil'ed callback safely without worrying about whether this coroutine gets canceled.
struct QueueEventHolder: public kj::Refcounted {
jsg::Ref<QueueEvent> event = nullptr;
kj::Maybe<kj::Promise<void>> exportedHandlerProm;
bool isServiceWorkerHandler = false;
};
auto queueEventHolder = kj::refcounted<QueueEventHolder>();

Expand All @@ -555,16 +575,52 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock);

auto& typeHandler = lock.getWorker().getIsolate().getApi().getQueueTypeHandler(lock);
queueEvent->event = startQueueEvent(lock.getGlobalScope(), kj::mv(params),
auto startResp = startQueueEvent(lock.getGlobalScope(), kj::mv(params),
context.addObject(result), lock,
lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor()), typeHandler);
queueEvent->event = kj::mv(startResp.event);
queueEvent->exportedHandlerProm = kj::mv(startResp.exportedHandlerProm);
queueEvent->isServiceWorkerHandler = startResp.isServiceWorkerHandler;
});

auto compatFlags = context.getWorker().getIsolate().getApi().getFeatureFlags();
if (compatFlags.getQueueConsumerNoWaitForWaitUntil()) {
// The user has opted in to only waiting on their event handler rather than all waitUntil'd
// promises.
KJ_UNIMPLEMENTED("TODO(now)");
auto timeoutPromise = context.getLimitEnforcer().limitScheduled();
auto outcome = co_await runProm
.then([queueEvent = kj::addRef(
*queueEventHolder)]() mutable -> kj::Promise<EventOutcome> {
KJ_IF_SOME(handlerProm, queueEvent->exportedHandlerProm) {
return handlerProm.then([]() { return EventOutcome::OK; });
}
return EventOutcome::OK;
}).catch_([](kj::Exception&& e) {
return EventOutcome::EXCEPTION;
}).exclusiveJoin(timeoutPromise.then([] {
return EventOutcome::EXCEEDED_CPU;
})).exclusiveJoin(context.onAbort().then([] {
// This is a change from the outcome we returned on abort before the compat flag, but better
// matches the behavior of fetch() handlers and the semantics of what's actually happening.
return EventOutcome::EXCEPTION;
}, [](kj::Exception&&) { return EventOutcome::EXCEPTION; }));

if (outcome == EventOutcome::OK && queueEventHolder->isServiceWorkerHandler) {
// HACK: For service-worker syntax, we effectively ignore the compatibility flag and wait
// for all waitUntil tasks anyway, since otherwise there's no way to do async work from an
// event listener callback.
// It'd be nicer if we could fall through to the code below for the non-compat-flag logic in
// this case, but we don't even know if the worker uses service worker syntax until after
// runProm resolves, so we just copy the bare essentials here.
auto result = co_await incomingRequest->finishScheduled();
bool completed = result == IoContext_IncomingRequest::FinishScheduledResult::COMPLETED;
outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU;
}

KJ_IF_SOME(status, context.getLimitEnforcer().getLimitsExceeded()) {
outcome = status;
}
co_return WorkerInterface::CustomEvent::Result{.outcome = outcome};
} else {
// The user has not opted in to the new waitUntil behavior, so we need to add the queue()
// handler's promise to the waitUntil promises and then wait on them all to finish.
Expand Down

0 comments on commit c96dc1e

Please sign in to comment.