Skip to content

Commit

Permalink
reworking task status
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Hanke committed Nov 2, 2023
1 parent d88d0ea commit af072a7
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 49 deletions.
135 changes: 104 additions & 31 deletions ckanext/csvwmapandtransform/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from ckan.lib.jobs import DEFAULT_QUEUE_NAME
from typing import Any
import re
import json

from ckanext.csvwmapandtransform import plugin
from ckanext.csvwmapandtransform import mapper
Expand All @@ -14,6 +15,9 @@
import ckan.plugins.toolkit as toolkit
import ckanapi
import itertools
import datetime
from dateutil.parser import parse as parse_date
from dateutil.parser import isoparse as parse_iso_date

_get_or_bust = logic.get_or_bust

Expand Down Expand Up @@ -87,8 +91,8 @@ def csvwmapandtransform_transform(
u'id': res_id
})
log.debug('transform_started for: {}'.format(resource))
enqueue_find_mapping(resource['id'], resource['name'], resource['url'], resource['package_id'], operation='changed')
return {}
res=enqueue_find_mapping(resource['id'], resource['name'], resource['url'], resource['package_id'], operation='changed')
return res


def csvwmapandtransform_map(context, data_dict):
Expand All @@ -110,11 +114,16 @@ def csvwmapandtransform_transform_status(
data_dict['resource_id'] = data_dict['id']
res_id = _get_or_bust(data_dict, 'resource_id')

task = p.toolkit.get_action('task_status_show')(context, {
'entity_id': res_id,
'task_type': 'csvwmapandtransform',
'key': 'csvwmapandtransform'
})
# task = p.toolkit.get_action('task_status_show')(context, {
# 'entity_id': res_id,
# 'task_type': 'csvwmapandtransform',
# 'key': 'csvwmapandtransform'
# })
joblist = toolkit.get_action("job_list")({"ignore_auth": True}, {})
joblist = get_jobs()
log.debug('jobs queried')

log.debug(joblist)

# datapusher_url = config.get('ckan.datapusher.url')
# if not datapusher_url:
Expand Down Expand Up @@ -149,13 +158,15 @@ def csvwmapandtransform_transform_status(
# job_detail = {'error': 'cannot connect to datapusher'}

return {
'status': task['state'],
# 'job_id': job_id,
# 'job_url': url,
'last_updated': task['last_updated'],
# 'job_key': job_key,
# 'task_info': job_detail,
'error': json.loads(task['error'])
'joblist': joblist,
'status': 'dont have one',
# 'status': task['state'],
# # 'job_id': job_id,
# # 'job_url': url,
# 'last_updated': task['last_updated'],
# # 'job_key': job_key,
# # 'task_info': job_detail,
# 'error': json.loads(task['error'])
}

def get_actions():
Expand All @@ -174,32 +185,94 @@ def create_group(name):
group = local_ckan.action.group_create(name=name)
return group

def get_jobs():
local_ckan = ckanapi.LocalCKAN()
jobs = local_ckan.action.job_list()
return jobs

def enqueue_find_mapping(res_id, res_name, res_url, dataset_id, operation):
# skip task if the dataset is already queued
queue = DEFAULT_QUEUE_NAME
jobs = toolkit.get_action("job_list")({"ignore_auth": True}, {"queues": [queue]})
log.debug("jobs")
log.debug("test-jobs")
log.debug(jobs)

if jobs:
for job in jobs:
if not job["title"]:
continue
match = re.match(r'csvwmapandtransform \w+ "[^"]*" ([\w-]+)', job["title"])
log.debug("match")
log.debug(match)

if match:
queued_resource_id = match.groups()[0]
if res_id == queued_resource_id:
log.info("Already queued resource: {} {}".format(res_name, res_id))
return
# Check if this resource is already in the process of being xloadered
task = {
'entity_id': res_id,
'entity_type': 'resource',
'task_type': 'csvwmapandtransform',
'last_updated': str(datetime.datetime.utcnow()),
'state': 'submitting',
'key': 'csvwmapandtransform',
'value': '{}',
'error': '{}',
}
try:
existing_task = p.toolkit.get_action('task_status_show')({}, {
'entity_id': res_id,
'task_type': 'csvwmapandtransform',
'key': 'csvwmapandtransform'
})
assume_task_stale_after = datetime.timedelta(seconds=3600)
assume_task_stillborn_after = \
datetime.timedelta(seconds=int(5))
if existing_task.get('state') == 'pending':
# queued_res_ids = [
# re.search(r"'resource_id': u?'([^']+)'",
# job.description).groups()[0]
# for job in get_queue().get_jobs()
# if 'xloader_to_datastore' in str(job) # filter out test_job etc
# ]
updated = parse_iso_date(existing_task['last_updated'])
time_since_last_updated = datetime.datetime.utcnow() - updated
# if (res_id not in queued_res_ids
# and time_since_last_updated > assume_task_stillborn_after):
# # it's not on the queue (and if it had just been started then
# # its taken too long to update the task_status from pending -
# # the first thing it should do in the xloader job).
# # Let it be restarted.
# log.info('A pending task was found %r, but its not found in '
# 'the queue %r and is %s hours old',
# existing_task['id'], queued_res_ids,
# time_since_last_updated)
# elif time_since_last_updated > assume_task_stale_after:
if time_since_last_updated > assume_task_stale_after:
# it's been a while since the job was last updated - it's more
# likely something went wrong with it and the state wasn't
# updated than its still in progress. Let it be restarted.
log.info('A pending task was found %r, but it is only %s hours'
' old', existing_task['id'], time_since_last_updated)
else:
log.info('A pending task was found %s for this resource, so '
'skipping this duplicate task', existing_task['id'])
return False

task['id'] = existing_task['id']
except p.toolkit.ObjectNotFound:
pass

p.toolkit.get_action('task_status_update')(
# {'session': model.meta.create_local_session(), 'ignore_auth': True},
{'ignore_auth': True},
task
)

# add this dataset to the queue
log.debug("Queuing job find_mapping: {} {}".format(operation, res_name))
toolkit.enqueue_job(
job=toolkit.enqueue_job(
find_mapping,
[res_url, res_id, dataset_id],
title='csvwmapandtransform {} "{}" {}'.format(operation, res_name, res_url),
queue=queue,
)
log.debug("Enqueued job {} to {} resource {}".format(job.id, operation, res_name))

value = json.dumps({'job_id': job.id})
task['value'] = value
task['state'] = 'pending'
task['last_updated'] = str(datetime.datetime.utcnow())
p.toolkit.get_action('task_status_update')(
# {'session': model.meta.create_local_session(), 'ignore_auth': True},
{'ignore_auth': True},
task
)
return True
23 changes: 17 additions & 6 deletions ckanext/csvwmapandtransform/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@

log = __import__("logging").getLogger(__name__)

DEFAULT_FORMATS = [
"json-ld",
"turtle",
"n3",
"nt",
"hext",
"trig",
"longturtle",
"xml"
]


class CsvwMapAndTransformPlugin(plugins.SingletonPlugin, DefaultTranslation):
plugins.implements(plugins.ITranslation)
Expand Down Expand Up @@ -39,19 +50,19 @@ def notify(self, entity, operation):
"""
if operation == "deleted":
return

log.debug(
"notify: {} {} '{}'".format(operation, type(entity).__name__, entity.name)
"notify: {} {} '{}'".format(operation, type(entity).__name__, entity)
)
if isinstance(entity, model.Resource):
log.debug("new uploaded resource")
dataset = entity.related_packages()[0]

if entity.format in ("json-ld","turtle") and "-joined" not in entity.url:
if entity.format in DEFAULT_FORMATS and "-joined" not in entity.url:
log.debug("plugin notify event for resource: {}".format(entity.id))
action.enqueue_find_mapping(
entity.id, entity.name, entity.url, dataset.id, operation
)
toolkit.get_action('csvwmapandtransform_transform')({},{u'id': entity.id})
# action.enqueue_find_mapping(
# entity.id, entity.name, entity.url, dataset.id, operation
# )
else:
return

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,6 @@ <h1 >{% block form_title %}{{ _('Transform Status') }}{% endblock %}</h1>
<td>{{ h.csvwmapandtransform__status_description(transform_status) }}</td>
</tr>
<tr>
<th>{{ _('Last updated') }}</th>
{% if status.status %}
<td><span class="date" title="{{ h.render_datetime(status.last_updated, with_hours=True) }}">{{ h.time_ago_from_timestamp(status.last_updated) }}</span></td>
{% else %}
<td>{{ _('Never') }}</td>
{% endif %}
</tr>
</table>
{% endif %}
Expand Down
17 changes: 11 additions & 6 deletions ckanext/csvwmapandtransform/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,19 @@ def get(self, id: str, resource_id: str):

except (logic.NotFound, logic.NotAuthorized):
base.abort(404, _(u'Resource not found'))
try:
transform_status=toolkit.get_action(u'csvwmapandtransform_transform_status')(
{}, {
transform_status=toolkit.get_action(u'csvwmapandtransform_transform_status')(
{}, {
u'resource_id': resource_id
}
)
except:
transform_status=None
)
# try:
# transform_status=toolkit.get_action(u'csvwmapandtransform_transform_status')(
# {}, {
# u'resource_id': resource_id
# }
# )
# except:
# transform_status=None


return base.render(
Expand Down

0 comments on commit af072a7

Please sign in to comment.