diff --git a/README.rst b/README.rst index 2be74d5fa..d851b6489 100644 --- a/README.rst +++ b/README.rst @@ -70,6 +70,16 @@ below shows the available options and their default values: - ``ckan.harvest.mq.virtual_host`` (/) +**Note**: it is safe to use the same backend server (either Redis or RabbitMQ) +for different CKAN instances, as long as they have different site ids. The ``ckan.site_id`` +config option (or ``default``) will be used to namespace the relevant things: + +* On RabbitMQ it will be used to name the queues used, eg ``ckan.harvest.site1.gather`` and + ``ckan.harvest.site1.fetch``. + +* On Redis, it will namespace the keys used, so only the relevant instance gets them, eg + ``site1:harvest_job_id``, ``site1:harvest_object__id:804f114a-8f68-4e7c-b124-3eb00f66202f`` + Configuration ============= diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index dfe1518fc..2b57be4fe 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -77,6 +77,16 @@ def get_fetch_queue_name(): 'default')) +def get_gather_routing_key(): + return '{0}:harvest_job_id'.format(config.get('ckan.site_id', + 'default')) + + +def get_fetch_routing_key(): + return '{0}:harvest_object_id'.format(config.get('ckan.site_id', + 'default')) + + def purge_queues(): backend = config.get('ckan.harvest.mq.type', MQ_TYPE) @@ -103,23 +113,25 @@ def resubmit_jobs(): redis = get_connection() # fetch queue - harvest_object_pending = redis.keys('harvest_object_id:*') + harvest_object_pending = redis.keys(get_fetch_routing_key() + ':*') for key in harvest_object_pending: date_of_key = datetime.datetime.strptime(redis.get(key), "%Y-%m-%d %H:%M:%S.%f") - if (datetime.datetime.now() - date_of_key).seconds > 180: # 3 minutes for fetch and import max - redis.rpush('harvest_object_id', + # 3 minutes for fetch and import max + if (datetime.datetime.now() - date_of_key).seconds > 180: + redis.rpush(get_fetch_routing_key(), json.dumps({'harvest_object_id': key.split(':')[-1]}) ) redis.delete(key) # gather queue - harvest_jobs_pending = redis.keys('harvest_job_id:*') + harvest_jobs_pending = redis.keys(get_gather_routing_key() + ':*') for key in harvest_jobs_pending: date_of_key = datetime.datetime.strptime(redis.get(key), "%Y-%m-%d %H:%M:%S.%f") - if (datetime.datetime.now() - date_of_key).seconds > 7200: # 3 hours for a gather - redis.rpush('harvest_job_id', + # 3 hours for a gather + if (datetime.datetime.now() - date_of_key).seconds > 7200: + redis.rpush(get_gather_routing_key(), json.dumps({'harvest_job_id': key.split(':')[-1]}) ) redis.delete(key) @@ -148,7 +160,7 @@ def __init__(self, redis, routing_key): def send(self, body, **kw): value = json.dumps(body) # remove if already there - if self.routing_key == 'harvest_job_id': + if self.routing_key == get_gather_routing_key(): self.redis.lrem(self.routing_key, 0, value) self.redis.rpush(self.routing_key, value) @@ -174,27 +186,42 @@ class FakeMethod(object): def __init__(self, message): self.delivery_tag = message + class RedisConsumer(object): def __init__(self, redis, routing_key): self.redis = redis + # Routing keys are constructed with {site-id}:{message-key}, eg: + # default:harvest_job_id or default:harvest_object_id self.routing_key = routing_key + # Message keys are harvest_job_id for the gather consumer and + # harvest_object_id for the fetch consumer + self.message_key = routing_key.split(':')[-1] + def consume(self, queue): while True: key, body = self.redis.blpop(self.routing_key) self.redis.set(self.persistance_key(body), str(datetime.datetime.now())) yield (FakeMethod(body), self, body) + def persistance_key(self, message): + # Persistance keys are constructed with + # {site-id}:{message-key}:{object-id}, eg: + # default:harvest_job_id:804f114a-8f68-4e7c-b124-3eb00f66202e message = json.loads(message) - return self.routing_key + ':' + message[self.routing_key] + return self.routing_key + ':' + message[self.message_key] + def basic_ack(self, message): self.redis.delete(self.persistance_key(message)) + def queue_purge(self, queue): self.redis.flushdb() + def basic_get(self, queue): body = self.redis.lpop(self.routing_key) return (FakeMethod(body), self, body) + def get_consumer(queue_name, routing_key): connection = get_connection() @@ -400,22 +427,26 @@ def fetch_and_import_stages(harvester, obj): obj.report_status = 'added' obj.save() + def get_gather_consumer(): - consumer = get_consumer(get_gather_queue_name(), 'harvest_job_id') + gather_routing_key = get_gather_routing_key() + consumer = get_consumer(get_gather_queue_name(), gather_routing_key) log.debug('Gather queue consumer registered') return consumer + def get_fetch_consumer(): - consumer = get_consumer(get_fetch_queue_name(), 'harvest_object_id') + fetch_routing_key = get_fetch_routing_key() + consumer = get_consumer(get_fetch_queue_name(), fetch_routing_key) log.debug('Fetch queue consumer registered') return consumer -def get_gather_publisher(): - return get_publisher('harvest_job_id') -def get_fetch_publisher(): - return get_publisher('harvest_object_id') +def get_gather_publisher(): + gather_routing_key = get_gather_routing_key() + return get_publisher(gather_routing_key) -# Get a publisher for the fetch queue -#fetch_publisher = get_fetch_publisher() +def get_fetch_publisher(): + fetch_routing_key = get_fetch_routing_key() + return get_publisher(fetch_routing_key)