From af072a797829f1e0e6723707160618d419d10f4a Mon Sep 17 00:00:00 2001 From: Thomas Hanke Date: Thu, 2 Nov 2023 17:48:45 +0100 Subject: [PATCH] reworking task status --- ckanext/csvwmapandtransform/action.py | 135 ++++++++++++++---- ckanext/csvwmapandtransform/plugin.py | 23 ++- .../csvwmapandtransform/transform.html | 6 - ckanext/csvwmapandtransform/views.py | 17 ++- 4 files changed, 132 insertions(+), 49 deletions(-) diff --git a/ckanext/csvwmapandtransform/action.py b/ckanext/csvwmapandtransform/action.py index eb8e8eb..6ab432b 100644 --- a/ckanext/csvwmapandtransform/action.py +++ b/ckanext/csvwmapandtransform/action.py @@ -4,6 +4,7 @@ from ckan.lib.jobs import DEFAULT_QUEUE_NAME from typing import Any import re +import json from ckanext.csvwmapandtransform import plugin from ckanext.csvwmapandtransform import mapper @@ -14,6 +15,9 @@ import ckan.plugins.toolkit as toolkit import ckanapi import itertools +import datetime +from dateutil.parser import parse as parse_date +from dateutil.parser import isoparse as parse_iso_date _get_or_bust = logic.get_or_bust @@ -87,8 +91,8 @@ def csvwmapandtransform_transform( u'id': res_id }) log.debug('transform_started for: {}'.format(resource)) - enqueue_find_mapping(resource['id'], resource['name'], resource['url'], resource['package_id'], operation='changed') - return {} + res=enqueue_find_mapping(resource['id'], resource['name'], resource['url'], resource['package_id'], operation='changed') + return res def csvwmapandtransform_map(context, data_dict): @@ -110,11 +114,16 @@ def csvwmapandtransform_transform_status( data_dict['resource_id'] = data_dict['id'] res_id = _get_or_bust(data_dict, 'resource_id') - task = p.toolkit.get_action('task_status_show')(context, { - 'entity_id': res_id, - 'task_type': 'csvwmapandtransform', - 'key': 'csvwmapandtransform' - }) + # task = p.toolkit.get_action('task_status_show')(context, { + # 'entity_id': res_id, + # 'task_type': 'csvwmapandtransform', + # 'key': 'csvwmapandtransform' + # }) + joblist = toolkit.get_action("job_list")({"ignore_auth": True}, {}) + joblist = get_jobs() + log.debug('jobs queried') + + log.debug(joblist) # datapusher_url = config.get('ckan.datapusher.url') # if not datapusher_url: @@ -149,13 +158,15 @@ def csvwmapandtransform_transform_status( # job_detail = {'error': 'cannot connect to datapusher'} return { - 'status': task['state'], - # 'job_id': job_id, - # 'job_url': url, - 'last_updated': task['last_updated'], - # 'job_key': job_key, - # 'task_info': job_detail, - 'error': json.loads(task['error']) + 'joblist': joblist, + 'status': 'dont have one', + # 'status': task['state'], + # # 'job_id': job_id, + # # 'job_url': url, + # 'last_updated': task['last_updated'], + # # 'job_key': job_key, + # # 'task_info': job_detail, + # 'error': json.loads(task['error']) } def get_actions(): @@ -174,32 +185,94 @@ def create_group(name): group = local_ckan.action.group_create(name=name) return group +def get_jobs(): + local_ckan = ckanapi.LocalCKAN() + jobs = local_ckan.action.job_list() + return jobs + def enqueue_find_mapping(res_id, res_name, res_url, dataset_id, operation): # skip task if the dataset is already queued queue = DEFAULT_QUEUE_NAME jobs = toolkit.get_action("job_list")({"ignore_auth": True}, {"queues": [queue]}) - log.debug("jobs") + log.debug("test-jobs") log.debug(jobs) - - if jobs: - for job in jobs: - if not job["title"]: - continue - match = re.match(r'csvwmapandtransform \w+ "[^"]*" ([\w-]+)', job["title"]) - log.debug("match") - log.debug(match) - - if match: - queued_resource_id = match.groups()[0] - if res_id == queued_resource_id: - log.info("Already queued resource: {} {}".format(res_name, res_id)) - return + # Check if this resource is already in the process of being xloadered + task = { + 'entity_id': res_id, + 'entity_type': 'resource', + 'task_type': 'csvwmapandtransform', + 'last_updated': str(datetime.datetime.utcnow()), + 'state': 'submitting', + 'key': 'csvwmapandtransform', + 'value': '{}', + 'error': '{}', + } + try: + existing_task = p.toolkit.get_action('task_status_show')({}, { + 'entity_id': res_id, + 'task_type': 'csvwmapandtransform', + 'key': 'csvwmapandtransform' + }) + assume_task_stale_after = datetime.timedelta(seconds=3600) + assume_task_stillborn_after = \ + datetime.timedelta(seconds=int(5)) + if existing_task.get('state') == 'pending': + # queued_res_ids = [ + # re.search(r"'resource_id': u?'([^']+)'", + # job.description).groups()[0] + # for job in get_queue().get_jobs() + # if 'xloader_to_datastore' in str(job) # filter out test_job etc + # ] + updated = parse_iso_date(existing_task['last_updated']) + time_since_last_updated = datetime.datetime.utcnow() - updated + # if (res_id not in queued_res_ids + # and time_since_last_updated > assume_task_stillborn_after): + # # it's not on the queue (and if it had just been started then + # # its taken too long to update the task_status from pending - + # # the first thing it should do in the xloader job). + # # Let it be restarted. + # log.info('A pending task was found %r, but its not found in ' + # 'the queue %r and is %s hours old', + # existing_task['id'], queued_res_ids, + # time_since_last_updated) + # elif time_since_last_updated > assume_task_stale_after: + if time_since_last_updated > assume_task_stale_after: + # it's been a while since the job was last updated - it's more + # likely something went wrong with it and the state wasn't + # updated than its still in progress. Let it be restarted. + log.info('A pending task was found %r, but it is only %s hours' + ' old', existing_task['id'], time_since_last_updated) + else: + log.info('A pending task was found %s for this resource, so ' + 'skipping this duplicate task', existing_task['id']) + return False + + task['id'] = existing_task['id'] + except p.toolkit.ObjectNotFound: + pass + + p.toolkit.get_action('task_status_update')( + # {'session': model.meta.create_local_session(), 'ignore_auth': True}, + {'ignore_auth': True}, + task + ) # add this dataset to the queue - log.debug("Queuing job find_mapping: {} {}".format(operation, res_name)) - toolkit.enqueue_job( + job=toolkit.enqueue_job( find_mapping, [res_url, res_id, dataset_id], title='csvwmapandtransform {} "{}" {}'.format(operation, res_name, res_url), queue=queue, ) + log.debug("Enqueued job {} to {} resource {}".format(job.id, operation, res_name)) + + value = json.dumps({'job_id': job.id}) + task['value'] = value + task['state'] = 'pending' + task['last_updated'] = str(datetime.datetime.utcnow()) + p.toolkit.get_action('task_status_update')( + # {'session': model.meta.create_local_session(), 'ignore_auth': True}, + {'ignore_auth': True}, + task + ) + return True \ No newline at end of file diff --git a/ckanext/csvwmapandtransform/plugin.py b/ckanext/csvwmapandtransform/plugin.py index cbb24de..d90e4ab 100644 --- a/ckanext/csvwmapandtransform/plugin.py +++ b/ckanext/csvwmapandtransform/plugin.py @@ -9,6 +9,17 @@ log = __import__("logging").getLogger(__name__) +DEFAULT_FORMATS = [ + "json-ld", + "turtle", + "n3", + "nt", + "hext", + "trig", + "longturtle", + "xml" +] + class CsvwMapAndTransformPlugin(plugins.SingletonPlugin, DefaultTranslation): plugins.implements(plugins.ITranslation) @@ -39,19 +50,19 @@ def notify(self, entity, operation): """ if operation == "deleted": return - log.debug( - "notify: {} {} '{}'".format(operation, type(entity).__name__, entity.name) + "notify: {} {} '{}'".format(operation, type(entity).__name__, entity) ) if isinstance(entity, model.Resource): log.debug("new uploaded resource") dataset = entity.related_packages()[0] - if entity.format in ("json-ld","turtle") and "-joined" not in entity.url: + if entity.format in DEFAULT_FORMATS and "-joined" not in entity.url: log.debug("plugin notify event for resource: {}".format(entity.id)) - action.enqueue_find_mapping( - entity.id, entity.name, entity.url, dataset.id, operation - ) + toolkit.get_action('csvwmapandtransform_transform')({},{u'id': entity.id}) + # action.enqueue_find_mapping( + # entity.id, entity.name, entity.url, dataset.id, operation + # ) else: return diff --git a/ckanext/csvwmapandtransform/templates/csvwmapandtransform/transform.html b/ckanext/csvwmapandtransform/templates/csvwmapandtransform/transform.html index 536490e..a75990f 100644 --- a/ckanext/csvwmapandtransform/templates/csvwmapandtransform/transform.html +++ b/ckanext/csvwmapandtransform/templates/csvwmapandtransform/transform.html @@ -47,12 +47,6 @@

{% block form_title %}{{ _('Transform Status') }}{% endblock %}

{{ h.csvwmapandtransform__status_description(transform_status) }} - {{ _('Last updated') }} - {% if status.status %} - {{ h.time_ago_from_timestamp(status.last_updated) }} - {% else %} - {{ _('Never') }} - {% endif %} {% endif %} diff --git a/ckanext/csvwmapandtransform/views.py b/ckanext/csvwmapandtransform/views.py index dd734f1..af7c878 100644 --- a/ckanext/csvwmapandtransform/views.py +++ b/ckanext/csvwmapandtransform/views.py @@ -43,14 +43,19 @@ def get(self, id: str, resource_id: str): except (logic.NotFound, logic.NotAuthorized): base.abort(404, _(u'Resource not found')) - try: - transform_status=toolkit.get_action(u'csvwmapandtransform_transform_status')( - {}, { + transform_status=toolkit.get_action(u'csvwmapandtransform_transform_status')( + {}, { u'resource_id': resource_id } - ) - except: - transform_status=None + ) + # try: + # transform_status=toolkit.get_action(u'csvwmapandtransform_transform_status')( + # {}, { + # u'resource_id': resource_id + # } + # ) + # except: + # transform_status=None return base.render(