Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jobs sometimes executed multiple times. #27

Open
sidepelican opened this issue Dec 19, 2022 · 0 comments · May be fixed by #28
Open

Jobs sometimes executed multiple times. #27

sidepelican opened this issue Dec 19, 2022 · 0 comments · May be fixed by #28
Labels
bug Something isn't working

Comments

@sidepelican
Copy link

Jobs sometimes executed multiple times.

Issue

I have found a concurrency safety issue.
A dispatched job will be dequeued twice with same jobID and payload.

Reproducing

This is a reproducing repository: https://github.com/sidepelican/QueuesFluentDriverMultipleExecution
This repository dispatches a simple job several times, and automatically detects when the same job launched multiple times.

$ swift run
...
2022-12-19T19:08:24+0900 info main : job_id=940F558A-EC00-47B9-8935-45D884281708 p_id=51675C99-B581-4555-9072-A376D1E95770 [App] EchoJob!
2022-12-19T19:08:24+0900 info main : job_id=940F558A-EC00-47B9-8935-45D884281708 p_id=51675C99-B581-4555-9072-A376D1E95770 [App] EchoJob!
p_id=51675C99-B581-4555-9072-A376D1E95770 is multiple executed!

Cause

  1. Queues calls Queue.set and Queue.push in Queue.dispatch.

https://github.com/vapor/queues/blob/c95c891c3c04817eac1165587fb02457c749523a/Sources/Queues/Queue.swift#L84-L86

  1. FluentQueue.set save a JobModel. JobModel.state has .pending as initial state.

public func set(_ id: JobIdentifier, to jobStorage: JobData) -> EventLoopFuture<Void> {
let jobModel = JobModel(id: id, queue: queueName.string, jobData: JobDataModel(jobData: jobStorage))
// If the job must run at a later time, ensure it won't be picked earlier since
// we sort pending jobs by date when querying
jobModel.runAtOrAfter = jobStorage.delayUntil ?? Date()
return jobModel.save(on: db).map { metadata in
return
}
}

  1. FluentQueue.push writes the job's state to pending. The default value of state is .pending, so this operation is seemingly meaningless.

public func push(_ id: JobIdentifier) -> EventLoopFuture<Void> {
guard let sqlDb = db as? SQLDatabase else {
return self.context.eventLoop.makeFailedFuture(QueuesFluentError.databaseNotFound)
}
return sqlDb
.update(JobModel.schema)
.set(SQLColumn("\(FieldKey.state)"), to: SQLBind(QueuesFluentJobState.pending))
.where(SQLColumn("\(FieldKey.id)"), .equal, SQLBind(id.string))
.run()
}

  1. The jobs set in 2 is ready for the workers to dequeue. What happens if a worker dequeues a job set in 2 between 2 and 3? The worker set the state to .processing and then it is overridden to .pending in 3.
  2. The state is .pending so another worker can dequeue the job. Incident happens.

How to fix?

I think there are two ways.
One is to add .initialized to QueuesFluentJobState and use it as an initial value of JobModel.state.
The other is to do nothing in FluentQueue.push.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants