-
-
Notifications
You must be signed in to change notification settings - Fork 176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Job abort not working ? #394
Comments
I'll have to look into this further tomorrow, as I have never had the need to cancel jobs myself.
This seems like a bug. |
Hey, apologize for the lack of response. I haven't forgot, just not found the time. I've got off on Friday, and I'll use that day for open source/personal projects and get back to you then. |
@JonasKs no problem! and thanks ❤️ |
Hi again @gerazenobi 😊 First, in case someone else reads this issue at a later point: The
The docs also state this here. I can see you've done this! 😊 The next thing I did was to check out your implementation. Knowing that this library has some quirkiness to queue-names, I tried removing So, short workaround: Use default queue. |
Hi @JonasKs 👋 Thanks so much for having taken a look at this and providing a workaround 🙏 I will test on my side and get back to you. |
@JonasKs #the job
async def compute(ctx):
await asyncio.sleep(30)
return ctx["job_id"] #enqueuing jobs
async def main():
redis = await create_pool(RedisSettings())
for job_index in range(1, 50):
job = await redis.enqueue_job("compute")
if job:
print(job.job_id) class WorkerSettings:
functions = [compute]
keep_result = 3600 * 24
allow_abort_jobs = True how I am aborting: JOB_ID = "d85e5e5b62074cf59aea481783e8200a"
async def main():
redis = await create_pool(RedisSettings())
job = Job(JOB_ID, redis)
job_info = await job.info()
job_status = await job.status()
print(f"job info: {job_info}")
print(f"status: {job_status}")
print(f"result_info: {job_info}")
try:
await job.abort(timeout=0)
except Exception as e:
print("job abort raised exception", e)
if __name__ == "__main__":
asyncio.run(main()) I enqueued the jobs, and run the abort script above multiple times: it prints |
Using your example from the first post results in this output:
Removing queue names: import asyncio
from arq import create_pool
from arq.connections import RedisSettings
async def compute(ctx):
print(f'compute called {ctx["job_id"]}')
await asyncio.sleep(3600)
return ctx["job_id"]
async def main():
redis = await create_pool(RedisSettings())
for job_index in range(1, 11):
job = await redis.enqueue_job("compute", _job_id=f"my_id_{job_index}")
if job:
print(job.job_id)
class WorkerSettings:
functions = [compute]
max_jobs = 3
keep_result = 3600 * 24
allow_abort_jobs = True
health_check_interval = 5
if __name__ == "__main__":
asyncio.run(main()) import asyncio
import asyncio
from arq import create_pool
from arq.connections import RedisSettings
from arq.jobs import Job
JOB_ID = "my_id_10"
async def main():
redis = await create_pool(RedisSettings())
jobs = await redis.queued_jobs()
print(f"queued jobs {len(jobs)}")
job = Job(JOB_ID, redis)
print(f"job info: {await job.info()}")
job_status = await job.status()
print(f"status: {job_status}")
print(f"result_info: {await job.result_info()}")
try:
aborted = await job.abort(timeout=1)
print(f"aborted: {aborted}")
except Exception as e:
print("exception raised when aborting job")
print(f"exception when aborting: {type(e)}")
if __name__ == "__main__":
asyncio.run(main()) Results in this output:
|
Aha, I see, with the new example there's something iffy going on. With the job: async def compute(ctx):
print(f'starting job {ctx}')
await asyncio.sleep(30)
print('slept')
return ctx["job_id"] Then finding the log from the worker: starting job {'redis': ArqRedis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>, 'job_id': 'afc5d7518819482eaaedffb0a82372f8', 'job_try': 1, 'enqueue_time': datetime.datetime(2023, 5, 25, 8, 17, 15, 800000, tzinfo=datetime.timezone.utc), 'score': 1685002635800} Then aborting the job: job info: JobDef(function='compute', args=(), kwargs={}, job_try=None, enqueue_time=datetime.datetime(2023, 5, 25, 8, 17, 15, 815000, tzinfo=datetime.timezone.utc), score=1685002635815)
status: JobStatus.in_progress
result_info: JobDef(function='compute', args=(), kwargs={}, job_try=None, enqueue_time=datetime.datetime(2023, 5, 25, 8, 17, 15, 815000, tzinfo=datetime.timezone.utc), score=1685002635815)
job abort raised exception Then waiting for job to complete: job info: JobResult(function='compute', args=(), kwargs={}, job_try=1, enqueue_time=datetime.datetime(2023, 5, 25, 8, 17, 15, 815000, tzinfo=datetime.timezone.utc), score=None, success=True, result='f859323bff434573850b10b5363d015d', start_time=datetime.datetime(2023, 5, 25, 8, 18, 16, 173000, tzinfo=datetime.timezone.utc), finish_time=datetime.datetime(2023, 5, 25, 8, 18, 46, 177000, tzinfo=datetime.timezone.utc), queue_name='arq:queue', job_id=None)
status: JobStatus.complete
result_info: JobResult(function='compute', args=(), kwargs={}, job_try=1, enqueue_time=datetime.datetime(2023, 5, 25, 8, 17, 15, 815000, tzinfo=datetime.timezone.utc), score=None, success=True, result='f859323bff434573850b10b5363d015d', start_time=datetime.datetime(2023, 5, 25, 8, 18, 16, 173000, tzinfo=datetime.timezone.utc), finish_time=datetime.datetime(2023, 5, 25, 8, 18, 46, 177000, tzinfo=datetime.timezone.utc), queue_name='arq:queue', job_id=None) I agree, this seems like a bug. I'll try to look into it this weekend I managed to close issue instead of comment, sorry, reopened |
Thanks @JonasKs. Not related to this issue but thought I would share for the sake of knowledge sharing in case someone else arrives to it: at the moment, to workaround the fact we can't abort and hence use custom IDs, I was using async def queued_jobs(self, *, queue_name: str = default_queue_name) -> List[JobDef]:
"""
Get information about queued, mostly useful when testing.
"""
jobs = await self.zrange(queue_name, withscores=True, start=0, end=-1)
return await asyncio.gather(*[self._get_job_def(job_id, int(score)) for job_id, score in jobs]) However this method will quickly exhaust redis available connections (doesn't scale) when dealing with concurrent requests/jobs as it launches a concurrent async def _get_job_def(self, job_id: bytes, score: int) -> JobDef:
key = job_key_prefix + job_id.decode()
v = await self.get(key) <================ new connection
# ... I was starting to see All that to say: |
Hi there 👋
I am starting to use Arq and it looks wonderful, thank you so much for this much needed framework !
I was having the following 2 issues:
job.abort()
is not working.Here are the details:
Context
queued
,in_progress
orcomplete
) I need to overwrite with this fresher onequeued
orin_progress
scenario: I understand I'd need to abort the existing job (as I don't want to wait for its completion) and enqueue the new one with fresher arguments.complete
, overwrite it: enqueue the new job with fresher arguments.The issues I was having then:
A) Running
job.abort()
with or withouttimemout
seems to have no effect in neitherqueued
orin_progress
jobs.FWIW: I have observed 1 thing though: If I abort providing a
0
timeout, only when the worker dequeues that particular job, then it will decide to not run it:10:56:34: 131.05s ⊘ my_id_10:compute aborted before start
Question: is there anything I am missing regarding aborting jobs? Any idea why it isn't working for me?
B) If I enqueue a job with the same ID as a
complete
one, it will not enqueue it and instead would returnNone
as per job uniqueness.Question: How do I delete the job then so that I am able to enqueue the fresher one ?
Worker settings and job generation
Code trying to abort job
Any help would be much appreciated 🙏
The text was updated successfully, but these errors were encountered: