Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run until no more work is pending #134

Merged
merged 2 commits into from
Aug 22, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions Sources/Queues/QueueWorker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,23 @@ public struct QueueWorker: Sendable {
/// Actually run the queue. This is a thin wrapper for ELF-style callers.
public func run() -> EventLoopFuture<Void> {
self.queue.eventLoop.makeFutureWithTask {
try await self.run()
var hasWork = true
while hasWork {
hasWork = try await self.run()
}
}
}

/// Pop a job off the queue and try to run it. If no jobs are available, do nothing.
public func run() async throws {
public func run() async throws -> Bool {
danpalmer marked this conversation as resolved.
Show resolved Hide resolved
var logger = self.queue.logger
logger[metadataKey: "queue"] = "\(self.queue.queueName.string)"
logger.trace("Popping job from queue")

guard let id = try await self.queue.pop().get() else {
// No job found, go around again.
return logger.trace("No pending jobs")
logger.trace("No pending jobs")
return false
}

logger[metadataKey: "job-id"] = "\(id.string)"
Expand All @@ -39,20 +43,23 @@ public struct QueueWorker: Sendable {

guard let job = self.queue.configuration.jobs[data.jobName] else {
logger.warning("No job with the desired name is registered, discarding")
return try await self.queue.clear(id).get()
try await self.queue.clear(id).get()
return false
}

// If the job has a delay that isn't up yet, requeue it.
guard (data.delayUntil ?? .distantPast) < Date() else {
logger.trace("Job is delayed, requeueing for later execution", metadata: ["delayed-until": "\(data.delayUntil ?? .distantPast)"])
return try await self.queue.push(id).get()
try await self.queue.push(id).get()
return false
}

await self.queue.sendNotification(of: "dequeue", logger: logger) {
try await $0.didDequeue(jobId: id.string, eventLoop: self.queue.eventLoop).get()
}

try await self.run(id: id, job: job, jobData: data, logger: logger)
return true
}

private func run(id: JobIdentifier, job: any AnyJob, jobData: JobData, logger: Logger) async throws {
Expand Down