Skip to content

Commit

Permalink
Multiple queue, multiple cluster in one site (Koed00#71)
Browse files Browse the repository at this point in the history
* Support for multi-queue, multi-cluster configuration. API changes include:
  * Adding `cluster` to async_task() parameters and Task model
  * Adding argument --name to qcluster command
  * Necessary adjustments to Conf and Broker classes
  * Some admin improvements

* Add settings.Q_CLUSTER['ALT_CLUSTERS']: q_cluster config overrides for alternative clusters;
Add Conf.CLUSTER_NAME: separate usage from Conf.PREFIX;
QueueAdmin/OrmQ detail page enhanced: now displaying args/kwargs/q_options instead of encrypted payload.

* if `cluster` argument is not set (the default), async_task() and schedule() will be handled by the default cluster; Documentation update.

* Text cleanup

* Documentation update.

* Fix TIMEOUT setting in Windows for non-default cluster

---------

Co-authored-by: Stan Triepels <1939656+GDay@users.noreply.github.com>
  • Loading branch information
sinowood and GDay authored Apr 2, 2023
1 parent f8520c9 commit 52c0421
Show file tree
Hide file tree
Showing 17 changed files with 215 additions and 47 deletions.
13 changes: 8 additions & 5 deletions django_q/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
class TaskAdmin(admin.ModelAdmin):
"""model admin for success tasks."""

list_display = ("name", "group", "func", "started", "stopped", "time_taken")
list_display = ("name", "group", "func", "cluster", "started", "stopped", "time_taken")

def has_add_permission(self, request):
"""Don't allow adds."""
Expand All @@ -26,7 +26,7 @@ def get_queryset(self, request):

search_fields = ("name", "func", "group")
readonly_fields = []
list_filter = ("group",)
list_filter = ("group", "cluster")

def get_readonly_fields(self, request, obj=None):
"""Set all fields readonly."""
Expand All @@ -36,7 +36,8 @@ def get_readonly_fields(self, request, obj=None):
def retry_failed(FailAdmin, request, queryset):
"""Submit selected tasks back to the queue."""
for task in queryset:
async_task(task.func, *task.args or (), hook=task.hook, **task.kwargs or {})
async_task(task.func, *task.args or (), hook=task.hook,
group=task.group, cluster=task.cluster, **task.kwargs or {})
task.delete()


Expand All @@ -46,15 +47,15 @@ def retry_failed(FailAdmin, request, queryset):
class FailAdmin(admin.ModelAdmin):
"""model admin for failed tasks."""

list_display = ("name", "group", "func", "started", "stopped", "short_result")
list_display = ("name", "group", "func", "cluster", "started", "stopped", "short_result")

def has_add_permission(self, request):
"""Don't allow adds."""
return False

actions = [retry_failed]
search_fields = ("name", "func", "group")
list_filter = ("group",)
list_filter = ("group", "cluster")
readonly_fields = []

def get_readonly_fields(self, request, obj=None):
Expand Down Expand Up @@ -123,6 +124,8 @@ class QueueAdmin(admin.ModelAdmin):
"""queue admin for ORM broker"""

list_display = ("id", "key", "name", "group", "func", "lock", "task_id")
fields = ("key", "lock", "task_id", "name", "group", "func", "args", "kwargs", "q_options")
readonly_fields = fields[2:]

def save_model(self, request, obj, form, change):
obj.save(using=Conf.ORM)
Expand Down
9 changes: 6 additions & 3 deletions django_q/brokers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@


class Broker:
def __init__(self, list_key: str = Conf.PREFIX):
def __init__(self, list_key: str = None):
# With same BROKER_CLASS, `list_key` is just a synonym for `queue_name` except for RedisBroker
list_key = list_key or Conf.CLUSTER_NAME
self.connection = self.get_connection(list_key)
self.list_key = list_key
self.cache = self.get_cache()
Expand Down Expand Up @@ -151,7 +153,7 @@ def get_cache():
return None

@staticmethod
def get_connection(list_key: str = Conf.PREFIX):
def get_connection(list_key: str = None):
"""
Gets a connection to the broker
:param list_key: Optional queue name
Expand All @@ -160,13 +162,14 @@ def get_connection(list_key: str = Conf.PREFIX):
return 0


def get_broker(list_key: str = Conf.PREFIX) -> Broker:
def get_broker(list_key: str = None) -> Broker:
"""
Gets the configured broker type
:param list_key: optional queue name
:type list_key: str
:return: a broker instance
"""
list_key = list_key or Conf.CLUSTER_NAME
# custom
if Conf.BROKER_CLASS:
module, func = Conf.BROKER_CLASS.rsplit(".", 1)
Expand Down
4 changes: 2 additions & 2 deletions django_q/brokers/aws_sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


class Sqs(Broker):
def __init__(self, list_key: str = Conf.PREFIX):
def __init__(self, list_key: str = None):
self.sqs = None
super(Sqs, self).__init__(list_key)
self.queue = self.get_queue()
Expand Down Expand Up @@ -77,7 +77,7 @@ def info(self) -> str:
return "AWS SQS"

@staticmethod
def get_connection(list_key: str = Conf.PREFIX) -> Session:
def get_connection(list_key: str = None) -> Session:
config = Conf.SQS
if "aws_region" in config:
config["region_name"] = config["aws_region"]
Expand Down
3 changes: 2 additions & 1 deletion django_q/brokers/ironmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def acknowledge(self, task_id):
return self.delete(task_id)

@staticmethod
def get_connection(list_key: str = Conf.PREFIX) -> Queue:
def get_connection(list_key: str = None) -> Queue:
list_key = list_key or Conf.CLUSTER_NAME
ironmq = IronMQ(name=None, **Conf.IRON_MQ)
return ironmq.queue(queue_name=list_key)
4 changes: 2 additions & 2 deletions django_q/brokers/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def _timeout():


class Mongo(Broker):
def __init__(self, list_key=Conf.PREFIX):
def __init__(self, list_key: str = None):
super(Mongo, self).__init__(list_key)
self.collection = self.get_collection()

Expand All @@ -24,7 +24,7 @@ def __setstate__(self, state):
self.collection = self.get_collection()

@staticmethod
def get_connection(list_key: str = Conf.PREFIX) -> MongoClient:
def get_connection(list_key: str = None) -> MongoClient:
return MongoClient(**Conf.MONGO)

def get_collection(self):
Expand Down
5 changes: 3 additions & 2 deletions django_q/brokers/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def _timeout():

class ORM(Broker):
@staticmethod
def get_connection(list_key: str = Conf.PREFIX):
def get_connection(list_key: str = None):
if transaction.get_autocommit(
using=Conf.ORM
): # Only True when not in an atomic block
Expand Down Expand Up @@ -55,8 +55,9 @@ def fail(self, task_id):
self.delete(task_id)

def enqueue(self, task):
# list_key might be null (e.g. in a test setup) but OrmQ.key has not-null constraint
package = self.get_connection().create(
key=self.list_key, payload=task, lock=timezone.now()
key=self.list_key or Conf.CLUSTER_NAME, payload=task, lock=timezone.now()
)
return package.pk

Expand Down
5 changes: 3 additions & 2 deletions django_q/brokers/redis_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@


class Redis(Broker):
def __init__(self, list_key: str = Conf.PREFIX):
def __init__(self, list_key: str = None):
list_key = list_key or Conf.CLUSTER_NAME
super(Redis, self).__init__(list_key=f"django_q:{list_key}:q")

def enqueue(self, task):
Expand Down Expand Up @@ -57,7 +58,7 @@ def get_stats(self, pattern: str):
return self.connection.mget(keys)

@staticmethod
def get_connection(list_key: str = Conf.PREFIX) -> Redis:
def get_connection(list_key: str = None) -> Redis:
if django_redis and Conf.DJANGO_REDIS:
return django_redis.get_redis_connection(Conf.DJANGO_REDIS)
if isinstance(Conf.REDIS, str):
Expand Down
39 changes: 24 additions & 15 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,16 @@

class Cluster:
def __init__(self, broker: Broker = None):
self.broker = broker or get_broker()
# Cluster do not need an init or default broker except for testing,
# The sentinel will create a broker for cluster and utilize ALT_CLUSTERS config in Conf.
self.broker = broker # DON'T USE get_broker() to set a default broker here.
self.sentinel = None
self.stop_event = None
self.start_event = None
self.pid = current_process().pid
self.cluster_id = uuid.uuid4()
self.host = socket.gethostname()
self.timeout = Conf.TIMEOUT
self.timeout = None
signal.signal(signal.SIGTERM, self.sig_handler)
signal.signal(signal.SIGINT, self.sig_handler)

Expand Down Expand Up @@ -141,7 +143,7 @@ def __init__(
start_event,
cluster_id,
broker=None,
timeout=Conf.TIMEOUT,
timeout=None,
start=True,
):
# Make sure we catch signals for the pool
Expand All @@ -158,7 +160,7 @@ def __init__(
self.start_event = start_event
self.pool_size = Conf.WORKERS
self.pool = []
self.timeout = timeout
self.timeout = timeout or Conf.TIMEOUT
self.task_queue = (
Queue(maxsize=Conf.QUEUE_LIMIT) if Conf.QUEUE_LIMIT else Queue()
)
Expand All @@ -169,6 +171,10 @@ def __init__(
if start:
self.start()

def queue_name(self):
# multi-queue: cluster name is (broker's) queue_name
return self.broker.list_key if self.broker else '--'

def start(self):
self.broker.ping()
self.spawn_cluster()
Expand Down Expand Up @@ -287,14 +293,14 @@ def guard(self):
_("%(name)s guarding cluster %(cluster_name)s")
% {
"name": current_process().name,
"cluster_name": humanize(self.cluster_id.hex),
"cluster_name": humanize(self.cluster_id.hex) + f" [{self.queue_name()}]",
}
)
self.start_event.set()
Stat(self).save()
logger.info(
_("Q Cluster %(cluster_name)s running.")
% {"cluster_name": humanize(self.cluster_id.hex)}
% {"cluster_name": humanize(self.cluster_id.hex) + f" [{self.queue_name()}]"}
)
counter = 0
cycle = Conf.GUARD_CYCLE # guard loop sleep in seconds
Expand Down Expand Up @@ -401,6 +407,7 @@ def pusher(task_queue: Queue, event: Event, broker: Broker = None):
logger.exception("Failed to push task to queue")
broker.fail(ack_id)
continue
task["cluster"] = Conf.CLUSTER_NAME # save actual cluster name to orm task table
task["ack_id"] = ack_id
task_queue.put(task)
logger.debug(
Expand Down Expand Up @@ -518,6 +525,7 @@ def worker(
pre_execute.send(sender="django_q", func=f, task=task)
# execute the payload
timer.value = timer_value # Busy

try:
if f is None:
# raise a meaningfull error if task["func"] is not a valid function
Expand Down Expand Up @@ -614,6 +622,7 @@ def save_task(task, broker: Broker):
hook=task.get("hook"),
args=task["args"],
kwargs=task["kwargs"],
cluster=task.get("cluster"),
started=task["started"],
stopped=task["stopped"],
result=task["result"],
Expand Down Expand Up @@ -685,13 +694,16 @@ def scheduler(broker: Broker = None):
broker = get_broker()
close_old_django_connections()
try:
# Only default cluster will handler schedule with default(null) cluster
Q_default = db.models.Q(cluster__isnull=True) if Conf.CLUSTER_NAME == Conf.PREFIX else db.models.Q(pk__in=[])

with db.transaction.atomic(using=db.router.db_for_write(Schedule)):
for s in (
Schedule.objects.select_for_update()
.exclude(repeats=0)
.filter(next_run__lt=timezone.now())
.filter(
db.models.Q(cluster__isnull=True) | db.models.Q(cluster=Conf.PREFIX)
Q_default | db.models.Q(cluster=Conf.CLUSTER_NAME)
)
):
args = ()
Expand Down Expand Up @@ -733,14 +745,11 @@ def scheduler(broker: Broker = None):

s.next_run = next_run
s.repeats += -1
# send it to the cluster
scheduled_broker = broker
try:
scheduled_broker = get_broker(q_options["broker_name"])
except: # noqa: E722
# invalid broker_name or non existing broker with broker_name
pass
q_options["broker"] = scheduled_broker
# send it to the cluster; any cluster name is allowed in multi-queue scenarios
# because `broker_name` is confusing, using `cluster` name is recommended and takes precedence
q_options["cluster"] = s.cluster or q_options.get("cluster", q_options.pop("broker_name", None))
if q_options['cluster'] is None or q_options['cluster'] == Conf.CLUSTER_NAME:
q_options["broker"] = broker
q_options["group"] = q_options.get("group", s.name or s.id)
kwargs["q_options"] = q_options
s.task = django_q.tasks.async_task(s.func, *args, **kwargs)
Expand Down
20 changes: 19 additions & 1 deletion django_q/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,22 @@ class Conf:
"""

try:
conf = settings.Q_CLUSTER
conf = settings.Q_CLUSTER.copy()
except AttributeError:
conf = {}

_Q_CLUSTER_NAME = os.getenv("Q_CLUSTER_NAME")
if _Q_CLUSTER_NAME and _Q_CLUSTER_NAME != conf.get("name") and \
_Q_CLUSTER_NAME != conf.get("cluster_name"):
conf["cluster_name"] = _Q_CLUSTER_NAME
alt_conf = conf.pop("ALT_CLUSTERS")
if isinstance(alt_conf, dict):
alt_conf = alt_conf.get(_Q_CLUSTER_NAME)
if isinstance(alt_conf, dict):
alt_conf.pop('name', None)
alt_conf.pop('cluster_name', None)
conf.update(alt_conf)

# Redis server configuration . Follows standard redis keywords
REDIS = conf.get("redis", {})

Expand Down Expand Up @@ -70,8 +82,14 @@ class Conf:
MONGO_DB = conf.get("mongo_db", None)

# Name of the cluster or site. For when you run multiple sites on one redis server
# It's also the `salt` for signing OrmQ, and part of the Redis stats caching key
# For all clusters in one site, PREFIX should be the same value to be able to decrypt payloads
PREFIX = conf.get("name", "default")

# Support alternative cluster name to use multiple queues in one site.
# cluster name and queue name are interchangeable, same thing.
CLUSTER_NAME = conf.get("cluster_name", PREFIX)

# Log output level
LOG_LEVEL = conf.get("log_level", "INFO")

Expand Down
14 changes: 14 additions & 0 deletions django_q/management/commands/qcluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from django.utils.translation import gettext as _

from django_q.cluster import Cluster
import os


class Command(BaseCommand):
Expand All @@ -16,8 +17,21 @@ def add_arguments(self, parser):
default=False,
help="Run once and then stop.",
)
parser.add_argument(
"-n",
"--name",
dest="cluster_name",
default=None,
help="Set alternative cluster name instead of the name in Q_CLUSTER settings (for multi-queue setup). "
"On Linux you should set name through `Q_CLUSTER_NAME=cluster_name python manage.py qcluster` instead."
)

def handle(self, *args, **options):
# Set alternative cluster_name before creating the cluster (cluster_name is broker's queue_name, too)
cluster_name = options.get("cluster_name")
if cluster_name:
os.environ["Q_CLUSTER_NAME"] = cluster_name

q = Cluster()
q.start()
if options.get("run_once", False):
Expand Down
Loading

0 comments on commit 52c0421

Please sign in to comment.