Skip to content

Commit

Permalink
feat(transfer) Adds action to set dataset permissions
Browse files Browse the repository at this point in the history
Closes #438
  • Loading branch information
vchendrix committed Oct 28, 2024
1 parent 1dbd96b commit 9822023
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 63 deletions.
10 changes: 9 additions & 1 deletion archive_api/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from archive_api.models import DataSet, DataSetDownloadLog, EssDiveTransfer, MeasurementVariable, Person, Plot, \
ServiceAccount, Site

from archive_api.forms import ServiceAccountForm
from simple_history.admin import SimpleHistoryAdmin

Expand All @@ -33,7 +34,8 @@ class DataSetHistoryAdmin(SimpleHistoryAdmin):
history_list_display = ["list_changes"]
search_fields = ['name', 'status', "ngt_id", "version"]

actions = ['osti_synchronize', 'osti_mint', 'essdive_transfer_metadata', 'essdive_transfer_data']
actions = ['osti_synchronize', 'osti_mint', 'essdive_transfer_metadata', 'essdive_transfer_data',
'update_essdive_permissions']

def last_transfer_status(self, obj):
"""
Expand All @@ -60,6 +62,12 @@ def last_transfer_date(self, obj):
else:
return None

def update_essdive_permissions(self, request, queryset):
"""Update permissions for dataset in ESS-DIVE"""
return self._essdive_transfer(request, queryset, EssDiveTransfer.TYPE_PERMISSIONS)

update_essdive_permissions.short_description = "Update permissions for dataset in ESS-DIVE"

def essdive_transfer_data(self, request, queryset):
"""Transfer dataset metadata+data to ESS-DIVE"""
return self._essdive_transfer(request, queryset, EssDiveTransfer.TYPE_DATA)
Expand Down
4 changes: 3 additions & 1 deletion archive_api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,10 +554,12 @@ class EssDiveTransfer(models.Model):

TYPE_METADATA = 0
TYPE_DATA = 1
TYPE_PERMISSIONS = 2

TYPE_CHOICES = (
(TYPE_METADATA, "Metadata"),
(TYPE_DATA, "Data")
(TYPE_DATA, "Data"),
(TYPE_PERMISSIONS, "Permissions")
)

dataset = models.ForeignKey(DataSet, on_delete=models.DO_NOTHING, blank=False, null=False)
Expand Down
170 changes: 111 additions & 59 deletions archive_api/service/essdive_transfer/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from archive_api.models import EssDiveTransfer, SERVICE_ACCOUNT_ESSDIVE, ServiceAccount
from archive_api.service.essdive_transfer import crosswalk

TEAM_NGEE_TROPICS_ADMIN = "CN=NGEE-Tropics Admins,DC=dataone,DC=org"

log = get_task_logger(__name__)


Expand Down Expand Up @@ -49,7 +51,6 @@ def get(essdive_id):
# if the NGT ID is in the text of the metadata.
response = requests.get(f"{service_account.endpoint}/{essdive_id}",
headers={"Authorization": f"Bearer {service_account.secret}"})

return response


Expand Down Expand Up @@ -154,8 +155,7 @@ def transfer_start(run_id):
except RunError as re:
raise re
except json.decoder.JSONDecodeError as je:
_raise_transfer_failure(f"Failed decoding ESS-DIVE response ({str(je)}) = "
f"{response_json.text()}", run_id)
_raise_transfer_failure(f"Failed decoding ESS-DIVE response ({str(je)})", run_id)
except Exception as e:
_raise_run_error(e, run_id)

Expand Down Expand Up @@ -196,62 +196,65 @@ def transfer(result):
transfer_job.start_time = timezone.now()
transfer_job.save()

method = essdive_id and "PUT" or "POST"
json_ld, ack_fp = crosswalk.dataset_transform(transfer_job.dataset)
log.info(f"{essdive_id and 'Updating' or 'Creating'} ESS-DIVE dataset metadata with ESS-DIVE identifier {essdive_id}")

# Prepare the multi part encoding to stream the data
files_tuples_array = list()
files_tuples_array.append(("json-ld", json.dumps(json_ld)))

# Get the locations csv for this dataset
locations_fp = crosswalk.locations_csv(transfer_job.dataset)
if locations_fp:
log.info(
f"Prepared ESS-DIVE dataset locations.csv file for ESS-DIVE identifier {essdive_id}")
files_tuples_array.append(
('data', (f"{transfer_job.dataset.data_set_id()}_locations.csv", locations_fp)))
if ack_fp:
# There is an acknowledgements file
log.info(
f"Prepared ESS-DIVE dataset acknowledgements.txt file for ESS-DIVE identifier {essdive_id}")
files_tuples_array.append(
('data', (f"{transfer_job.dataset.data_set_id()}_acknowledgements.txt", ack_fp)))

# Is this a data update?
if transfer_job.type == EssDiveTransfer.TYPE_DATA:

log.info(f"Uploading ESS-DIVE dataset file '{transfer_job.dataset.archive.path}' for ESS-DIVE identifier {essdive_id}")
files_tuples_array.append(
('data', (transfer_job.dataset.archive.name, open(transfer_job.dataset.archive.path, 'rb'))))

encoder = MultipartEncoder(fields=files_tuples_array)
# need to limit number of messages at each percent
monitor_messages = []

def _upload_progress(monitor: MultipartEncoderMonitor):
"""
Callback for MultipartEncoder Monitor to log upload progress
:param monitor: The monitor for this callback
:return: None
"""
# Determine percentage complete
percent_complete = math.floor(monitor.bytes_read/monitor.len * 100)
if percent_complete % 10 == 0 and percent_complete not in monitor_messages:
monitor_messages.append(percent_complete)
log.info(f"Upload progress {transfer_job.dataset.data_set_id()} - bytes {monitor.bytes_read} of {monitor.len} ({percent_complete}%) read")

monitor = MultipartEncoderMonitor(encoder, _upload_progress)
endpoint = f"{service_account.endpoint}/{essdive_id or ''}"
response = requests.request(method=method, url=endpoint,
headers={"Authorization": f"Bearer {service_account.secret}",
'Content-Type': monitor.content_type},
data=monitor)

json_response = response.json()

# Return run information
return {"run_id": run_id, "response": json_response, "status_code": response.status_code}
if transfer_job.type in [EssDiveTransfer.TYPE_METADATA, EssDiveTransfer.TYPE_DATA]:
method = essdive_id and "PUT" or "POST"
json_ld, ack_fp = crosswalk.dataset_transform(transfer_job.dataset)
log.info(f"{essdive_id and 'Updating' or 'Creating'} ESS-DIVE dataset metadata with ESS-DIVE identifier {essdive_id}")

# Prepare the multi part encoding to stream the data
files_tuples_array = list()
files_tuples_array.append(("json-ld", json.dumps(json_ld)))

# Get the locations csv for this dataset
locations_fp = crosswalk.locations_csv(transfer_job.dataset)
if locations_fp:
log.info(
f"Prepared ESS-DIVE dataset locations.csv file for ESS-DIVE identifier {essdive_id}")
files_tuples_array.append(
('data', (f"{transfer_job.dataset.data_set_id()}_locations.csv", locations_fp)))
if ack_fp:
# There is an acknowledgements file
log.info(
f"Prepared ESS-DIVE dataset acknowledgements.txt file for ESS-DIVE identifier {essdive_id}")
files_tuples_array.append(
('data', (f"{transfer_job.dataset.data_set_id()}_acknowledgements.txt", ack_fp)))

# Is this a data update?
if transfer_job.type == EssDiveTransfer.TYPE_DATA:

log.info(f"Uploading ESS-DIVE dataset file '{transfer_job.dataset.archive.path}' for ESS-DIVE identifier {essdive_id}")
files_tuples_array.append(
('data', (transfer_job.dataset.archive.name, open(transfer_job.dataset.archive.path, 'rb'))))

encoder = MultipartEncoder(fields=files_tuples_array)
# need to limit number of messages at each percent
monitor_messages = []

def _upload_progress(monitor: MultipartEncoderMonitor):
"""
Callback for MultipartEncoder Monitor to log upload progress
:param monitor: The monitor for this callback
:return: None
"""
# Determine percentage complete
percent_complete = math.floor(monitor.bytes_read/monitor.len * 100)
if percent_complete % 10 == 0 and percent_complete not in monitor_messages:
monitor_messages.append(percent_complete)
log.info(f"Upload progress {transfer_job.dataset.data_set_id()} - bytes {monitor.bytes_read} of {monitor.len} ({percent_complete}%) read")

monitor = MultipartEncoderMonitor(encoder, _upload_progress)
endpoint = f"{service_account.endpoint}/{essdive_id or ''}"
response = requests.request(method=method, url=endpoint,
headers={"Authorization": f"Bearer {service_account.secret}",
'Content-Type': monitor.content_type},
data=monitor)

json_response = response.json()

# Return run information
return {"run_id": run_id, "response": json_response, "status_code": response.status_code}
elif transfer_job.type == EssDiveTransfer.TYPE_PERMISSIONS:
return _update_dataset_permissions(result)
except Exception as e:
_raise_run_error(e, run_id)

Expand Down Expand Up @@ -304,6 +307,55 @@ def transfer_end(result):
RunError(None, "run_id is missing from result")


def _update_dataset_permissions(result):
"""
Update the dataset permissions for the dataset on ESS-DIVE
"""
log.info("inputs:{}".format(result))

run_id = result.get("run_id", None)
essdive_id = result.get("essdive_id", None)

assert run_id, "run_id is missing from result"
assert essdive_id, "essdive_id is missing from result"

try:

# Check for required parameters
if not run_id or not essdive_id:
raise ValueError("Invalid parameters provided for updating dataset permissions")

# Get the ESS-DIVE account information
service_account = ServiceAccount.objects.all().get(service=SERVICE_ACCOUNT_ESSDIVE)
log.info(f"Using {service_account.get_service_display()} service account to update permissions.")

# Prepare the payload to update the dataset permissions
payload = json.dumps({
"manage": [
service_account.identity,
TEAM_NGEE_TROPICS_ADMIN
]
})
# Update the dataset permissions
endpoint = f"{service_account.endpoint}/{essdive_id or ''}/share"
response = requests.request(method="PUT", url=endpoint,
headers={"Authorization": f"Bearer {service_account.secret}",
'Content-Type': "application/json"},
data=payload)

json_response = response.json()

# Return run information
return {"run_id": run_id, "response": json_response, "status_code": response.status_code, "essdive_id": essdive_id}
except RunError as re:
raise re
except json.decoder.JSONDecodeError as je:
_raise_transfer_failure(f"Failed decoding ESS-DIVE response ({str(je)})", run_id)
except Exception as e:
_raise_run_error(e, run_id)


def _raise_run_error(e, run_id):
"""
Raises run error and logs to the transfer job record
Expand Down
4 changes: 2 additions & 2 deletions archive_api/tests/test_essdive_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ def test_transfer_task(celery_setup, monkeypatch):
def test_transfer_task_not_authorized(celery_setup, monkeypatch):
"""Test transfer task"""

task = tasks.transfer.delay({'run_id': RUN_ID_TRANSFER, "ngt_id": "NGT0001"})
monkeypatch.setattr(requests, "request", create_mock_request("ngt_essdive_dataset_unauthorized.json", 401))
task = tasks.transfer.delay({'run_id': RUN_ID_TRANSFER, "ngt_id": "NGT0001"})

results = task.get()
assert results == {'response': {'detail': 'You do not have authorized access'},
Expand All @@ -262,7 +262,7 @@ def test_transfer_task_not_authorized(celery_setup, monkeypatch):
def test_essdive_task_end(celery_setup):
"""Test start end"""

task = tasks.transfer_end.delay({'response': {}, 'run_id': RUN_ID_END, 'status_code': 3})
task = tasks.transfer_end.delay({'run_id': RUN_ID_END})
results = task.get()
assert results == {'run_id': RUN_ID_END}

Expand Down

0 comments on commit 9822023

Please sign in to comment.