diff --git a/Sources/Queues/QueueWorker.swift b/Sources/Queues/QueueWorker.swift index fe76246..c916b4e 100644 --- a/Sources/Queues/QueueWorker.swift +++ b/Sources/Queues/QueueWorker.swift @@ -12,22 +12,31 @@ extension Queue { public struct QueueWorker: Sendable { let queue: any Queue - /// Actually run the queue. This is a thin wrapper for ELF-style callers. + /// Run the queue until there is no more work to be done. + /// This is a thin wrapper for ELF-style callers. public func run() -> EventLoopFuture { self.queue.eventLoop.makeFutureWithTask { - try await self.run() + try await run() } } - - /// Pop a job off the queue and try to run it. If no jobs are available, do nothing. + + /// Run the queue until there is no more work to be done. + /// This is the main async entrypoint for a queue worker. public func run() async throws { + while try await self.runOneJob() {} + } + + /// Pop a job off the queue and try to run it. If no jobs are available, do + /// nothing. Returns whether a job was run. + private func runOneJob() async throws -> Bool { var logger = self.queue.logger logger[metadataKey: "queue"] = "\(self.queue.queueName.string)" logger.trace("Popping job from queue") guard let id = try await self.queue.pop().get() else { // No job found, go around again. - return logger.trace("No pending jobs") + logger.trace("No pending jobs") + return false } logger[metadataKey: "job-id"] = "\(id.string)" @@ -39,23 +48,26 @@ public struct QueueWorker: Sendable { guard let job = self.queue.configuration.jobs[data.jobName] else { logger.warning("No job with the desired name is registered, discarding") - return try await self.queue.clear(id).get() + try await self.queue.clear(id).get() + return false } // If the job has a delay that isn't up yet, requeue it. guard (data.delayUntil ?? .distantPast) < Date() else { logger.trace("Job is delayed, requeueing for later execution", metadata: ["delayed-until": "\(data.delayUntil ?? .distantPast)"]) - return try await self.queue.push(id).get() + try await self.queue.push(id).get() + return false } await self.queue.sendNotification(of: "dequeue", logger: logger) { try await $0.didDequeue(jobId: id.string, eventLoop: self.queue.eventLoop).get() } - try await self.run(id: id, job: job, jobData: data, logger: logger) + try await self.runOneJob(id: id, job: job, jobData: data, logger: logger) + return true } - private func run(id: JobIdentifier, job: any AnyJob, jobData: JobData, logger: Logger) async throws { + private func runOneJob(id: JobIdentifier, job: any AnyJob, jobData: JobData, logger: Logger) async throws { logger.info("Dequeing and running job", metadata: ["attempt": "\(jobData.currentAttempt)", "retries-left": "\(jobData.remainingAttempts)"]) do { try await job._dequeue(self.queue.context, id: id.string, payload: jobData.payload).get() diff --git a/Tests/QueuesTests/AsyncQueueTests.swift b/Tests/QueuesTests/AsyncQueueTests.swift index d77cd01..52a2452 100644 --- a/Tests/QueuesTests/AsyncQueueTests.swift +++ b/Tests/QueuesTests/AsyncQueueTests.swift @@ -37,6 +37,8 @@ final class AsyncQueueTests: XCTestCase { app.get("foo") { req in try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar")) + try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "baz")) + try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "quux")) return "done" } @@ -45,8 +47,8 @@ final class AsyncQueueTests: XCTestCase { XCTAssertEqual(res.body.string, "done") } - XCTAssertEqual(app.queues.test.queue.count, 1) - XCTAssertEqual(app.queues.test.jobs.count, 1) + XCTAssertEqual(app.queues.test.queue.count, 3) + XCTAssertEqual(app.queues.test.jobs.count, 3) let job = app.queues.test.first(MyAsyncJob.self) XCTAssert(app.queues.test.contains(MyAsyncJob.self)) XCTAssertNotNil(job) @@ -67,6 +69,8 @@ final class AsyncQueueTests: XCTestCase { app.get("foo") { req in try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar")) + try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "baz")) + try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "quux")) return "done" } @@ -75,8 +79,8 @@ final class AsyncQueueTests: XCTestCase { XCTAssertEqual(res.body.string, "done") } - XCTAssertEqual(app.queues.asyncTest.queue.count, 1) - XCTAssertEqual(app.queues.asyncTest.jobs.count, 1) + XCTAssertEqual(app.queues.asyncTest.queue.count, 3) + XCTAssertEqual(app.queues.asyncTest.jobs.count, 3) let job = app.queues.asyncTest.first(MyAsyncJob.self) XCTAssert(app.queues.asyncTest.contains(MyAsyncJob.self)) XCTAssertNotNil(job) diff --git a/Tests/QueuesTests/QueueTests.swift b/Tests/QueuesTests/QueueTests.swift index cd30736..cf90318 100644 --- a/Tests/QueuesTests/QueueTests.swift +++ b/Tests/QueuesTests/QueueTests.swift @@ -119,6 +119,34 @@ final class QueueTests: XCTestCase { await XCTAssertEqualAsync(try await promise.futureResult.get(), "bar") } + func testRunUntilEmpty() async throws { + let promise1 = self.app.eventLoopGroup.any().makePromise(of: String.self) + self.app.queues.add(Foo1(promise: promise1)) + let promise2 = self.app.eventLoopGroup.any().makePromise(of: String.self) + self.app.queues.add(Foo2(promise: promise2)) + + self.app.get("foo") { req in + try await req.queue.dispatch(Foo1.self, .init(foo: "bar")) + try await req.queue.dispatch(Foo1.self, .init(foo: "quux")) + try await req.queue.dispatch(Foo2.self, .init(foo: "baz")) + return "done" + } + + try await self.app.testable().test(.GET, "foo") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "done") + } + + XCTAssertEqual(self.app.queues.test.queue.count, 3) + XCTAssertEqual(self.app.queues.test.jobs.count, 3) + try await self.app.queues.queue.worker.run() + XCTAssertEqual(self.app.queues.test.queue.count, 0) + XCTAssertEqual(self.app.queues.test.jobs.count, 0) + + await XCTAssertEqualAsync(try await promise1.futureResult.get(), "quux") + await XCTAssertEqualAsync(try await promise2.futureResult.get(), "baz") + } + func testSettingCustomId() async throws { let promise = self.app.eventLoopGroup.any().makePromise(of: String.self) self.app.queues.add(Foo1(promise: promise))