Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job abort not working ? #394

Open
gerazenobi opened this issue Apr 20, 2023 · 9 comments
Open

Job abort not working ? #394

gerazenobi opened this issue Apr 20, 2023 · 9 comments

Comments

@gerazenobi
Copy link

gerazenobi commented Apr 20, 2023

Hi there 👋

I am starting to use Arq and it looks wonderful, thank you so much for this much needed framework !

I was having the following 2 issues:

  • job.abort() is not working.
  • can't delete a job (not sure if this is supported or there is a workaround so that we are able to re-enqueue a job with same custom id.

Here are the details:

Context

  • I enqueue jobs with a custom ID
  • I want results to live for X amount of time (e.g. for 24hs)
  • A new job arrives, and if the same ID exists (queued, in_progress or complete) I need to overwrite with this fresher one
    • (A) for queued or in_progress scenario: I understand I'd need to abort the existing job (as I don't want to wait for its completion) and enqueue the new one with fresher arguments.
    • (B) if the existing job with same ID is complete, overwrite it: enqueue the new job with fresher arguments.

The issues I was having then:

A) Running job.abort() with or without timemout seems to have no effect in neither queued or in_progress jobs.
FWIW: I have observed 1 thing though: If I abort providing a 0 timeout, only when the worker dequeues that particular job, then it will decide to not run it: 10:56:34: 131.05s ⊘ my_id_10:compute aborted before start

Question: is there anything I am missing regarding aborting jobs? Any idea why it isn't working for me?

B) If I enqueue a job with the same ID as a complete one, it will not enqueue it and instead would return Noneas per job uniqueness.
Question: How do I delete the job then so that I am able to enqueue the fresher one ?


Worker settings and job generation
import asyncio
from arq import create_pool
from arq.connections import RedisSettings


async def compute(ctx):
    print(f'compute called {ctx["job_id"]}')
    await asyncio.sleep(3600)
    return ctx["job_id"]


async def main():
    redis = await create_pool(RedisSettings())
    for job_index in range(1, 11):
        job = await redis.enqueue_job("compute", _job_id=f"my_id_{job_index}", _queue_name="example_queue")
        if job:
            print(job.job_id)


class WorkerSettings:
    functions = [compute]
    max_jobs = 3
    keep_result = 3600 * 24
    queue_name = "example_queue"
    allow_abort_jobs = True
    health_check_interval = 5


if __name__ == "__main__":
    asyncio.run(main())
Code trying to abort job
import asyncio
import asyncio
from arq import create_pool
from arq.connections import RedisSettings
from arq.jobs import Job

JOB_ID = "my_id_10"

async def main(queue_name):
    redis = await create_pool(RedisSettings())
    jobs = await redis.queued_jobs(queue_name=queue_name)
    print(f"queued jobs {len(jobs)}")

    job = Job(JOB_ID, redis, _queue_name=queue_name)
    print(f"job info: {await job.info()}")
    job_status = await job.status()
    print(f"status: {job_status}")
    print(f"result_info: {await job.result_info()}")
    try:
        aborted = await job.abort(timeout=1)
        print(f"aborted: {aborted}")
    except Exception as e:
        print("exception raised when aborting job")
        print(f"exception when aborting: {type(e)}")


if __name__ == "__main__":
    asyncio.run(main("example_queue"))

Any help would be much appreciated 🙏

@gerazenobi gerazenobi changed the title Question - Managing Job override / replacement (abort and re-enqueue) Managing Job override / replacement (abort and re-enqueue) Apr 20, 2023
@gerazenobi gerazenobi changed the title Managing Job override / replacement (abort and re-enqueue) Managing Job override / replacement (delete and re-enqueue) Apr 21, 2023
@gerazenobi gerazenobi changed the title Managing Job override / replacement (delete and re-enqueue) Managing Job: abort / replacement (delete and re-enqueue) Apr 24, 2023
@gerazenobi gerazenobi changed the title Managing Job: abort / replacement (delete and re-enqueue) Job abort / replacement (delete and re-enqueue) Apr 24, 2023
@gerazenobi gerazenobi changed the title Job abort / replacement (delete and re-enqueue) Job abort - Job delete Apr 26, 2023
@gerazenobi gerazenobi changed the title Job abort - Job delete Job abort not working ? May 10, 2023
@JonasKs
Copy link
Collaborator

JonasKs commented May 15, 2023

I'll have to look into this further tomorrow, as I have never had the need to cancel jobs myself.

Running job.abort() with or without timemout seems to have no effect in neither queued or in_progress jobs.

This seems like a bug.

@JonasKs
Copy link
Collaborator

JonasKs commented May 17, 2023

Hey, apologize for the lack of response. I haven't forgot, just not found the time. I've got off on Friday, and I'll use that day for open source/personal projects and get back to you then.

@gerazenobi
Copy link
Author

@JonasKs no problem! and thanks ❤️

@JonasKs
Copy link
Collaborator

JonasKs commented May 19, 2023

Hi again @gerazenobi 😊 First, in case someone else reads this issue at a later point:

The abort() function requires the allow_abort_jobs flag has been set on the worker:

:param allow_abort_jobs: whether to abort jobs on a call to :func:arq.jobs.Job.abort

The docs also state this here.

I can see you've done this! 😊

The next thing I did was to check out your implementation. Knowing that this library has some quirkiness to queue-names, I tried removing queue_name from your implementation, and abort seems to work fine. I started working on a queue-name fix here, which I just commited (a very old branch though, it don't work, and I don't remember why 😁). The issue is somewhat explained in #346.

So, short workaround: Use default queue.
Best workaround: Attempt to fix queue names. I won't have time to do this atm, I'm flooded with work at work - hence I took time off to answer my 50 GitHub notifications 😁 PR very welcome.

@gerazenobi
Copy link
Author

Hi @JonasKs 👋

Thanks so much for having taken a look at this and providing a workaround 🙏 I will test on my side and get back to you.

@gerazenobi
Copy link
Author

gerazenobi commented May 22, 2023

@JonasKs
I tried again with the code provided previously and it didn't work for me even though I removed queue name and also custom ids in order to have the bare minimuim:

#the job
async def compute(ctx):
    await asyncio.sleep(30)
    return ctx["job_id"]
#enqueuing jobs
async def main():
    redis = await create_pool(RedisSettings())
    for job_index in range(1, 50):
        job = await redis.enqueue_job("compute")
        if job:
            print(job.job_id)
class WorkerSettings:
    functions = [compute]
    keep_result = 3600 * 24
    allow_abort_jobs = True

how I am aborting:

JOB_ID = "d85e5e5b62074cf59aea481783e8200a"

async def main():
    redis = await create_pool(RedisSettings())
    job = Job(JOB_ID, redis)
    job_info = await job.info()
    job_status = await job.status()
    print(f"job info: {job_info}")
    print(f"status: {job_status}")
    print(f"result_info: {job_info}")
    try:
        await job.abort(timeout=0)
    except Exception as e:
        print("job abort raised exception", e)


if __name__ == "__main__":
    asyncio.run(main())

I enqueued the jobs, and run the abort script above multiple times: it prints status: queued always.

@JonasKs
Copy link
Collaborator

JonasKs commented May 25, 2023

Using your example from the first post results in this output:

queued jobs 10
job info: JobDef(function='compute', args=(), kwargs={}, job_try=None, enqueue_time=datetime.datetime(2023, 5, 25, 7, 14, 55, 350000, tzinfo=datetime.timezone.utc), score=1684998895350)
status: JobStatus.queued
result_info: None
exception raised when aborting job
exception when aborting: <class 'TimeoutError'>

Removing queue names:

import asyncio
from arq import create_pool
from arq.connections import RedisSettings


async def compute(ctx):
    print(f'compute called {ctx["job_id"]}')
    await asyncio.sleep(3600)
    return ctx["job_id"]


async def main():
    redis = await create_pool(RedisSettings())
    for job_index in range(1, 11):
        job = await redis.enqueue_job("compute", _job_id=f"my_id_{job_index}")
        if job:
            print(job.job_id)


class WorkerSettings:
    functions = [compute]
    max_jobs = 3
    keep_result = 3600 * 24
    allow_abort_jobs = True
    health_check_interval = 5


if __name__ == "__main__":
    asyncio.run(main())
import asyncio
import asyncio
from arq import create_pool
from arq.connections import RedisSettings
from arq.jobs import Job

JOB_ID = "my_id_10"

async def main():
    redis = await create_pool(RedisSettings())
    jobs = await redis.queued_jobs()
    print(f"queued jobs {len(jobs)}")

    job = Job(JOB_ID, redis)
    print(f"job info: {await job.info()}")
    job_status = await job.status()
    print(f"status: {job_status}")
    print(f"result_info: {await job.result_info()}")
    try:
        aborted = await job.abort(timeout=1)
        print(f"aborted: {aborted}")
    except Exception as e:
        print("exception raised when aborting job")
        print(f"exception when aborting: {type(e)}")


if __name__ == "__main__":
    asyncio.run(main())

Results in this output:

queued jobs 0
job info: JobResult(function='compute', args=(), kwargs={}, job_try=1, enqueue_time=datetime.datetime(2023, 5, 25, 7, 14, 55, 350000, tzinfo=datetime.timezone.utc), score=None, success=False, result=CancelledError(), start_time=datetime.datetime(2023, 5, 25, 7, 29, 55, 591000, tzinfo=datetime.timezone.utc), finish_time=datetime.datetime(2023, 5, 25, 7, 29, 55, 594000, tzinfo=datetime.timezone.utc), queue_name='example_queue', job_id=None)
status: JobStatus.complete
result_info: JobResult(function='compute', args=(), kwargs={}, job_try=1, enqueue_time=datetime.datetime(2023, 5, 25, 7, 14, 55, 350000, tzinfo=datetime.timezone.utc), score=None, success=False, result=CancelledError(), start_time=datetime.datetime(2023, 5, 25, 7, 29, 55, 591000, tzinfo=datetime.timezone.utc), finish_time=datetime.datetime(2023, 5, 25, 7, 29, 55, 594000, tzinfo=datetime.timezone.utc), queue_name='example_queue', job_id=None)
aborted: True

@JonasKs
Copy link
Collaborator

JonasKs commented May 25, 2023

Aha, I see, with the new example there's something iffy going on.

With the job:

async def compute(ctx):
    print(f'starting job {ctx}')
    await asyncio.sleep(30)
    print('slept')
    return ctx["job_id"]

Then finding the log from the worker:

starting job {'redis': ArqRedis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>, 'job_id': 'afc5d7518819482eaaedffb0a82372f8', 'job_try': 1, 'enqueue_time': datetime.datetime(2023, 5, 25, 8, 17, 15, 800000, tzinfo=datetime.timezone.utc), 'score': 1685002635800}

Then aborting the job:

job info: JobDef(function='compute', args=(), kwargs={}, job_try=None, enqueue_time=datetime.datetime(2023, 5, 25, 8, 17, 15, 815000, tzinfo=datetime.timezone.utc), score=1685002635815)
status: JobStatus.in_progress
result_info: JobDef(function='compute', args=(), kwargs={}, job_try=None, enqueue_time=datetime.datetime(2023, 5, 25, 8, 17, 15, 815000, tzinfo=datetime.timezone.utc), score=1685002635815)
job abort raised exception 

Then waiting for job to complete:

job info: JobResult(function='compute', args=(), kwargs={}, job_try=1, enqueue_time=datetime.datetime(2023, 5, 25, 8, 17, 15, 815000, tzinfo=datetime.timezone.utc), score=None, success=True, result='f859323bff434573850b10b5363d015d', start_time=datetime.datetime(2023, 5, 25, 8, 18, 16, 173000, tzinfo=datetime.timezone.utc), finish_time=datetime.datetime(2023, 5, 25, 8, 18, 46, 177000, tzinfo=datetime.timezone.utc), queue_name='arq:queue', job_id=None)
status: JobStatus.complete
result_info: JobResult(function='compute', args=(), kwargs={}, job_try=1, enqueue_time=datetime.datetime(2023, 5, 25, 8, 17, 15, 815000, tzinfo=datetime.timezone.utc), score=None, success=True, result='f859323bff434573850b10b5363d015d', start_time=datetime.datetime(2023, 5, 25, 8, 18, 16, 173000, tzinfo=datetime.timezone.utc), finish_time=datetime.datetime(2023, 5, 25, 8, 18, 46, 177000, tzinfo=datetime.timezone.utc), queue_name='arq:queue', job_id=None)

I agree, this seems like a bug. I'll try to look into it this weekend

I managed to close issue instead of comment, sorry, reopened

@JonasKs JonasKs closed this as completed May 25, 2023
@JonasKs JonasKs reopened this May 25, 2023
@gerazenobi
Copy link
Author

gerazenobi commented May 29, 2023

Thanks @JonasKs.

Not related to this issue but thought I would share for the sake of knowledge sharing in case someone else arrives to it: at the moment, to workaround the fact we can't abort and hence use custom IDs, I was using queued_jobs method and then querying the jobs' kwargs in JobDef to manage/track which jobs are in the queue:

    async def queued_jobs(self, *, queue_name: str = default_queue_name) -> List[JobDef]:
        """
        Get information about queued, mostly useful when testing.
        """
        jobs = await self.zrange(queue_name, withscores=True, start=0, end=-1)
        return await asyncio.gather(*[self._get_job_def(job_id, int(score)) for job_id, score in jobs])

However this method will quickly exhaust redis available connections (doesn't scale) when dealing with concurrent requests/jobs as it launches a concurrent _get_job_def, for each job in the queue:

    async def _get_job_def(self, job_id: bytes, score: int) -> JobDef:
        key = job_key_prefix + job_id.decode()
        v = await self.get(key)   <================ new connection
        # ...

I was starting to see redis.exceptions.ConnectionError: max number of clients reached

All that to say: queued_jobs should be used with care and perhaps only for testing 🤔 .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants