Skip to content

Commit

Permalink
Add endpoint_manager pause_rule unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aaschaer committed Apr 20, 2017
1 parent 1749fc9 commit 795d342
Showing 1 changed file with 182 additions and 17 deletions.
199 changes: 182 additions & 17 deletions tests/unit/test_transfer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,19 @@ class BaseTransferClientTests(CapturedIOTestCase):
@classmethod
def setUpClass(self):
"""
Does an auth flow to create an authorized client
Does an auth flow to create an authorized client for
sdktester1a and sdktester2b
"""
ac = globus_sdk.NativeAppAuthClient(
client_id=get_client_data()["native_app_client1"]["id"])
authorizer = globus_sdk.RefreshTokenAuthorizer(

authorizer1 = globus_sdk.RefreshTokenAuthorizer(
SDKTESTER1A_NATIVE1_TRANSFER_RT, ac)
self.tc = globus_sdk.TransferClient(authorizer=authorizer)
self.tc = globus_sdk.TransferClient(authorizer=authorizer1)

authorizer2 = globus_sdk.RefreshTokenAuthorizer(
SDKTESTER2B_NATIVE1_TRANSFER_RT, ac)
self.tc2 = globus_sdk.TransferClient(authorizer=authorizer2)

def setUp(self):
"""
Expand Down Expand Up @@ -919,9 +925,6 @@ def test_submit_delete(self):
self.assertEqual(resub_delete_doc["submission_id"], sub_id)
self.assertEqual(resub_delete_doc["task_id"], task_id)

# def test_def endpoint_manager_task_list(self):
# TODO: give SDK test activity_monitor role on an endpoint

def test_task_list(self):
"""
Gets task list, validates results, tests num_results and filter params
Expand Down Expand Up @@ -1156,8 +1159,8 @@ def test_task_successful_transfers(self):
self.assertIn("destination_path", success)


# class for Transfer Client Tests that require a shared endpoint
# since tearDown takes significantly longer if a shared endpoint was made
# class for TransferClientTests that require a unique shared endpoint per test
# since tearDown takes significantly longer if a shared endpoint is deleted
class SharedTransferClientTests(BaseTransferClientTests):

__test__ = True # marks sub-class as having tests
Expand Down Expand Up @@ -1414,30 +1417,192 @@ def test_delete_endpoint_acl_rule(self):
self.assertEqual(apiErr.exception.http_status, 404)
self.assertEqual(apiErr.exception.code, "AccessRuleNotFound")

def test_endpoint_manager_create_pause_rule(self):
"""
Creates a pause rule on the shared endpoint, validates results.
Returns the rule's id for use in other tests that need a pause rule.
Confirms 403 when non manager attempts to use this resource.
"""
rule_data = {
"DATA_TYPE": "pause_rule",
"message": "SDK Test Pause Rule",
"endpoint_id": self.test_share_ep_id,
"identity_id": None, # affect all users on endpoint
"start_time": None # start now
}
create_doc = self.tc.endpoint_manager_create_pause_rule(rule_data)

# verify results
for key in rule_data:
self.assertEqual(create_doc[key], rule_data[key])
self.assertTrue(create_doc["editable"])
self.assertFalse(create_doc["created_by_host_manager"])
rule_id = create_doc["id"]

# 403 for non managers
with self.assertRaises(TransferAPIError) as apiErr:
self.tc2.endpoint_manager_create_pause_rule(rule_data)
self.assertEqual(apiErr.exception.http_status, 403)
self.assertEqual(apiErr.exception.code, "PermissionDenied")

# track for cleanup, note this must be placed earlier in the
# list than the shared endpoint
self.asset_cleanup.insert(
0,
{"function": self.tc.endpoint_manager_delete_pause_rule,
"args": [rule_id],
"name": "test_pause_rule"})

# return id for use in other tests
return rule_id

def test_endpoint_manager_pause_rule_list(self):
"""
Gets a pause_rule id from test_endpoint_manager_create_pause_rule.
Gets pause rule lists with/without endpoint filters, validates results.
Confirms 403 when non manager attempts to use this resource.
"""
rule_id = self.test_endpoint_manager_create_pause_rule()

# all endpoint_rules
list_doc = self.tc.endpoint_manager_pause_rule_list()
self.assertEqual(list_doc["DATA_TYPE"], "pause_rule_list")
for rule in list_doc:
self.assertEqual(rule["DATA_TYPE"], "pause_rule")
if rule["id"] == rule_id:
break
else:
self.assertFalse("rule_id not found")

# filtered by endpoint, should only have the created rule
list_doc = self.tc.endpoint_manager_pause_rule_list(
filter_endpoint=self.test_share_ep_id)
rule = list_doc["DATA"][0]
self.assertEqual(rule["id"], rule_id)

# filterd by host endpoint
list_doc = self.tc.endpoint_manager_pause_rule_list(
filter_host_endpoint=GO_EP1_ID)
for rule in list_doc:
self.assertEqual(rule["DATA_TYPE"], "pause_rule")
if rule["id"] == rule_id:
break
else:
self.assertFalse("rule_id not found")

# 403 for non managers
with self.assertRaises(TransferAPIError) as apiErr:
self.tc2.endpoint_manager_pause_rule_list()
self.assertEqual(apiErr.exception.http_status, 403)
self.assertEqual(apiErr.exception.code, "PermissionDenied")

def test_endpoint_manager_get_pause_rule(self):
"""
Gets a pause rule created by test_endpoint_manager_create_pause_rule.
Validates results and checks expected fields.
Confirms 403 when non manager attempts to use this resource.
"""
rule_id = self.test_endpoint_manager_create_pause_rule()
get_doc = self.tc.endpoint_manager_get_pause_rule(rule_id)

# validate results have expected fields and values
expected = {
"DATA_TYPE": "pause_rule", "id": rule_id,
"message": "SDK Test Pause Rule", "start_time": None,
"endpoint_id": self.test_share_ep_id, "identity_id": None,
"modified_by_id": get_user_data()["sdktester1a"]["id"],
"created_by_host_manager": False, "editable": True,
"pause_ls": True, "pause_mkdir": True, "pause_rename": True,
"pause_task_delete": True, "pause_task_transfer_write": True,
"pause_task_transfer_read": True}
for key in expected:
self.assertEqual(get_doc[key], expected[key])

# 403 for non managers
with self.assertRaises(TransferAPIError) as apiErr:
self.tc2.endpoint_manager_get_pause_rule(rule_id)
self.assertEqual(apiErr.exception.http_status, 403)
self.assertEqual(apiErr.exception.code, "PermissionDenied")

def test_endpoint_manager_update_pause_rule(self):
"""
Updates a pause rule created by test_endpoint_manager_create_pause_rule
Validates results and confirms fields have updated with get.
Confirms 403 when non manager attempts to use this resource.
"""
rule_id = self.test_endpoint_manager_create_pause_rule()

# update the rule
update_data = {
"message": "New Message", "pause_ls": False, "pause_mkdir": False,
"pause_rename": False, "pause_task_delete": False,
"pause_task_transfer_write": False,
"pause_task_transfer_read": False}
update_doc = self.tc.endpoint_manager_update_pause_rule(
rule_id, update_data)
self.assertEqual(update_doc["DATA_TYPE"], "pause_rule")

# get the the rule after update
get_doc = self.tc.endpoint_manager_get_pause_rule(rule_id)

# validate results and confirm get sees updated fields
for key in update_data:
self.assertEqual(update_doc[key], update_data[key])
self.assertEqual(get_doc[key], update_data[key])

# 403 for non managers
with self.assertRaises(TransferAPIError) as apiErr:
self.tc2.endpoint_manager_update_pause_rule(rule_id, update_data)
self.assertEqual(apiErr.exception.http_status, 403)
self.assertEqual(apiErr.exception.code, "PermissionDenied")

def test_endpoint_manager_delete_pause_rule(self):
"""
Deletes a pause rule created by test_endpoint_manager_create_pause_rule
Validates results and confirms 404 when trying to get rule.
Confirms 403 when non manager attempts to use this resource.
"""
rule_id = self.test_endpoint_manager_create_pause_rule()

# confirm 403 for non managers before deleting the rule
with self.assertRaises(TransferAPIError) as apiErr:
self.tc2.endpoint_manager_delete_pause_rule(rule_id)
self.assertEqual(apiErr.exception.http_status, 403)
self.assertEqual(apiErr.exception.code, "PermissionDenied")

# delete the rule, validate results
delete_doc = self.tc.endpoint_manager_delete_pause_rule(rule_id)
self.assertEqual(delete_doc["DATA_TYPE"], "result")
self.assertEqual(delete_doc["code"], "Deleted")

# confirm get returns 404 as rule no longer exists
with self.assertRaises(TransferAPIError) as apiErr:
self.tc.endpoint_manager_get_pause_rule(rule_id)
self.assertEqual(apiErr.exception.http_status, 404)
self.assertEqual(apiErr.exception.code, "PauseRuleNotFound")

# stop tracking pause rule for cleanup
for cleanup in self.asset_cleanup:
if "name" in cleanup and cleanup["name"] == "test_pause_rule":
self.asset_cleanup.remove(cleanup)
break


# class for Transfer Client Tests that require the activity_manager
# role on an endpoint. Setup checks to see if the managed shared endpoint
# exists and if not creates one. This endpoint is not removed during cleanup.
# role on an endpoint but don't require a unique endpoint.
class ManagerTransferClientTests(BaseTransferClientTests):

__test__ = True # marks sub-class as having tests

@classmethod
def setUpClass(self):
"""
Does an auth flow for sdktester2b's transfer client.
Sets up a shared endpoint on test gp#ep2 managed by sdktester1a,
and shares it with sdktester2b,
or sees that this endpoint already exits and gets its id.
"""
super(ManagerTransferClientTests, self).setUpClass()

ac = globus_sdk.NativeAppAuthClient(
client_id=get_client_data()["native_app_client1"]["id"])
authorizer = globus_sdk.RefreshTokenAuthorizer(
SDKTESTER2B_NATIVE1_TRANSFER_RT, ac)
self.tc2 = globus_sdk.TransferClient(authorizer=authorizer)

try:
# shared endpoint hosted on go#ep2 managed by sdktester1a
host_path = "/~/managed_ep"
Expand Down

0 comments on commit 795d342

Please sign in to comment.