Skip to content

Latest commit

 

History

History
75 lines (58 loc) · 1.9 KB

todo.org

File metadata and controls

75 lines (58 loc) · 1.9 KB

concurrent job execution with ensure_future

extract protocol layer

We can’t use connection pull and pipelines without explicit entry points.

job_result

cleanup registries

suspension

around dequeue job -> worker heartbeat (expire worker key)

adapt jobs, queues and workers to use this protocol layer

Leave only public methods on each component

__init__

  • Don’t import job class but return it instances as part of public API.
  • cancel_job(job_id, pool)
  • requeue_job(job_id, pool)
  • get_current_job() # Unique job instance for each asyncio task.
  • get_current_worker() # To store state (HTTP session for example).
  • job_exists(job_id, pool)
  • fetch_job(job_id, pool) # Return instance from redis.
  • all_queues(pool)
  • all_workers(pool)
  • is_suspended(pool)
  • suspend(pool)
  • resume(pool)

job

  • Convert redis spec into executable spec
  • Convert something into Job instance
  • class Job
    • job.get_status()
    • job.is_finished()
    • job.is_queued()
    • job.is_failed()
    • job.is_started()
    • all data attributes from spec
    • job.result # Immediate check for result

queue

  • class Queue
    • queue.empty()
    • queue.is_empty()
    • queue.count
    • queue.jobs
    • queue.compact()
    • queue.enqueue_call()
    • queue.enqueue()

worker

  • class Worker
    • This class does not contain anything affecting redis protocol.
    • This class used to pass custom exception handlers mostly.
    • worker.get_state()
    • worker.work()

use redis connection pool

Concurrent job execution blocked by single redis connection.

remove rq dependency

python 3.5 async/await syntax support

  • async for job in Queue

yield from job.wait_result

It must return after worker process done this task or task was failed.

redis bluster support

eta

rate limit