Skip to content

Commit

Permalink
Merge pull request #201 from aaschaer/endpoint_manager_cancel_pause_r…
Browse files Browse the repository at this point in the history
…esume

Add endpoint_manager cancel pause resume to transfer client
  • Loading branch information
sirosen authored Apr 20, 2017
2 parents 41de1b8 + f590584 commit 28cd0e3
Show file tree
Hide file tree
Showing 2 changed files with 282 additions and 2 deletions.
142 changes: 142 additions & 0 deletions globus_sdk/transfer/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1670,3 +1670,145 @@ def endpoint_manager_task_successful_transfers(self, task_id,
self.get, resource_path, {'params': params},
num_results=num_results, max_results_per_call=1000,
paging_style=PaginatedResource.PAGING_STYLE_MARKER)

def endpoint_manager_cancel_tasks(self, task_ids, message, **params):
"""
Cancel a list of tasks as an admin. Requires activity manager effective
role on the task(s) source or destination endpoint(s).
``POST /endpoint_manager/admin_cancel``
:rtype: :class:`TransferResponse
<globus_sdk.transfer.response.TransferResponse>`
**Parameters**
``task_ids`` (*list of string*)
List of task ids to cancel.
``message`` (*string*)
Message given to all users who's tasks have been canceled.
``params``
Any additional parameters will be passed through as query params.
**External Documentation**
See
`Cancel tasks as admin \
<https://docs.globus.org/api/transfer/advanced_endpoint_management/#admin_cancel>`_
in the REST documentation for details.
"""
self.logger.info(("TransferClient.endpoint_manager_"
"cancel_tasks({},{})".format(task_ids, message)))
json_body = {
"message": safe_stringify(message),
"task_id_list": [safe_stringify(i) for i in task_ids]
}
path = self.qjoin_path("endpoint_manager", "admin_cancel")
return self.post(path, json_body=json_body, params=params)

def endpoint_manager_cancel_status(self, admin_cancel_id, **params):
"""
Get the status of an an admin cancel (result of endpoint_manager_
cancel_tasks).
``GET /endpoint_manager/admin_cancel/<admin_cancel_id>``
:rtype: :class:`TransferResponse
<globus_sdk.transfer.response.TransferResponse>`
**Parameters**
``admin_cancel_id`` (*string*)
The ID of the the cancel to inspect.
``params``
Any additional parameters will be passed through as query params.
**External Documentation**
See
`Get cancel status by id \
<https://docs.globus.org/api/transfer/advanced_endpoint_management/#get_cancel_status_by_id>`_
in the REST documentation for details.
"""
self.logger.info(("TransferClient.endpoint_manager_"
"cancel_status({})".format(admin_cancel_id)))
path = self.qjoin_path("endpoint_manager", "admin_cancel",
admin_cancel_id)
return self.get(path, params=params)

def endpoint_manager_pause_tasks(self, task_ids, message, **params):
"""
Pause a list of tasks as an admin. Requires activity manager effective
role on the task(s) source or destination endpoint(s).
``POST /endpoint_manager/admin_pause``
:rtype: :class:`TransferResponse
<globus_sdk.transfer.response.TransferResponse>`
**Parameters**
``task_ids`` (*list of string*)
List of task ids to pause.
``message`` (*string*)
Message given to all users who's tasks have been paused.
``params``
Any additional parameters will be passed through as query params.
**External Documentation**
See
`Cancel tasks as admin \
<https://docs.globus.org/api/transfer/advanced_endpoint_management/#pause_tasks_as_admin>`_
in the REST documentation for details.
"""
self.logger.info(("TransferClient.endpoint_manager_"
"pause_tasks({},{})".format(task_ids, message)))
json_body = {
"message": safe_stringify(message),
"task_id_list": [safe_stringify(i) for i in task_ids]
}
path = self.qjoin_path("endpoint_manager", "admin_pause")
return self.post(path, json_body=json_body, params=params)

def endpoint_manager_resume_tasks(self, task_ids, message, **params):
"""
Resume a list of tasks as an admin. Requires activity manager effective
role on the task(s) source or destination endpoint(s).
``POST /endpoint_manager/admin_resume``
:rtype: :class:`TransferResponse
<globus_sdk.transfer.response.TransferResponse>`
**Parameters**
``task_ids`` (*list of string*)
List of task ids to resume.
``message`` (*string*)
Message given to all users who's tasks have been canceled.
``params``
Any additional parameters will be passed through as query params.
**External Documentation**
See
`Cancel tasks as admin \
<https://docs.globus.org/api/transfer/advanced_endpoint_management/#admin_cancel>`_
in the REST documentation for details.
"""
self.logger.info(("TransferClient.endpoint_manager_"
"resume_tasks({},{})".format(task_ids, message)))
json_body = {
"message": safe_stringify(message),
"task_id_list": [safe_stringify(i) for i in task_ids]
}
path = self.qjoin_path("endpoint_manager", "admin_resume")
return self.post(path, json_body=json_body, params=params)
142 changes: 140 additions & 2 deletions tests/unit/test_transfer_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import re
import time
import unittest
from random import getrandbits
from datetime import datetime, timedelta

Expand Down Expand Up @@ -1604,7 +1606,7 @@ def test_endpoint_manager_get_task(self):
"""
# sdktester2b subits no-op delete task
ddata = globus_sdk.DeleteData(self.tc2, self.managed_ep_id,
notify_on_succeeded=False)
notify_on_fail=False)
ddata.add_item("no-op.txt")
task_id = self.tc2.submit_delete(ddata)["task_id"]

Expand Down Expand Up @@ -1632,7 +1634,7 @@ def test_endpoint_manager_task_event_list(self):
"""
# sdktester2b subits no-op delete task and waits for completion
ddata = globus_sdk.DeleteData(self.tc2, self.managed_ep_id,
notify_on_succeeded=False)
notify_on_fail=False)
ddata.add_item("no-op.txt")
task_id = self.tc2.submit_delete(ddata)["task_id"]
self.assertTrue(
Expand Down Expand Up @@ -1722,3 +1724,139 @@ def test_endpoint_manager_task_successful_transfers(self):
transfer["destination_path"]))
count += 1
self.assertEqual(count, 3)

def _unauthorized_transfers(self):
"""
Helper that has sdktester2b submits 3 unauthorized transfers from the
managed endpoint, returns a list of their task_ids,
and tracks them for cleanup.
"""
# submit the tasks
task_ids = []
for i in range(3):
tdata = globus_sdk.TransferData(self.tc2, self.managed_ep_id,
GO_EP1_ID, notify_on_fail=False)
tdata.add_item("/", "/", recursive=True)
task_ids.append(self.tc2.submit_transfer(tdata)["task_id"])

# track assets for cleanup
self.asset_cleanup.append(
{"function": self.tc.endpoint_manager_cancel_tasks,
"args": [task_ids, "Cleanup for unauthorized_transfers helper"]})

return task_ids

def test_endpoint_manager_cancel_tasks(self):
"""
Get task ids from _unauthorized transfers, and has sdktester1a cancel
those tasks. Validates results.
Confirms 403 when non manager attempts to use this resource.
"""
# cancel the tasks
task_ids = self._unauthorized_transfers()
message = "SDK test cancel tasks"
cancel_doc = self.tc.endpoint_manager_cancel_tasks(task_ids, message)

# validate results
self.assertEqual(cancel_doc["DATA_TYPE"], "admin_cancel")
self.assertIn("done", cancel_doc)
self.assertIn("id", cancel_doc)

# 403 for non managers, even if they submitted the tasks
with self.assertRaises(TransferAPIError) as apiErr:
self.tc2.endpoint_manager_cancel_tasks(task_ids, message)
self.assertEqual(apiErr.exception.http_status, 403)
self.assertEqual(apiErr.exception.code, "PermissionDenied")

def test_endpoint_manager_cancel_status(self):
"""
Has sdktester2b submit three unauthorized transfers from the managed
endpoint, and sdktester1a admin_cancel those tasks.
Gets the cancel status of the cancel and validates results.
Loops while status is not done, then confirms all tasks canceled.
"""
# cancel the tasks and get the cancel id
task_ids = self._unauthorized_transfers()
message = "SDK test cancel status"
cancel_id = self.tc.endpoint_manager_cancel_tasks(
task_ids, message)["id"]

# loop while not done or fail after 30 tries, 1 try per second
for tries in range(30):
# get and validate cancel status
status_doc = self.tc.endpoint_manager_cancel_status(cancel_id)
self.assertEqual(status_doc["DATA_TYPE"], "admin_cancel")
self.assertEqual(status_doc["id"], cancel_id)
if status_doc["done"]:
break
else:
time.sleep(1)

# confirm sdktester2b now sees all tasks as canceled by admin.
for task_id in task_ids:
task_doc = self.tc2.get_task(task_id)
self.assertEqual(task_doc["canceled_by_admin"], "SOURCE")
self.assertEqual(task_doc["canceled_by_admin_message"], message)

# TODO: stop skipping these tests when
# https://github.com/globusonline/koa/issues/49
# is resolved.
@unittest.skipIf(True, "github.com/globusonline/koa/issues/49")
def test_endpoint_manager_pause_tasks(self):
"""
Has sdktester2b submit three unauthorized transfers,
and sdktester1a pause the tasks as an admin.
Validates results and confirms the tasks are paused.
Confirms 403 when non manager attempts to use this resource.
"""
# pause the tasks
task_ids = self._unauthorized_transfers()
message = "SDK test pause tasks"
pause_doc = self.tc.endpoint_manager_pause_tasks(task_ids, message)

# validate results
self.assertEqual(pause_doc["DATA_TYPE"], "result")
self.assertEqual(pause_doc["code"], "PauseAccepted")

# confirm sdktester2b sees the tasks as paused
for task_id in task_ids:
task_doc = self.tc2.get_task(task_id)
self.assertTrue(task_doc["is_paused"])

# 403 for non managers
with self.assertRaises(TransferAPIError) as apiErr:
self.tc2.endpoint_manager_pause_tasks(task_ids, message)
self.assertEqual(apiErr.exception.http_status, 403)
self.assertEqual(apiErr.exception.code, "PermissionDenied")

@unittest.skipIf(True, "github.com/globusonline/koa/issues/49")
def test_endpoint_manager_resume_tasks(self):
"""
Has sdktester2b submit three unauthorized transfers,
then sdktester1a pauses then resumes the tasks as an admin.
Confirms tasks go from paused to active.
Confirms 403 when non manager attempts to use this resource.
"""
# pause the tasks and confirm they are paused
task_ids = self._unauthorized_transfers()
message = "SDK test resume tasks"
self.tc.endpoint_manager_pause_tasks(task_ids, message)
for task_id in task_ids:
task_doc = self.tc2.get_task(task_id)
self.assertTrue(task_doc["is_paused"])

# resume the tasks and validate results
resume_doc = self.tc.endpoint_manager_resume_tasks(task_ids, message)
self.assertEqual(resume_doc["DATA_TYPE"], "result")
self.assertEqual(resume_doc["code"], "ResumeAccepted")

# confirm tasks are now active.
for task_id in task_ids:
task_doc = self.tc2.get_task(task_id)
self.assertFalse(task_doc["is_paused"])

# 403 for non managers
with self.assertRaises(TransferAPIError) as apiErr:
self.tc2.endpoint_manager_resume_tasks(task_ids, message)
self.assertEqual(apiErr.exception.http_status, 403)
self.assertEqual(apiErr.exception.code, "PermissionDenied")

0 comments on commit 28cd0e3

Please sign in to comment.