diff --git a/src/workerd/api/queue.c++ b/src/workerd/api/queue.c++ index c25186c4cfd..5532ddf8e01 100644 --- a/src/workerd/api/queue.c++ +++ b/src/workerd/api/queue.c++ @@ -356,9 +356,7 @@ QueueMessage::QueueMessage( attempts(message.attempts), result(result) {} -jsg::JsValue QueueMessage::getBody(jsg::Lock& js) { - return body.getHandle(js); -} +jsg::JsValue QueueMessage::getBody(jsg::Lock& js) { return body.getHandle(js); } void QueueMessage::retry(jsg::Optional options) { if (result->ackAll) { @@ -550,7 +548,7 @@ kj::Promise QueueCustomEventImpl::run( // It's a little ugly, but the usage of waitUntil (and finishScheduled) down below are here so // that users can write queue handlers in the old addEventListener("queue", ...) syntax (where we // can't just wait on their addEventListener handler to resolve because it can't be async). - context.addWaitUntil(context.run( + auto runProm = context.run( [this, entrypointName = entrypointName, &context, queueEvent = kj::addRef(*queueEventHolder), &metrics = incomingRequest->getMetrics(), props = kj::mv(props)](Worker::Lock& lock) mutable { @@ -560,60 +558,70 @@ kj::Promise QueueCustomEventImpl::run( queueEvent->event = startQueueEvent(lock.getGlobalScope(), kj::mv(params), context.addObject(result), lock, lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor()), typeHandler); - })); - - // TODO(soon): There's a good chance we'll want a different wall-clock timeout for queue handlers - // than for scheduled workers, but it's not at all clear yet to me what it should be, so just - // reuse the scheduled worker logic and timeout for now. - auto result = co_await incomingRequest->finishScheduled(); - bool completed = result == IoContext_IncomingRequest::FinishScheduledResult::COMPLETED; - - // Log some debug info if the request timed out or was aborted. - // In particular, detect whether or not the users queue() handler function completed - // and include info about other waitUntil tasks that may have caused the request to timeout. - if (!completed) { - kj::String status; - if (queueEventHolder->event.get() == nullptr) { - status = kj::str("Empty"); - } else { - KJ_SWITCH_ONEOF(queueEventHolder->event->getCompletionStatus()) { - KJ_CASE_ONEOF(i, QueueEvent::Incomplete) { - status = kj::str("Incomplete"); - break; - } - KJ_CASE_ONEOF(s, QueueEvent::CompletedSuccessfully) { - status = kj::str("Completed Succesfully"); - break; - } - KJ_CASE_ONEOF(e, QueueEvent::CompletedWithError) { - status = kj::str("Completed with error:", e.error); - break; + }); + + auto compatFlags = context.getWorker().getIsolate().getApi().getFeatureFlags(); + if (compatFlags.getQueueConsumerNoWaitForWaitUntil()) { + // The user has opted in to only waiting on their event handler rather than all waitUntil'd + // promises. + KJ_UNIMPLEMENTED("TODO(now)"); + } else { + // The user has not opted in to the new waitUntil behavior, so we need to add the queue() + // handler's promise to the waitUntil promises and then wait on them all to finish. + context.addWaitUntil(kj::mv(runProm)); + + // We reuse the finishScheduled() method for convenience, since queues use the same wall clock + // timeout as scheduled workers. + auto result = co_await incomingRequest->finishScheduled(); + bool completed = result == IoContext_IncomingRequest::FinishScheduledResult::COMPLETED; + + // Log some debug info if the request timed out or was aborted. + // In particular, detect whether or not the users queue() handler function completed + // and include info about other waitUntil tasks that may have caused the request to timeout. + if (!completed) { + kj::String status; + if (queueEventHolder->event.get() == nullptr) { + status = kj::str("Empty"); + } else { + KJ_SWITCH_ONEOF(queueEventHolder->event->getCompletionStatus()) { + KJ_CASE_ONEOF(i, QueueEvent::Incomplete) { + status = kj::str("Incomplete"); + break; + } + KJ_CASE_ONEOF(s, QueueEvent::CompletedSuccessfully) { + status = kj::str("Completed Succesfully"); + break; + } + KJ_CASE_ONEOF(e, QueueEvent::CompletedWithError) { + status = kj::str("Completed with error:", e.error); + break; + } } } + auto& ioContext = incomingRequest->getContext(); + auto scriptId = ioContext.getWorker().getScript().getId(); + auto tasks = ioContext.getWaitUntilTasks().trace(); + if (result == IoContext_IncomingRequest::FinishScheduledResult::TIMEOUT) { + KJ_LOG(WARNING, "NOSENTRY queue event hit timeout", scriptId, status, tasks); + } else if (result == IoContext_IncomingRequest::FinishScheduledResult::ABORTED) { + // Attempt to grab the error message to understand the reason for the abort. + // Include a timeout just in case for some unexpected reason the onAbort promise hasn't + // already rejected. + kj::String abortError; + co_await ioContext.onAbort() + .then([] {}, [&abortError](kj::Exception&& e) { + abortError = kj::str(e); + }).exclusiveJoin(ioContext.afterLimitTimeout(1 * kj::MICROSECONDS).then([&abortError]() { + abortError = kj::str("onAbort() promise has unexpectedly not yet been rejected"); + })); + KJ_LOG(WARNING, "NOSENTRY queue event aborted", abortError, scriptId, status, tasks); + } } - auto& ioContext = incomingRequest->getContext(); - auto scriptId = ioContext.getWorker().getScript().getId(); - auto tasks = ioContext.getWaitUntilTasks().trace(); - if (result == IoContext_IncomingRequest::FinishScheduledResult::TIMEOUT) { - KJ_LOG(WARNING, "NOSENTRY queue event hit timeout", scriptId, status, tasks); - } else if (result == IoContext_IncomingRequest::FinishScheduledResult::ABORTED) { - // Attempt to grab the error message to understand the reason for the abort. - // Include a timeout just in case for some unexpected reason the onAbort promise hasn't - // already rejected. - kj::String abortError; - co_await ioContext.onAbort() - .then([] {}, [&abortError](kj::Exception&& e) { - abortError = kj::str(e); - }).exclusiveJoin(ioContext.afterLimitTimeout(1 * kj::MICROSECONDS).then([&abortError]() { - abortError = kj::str("onAbort() promise has unexpectedly not yet been rejected"); - })); - KJ_LOG(WARNING, "NOSENTRY queue event aborted", abortError, scriptId, status, tasks); - } - } - co_return WorkerInterface::CustomEvent::Result{ - .outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU, - }; + co_return WorkerInterface::CustomEvent::Result{ + .outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU, + }; + } } kj::Promise QueueCustomEventImpl::sendRpc( diff --git a/src/workerd/io/compatibility-date.capnp b/src/workerd/io/compatibility-date.capnp index 3254693e4cb..f266403fc05 100644 --- a/src/workerd/io/compatibility-date.capnp +++ b/src/workerd/io/compatibility-date.capnp @@ -697,4 +697,13 @@ struct CompatibilityFlags @0x8f8c1b68151b6cef { $compatEnableFlag("memory_cache_delete") $experimental; # Enables delete operations on memory cache if enabled. + + queueConsumerNoWaitForWaitUntil @74 :Bool + $compatEnableFlag("queue_consumer_no_wait_for_wait_until") + $compatDisableFlag("queue_consumer_wait_for_wait_until"); + # If enabled, does not require all waitUntil'ed promises to resolve successfully before reporting + # succeeded/failed messages/batches back from a queue consumer to the Queues service. This + # prevents a slow waitUntil'ed promise from slowing down consumption of messages from a queue, + # which has been a recurring problem for the prior behavior (which did wait for all waitUntil'ed + # tasks to complete. }