Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#50 adds psutil as alternative os.getppid provider #51

Merged
merged 6 commits into from
Aug 28, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Installation
- Make sure you have a `Redis <http://redis.io/>`__ server running
somewhere

Read the full documentation at `http://django-q.readthedocs.org <http://django-q.readthedocs.org>`__
Read the full documentation at `https://django-q.readthedocs.org <https://django-q.readthedocs.org>`__


Configuration
Expand All @@ -84,10 +84,10 @@ All configuration settings are optional. e.g:
'db': 0, }
}

For full configuration options, see the `configuration documentation <http://django-q.readthedocs.org/en/latest/install.html#configuration>`__.
For full configuration options, see the `configuration documentation <https://django-q.readthedocs.org/en/latest/configure.html>`__.


If you are using `django-redis <https://github.com/niwinz/django-redis>`__ , you can `configure <https://django-q.readthedocs.org/en/latest/install.html#django-redis>`__ Django Q to use its connection pool.
If you are using `django-redis <https://github.com/niwinz/django-redis>`__ , you can `configure <https://django-q.readthedocs.org/en/latest/configure.html#django-redis>`__ Django Q to use its connection pool.

Management Commands
~~~~~~~~~~~~~~~~~~~
Expand Down Expand Up @@ -129,7 +129,7 @@ Use `async` from your code to quickly offload tasks:
def print_result(task):
print(task.result)

For more info see `Tasks <http://django-q.readthedocs.org/en/latest/tasks.html>`__
For more info see `Tasks <https://django-q.readthedocs.org/en/latest/tasks.html>`__


Schedule
Expand Down Expand Up @@ -168,7 +168,7 @@ Admin page or directly from your code:
repeats=24,
next_run=arrow.utcnow().replace(hour=18, minute=0))

For more info check the `Schedules <http://django-q.readthedocs.org/en/latest/schedules.html>`__ documentation.
For more info check the `Schedules <https://django-q.readthedocs.org/en/latest/schedules.html>`__ documentation.


Testing
Expand All @@ -181,7 +181,6 @@ Todo
~~~~

- Better tests and coverage
- Get out of Alpha
- Less dependencies?

Acknowledgements
Expand Down
2 changes: 1 addition & 1 deletion django_q/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .tasks import async, schedule, result, result_group, fetch, fetch_group, count_group, delete_group, queue_size
from .models import Task, Schedule, Success, Failure
from .cluster import Cluster
from .monitor import Stat
from .status import Stat

VERSION = (0, 5, 3)

Expand Down
12 changes: 6 additions & 6 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

# Standard
import importlib
import os
import signal
import socket
import sys
Expand All @@ -31,9 +30,9 @@
import signing
import tasks

from django_q.conf import Conf, redis_client, logger, psutil
from django_q.conf import Conf, redis_client, logger, psutil, get_ppid
from django_q.models import Task, Success, Schedule
from django_q.monitor import Status, Stat, ping_redis
from django_q.status import Stat, Status, ping_redis


class Cluster(object):
Expand Down Expand Up @@ -111,7 +110,7 @@ def __init__(self, stop_event, start_event, list_key=Conf.Q_LIST, timeout=Conf.T
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
self.pid = current_process().pid
self.parent_pid = os.getppid()
self.parent_pid = get_ppid()
self.name = current_process().name
self.list_key = list_key
self.r = redis_client
Expand Down Expand Up @@ -166,7 +165,7 @@ def dummy_close():
return p

def spawn_pusher(self):
return self.spawn_process(pusher, self.task_queue, self.event_out, self.list_key, self.r)
return self.spawn_process(pusher, self.task_queue, self.event_out, self.list_key)

def spawn_worker(self):
self.spawn_process(worker, self.task_queue, self.result_queue, Value('f', -1), self.timeout)
Expand Down Expand Up @@ -288,14 +287,15 @@ def stop(self):
Stat(self).save()


def pusher(task_queue, event, list_key=Conf.Q_LIST, r=redis_client):
def pusher(task_queue, event, list_key=Conf.Q_LIST):
"""
Pulls tasks of the Redis List and puts them in the task queue
:type task_queue: multiprocessing.Queue
:type event: multiprocessing.Event
:type list_key: str
"""
logger.info(_('{} pushing tasks at {}').format(current_process().name, current_process().pid))
r = redis_client
while True:
try:
task = r.blpop(list_key, 1)
Expand Down
11 changes: 11 additions & 0 deletions django_q/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from django.conf import settings

# external
import os
import redis

# optional
Expand Down Expand Up @@ -143,3 +144,13 @@ def get_redis_client():

# redis client
redis_client = get_redis_client()


# get parent pid compatibility
def get_ppid():
if hasattr(os, 'getppid'):
return os.getppid()
elif psutil:
return psutil.Process(os.getpid()).ppid()
else:
raise OSError('Your OS does not support `os.getppid`. Please install `psutil` as an alternative provider.')
120 changes: 2 additions & 118 deletions django_q/monitor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from datetime import timedelta
import socket

# external
from blessed import Terminal
Expand All @@ -11,8 +10,8 @@
from django.utils.translation import ugettext as _

# local
import signing
from django_q.conf import Conf, redis_client, logger
from django_q.conf import Conf, redis_client
from django_q.status import Stat, ping_redis
from django_q import models


Expand Down Expand Up @@ -85,113 +84,6 @@ def monitor(run_once=False, r=redis_client):
val = term.inkey(timeout=1)


class Status(object):
"""Cluster status base class."""

def __init__(self, pid):
self.workers = []
self.tob = None
self.reincarnations = 0
self.cluster_id = pid
self.sentinel = 0
self.status = Conf.STOPPED
self.done_q_size = 0
self.host = socket.gethostname()
self.monitor = 0
self.task_q_size = 0
self.pusher = 0
self.timestamp = timezone.now()


class Stat(Status):
"""Status object for Cluster monitoring."""

def __init__(self, sentinel):
super(Stat, self).__init__(sentinel.parent_pid or sentinel.pid)
self.r = sentinel.r
self.tob = sentinel.tob
self.reincarnations = sentinel.reincarnations
self.sentinel = sentinel.pid
self.status = sentinel.status()
self.done_q_size = 0
self.task_q_size = 0
if Conf.QSIZE:
self.done_q_size = sentinel.result_queue.qsize()
self.task_q_size = sentinel.task_queue.qsize()
if sentinel.monitor:
self.monitor = sentinel.monitor.pid
if sentinel.pusher:
self.pusher = sentinel.pusher.pid
self.workers = [w.pid for w in sentinel.pool]

def uptime(self):
return (timezone.now() - self.tob).total_seconds()

@property
def key(self):
"""
:return: redis key for this cluster statistic
"""
return self.get_key(self.cluster_id)

@staticmethod
def get_key(cluster_id):
"""
:param cluster_id: cluster ID
:return: redis key for the cluster statistic
"""
return '{}:{}'.format(Conf.Q_STAT, cluster_id)

def save(self):
try:
self.r.set(self.key, signing.SignedPackage.dumps(self, True), 3)
except Exception as e:
logger.error(e)

def empty_queues(self):
return self.done_q_size + self.task_q_size == 0

@staticmethod
def get(cluster_id, r=redis_client):
"""
gets the current status for the cluster
:param cluster_id: id of the cluster
:return: Stat or Status
"""
key = Stat.get_key(cluster_id)
if r.exists(key):
pack = r.get(key)
try:
return signing.SignedPackage.loads(pack)
except signing.BadSignature:
return None
return Status(cluster_id)

@staticmethod
def get_all(r=redis_client):
"""
Get the status for all currently running clusters with the same prefix
and secret key.
:return: list of type Stat
"""
stats = []
keys = r.keys(pattern='{}:*'.format(Conf.Q_STAT))
if keys:
packs = r.mget(keys)
for pack in packs:
try:
stats.append(signing.SignedPackage.loads(pack))
except signing.BadSignature:
continue
return stats

def __getstate__(self):
# Don't pickle the redis connection
state = dict(self.__dict__)
del state['r']
return state


def info(r=redis_client):
term = Terminal()
ping_redis(r)
Expand Down Expand Up @@ -272,11 +164,3 @@ def info(r=redis_client):
term.white('{0:.4f}'.format(exec_time))
)
return True


def ping_redis(r):
try:
r.ping()
except Exception as e:
logger.error('Can not connect to Redis server.')
raise e
119 changes: 119 additions & 0 deletions django_q/status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import socket
from django.utils import timezone
from django_q.conf import Conf, logger, redis_client
import signing


class Status(object):
"""Cluster status base class."""

def __init__(self, pid):
self.workers = []
self.tob = None
self.reincarnations = 0
self.cluster_id = pid
self.sentinel = 0
self.status = Conf.STOPPED
self.done_q_size = 0
self.host = socket.gethostname()
self.monitor = 0
self.task_q_size = 0
self.pusher = 0
self.timestamp = timezone.now()


class Stat(Status):
"""Status object for Cluster monitoring."""

def __init__(self, sentinel):
super(Stat, self).__init__(sentinel.parent_pid or sentinel.pid)
self.r = sentinel.r
self.tob = sentinel.tob
self.reincarnations = sentinel.reincarnations
self.sentinel = sentinel.pid
self.status = sentinel.status()
self.done_q_size = 0
self.task_q_size = 0
if Conf.QSIZE:
self.done_q_size = sentinel.result_queue.qsize()
self.task_q_size = sentinel.task_queue.qsize()
if sentinel.monitor:
self.monitor = sentinel.monitor.pid
if sentinel.pusher:
self.pusher = sentinel.pusher.pid
self.workers = [w.pid for w in sentinel.pool]

def uptime(self):
return (timezone.now() - self.tob).total_seconds()

@property
def key(self):
"""
:return: redis key for this cluster statistic
"""
return self.get_key(self.cluster_id)

@staticmethod
def get_key(cluster_id):
"""
:param cluster_id: cluster ID
:return: redis key for the cluster statistic
"""
return '{}:{}'.format(Conf.Q_STAT, cluster_id)

def save(self):
try:
self.r.set(self.key, signing.SignedPackage.dumps(self, True), 3)
except Exception as e:
logger.error(e)

def empty_queues(self):
return self.done_q_size + self.task_q_size == 0

@staticmethod
def get(cluster_id, r=redis_client):
"""
gets the current status for the cluster
:param cluster_id: id of the cluster
:return: Stat or Status
"""
key = Stat.get_key(cluster_id)
if r.exists(key):
pack = r.get(key)
try:
return signing.SignedPackage.loads(pack)
except signing.BadSignature:
return None
return Status(cluster_id)

@staticmethod
def get_all(r=redis_client):
"""
Get the status for all currently running clusters with the same prefix
and secret key.
:return: list of type Stat
"""
stats = []
keys = r.keys(pattern='{}:*'.format(Conf.Q_STAT))
if keys:
packs = r.mget(keys)
for pack in packs:
try:
stats.append(signing.SignedPackage.loads(pack))
except signing.BadSignature:
continue
return stats

def __getstate__(self):
# Don't pickle the redis connection
state = dict(self.__dict__)
del state['r']
return state


def ping_redis(r):
try:
r.ping()
except Exception as e:
logger.error('Can not connect to Redis server.')
raise e
Loading