-
Notifications
You must be signed in to change notification settings - Fork 36
/
Copy pathcommands.py
632 lines (556 loc) · 22.3 KB
/
commands.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from ckan.lib.cli import CkanCommand
from ckan.logic.validators import isodate, boolean_validator
from ckan.lib.navl.dictization_functions import Invalid
import paste.script
from paste.script.util.logging_config import fileConfig
from ckanapi.cli.workers import worker_pool
from ckanapi.cli.utils import completion_stats
import ckan.lib.uploader as uploader
import csv
import io
import re
import os
import json
import time
import sys
import urllib2
from datetime import datetime, timedelta
from contextlib import contextmanager
from ckanext.canada import search_integration
from ckanext.canada.metadata_xform import metadata_xform
from ckanext.canada.triggers import update_triggers
from ckanapi import (
RemoteCKAN,
LocalCKAN,
NotFound,
ValidationError,
NotAuthorized,
CKANAPIError
)
PAST_RE = (
r'^'
# Days
r'(?:(\d+)d)?'
# Hours
r'(?:(\d+)h)?'
# Minutes
r'(?:(\d+)m)?'
r'$'
)
DATASET_TYPES = 'info', 'dataset', 'prop'
class CanadaCommand(CkanCommand):
"""
CKAN Canada Extension
Usage::
paster canada portal-update <portal.ini>
[<last activity date> | [<k>d][<k>h][<k>m]]
[-p <num>] [-m]
[-l <log file>] [-t <num> [-d <seconds>]]
copy-datasets [-m]
changed-datasets [<since date>] [-s <remote server>] [-b]
metadata-xform [--portal]
load-suggested [--use-created-date] <suggested-datasets.csv>
rebuild-external-search [-r | -f]
update-triggers
update-inventory-votes <votes.json>
<last activity date> for reading activites, default: 7 days ago
<k> number of hours/minutes/seconds in the past for reading activities
Options::
-a/--push-apikey <apikey> push to <remote server> using apikey
-b/--brief don't output requested dates
-c/--config <ckan config> use named ckan config file
(available to all commands)
-d/--delay <seconds> delay between retries, default: 60
-l/--log <log filename> write log of actions to log filename
-m/--mirror copy all datasets, default is to treat
unreleased datasets as deleted
-p/--processes <num> sets the number of worker processes,
default: 1
--portal don't filter record types
-s/--server <remote server> retrieve from <remote server>
-t/--tries <num> try <num> times, set > 1 to retry on
failures, default: 1
-u/--ckan-user <username> sets the owner of packages created,
default: ckan system user
--use-created-date use date_created field for date forwarded to data
owner and other statuses instead of today's date
-r/--rebuild-unindexed-only When rebuilding the advanced search Solr core
only index datasets not already present in the
second Solr core
-f/--freshen When rebuilding the advanced search Solr core
re-index all datasets, but do not purge the Solr core
"""
summary = __doc__.split('\n')[0]
usage = __doc__
parser = paste.script.command.Command.standard_parser(verbose=True)
parser.add_option(
'-c',
'--config',
dest='config',
default='development.ini',
help='Config file to use.'
)
parser.add_option(
'-p',
'--processes',
dest='processes',
default=1,
type='int'
)
parser.add_option(
'-u',
'--ckan-user',
dest='ckan_user',
default=None
)
parser.add_option('-l', '--log', dest='log', default=None)
parser.add_option('-m', '--mirror', dest='mirror', action='store_true')
parser.add_option(
'-a',
'--push-apikey',
dest='push_apikey',
default=None
)
parser.add_option('-s', '--server', dest='server', default=None)
parser.add_option('-b', '--brief', dest='brief', action='store_true')
parser.add_option('-t', '--tries', dest='tries', default=1, type='int')
parser.add_option('-d', '--delay', dest='delay', default=60, type='float')
parser.add_option('--portal', dest='portal', action='store_true')
parser.add_option('-r', '--rebuild-unindexed-only', dest='unindexed_only', action='store_true')
parser.add_option('-f', '--freshen', dest='refresh_index', action='store_true')
parser.add_option('--use-created-date', dest='use_created_date', action='store_true')
def command(self):
'''
Parse command line arguments and call appropriate method.
'''
if not self.args or self.args[0] in ['--help', '-h', 'help']:
print self.__doc__
return
cmd = self.args[0]
self._load_config()
if cmd == 'portal-update':
self.portal_update(self.args[1], *self.args[2:])
elif cmd == 'copy-datasets':
with _quiet_int_pipe():
self.copy_datasets(self.args[2:])
elif cmd == 'changed-datasets':
self.changed_datasets(*self.args[1:])
elif cmd == 'metadata-xform':
metadata_xform(self.options.portal)
elif cmd == 'update-triggers':
update_triggers()
elif cmd == 'update-inventory-votes':
update_inventory_votes(*self.args[1:])
elif cmd == 'rebuild-external-search':
self.rebuild_external_search()
elif cmd == 'load-suggested':
self.load_suggested(self.options.use_created_date, *self.args[1:])
else:
print self.__doc__
def _app_config(self):
"""
This is the first part of CkanCommand._load_config()
"""
from paste.deploy import appconfig
if not self.options.config:
msg = 'No config file supplied'
raise self.BadCommand(msg)
self.filename = os.path.abspath(self.options.config)
if not os.path.exists(self.filename):
raise AssertionError(
'Config filename %r does not exist.' % self.filename
)
fileConfig(self.filename)
appconfig('config:' + self.filename)
def portal_update(self, portal_ini, activity_date=None):
"""
collect batches of packages modified at local CKAN since activity_date
and apply the package updates to the portal instance for all
packages with published_date set to any time in the past.
"""
tries = self.options.tries
self._portal_update_completed = False
self._portal_update_activity_date = activity_date
while tries > 0:
tries -= 1
self._portal_update(portal_ini, self._portal_update_activity_date)
if self._portal_update_completed or not tries:
return
time.sleep(self.options.delay)
def _portal_update(self, portal_ini, activity_date):
if activity_date:
past = re.match(PAST_RE, activity_date)
if past:
days, hours, minutes = (
int(x) if x else 0 for x in past.groups()
)
activity_date = datetime.now() - timedelta(
days=days,
seconds=(hours * 60 + minutes) * 60
)
else:
activity_date = isodate(activity_date, None)
else:
activity_date = datetime.now() - timedelta(days=7)
log = None
if self.options.log:
log = open(self.options.log, 'a')
registry = LocalCKAN()
def changed_package_id_runs(start_date):
while True:
packages, next_date = self._changed_packages_since(
registry, start_date)
if next_date is None:
return
yield packages, next_date
start_date = next_date
cmd = [
sys.argv[0],
'canada',
'copy-datasets',
'-c',
portal_ini
]
if self.options.mirror:
cmd.append('-m')
pool = worker_pool(
cmd,
self.options.processes,
[],
stop_when_jobs_done=False,
stop_on_keyboard_interrupt=False,
)
# Advance generator so we may call send() below
pool.next()
def append_log(finished, package_id, action, reason):
if not log:
return
log.write(json.dumps([
datetime.now().isoformat(),
finished,
package_id,
action,
reason,
]) + '\n')
log.flush()
with _quiet_int_pipe():
append_log(
None,
None,
"started updating from:",
activity_date.isoformat()
)
for packages, next_date in (
changed_package_id_runs(activity_date)):
job_ids, finished, result = pool.send(enumerate(packages))
stats = completion_stats(self.options.processes)
while result is not None:
package_id, action, reason = json.loads(result)
print job_ids, stats.next(), finished, package_id, \
action, reason
append_log(finished, package_id, action, reason)
job_ids, finished, result = pool.next()
print " --- next batch starting at: " + next_date.isoformat()
append_log(
None,
None,
"next batch starting at:",
next_date.isoformat()
)
self._portal_update_activity_date = next_date.isoformat()
self._portal_update_completed = True
def _changed_packages_since(self, registry, since_time):
"""
Query source ckan instance for packages changed since_time.
returns (packages, next since_time to query) or (None, None)
when no more changes are found.
registry - LocalCKAN or RemoteCKAN instance
since_time - local datetime to start looking for changes
If all the package ids found were included in seen_id_set this
function will return an empty list of package ids. Note that
this is different than when no more changes found and (None, None)
is returned.
"""
data = registry.action.changed_packages_activity_list_since(
since_time=since_time.isoformat())
if not data:
return None, None
packages = []
for result in data:
package_id = result['data']['package']['id']
try:
packages.append(json.dumps(registry.action.package_show(id=package_id)))
except NotFound:
pass
if data:
since_time = isodate(data[-1]['timestamp'], None)
return packages, since_time
def copy_datasets(self, remote, package_ids=None):
"""
a process that accepts packages on stdin which are compared
to the local version of the same package. The local package is
then created, updated, deleted or left unchanged. This process
outputs that action as a string 'created', 'updated', 'deleted'
or 'unchanged'
"""
portal = LocalCKAN()
now = datetime.now()
packages = iter(sys.stdin.readline, '')
for package in packages:
source_pkg = json.loads(package)
package_id = source_pkg['id']
reason = None
target_deleted = False
if source_pkg and source_pkg['state'] == 'deleted':
source_pkg = None
if source_pkg and source_pkg['type'] not in DATASET_TYPES:
# non-default dataset types ignored
source_pkg = None
_trim_package(source_pkg)
action = None
if source_pkg and not self.options.mirror:
if source_pkg.get('ready_to_publish') == 'false':
source_pkg = None
reason = 'marked not ready to publish'
elif not source_pkg.get('portal_release_date'):
source_pkg = None
reason = 'release date not set'
elif isodate(source_pkg['portal_release_date'], None) > now:
source_pkg = None
reason = 'release date in future'
else:
# portal packages published public
source_pkg['private'] = False
if action != 'skip':
try:
target_pkg = portal.call_action('package_show', {
'id': package_id
})
except (NotFound, NotAuthorized):
target_pkg = None
except (CKANAPIError, urllib2.URLError), e:
sys.stdout.write(
json.dumps([
package_id,
'target error',
unicode(e.args)
]) + '\n'
)
raise
if target_pkg and target_pkg['state'] == 'deleted':
target_pkg = None
target_deleted = True
_trim_package(target_pkg)
if action == 'skip':
pass
elif target_pkg is None and source_pkg is None:
action = 'unchanged'
reason = reason or 'deleted on registry'
elif target_deleted:
action = 'updated'
reason = 'undeleting on target'
portal.action.package_update(**source_pkg)
elif target_pkg is None:
action = 'created'
portal.action.package_create(**source_pkg)
elif source_pkg is None:
action = 'deleted'
portal.action.package_delete(id=package_id)
elif source_pkg == target_pkg:
action = 'unchanged'
reason = 'no difference found'
else:
action = 'updated'
portal.action.package_update(**source_pkg)
sys.stdout.write(json.dumps([package_id, action, reason]) + '\n')
sys.stdout.flush()
def changed_datasets(self, since_date):
"""
Produce a list of dataset ids and requested dates. Each package
id will appear at most once, showing the activity date closest
to since_date. Requested dates are preceeded with a "#"
"""
since_date = isodate(since_date, None)
seen_ids = set()
if self.options.server:
registry = RemoteCKAN(self.options.server)
else:
registry = LocalCKAN()
while True:
ids, since_date = self._changed_package_ids_since(
registry, since_date, seen_ids)
if not ids:
return
for i in ids:
print i
if not self.options.brief:
print "# {0}".format(since_date.isoformat())
def rebuild_external_search(self):
search_integration.rebuild_search_index(LocalCKAN(), self.options.unindexed_only, self.options.refresh_index)
def load_suggested(self, use_created_date, filename):
"""
a process that loads suggested datasets from Drupal into CKAN
"""
registry = LocalCKAN()
# load packages as dict
results = True
counter = 0
batch_size = 100
existing_suggestions = {}
while results:
packages = registry.action.package_search(q='type:prop', start=counter, rows=batch_size, include_private=True)['results']
if packages:
for package in packages:
existing_suggestions[package['id']] = package
counter += len(packages)
else:
results = False
# load data from csv
csv_file = io.open(filename, "r", encoding='utf-8-sig')
csv_reader = csv.DictReader((l.encode('utf-8') for l in csv_file))
today = datetime.now().strftime('%Y-%m-%d')
for row in csv_reader:
uuid = row['uuid']
if uuid in existing_suggestions:
continue
if use_created_date:
today = row['date_created']
# add record
record = {
"type": "prop",
"state": "active",
"id": uuid,
"title_translated": {
"en": row['title_en'],
"fr": row['title_fr']
},
"owner_org": row['organization'],
"notes_translated": {
"en": row['description_en'],
"fr": row['description_fr'],
},
"comments": {
"en": row['additional_comments_and_feedback_en'],
"fr": row['additional_comments_and_feedback_fr']
},
"reason": row['reason'],
"subject": row['subject'].split(',') if row['subject'] else ['information_and_communications'],
"keywords": {
"en": row['keywords_en'].split(',') if row['keywords_en'] else ['dataset'],
"fr": row['keywords_fr'].split(',') if row['keywords_fr'] else ['Jeu de données'],
},
"date_submitted": row['date_created'],
"date_forwarded": today,
"status": [] if row['dataset_suggestion_status'] == 'department_contacted' else [
{
"reason": row['dataset_suggestion_status'],
"date": row['dataset_released_date'] if row['dataset_released_date'] else today,
"comments": {
"en": row['dataset_suggestion_status_link'] or u'Status imported from previous ‘suggest a dataset’ system',
"fr": row['dataset_suggestion_status_link'] or u'État importé du système précédent « Proposez un jeu de données »',
}
}
]
}
try:
registry.action.package_create(**record)
print uuid + ' suggested dataset created'
except ValidationError as e:
if 'id' in e.error_dict:
try:
registry.action.package_update(**record)
print uuid + ' suggested dataset update deleted'
except ValidationError as e:
print uuid + ' (update deleted) ' + str(e)
else:
print uuid + ' ' + str(e)
csv_file.close()
def _trim_package(pkg):
"""
remove keys from pkg that we don't care about when comparing
or updating/creating packages. Also try to convert types and
create missing fields that will be present in package_show.
"""
# XXX full of custom hacks and deep knowledge of our schema :-(
if not pkg:
return
for k in ['extras', 'metadata_modified', 'metadata_created',
'revision_id', 'revision_timestamp', 'organization',
'version', 'tracking_summary',
'tags', # just because we don't use them
'num_tags', 'num_resources', 'maintainer',
'isopen', 'relationships_as_object', 'license_title',
'license_title_fra', 'license_url_fra', 'license_url',
'author',
'groups', # just because we don't use them
'relationships_as_subject', 'department_number',
# FIXME: remove these when we can:
'resource_type',
# new in 2.3:
'creator_user_id',
]:
if k in pkg:
del pkg[k]
for r in pkg['resources']:
for k in ['package_id', 'revision_id',
'revision_timestamp', 'cache_last_updated',
'webstore_last_updated', 'state', 'hash',
'description', 'tracking_summary', 'mimetype_inner',
'mimetype', 'cache_url', 'created', 'webstore_url',
'last_modified', 'position']:
if k in r:
del r[k]
if r.get('url_type') == 'upload' and r['url']:
r['url'] = r['url'].rsplit('/', 1)[-1]
for k in ['name', 'size']:
if k not in r:
r[k] = None
if 'name' not in pkg:
pkg['name'] = pkg['id']
if 'type' not in pkg:
pkg['type'] = 'dataset'
if 'state' not in pkg:
pkg['state'] = 'active'
for k in ['url']:
if k not in pkg:
pkg[k] = ''
@contextmanager
def _quiet_int_pipe():
"""
let pipe errors and KeyboardIterrupt exceptions cause silent exit
"""
try:
yield
except KeyboardInterrupt:
pass
except IOError, e:
if e.errno != 32:
raise
def update_inventory_votes(json_name):
with open(json_name) as j:
votes = json.load(j)
registry = LocalCKAN()
for org in votes:
print org, len(votes[org]),
rs = registry.action.recombinant_show(
dataset_type='inventory',
owner_org=org)
resource_id = rs['resources'][0]['id']
result = registry.action.datastore_search(
resource_id=resource_id,
limit=len(votes[org]),
filters={'ref_number': list(votes[org])})
update = []
for r in result['records']:
expected = votes[org][r['ref_number']]
if r['user_votes'] != expected:
r['user_votes'] = expected
del r['_id']
update.append(r)
print len(update)
if update:
registry.action.datastore_upsert(
resource_id=resource_id,
records=update)