-
Notifications
You must be signed in to change notification settings - Fork 44
/
Copy pathpackage.py
2848 lines (2580 loc) · 131 KB
/
package.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import absolute_import
# stdlib, alphabetical
from collections import namedtuple
import distutils.dir_util
import json
import logging
from lxml import etree
import os
import re
import shutil
import subprocess
import tempfile
from uuid import uuid4
# Core Django, alphabetical
from django.conf import settings
from django.db import models
from django.utils.translation import ugettext_lazy as _
from django.utils import timezone
# Third party dependencies, alphabetical
import bagit
import jsonfield
from django_extensions.db.fields import UUIDField
import metsrw
from metsrw.plugins import premisrw
import requests
# This project, alphabetical
from common import utils
from locations import signals
# This module, alphabetical
from . import StorageException
from .location import Location
from .space import Space, PosixMoveUnsupportedError
from .event import File
from .fixity_log import FixityLog
__all__ = ('Package', )
LOGGER = logging.getLogger(__name__)
class Package(models.Model):
""" A package stored in a specific location. """
uuid = UUIDField(editable=False, unique=True, version=4,
help_text=_("Unique identifier"))
description = models.CharField(
max_length=256, default=None, null=True, blank=True,
help_text=_("Human-readable description."))
origin_pipeline = models.ForeignKey('Pipeline', to_field='uuid', null=True,
blank=True)
current_location = models.ForeignKey(Location, to_field='uuid')
current_path = models.TextField()
pointer_file_location = models.ForeignKey(
Location, to_field='uuid', related_name='+', null=True, blank=True)
pointer_file_path = models.TextField(null=True, blank=True)
size = models.IntegerField(default=0,
help_text=_('Size in bytes of the package'))
encryption_key_fingerprint = models.CharField(
max_length=512, blank=True, null=True, default=None,
help_text=_('The fingerprint of the GPG key used to encrypt the'
' package, if applicable'))
replicated_package = models.ForeignKey('Package', to_field='uuid',
null=True, blank=True,
related_name='replicas')
AIP = "AIP"
AIC = "AIC"
SIP = "SIP"
DIP = "DIP"
TRANSFER = "transfer"
FILE = 'file'
DEPOSIT = 'deposit'
PACKAGE_TYPE_CHOICES = (
(AIP, 'AIP'),
(AIC, 'AIC'),
(SIP, 'SIP'),
(DIP, 'DIP'),
(TRANSFER, _('Transfer')),
(FILE, _('Single File')),
(DEPOSIT, _('FEDORA Deposit'))
)
package_type = models.CharField(max_length=8, choices=PACKAGE_TYPE_CHOICES)
related_packages = models.ManyToManyField('self', related_name='related')
DEFAULT_CHECKSUM_ALGORITHM = 'sha256'
PENDING = 'PENDING'
STAGING = 'STAGING'
UPLOADED = 'UPLOADED'
VERIFIED = 'VERIFIED'
DEL_REQ = 'DEL_REQ'
DELETED = 'DELETED'
RECOVER_REQ = 'RECOVER_REQ'
MOVING = 'MOVING'
FAIL = 'FAIL'
FINALIZED = 'FINALIZE'
STATUS_CHOICES = (
(PENDING, _("Upload Pending")), # Still on Archivematica
(STAGING, _("Staged on Storage Service")), # In Storage Service staging dir
(UPLOADED, _("Uploaded")), # In final storage location
(VERIFIED, _("Verified")), # Verified to be in final storage location
(FAIL, _("Failed")), # Error occured - may or may not be at final location
(DEL_REQ, _("Delete requested")),
(DELETED, _("Deleted")),
(FINALIZED, _("Deposit Finalized")),
)
status = models.CharField(
max_length=8, choices=STATUS_CHOICES, default=FAIL,
help_text=_("Status of the package in the storage service."))
# NOTE Do not put anything important here because you cannot easily query
# JSONFields! Add a new column if you need to query it
misc_attributes = jsonfield.JSONField(
blank=True, null=True, default={},
help_text=_('For storing flexible, often Space-specific, attributes'))
# Temporary attributes to track path on locally accessible filesystem
local_path = None
local_path_location = None
PACKAGE_TYPE_CAN_DELETE = (AIP, AIC, TRANSFER)
PACKAGE_TYPE_CAN_DELETE_DIRECTLY = (DIP,)
PACKAGE_TYPE_CAN_EXTRACT = (AIP, AIC)
PACKAGE_TYPE_CAN_RECOVER = (AIP, AIC)
PACKAGE_TYPE_CAN_REINGEST = (AIP, AIC)
# Reingest type options described in lower case. The body of the request
# will have this value converted to lower case to check it against these
# values to make the API responses more robust to human error.
METADATA_ONLY = 'metadata' # Re-ingest metadata only
OBJECTS = 'objects' # Re-ingest metadata and objects for DIP generation
FULL = 'full' # Full re-ingest
REINGEST_CHOICES = (METADATA_ONLY, OBJECTS, FULL)
class Meta:
verbose_name = _("Package")
app_label = 'locations'
def __unicode__(self):
return u"{uuid}: {path}".format(
uuid=self.uuid,
path=self.full_path,
)
# return "File: {}".format(self.uuid)
# Attributes
@property
def full_path(self):
""" Return the full path of the package's current location.
Includes the space, location, and package paths joined. """
return os.path.normpath(
os.path.join(self.current_location.full_path, self.current_path))
@property
def full_pointer_file_path(self):
""" Return the full path of the AIP's pointer file, None if not an AIP.
Includes the space, location and package paths joined."""
if not self.pointer_file_location:
return None
return os.path.join(self.pointer_file_location.full_path,
self.pointer_file_path)
def is_encrypted(self, local_path):
"""Determines whether or not the package at ``local_path`` is
encrypted. Note that we can't compare the type of the child space to
GPG because that would cause a circular import.
"""
space_is_encr = getattr(self.current_location.space.get_child_space(),
'encrypted_space',
False)
is_file = os.path.isfile(local_path)
return space_is_encr and is_file
@property
def is_compressed(self):
""" Determines whether or not the package is a compressed file. """
full_path = self.fetch_local_path()
if os.path.isdir(full_path):
return False
elif os.path.isfile(full_path):
return True
else:
if not os.path.exists(full_path):
message = _('Package %(uuid)s (located at %(path)s) does not exist') % {'uuid': self.uuid, 'path': full_path}
else:
message = _('%(path)s is neither a file nor a directory') % {'path': full_path}
raise StorageException(message)
@property
def latest_fixity_check_datetime(self):
latest_check = self._latest_fixity_check()
return latest_check.datetime_reported if latest_check is not None else None
@property
def latest_fixity_check_result(self):
latest_check = self._latest_fixity_check()
return latest_check.success if latest_check is not None else None
def _latest_fixity_check(self):
try:
return FixityLog.objects.filter(package=self).order_by('-datetime_reported')[0] # limit 1
except IndexError:
return None
def get_download_path(self, lockss_au_number=None):
full_path = self.fetch_local_path()
if lockss_au_number is None:
if not self.is_compressed:
raise StorageException(_("Cannot return a download path for an uncompressed package"))
path = full_path
elif self.current_location.space.access_protocol == Space.LOM:
# Only LOCKSS breaks files into AUs
# TODO Get path from pointer file
# self.pointer_root.find("mets:structMap/*/mets:div[@ORDER='{}']".format(lockss_au_number), namespaces=NSMAP)
path = os.path.splitext(full_path)[0] + '.tar-' + str(lockss_au_number)
else: # LOCKSS AU number specified, but not a LOCKSS package
LOGGER.warning('Trying to download LOCKSS chunk for a non-LOCKSS package.')
path = full_path
return path
def get_local_path(self):
"""Return a locally accessible path to this Package if available.
If a cached copy of the local path is available (possibly from
fetch_local_path), return that. If the package is available locally,
return self.full_path. Otherwise, local_path is None.
:returns: Local path to this package or None
"""
# Return cached copy
if self.local_path is not None and os.path.exists(self.local_path):
return self.local_path
# Package is locally accessible
if os.path.exists(self.full_path):
# TODO use Space protocol to determine if this is possible?
self.local_path = self.full_path
return self.local_path
return None
def fetch_local_path(self):
"""Fetches a local copy of the package.
Return local path if package is already available locally. Otherwise,
copy to SS Internal Location, and return that path.
:returns: Local path to this package.
"""
local_path = self.get_local_path()
if local_path and not self.is_encrypted(local_path):
return local_path
# Not locally accessible, so copy to SS internal temp dir
ss_internal = Location.active.get(
purpose=Location.STORAGE_SERVICE_INTERNAL)
temp_dir = tempfile.mkdtemp(dir=ss_internal.full_path)
int_path = os.path.join(temp_dir, self.current_path)
# If encrypted, this will decrypt.
self.current_location.space.move_to_storage_service(
source_path=os.path.join(
self.current_location.relative_path, self.current_path),
destination_path=self.current_path,
destination_space=ss_internal.space,
)
relative_path = int_path.replace(
ss_internal.space.path, '', 1).lstrip('/')
ss_internal.space.move_from_storage_service(
source_path=self.current_path,
destination_path=relative_path,
package=self,
)
self.local_path_location = ss_internal
self.local_path = int_path
return self.local_path
def get_base_directory(self):
"""
Returns the base directory of a package. This is the directory in
which all of the contents of the package are nested. For example,
given the following bag:
.
|-- package-00000000-0000-0000-0000-000000000000
|-- bagit.txt
|-- manifest-sha512.txt
...
The string "package-00000000-0000-0000-0000-000000000000" would be
returned.
Note that this currently only supports locally-available packages.
If the package is stored externally, raises NotImplementedError.
"""
full_path = self.get_local_path()
if full_path is None:
raise NotImplementedError(_("This method currently only retrieves base directories for locally-available AIPs."))
if self.is_compressed:
# Use lsar's JSON output to determine the directories in a
# compressed file. Since the index of the base directory may
# not be consistent, determine it by filtering all entries
# for directories, then determine the directory with the
# shortest name. (e.g. foo is the parent of foo/bar)
# NOTE: lsar's JSON output is broken in certain circumstances in
# all released versions; make sure to use a patched version
# for this to work.
command = ['lsar', '-ja', full_path]
output = subprocess.check_output(command)
output = json.loads(output)
directories = [d['XADFileName'] for d in output['lsarContents'] if
d.get('XADIsDirectory', False)]
directories = sorted(directories, key=len)
return directories[0]
return os.path.basename(full_path)
def _check_quotas(self, dest_space, dest_location):
"""
Verify that there is enough storage space on dest_space and dest_location for this package. All sizes in bytes.
"""
# Check if enough space on the space and location
# All sizes expected to be in bytes
if dest_space.size is not None and dest_space.used + self.size > dest_space.size:
raise StorageException(_('Not enough space for AIP on storage device %(space)s; Used: %(used)s; Size: %(size)s; AIP size: %(aip_size)s') % {'space': dest_space, 'used': dest_space.used, 'size': dest_space.size, 'aip_size': self.size})
if (dest_location.quota is not None and
dest_location.used + self.size > dest_location.quota):
raise StorageException(_('AIP too big for quota on %(location)s; Used: %(used)s; Quota: %(quota)s; AIP size: %(aip_size)s') % {'location': dest_location, 'used': dest_location.used, 'quota': dest_location.quota, 'aip_size': self.size})
def _update_quotas(self, space, location):
"""
Add this package's size to the space and location.
"""
space.used += self.size
space.save()
location.used += self.size
location.save()
def move(self, to_location):
"""Move the package to location."""
if self.current_location == to_location:
return
origin_space = self.current_location.space
destination_space = to_location.space
source_path = os.path.join(
self.current_location.relative_path,
self.current_path)
destination_path = os.path.join(
to_location.relative_path,
self.current_path)
try:
origin_space.posix_move(
source_path=source_path,
destination_path=destination_path,
destination_space=destination_space,
package=None)
except PosixMoveUnsupportedError:
origin_space.move_to_storage_service(
source_path=source_path,
destination_path=destination_path,
destination_space=destination_space,)
origin_space.post_move_to_storage_service()
destination_space.move_from_storage_service(
source_path=destination_path,
destination_path=destination_path,
package=None,)
destination_space.post_move_from_storage_service(
destination_path, destination_path)
# If we get here everything went well, update with new location
self.current_location = to_location
self.save()
self.current_location.space.update_package_status(self)
self._update_existing_ptr_loc_info()
def recover_aip(self, origin_location, origin_path):
""" Recovers an AIP using files at a given location.
Creates a temporary package associated with recovery AIP files within
a space. Does fixity check on recovery AIP package. Makes backup of
AIP files being replaced by recovery files. Replaces AIP files with
recovery files.
"""
# Create temporary AIP package
temp_aip = Package()
temp_aip.package_type = 'AIP'
temp_aip.origin_pipeline = self.origin_pipeline
temp_aip.current_location = origin_location
temp_aip.current_path = origin_path
temp_aip.save()
# Check integrity of temporary AIP package
(success, failures, message, __) = temp_aip.check_fixity(force_local=True)
# If the recovered AIP doesn't pass check, delete and return error info
if not success:
temp_aip.delete()
return (success, failures, message)
origin_space = temp_aip.current_location.space
destination_space = self.current_location.space
# Copy corrupt files to storage service staging
source_path = os.path.join(
self.current_location.relative_path,
self.current_path)
destination_path = os.path.join(
origin_location.relative_path,
'backup')
origin_space.move_to_storage_service(
source_path=source_path,
destination_path=destination_path,
destination_space=destination_space)
origin_space.post_move_to_storage_service()
# Copy corrupt files from staging to backup directory
destination_space.move_from_storage_service(
source_path=destination_path,
destination_path=destination_path,
package=self)
destination_space.post_move_from_storage_service(
staging_path=None,
destination_path=None)
# Copy recovery files to storage service staging
source_path = os.path.join(
temp_aip.current_location.relative_path, origin_path)
destination_path = os.path.join(
self.current_location.relative_path,
os.path.dirname(self.current_path))
origin_space.move_to_storage_service(
source_path=source_path,
destination_path=destination_path,
destination_space=destination_space)
origin_space.post_move_to_storage_service()
# Copy recovery files from staging to AIP store
destination_space.move_from_storage_service(
source_path=destination_path,
destination_path=destination_path,
package=self)
destination_space.post_move_from_storage_service(
staging_path=None,
destination_path=None)
temp_aip.delete()
# Do fixity check of AIP with recovered files
success, failures, message, __ = self.check_fixity(force_local=True)
return success, failures, message
def replicate(self, replicator_location_uuid):
"""Replicate this package in the database and on disk by
1. creating a new ``Package`` model instance that references this one in
its ``replicated_package`` attribute,
2. copying the AIP on disk to a new path in the replicator location
referenced by ``replicator_location_uuid``,
3. creating a new pointer file for the replica, which encodes the
replication event, and
4. updating the pointer file for the replicated AIP, which encodes the
replication event.
"""
self_uuid = self.uuid
replicator_location = Location.objects.get(
uuid=replicator_location_uuid)
# Replicandum is the package to be replicated, i.e., ``self``
replicandum_location = self.current_location
replicandum_path = self.current_path
replicandum_uuid = self.uuid
LOGGER.info('Replicating package %s to replicator location %s',
replicandum_uuid, replicator_location_uuid)
replica_package = _replicate_package_mdl_inst(self)
# It is necessary to re-retrieve ``self`` here because otherwise the
# package model instance replication will cause ``self`` to reference
# the replica.
self = Package.objects.get(uuid=self_uuid)
# Remove the /uuid/path from the replica's current_path and replace the
# old UUID in the basename with the new UUID.
replica_package.current_path = os.path.basename(
replicandum_path.rstrip('/')).replace(
replicandum_uuid, replica_package.uuid, 1)
replica_package.current_location = replicator_location
# Check if enough space on the space and location
src_space = replicandum_location.space
dest_space = replica_package.current_location.space
self._check_quotas(dest_space, replica_package.current_location)
# Replicate AIP at
# destination_location/uuid/split/into/chunks/destination_path
uuid_path = utils.uuid_to_path(replica_package.uuid)
replica_package.current_path = os.path.join(
uuid_path, replica_package.current_path)
replica_destination_path = os.path.join(
replica_package.current_location.relative_path,
replica_package.current_path)
replica_package.status = Package.PENDING
replica_package.save()
# Copy replicandum AIP from its source location to the SS
src_space.move_to_storage_service(
source_path=os.path.join(replicandum_location.relative_path,
replicandum_path),
destination_path=replica_package.current_path,
destination_space=dest_space)
replica_package.status = Package.STAGING
replica_package.save()
src_space.post_move_to_storage_service()
# Get the master AIP's pointer file and extract the checksum details
master_ptr = self.get_pointer_instance()
if master_ptr:
master_ptr_aip_fsentry = master_ptr.get_file(file_uuid=self.uuid)
master_premis_object = master_ptr_aip_fsentry.get_premis_objects()[0]
master_checksum_algorithm = master_premis_object.message_digest_algorithm
master_checksum = master_premis_object.message_digest
# Calculate the checksum of the replica while we have it locally,
# compare it to the master's checksum and create a PREMIS validation
# event out of the result.
replica_local_path = self.get_local_path()
replica_checksum = utils.generate_checksum(
replica_local_path, master_checksum_algorithm).hexdigest()
checksum_report = _get_checksum_report(
master_checksum, self.uuid, replica_checksum, replica_package.uuid,
master_checksum_algorithm)
replication_validation_event = (
replica_package.get_replication_validation_event(
checksum_report=checksum_report,
master_aip_uuid=self.uuid))
# Create and write to disk the pointer file for the replica, which
# contains the PREMIS replication event.
replication_event_uuid = str(uuid4())
replica_pointer_file = self.create_replica_pointer_file(
replica_package, replication_event_uuid,
replication_validation_event, master_ptr=master_ptr)
write_pointer_file(replica_pointer_file,
replica_package.full_pointer_file_path)
replica_package.save()
# Copy replicandum AIP from the SS to replica package's replicator
# location.
replica_storage_effects = dest_space.move_from_storage_service(
source_path=replica_package.current_path,
destination_path=replica_destination_path,
package=replica_package)
if dest_space.access_protocol not in (Space.LOM, Space.ARKIVUM):
replica_package.status = Package.UPLOADED
replica_package.save()
dest_space.post_move_from_storage_service(
staging_path=replica_package.current_path,
destination_path=replica_destination_path,
package=replica_package)
self._update_quotas(dest_space, replica_package.current_location)
# Any effects resulting from AIP storage (e.g., encryption) are
# recorded in the replica's pointer file.
if replica_storage_effects:
# Note: unclear why the existing ``replica_pointer_file`` is
# a ``lxml.etree._Element`` instance and not the expected
# ``premisrw.PREMISObject``. As a result, the following is required:
replica_pointer_file = replica_package.get_pointer_instance()
if replica_pointer_file:
revised_replica_pointer_file = (
replica_package.create_new_pointer_file_given_storage_effects(
replica_pointer_file, replica_storage_effects))
write_pointer_file(revised_replica_pointer_file,
replica_package.full_pointer_file_path)
# Update the pointer file of the replicated AIP (master) so that it
# contains a record of its replication.
if master_ptr:
new_master_pointer_file = self.create_new_pointer_file_with_replication(
master_ptr, replica_package, replication_event_uuid)
write_pointer_file(new_master_pointer_file, self.full_pointer_file_path)
LOGGER.info('Finished replicating package %s as replica package %s',
replicandum_uuid, replica_package.uuid)
def should_have_pointer_file(self, package_full_path=None,
package_type=None):
"""Returns ``True`` if the package is both an AIP/AIC and is a file.
Note: because storage in certain locations (e.g., GPG encrypted
locations) can result in packaging and hence transformation of an AIP
directory to an AIP file, this predicate may return ``True`` after
``move_from_storage_service`` is called but ``False`` before.
"""
if not package_full_path:
package_full_path = os.path.join(
self.current_location.space.path,
self.current_location.relative_path,
self.current_path)
if not package_type:
package_type = self.package_type
# The package hasn't been moved yet, test for it being a file on the
# originating Space.
try:
isfile = self.origin_location.space.isfile(package_full_path)
except NotImplementedError:
isfile = os.path.isfile(package_full_path)
isaip = package_type in (Package.AIP, Package.AIC)
ret = isfile and isaip
if not ret:
if not isfile:
LOGGER.info('Package should not have a pointer file because %s'
' is not a file', package_full_path)
if not isaip:
LOGGER.info('Package should not have a pointer file because it'
' is not an AIP or an AIC; it is a(n) %s', package_type)
return ret
# ==========================================================================
# Store AIP methods
# ==========================================================================
def store_aip(self, origin_location, origin_path, related_package_uuid=None,
premis_events=None, premis_agents=None, aip_subtype=None):
"""Stores an AIP in the correct Location.
Invokes different transfer mechanisms depending on what the source and
destination Spaces are. High-level steps (see auxiliary methods for
details):
1. Get AIP to the "pending" stage: check space quotas (raising
``StorageException if insufficient) and get needed vars into ``v``.
2. Get AIP to the "uploaded" stage: move the AIP to its AIP Storage
location and update space quotas after move.
3. Ensure the AIP has a pointer file, if applicable.
4. Create replicas of the AIP, if applicable.
The AIP is initially located in location ``origin_location`` at
relative path ``origin_path``. Once stored, the AIP should be in
location ``self.current_location`` at relative path
``<UUID_AS_PATH>/self.current_path``. In the course of this method,
values on the ``Package`` instance are updated (including status) and
periodically saved to the db.
"""
LOGGER.info('store_aip called in Package class of SS')
v = self._store_aip_to_pending(origin_location, origin_path)
storage_effects, checksum = self._store_aip_to_uploaded(v, related_package_uuid)
self._store_aip_ensure_pointer_file(
v, checksum, premis_events=premis_events,
premis_agents=premis_agents, aip_subtype=aip_subtype)
if storage_effects:
pointer_file = self.get_pointer_instance()
if pointer_file:
revised_pointer_file = (
self.create_new_pointer_file_given_storage_effects(
pointer_file, storage_effects))
write_pointer_file(revised_pointer_file,
self.full_pointer_file_path)
self.create_replicas()
def _store_aip_to_pending(self, origin_location, origin_path):
"""Get this AIP to the "pending" stage of ``store_aip`` by
1. settting and persisting attributes on ``self`` (including
``status=Package.PENDING``),
2. checking that the destination space has enough space for the AIP to
be stored (and raising an exception if not), and
3. returning a simple object with attributes needed in the rest of
``store_aip``.
"""
V = namedtuple('V', ['src_space', 'dest_space', 'should_have_pointer',
'pointer_file_src', 'pointer_file_dst',
'already_generated_ptr_exists'])
self.origin_location = origin_location
self.origin_path = origin_path
origin_full_path = os.path.join(
self.origin_location.space.path,
self.origin_location.relative_path,
self.origin_path)
# Check if enough space on the space and location
# All sizes expected to be in bytes
src_space = self.origin_location.space
dest_space = self.current_location.space
self._check_quotas(dest_space, self.current_location)
# Store AIP at
# destination_location/uuid/split/into/chunks/destination_path
uuid_path = utils.uuid_to_path(self.uuid)
self.current_path = os.path.join(uuid_path, self.current_path)
self.status = Package.PENDING
self.save()
# If applicable, we will store the AIP pointer file at
# internal_usage_location/uuid/split/into/chunks/pointer.uuid.xml
should_have_pointer = self.should_have_pointer_file(
package_full_path=origin_full_path)
pointer_file_src = pointer_file_dst = already_generated_ptr_exists = \
None
if should_have_pointer:
self.pointer_file_location = Location.active.get(
purpose=Location.STORAGE_SERVICE_INTERNAL)
self.pointer_file_path = os.path.join(
uuid_path, 'pointer.{}.xml'.format(self.uuid))
pointer_file_src = os.path.join(
self.origin_location.relative_path,
os.path.dirname(self.origin_path),
'pointer.xml')
pointer_file_dst = os.path.join(
self.pointer_file_location.relative_path,
self.pointer_file_path)
already_generated_ptr_full_path = os.path.join(
self.origin_location.space.path,
pointer_file_src)
already_generated_ptr_exists = os.path.isfile(
already_generated_ptr_full_path)
return V(
src_space=src_space,
dest_space=dest_space,
should_have_pointer=should_have_pointer,
pointer_file_src=pointer_file_src,
pointer_file_dst=pointer_file_dst,
already_generated_ptr_exists=already_generated_ptr_exists)
def _store_aip_to_uploaded(self, v, related_package_uuid):
"""Get this AIP to the "uploaded" stage of ``store_aip``
:param namedtuple v: object with attributes needed for processing.
:param str related_package_uuid: UUID of a related package.
:returns tuple 2-tuple of (storage_effects, checksum):
"""
try:
# Both spaces are POSIX filesystems and support `posix_move`
# 1. move direct to the SS destination space/location,
# 2. calculate the checksum,
# 3. set the status to "uploaded",
# 4. set a related package (if applicable),
# 5. update quotas on the destination space, and
# 6. persist the package to the database.
source_path = os.path.join(self.origin_location.relative_path,
self.origin_path)
destination_path = os.path.join(
self.current_location.relative_path,
self.current_path)
storage_effects = v.src_space.posix_move(
source_path=source_path,
destination_path=destination_path,
destination_space=v.dest_space,
package=self
)
checksum = None
if v.should_have_pointer and (not v.already_generated_ptr_exists):
# If posix_move didn't raise, then get_local_path() should
# return not None
checksum = utils.generate_checksum(
self.get_local_path(),
Package.DEFAULT_CHECKSUM_ALGORITHM).hexdigest()
if related_package_uuid is not None:
related_package = Package.objects.get(uuid=related_package_uuid)
self.related_packages.add(related_package)
self.status = Package.UPLOADED
self.save()
self._update_quotas(v.dest_space, self.current_location)
return storage_effects, checksum
except PosixMoveUnsupportedError:
# 1. move AIP to the SS internal location,
# 2. get its checksum,
# 3. set its status to "staging",
# 4. call ``post_move_to_storage_service`` on the source space,
# 5. move it to the destination space/location,
# 6. set the status to "uploaded" (if applicable),
# 7. set a related package (if applicable),
# 8. call ``post_move_from_storage_service`` on the destination space,
# 9. update quotas on the destination space, and
# 10. persist the package to the database.
v.src_space.move_to_storage_service(
source_path=os.path.join(self.origin_location.relative_path,
self.origin_path),
destination_path=self.current_path, # This should include Location.path
destination_space=v.dest_space)
# We have to manually construct the AIP's current path here;
# ``self.get_local_path()`` won't work.
local_aip_path = os.path.join(v.dest_space.staging_path, self.current_path)
checksum = None
if v.should_have_pointer and (not v.already_generated_ptr_exists):
checksum = utils.generate_checksum(
local_aip_path,
Package.DEFAULT_CHECKSUM_ALGORITHM).hexdigest()
self.status = Package.STAGING
self.save()
v.src_space.post_move_to_storage_service()
storage_effects = v.dest_space.move_from_storage_service(
source_path=self.current_path, # This should include Location.path
destination_path=os.path.join(
self.current_location.relative_path,
self.current_path),
package=self,
)
# Update package status once transferred to SS
if v.dest_space.access_protocol not in (Space.LOM, Space.ARKIVUM):
self.status = Package.UPLOADED
if related_package_uuid is not None:
related_package = Package.objects.get(uuid=related_package_uuid)
self.related_packages.add(related_package)
self.save()
v.dest_space.post_move_from_storage_service(
staging_path=self.current_path,
destination_path=os.path.join(
self.current_location.relative_path, self.current_path),
package=self)
self._update_quotas(v.dest_space, self.current_location)
return storage_effects, checksum
def _store_aip_ensure_pointer_file(self, v, checksum, premis_events=None,
premis_agents=None, aip_subtype=None):
"""Ensure that this newly stored AIP has a pointer file by moving an
AM-created pointer file to the appropriate SS location if such a
pointer file exists or by creating a pointer file (if necessary)
otherwise. Set pointer file-related attributes on the model instance
and save to the database. Optional args are only used if a pointer file
must be created; see ``create_pointer_file`` for details.
:param namedtuple v: object with attributes needed for processing.
:returns NoneType None:
"""
if v.should_have_pointer:
if v.already_generated_ptr_exists:
# Move an already-generated pointer file if exists.
v.src_space.move_to_storage_service(
v.pointer_file_src,
self.pointer_file_path,
self.pointer_file_location.space)
self.pointer_file_location.space.move_from_storage_service(
self.pointer_file_path,
v.pointer_file_dst,
package=None)
self._update_existing_ptr_loc_info() # Update its location info
else: # Otherwise, create a pointer file here.
pointer_file_dst = os.path.join(
self.pointer_file_location.space.path, v.pointer_file_dst)
self._create_pointer_file_write_to_disk(
pointer_file_dst, checksum, premis_events,
premis_agents=premis_agents, aip_subtype=aip_subtype)
else: # This package should not have a pointer file
self.pointer_file_location = None
self.pointer_file_path = None
self.save()
def _create_pointer_file_write_to_disk(self, pointer_file_dst, checksum,
premis_events, premis_agents=None,
aip_subtype=None):
"""Create a pointer file and write it to disk for the ``store_aip``
method.
:param str pointer_file_dst: Full path to where the pointer file should
be written.
:param list premis_events:
:param list premis_agents:
:param str aip_subtype:
:returns NoneType None:
See ``create_pointer_file`` for details.
"""
checksum_algorithm = Package.DEFAULT_CHECKSUM_ALGORITHM
premis_events = [
premisrw.PREMISEvent(data=event) for event in premis_events]
__, compression_program_version, archive_tool = (
_get_compression_details_from_premis_events(
premis_events, self.uuid))
premis_object = self._create_aip_premis_object(
checksum_algorithm, checksum, archive_tool,
compression_program_version)
pointer_file = self.create_pointer_file(
premis_object, premis_events, premis_agents=premis_agents,
package_subtype=aip_subtype)
if pointer_file is None:
self.pointer_file_location = None
self.pointer_file_path = None
else:
write_pointer_file(pointer_file, pointer_file_dst)
def _update_existing_ptr_loc_info(self):
"""Update an AM-created pointer file's location information."""
pointer_absolute_path = self.full_pointer_file_path
root = etree.parse(pointer_absolute_path)
element = root.find('.//mets:file', namespaces=utils.NSMAP)
flocat = element.find('mets:FLocat', namespaces=utils.NSMAP)
if self.uuid in element.get('ID', '') and flocat is not None:
flocat.set('{{{ns}}}href'.format(ns=utils.NSMAP['xlink']),
self.full_path)
# Add USE="Archival Information Package" to fileGrp. Required for
# LOCKSS, and not provided in Archivematica <=1.1
if root.find('.//mets:fileGrp[@USE="Archival Information Package"]',
namespaces=utils.NSMAP) is not None:
root.find('.//mets:fileGrp', namespaces=utils.NSMAP).set(
'USE', 'Archival Information Package')
with open(pointer_absolute_path, 'w') as f:
f.write(etree.tostring(root, pretty_print=True, xml_declaration=True, encoding='utf-8'))
# ==========================================================================
# END Store AIP methods
# ==========================================================================
def get_pointer_instance(self):
"""Return this package's pointer file as a ``metsrw.METSDocument``
instance. Return `None` in packages without pointer file, like
uncompressed AIPs. See artefactual/archivematica-storage-service#324
for further enhancements.
"""
if not self.should_have_pointer_file():
return None
ptr_path = self.full_pointer_file_path
if not ptr_path:
return None
return metsrw.METSDocument.fromfile(ptr_path)
def create_replica_pointer_file(self, replica_package,
replication_event_uuid,
replication_validation_event,
master_ptr=None):
"""Create and write to disk a new pointer file for the replica package
Model instance ``replica_package``. Assume that ``self`` is the
source/master of the replica.
NOTE: Fixity check results are not currently included in the
replication alidation event because of a circular dependency issue: the
pointer file must exist (so we can get the used compression command
from it) before we can extract the package in order to check its
fixity. Here's how it could be done::
>>> __, fixity_report = (
replica_package.get_fixity_check_report_send_signals())
>>> replication_validation_event = (
replica_package.get_replication_validation_event(
checksum_report=checksum_report,
master_aip_uuid=self.uuid,
fixity_report=fixity_report,
agents=replica_premis_agents))
Given that a checksum is calculated for the replica and that checksum
is compared for equality to the master's checksum, it seems overkill to
perform a BagIt fixity check as well. Is checksum comparison sufficient
for replication validation or does a fixity check need to be performed
also?
"""
# 1. Set attrs and get full path to pointer file.
if not master_ptr:
master_ptr = self.get_pointer_instance()
if not master_ptr:
LOGGER.warning('Not creating a pointer file for replica package %s'
' because its master package does not have one.',
replica_package.uuid)
return None
uuid_path = utils.uuid_to_path(replica_package.uuid)
replica_package.pointer_file_location = Location.active.get(
purpose=Location.STORAGE_SERVICE_INTERNAL)
replica_package.pointer_file_path = os.path.join(
uuid_path, 'pointer.{}.xml'.format(replica_package.uuid))
master_aip_uuid = self.uuid
# 2. Get the master AIP's pointer file and extract what we need from it
# in order to create the replica's pointer file.
master_ptr_aip_fsentry = master_ptr.get_file(file_uuid=self.uuid)
master_package_subtype = master_ptr_aip_fsentry.mets_div_type
master_compression_event = [
pe for pe in master_ptr_aip_fsentry.get_premis_events()
if pe.event_type == 'compression'][0]
master_premis_object = master_ptr_aip_fsentry.get_premis_objects()[0]
master_checksum_algorithm = master_premis_object.message_digest_algorithm
master_checksum = master_premis_object.message_digest
master_premis_agents = master_ptr_aip_fsentry.get_premis_agents()
# 3. Construct the pointer file and return it
replica_premis_creation_agents = utils.get_ss_premis_agents()
__, compression_program_version, archive_tool = (
master_compression_event.compression_details)
replica_premis_relationships = [
_get_replication_derivation_relationship(master_aip_uuid,
replication_event_uuid)]
replica_premis_object = replica_package._create_aip_premis_object(
master_checksum_algorithm, master_checksum, archive_tool,
compression_program_version,
premis_relationships=replica_premis_relationships)
replica_premis_creation_event = (
replica_package.get_premis_aip_creation_event(
master_aip_uuid=master_aip_uuid,
agents=replica_premis_creation_agents))
replica_premis_agents = list(
set(master_premis_agents + replica_premis_creation_agents))
replica_premis_events = [
master_compression_event,
replica_premis_creation_event,
replication_validation_event
]
return replica_package.create_pointer_file(
replica_premis_object,