Skip to content

Commit

Permalink
Namespace Redis keys to avoid conflicts between instances
Browse files Browse the repository at this point in the history
The `ckan.site_id` config option (or `default` if missing) is used to
namespace the Redis keys: routing key and persistance key. Consumers
will only get the relevant keys for their instance.
  • Loading branch information
amercader committed Nov 20, 2015
1 parent 920df68 commit f1ba2bc
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 16 deletions.
10 changes: 10 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ below shows the available options and their default values:
- ``ckan.harvest.mq.virtual_host`` (/)


**Note**: it is safe to use the same backend server (either Redis or RabbitMQ)
for different CKAN instances, as long as they have different site ids. The ``ckan.site_id``
config option (or ``default``) will be used to namespace the relevant things:

* On RabbitMQ it will be used to name the queues used, eg ``ckan.harvest.site1.gather`` and
``ckan.harvest.site1.fetch``.

* On Redis, it will namespace the keys used, so only the relevant instance gets them, eg
``site1:harvest_job_id``, ``site1:harvest_object__id:804f114a-8f68-4e7c-b124-3eb00f66202f``


Configuration
=============
Expand Down
63 changes: 47 additions & 16 deletions ckanext/harvest/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ def get_fetch_queue_name():
'default'))


def get_gather_routing_key():
return '{0}:harvest_job_id'.format(config.get('ckan.site_id',
'default'))


def get_fetch_routing_key():
return '{0}:harvest_object_id'.format(config.get('ckan.site_id',
'default'))


def purge_queues():

backend = config.get('ckan.harvest.mq.type', MQ_TYPE)
Expand All @@ -103,23 +113,25 @@ def resubmit_jobs():
redis = get_connection()

# fetch queue
harvest_object_pending = redis.keys('harvest_object_id:*')
harvest_object_pending = redis.keys(get_fetch_routing_key() + ':*')
for key in harvest_object_pending:
date_of_key = datetime.datetime.strptime(redis.get(key),
"%Y-%m-%d %H:%M:%S.%f")
if (datetime.datetime.now() - date_of_key).seconds > 180: # 3 minutes for fetch and import max
redis.rpush('harvest_object_id',
# 3 minutes for fetch and import max
if (datetime.datetime.now() - date_of_key).seconds > 180:
redis.rpush(get_fetch_routing_key(),
json.dumps({'harvest_object_id': key.split(':')[-1]})
)
redis.delete(key)

# gather queue
harvest_jobs_pending = redis.keys('harvest_job_id:*')
harvest_jobs_pending = redis.keys(get_gather_routing_key() + ':*')
for key in harvest_jobs_pending:
date_of_key = datetime.datetime.strptime(redis.get(key),
"%Y-%m-%d %H:%M:%S.%f")
if (datetime.datetime.now() - date_of_key).seconds > 7200: # 3 hours for a gather
redis.rpush('harvest_job_id',
# 3 hours for a gather
if (datetime.datetime.now() - date_of_key).seconds > 7200:
redis.rpush(get_gather_routing_key(),
json.dumps({'harvest_job_id': key.split(':')[-1]})
)
redis.delete(key)
Expand Down Expand Up @@ -148,7 +160,7 @@ def __init__(self, redis, routing_key):
def send(self, body, **kw):
value = json.dumps(body)
# remove if already there
if self.routing_key == 'harvest_job_id':
if self.routing_key == get_gather_routing_key():
self.redis.lrem(self.routing_key, 0, value)
self.redis.rpush(self.routing_key, value)

Expand All @@ -174,27 +186,42 @@ class FakeMethod(object):
def __init__(self, message):
self.delivery_tag = message


class RedisConsumer(object):
def __init__(self, redis, routing_key):
self.redis = redis
# Routing keys are constructed with {site-id}:{message-key}, eg:
# default:harvest_job_id or default:harvest_object_id
self.routing_key = routing_key
# Message keys are harvest_job_id for the gather consumer and
# harvest_object_id for the fetch consumer
self.message_key = routing_key.split(':')[-1]

def consume(self, queue):
while True:
key, body = self.redis.blpop(self.routing_key)
self.redis.set(self.persistance_key(body),
str(datetime.datetime.now()))
yield (FakeMethod(body), self, body)

def persistance_key(self, message):
# Persistance keys are constructed with
# {site-id}:{message-key}:{object-id}, eg:
# default:harvest_job_id:804f114a-8f68-4e7c-b124-3eb00f66202e
message = json.loads(message)
return self.routing_key + ':' + message[self.routing_key]
return self.routing_key + ':' + message[self.message_key]

def basic_ack(self, message):
self.redis.delete(self.persistance_key(message))

def queue_purge(self, queue):
self.redis.flushdb()

def basic_get(self, queue):
body = self.redis.lpop(self.routing_key)
return (FakeMethod(body), self, body)


def get_consumer(queue_name, routing_key):

connection = get_connection()
Expand Down Expand Up @@ -400,22 +427,26 @@ def fetch_and_import_stages(harvester, obj):
obj.report_status = 'added'
obj.save()


def get_gather_consumer():
consumer = get_consumer(get_gather_queue_name(), 'harvest_job_id')
gather_routing_key = get_gather_routing_key()
consumer = get_consumer(get_gather_queue_name(), gather_routing_key)
log.debug('Gather queue consumer registered')
return consumer


def get_fetch_consumer():
consumer = get_consumer(get_fetch_queue_name(), 'harvest_object_id')
fetch_routing_key = get_fetch_routing_key()
consumer = get_consumer(get_fetch_queue_name(), fetch_routing_key)
log.debug('Fetch queue consumer registered')
return consumer

def get_gather_publisher():
return get_publisher('harvest_job_id')

def get_fetch_publisher():
return get_publisher('harvest_object_id')
def get_gather_publisher():
gather_routing_key = get_gather_routing_key()
return get_publisher(gather_routing_key)

# Get a publisher for the fetch queue
#fetch_publisher = get_fetch_publisher()

def get_fetch_publisher():
fetch_routing_key = get_fetch_routing_key()
return get_publisher(fetch_routing_key)

0 comments on commit f1ba2bc

Please sign in to comment.