diff --git a/.travis.yml b/.travis.yml index a0a5579d..269d24fa 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,12 +5,11 @@ services: - mongodb python: - - "2.7" - "3.6" env: - - DJANGO=1.11.6 - - DJANGO=1.10.8 + - DJANGO=2.0 + - DJANGO=1.11.9 - DJANGO=1.8.18 sudo: false diff --git a/django_q/cluster.py b/django_q/cluster.py index 23db9639..4dd61f60 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -10,7 +10,7 @@ import socket import ast from time import sleep -from multiprocessing import Queue, Event, Process, Value, current_process +from multiprocessing import Event, Process, Value, current_process # external import arrow @@ -30,7 +30,7 @@ from django_q.status import Stat, Status from django_q.brokers import get_broker from django_q.signals import pre_execute - +from django_q.queues import Queue class Cluster(object): diff --git a/django_q/conf.py b/django_q/conf.py index 6e955a16..e8afac17 100644 --- a/django_q/conf.py +++ b/django_q/conf.py @@ -1,7 +1,7 @@ import logging from copy import deepcopy from signal import signal -from multiprocessing import cpu_count, Queue +from multiprocessing import cpu_count # django from django.utils.translation import ugettext_lazy as _ @@ -11,6 +11,9 @@ import os import pkg_resources +# local +from django_q.queues import Queue + # optional try: import psutil diff --git a/django_q/core_signing.py b/django_q/core_signing.py new file mode 100644 index 00000000..1790a0bb --- /dev/null +++ b/django_q/core_signing.py @@ -0,0 +1,76 @@ +from __future__ import unicode_literals + +import datetime +import time +import zlib + +from django.utils import baseconv +from django.utils.crypto import constant_time_compare +from django.utils.encoding import force_bytes, force_str, force_text +from django.core.signing import BadSignature, SignatureExpired, b64_decode, JSONSerializer, \ + Signer as Sgnr, TimestampSigner as TsS, dumps + +dumps = dumps + + +""" +The loads function is the same as the `django.core.signing.loads` function +The difference is that `this` loads function calls `TimestampSigner` and `Signer` +""" +def loads(s, key=None, salt='django.core.signing', serializer=JSONSerializer, max_age=None): + """ + Reverse of dumps(), raise BadSignature if signature fails. + + The serializer is expected to accept a bytestring. + """ + # TimestampSigner.unsign() returns str but base64 and zlib compression + # operate on bytes. + base64d = force_bytes(TimestampSigner(key, salt=salt).unsign(s, max_age=max_age)) + decompress = False + if base64d[:1] == b'.': + # It's compressed; uncompress it first + base64d = base64d[1:] + decompress = True + data = b64_decode(base64d) + if decompress: + data = zlib.decompress(data) + return serializer().loads(data) + + +class Signer(Sgnr): + + def unsign(self, signed_value): + # force_str is removed in Django 2.0 + signed_value = force_str(signed_value) + if self.sep not in signed_value: + raise BadSignature('No "%s" found in value' % self.sep) + value, sig = signed_value.rsplit(self.sep, 1) + if constant_time_compare(sig, self.signature(value)): + # force_text is removed in Django 2.0 + return force_text(value) + raise BadSignature('Signature "%s" does not match' % sig) + + +""" +TimestampSigner is also the same as `django.core.signing.TimestampSigner` but is +calling `this` Signer. +""" +class TimestampSigner(Signer, TsS): + + def unsign(self, value, max_age=None): + """ + Retrieve original value and check it wasn't signed more + than max_age seconds ago. + """ + result = super().unsign(value) + value, timestamp = result.rsplit(self.sep, 1) + timestamp = baseconv.base62.decode(timestamp) + if max_age is not None: + if isinstance(max_age, datetime.timedelta): + max_age = max_age.total_seconds() + # Check timestamp is not older than max_age + age = time.time() - timestamp + if age > max_age: + raise SignatureExpired( + 'Signature age %s > %s seconds' % (age, max_age)) + return value diff --git a/django_q/queues.py b/django_q/queues.py new file mode 100644 index 00000000..ad729a1b --- /dev/null +++ b/django_q/queues.py @@ -0,0 +1,69 @@ +""" +The code is derived from https://github.com/althonos/pronto/commit/3384010dfb4fc7c66a219f59276adef3288a886b +""" + +import multiprocessing +import multiprocessing.queues + + +class SharedCounter(object): + """ A synchronized shared counter. + + The locking done by multiprocessing.Value ensures that only a single + process or thread may read or write the in-memory ctypes object. However, + in order to do n += 1, Python performs a read followed by a write, so a + second process may read the old value before the new one is written by + the first process. The solution is to use a multiprocessing.Lock to + guarantee the atomicity of the modifications to Value. + + This class comes almost entirely from Eli Bendersky's blog: + http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/ + """ + + def __init__(self, n=0): + self.count = multiprocessing.Value('i', n) + + def increment(self, n=1): + """ Increment the counter by n (default = 1) """ + with self.count.get_lock(): + self.count.value += n + + @property + def value(self): + """ Return the value of the counter """ + return self.count.value + + +class Queue(multiprocessing.queues.Queue): + """ A portable implementation of multiprocessing.Queue. + + Because of multithreading / multiprocessing semantics, Queue.qsize() may + raise the NotImplementedError exception on Unix platforms like Mac OS X + where sem_getvalue() is not implemented. This subclass addresses this + problem by using a synchronized shared counter (initialized to zero) and + increasing / decreasing its value every time the put() and get() methods + are called, respectively. This not only prevents NotImplementedError from + being raised, but also allows us to implement a reliable version of both + qsize() and empty(). + """ + + def __init__(self, *args, **kwargs): + super(Queue, self).__init__(*args, ctx=multiprocessing.get_context(), **kwargs) + self.size = SharedCounter(0) + + def put(self, *args, **kwargs): + super(Queue, self).put(*args, **kwargs) + self.size.increment(1) + + def get(self, *args, **kwargs): + x = super(Queue, self).get(*args, **kwargs) + self.size.increment(-1) + return x + + def qsize(self): + """ Reliable implementation of multiprocessing.Queue.qsize() """ + return self.size.value + + def empty(self): + """ Reliable implementation of multiprocessing.Queue.empty() """ + return not self.qsize() > 0 diff --git a/django_q/signing.py b/django_q/signing.py index 4ee8bf31..2460aaa5 100644 --- a/django_q/signing.py +++ b/django_q/signing.py @@ -4,7 +4,7 @@ except ImportError: import pickle -from django.core import signing +from django_q import core_signing as signing from django_q.conf import Conf diff --git a/django_q/tasks.py b/django_q/tasks.py index 5f1c46b1..e64bfc9d 100644 --- a/django_q/tasks.py +++ b/django_q/tasks.py @@ -1,5 +1,5 @@ """Provides task functionality.""" -from multiprocessing import Queue, Value +from multiprocessing import Value # django from django.db import IntegrityError @@ -14,6 +14,7 @@ from django_q.humanhash import uuid from django_q.brokers import get_broker from django_q.signals import pre_enqueue +from django_q.queues import Queue def async(func, *args, **kwargs): diff --git a/django_q/tests/settings.py b/django_q/tests/settings.py index 8bad66c8..b5449633 100644 --- a/django_q/tests/settings.py +++ b/django_q/tests/settings.py @@ -1,4 +1,5 @@ import os +import django BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) @@ -28,16 +29,18 @@ 'django_redis' ) + MIDDLEWARE_CLASSES = ( 'django.contrib.sessions.middleware.SessionMiddleware', 'django.middleware.common.CommonMiddleware', 'django.middleware.csrf.CsrfViewMiddleware', 'django.contrib.auth.middleware.AuthenticationMiddleware', - 'django.contrib.auth.middleware.SessionAuthenticationMiddleware', 'django.contrib.messages.middleware.MessageMiddleware', 'django.middleware.clickjacking.XFrameOptionsMiddleware', ) +MIDDLEWARE = MIDDLEWARE_CLASSES + ROOT_URLCONF = 'tests.urls' TEMPLATES = [ diff --git a/django_q/tests/test_cached.py b/django_q/tests/test_cached.py index 1cf99802..e32c60c0 100644 --- a/django_q/tests/test_cached.py +++ b/django_q/tests/test_cached.py @@ -1,4 +1,4 @@ -from multiprocessing import Event, Queue, Value +from multiprocessing import Event, Value import pytest @@ -8,6 +8,7 @@ from django_q.tasks import async, result, fetch, count_group, result_group, fetch_group, delete_group, delete_cached, \ async_iter, Chain, async_chain, Iter, Async from django_q.brokers import get_broker +from django_q.queues import Queue @pytest.fixture diff --git a/django_q/tests/test_cluster.py b/django_q/tests/test_cluster.py index d16d0ac3..15461d0a 100644 --- a/django_q/tests/test_cluster.py +++ b/django_q/tests/test_cluster.py @@ -1,6 +1,6 @@ import sys import threading -from multiprocessing import Queue, Event, Value +from multiprocessing import Event, Value from time import sleep from django.utils import timezone @@ -19,6 +19,7 @@ from django_q.status import Stat from django_q.brokers import get_broker from django_q.tests.tasks import multiply +from django_q.queues import Queue class WordClass(object): diff --git a/django_q/tests/test_scheduler.py b/django_q/tests/test_scheduler.py index 47a08603..51607764 100644 --- a/django_q/tests/test_scheduler.py +++ b/django_q/tests/test_scheduler.py @@ -1,5 +1,5 @@ from datetime import timedelta -from multiprocessing import Queue, Event, Value +from multiprocessing import Event, Value import arrow import pytest @@ -10,6 +10,7 @@ from django_q.cluster import pusher, worker, monitor, scheduler from django_q.conf import Conf from django_q.tasks import Schedule, fetch, schedule as create_schedule +from django_q.queues import Queue @pytest.fixture diff --git a/django_q/tests/urls.py b/django_q/tests/urls.py index eb91d283..ede2ec91 100644 --- a/django_q/tests/urls.py +++ b/django_q/tests/urls.py @@ -1,6 +1,6 @@ -from django.conf.urls import include, url +from django.conf.urls import url from django.contrib import admin urlpatterns = [ - url(r'^admin/', include(admin.site.urls)), + url(r'^admin/', admin.site.urls), ] diff --git a/requirements.txt b/requirements.txt index 6bd4a89b..d88c43d8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,11 +4,11 @@ # # pip-compile --output-file requirements.txt requirements.in # -arrow==0.10.0 +arrow==0.12.0 blessed==1.14.2 -boto3==1.4.7 -botocore==1.7.28 # via boto3, s3transfer -certifi==2017.7.27.1 # via requests +boto3==1.5.9 +botocore==1.8.23 # via boto3, s3transfer +certifi==2017.11.5 # via requests chardet==3.0.4 # via requests django-picklefield==1.0.0 django-redis==4.8.0 @@ -18,13 +18,13 @@ idna==2.6 # via requests iron-core==1.2.0 # via iron-mq iron-mq==0.9 jmespath==0.9.3 # via boto3, botocore -psutil==5.4.0 -pymongo==3.5.1 +psutil==5.4.3 +pymongo==3.6.0 python-dateutil==2.6.1 # via arrow, botocore, iron-core redis==2.10.6 requests==2.18.4 # via iron-core, rollbar -rollbar==0.13.13 -s3transfer==0.1.11 # via boto3 +rollbar==0.13.17 +s3transfer==0.1.12 # via boto3 six==1.11.0 # via blessed, python-dateutil, rollbar urllib3==1.22 # via requests wcwidth==0.1.7 # via blessed