Skip to content

Commit

Permalink
Merge pull request #5517 from archesproject/5474_celery
Browse files Browse the repository at this point in the history
Implements celery in Arches
  • Loading branch information
chiatt authored Nov 6, 2019
2 parents b3159d3 + 9186707 commit bb6e9a4
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 329 deletions.
12 changes: 11 additions & 1 deletion arches/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
from __future__ import absolute_import
from arches.setup import get_version

VERSION = (4, 4, 1, 'final', 0)
try:
from .celery import app as celery_app
except ModuleNotFoundError as e:
print(e)

VERSION = (5, 0, 0, 'final', 0)

__version__ = get_version(VERSION)

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
__all__ = ('celery_app',)
9 changes: 9 additions & 0 deletions arches/app/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
from django.core import management

@shared_task
def sync(surveyid, userid):
management.call_command('mobile', operation='sync_survey', id=surveyid, user=userid)
return 'sync complete'
24 changes: 24 additions & 0 deletions arches/app/utils/task_management.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import logging
from kombu import Connection
from django.utils.translation import ugettext as _
from arches.app.models.system_settings import settings
from arches.celery import app

logger = logging.getLogger(__name__)


def check_if_celery_available():
try:
conn = Connection(settings.CELERY_BROKER_URL)
conn.ensure_connection(max_retries=2)
except Exception as e:
logger.warning(_("Unable to connect to a celery broker"))
return False
inspect = app.control.inspect()
result = inspect.ping()
if result is None:
logger.warning(_("A celery broker is running, but a celery worker is not available"))
result = False # ping returns True or None, assigning False here so we return only a boolean value
else:
result = True
return result
9 changes: 8 additions & 1 deletion arches/app/views/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import re
import sys
import uuid
import arches.app.tasks as tasks
import arches.app.utils.task_management as task_management
from io import StringIO
from django.shortcuts import render
from django.views.generic import View
Expand Down Expand Up @@ -39,6 +41,7 @@
from pyld.jsonld import compact, frame, from_rdf
from rdflib import RDF
from rdflib.namespace import SKOS, DCTERMS
from arches.celery import app

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -123,7 +126,11 @@ def get(self, request, surveyid=None):
if can_sync:
try:
logger.info("Starting sync for user {0}".format(request.user.username))
management.call_command('mobile', operation='sync_survey', id=surveyid, user=request.user.id)
celery_worker_running = task_management.check_if_celery_available()
if celery_worker_running is True:
tasks.sync.delay(surveyid=surveyid, userid=request.user.id)
else:
management.call_command('mobile', operation='sync_survey', id=surveyid, user=request.user.id)
logger.info("Sync complete for user {0}".format(request.user.username))
except Exception:
logger.exception(_('Sync Failed'))
Expand Down
22 changes: 22 additions & 0 deletions arches/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from __future__ import absolute_import, unicode_literals
import os
import celery
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings')

app = Celery('arches')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
21 changes: 21 additions & 0 deletions arches/install/arches-templates/project_name/celery.py-tpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', '{{ project_name }}.settings')

app = Celery('{{ project_name }}')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
10 changes: 8 additions & 2 deletions arches/install/arches-templates/project_name/settings.py-tpl
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ INSTALLED_APPS = (
'revproxy',
'corsheaders',
'oauth2_provider',
'django_celery_results',
'{{ project_name }}',
)

Expand Down Expand Up @@ -147,12 +148,17 @@ APP_TITLE = 'Arches | Heritage Data Management'
COPYRIGHT_TEXT = 'All Rights Reserved.'
COPYRIGHT_YEAR = '2019'

CELERY_BROKER_URL = 'amqp://guest:guest@localhost'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_BACKEND = 'django-db' # Use 'django-cache' if you want to use your cache as your backend
CELERY_TASK_SERIALIZER = 'json'

try:
from package_settings import *
from .package_settings import *
except ImportError:
pass

try:
from settings_local import *
from .settings_local import *
except ImportError:
pass
Loading

0 comments on commit bb6e9a4

Please sign in to comment.