-
Notifications
You must be signed in to change notification settings - Fork 330
/
azure_rm_common_rest.py
109 lines (84 loc) · 4.22 KB
/
azure_rm_common_rest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# Copyright (c) 2018 Zim Kalinowski, <zikalino@microsoft.com>
#
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
try:
from ansible.module_utils.ansible_release import __version__ as ANSIBLE_VERSION
except Exception:
ANSIBLE_VERSION = 'unknown'
try:
from azure.core._pipeline_client import PipelineClient
from azure.core.polling import LROPoller
from azure.core.pipeline import PipelineResponse
from azure.core.pipeline.policies import BearerTokenCredentialPolicy
from azure.mgmt.core.polling.arm_polling import ARMPolling
import uuid
from azure.core.configuration import Configuration
except ImportError:
# This is handled in azure_rm_common
Configuration = object
ANSIBLE_USER_AGENT = 'Ansible/{0}'.format(ANSIBLE_VERSION)
class GenericRestClientConfiguration(Configuration):
def __init__(self, credential, subscription_id, credential_scopes=None, base_url=None):
if credential is None:
raise ValueError("Parameter 'credentials' must not be None.")
if subscription_id is None:
raise ValueError("Parameter 'subscription_id' must not be None.")
if not base_url:
base_url = 'https://management.azure.com'
if not credential_scopes:
credential_scopes = 'https://management.azure.com/.default'
super(GenericRestClientConfiguration, self).__init__()
self.credentials = credential
self.subscription_id = subscription_id
self.authentication_policy = BearerTokenCredentialPolicy(credential, credential_scopes)
class GenericRestClient(object):
def __init__(self, credential, subscription_id, base_url=None, credential_scopes=None):
self.config = GenericRestClientConfiguration(credential, subscription_id, credential_scopes[0])
self._client = PipelineClient(base_url, config=self.config)
self.models = None
def query(self, url, method, query_parameters, header_parameters, body, expected_status_codes, polling_timeout, polling_interval):
# Construct and send request
operation_config = {}
request = None
if header_parameters is None:
header_parameters = {}
header_parameters['x-ms-client-request-id'] = str(uuid.uuid1())
if method == 'GET':
request = self._client.get(url, query_parameters, header_parameters, body)
elif method == 'PUT':
request = self._client.put(url, query_parameters, header_parameters, body)
elif method == 'POST':
request = self._client.post(url, query_parameters, header_parameters, body)
elif method == 'HEAD':
request = self._client.head(url, query_parameters, header_parameters, body)
elif method == 'PATCH':
request = self._client.patch(url, query_parameters, header_parameters, body)
elif method == 'DELETE':
request = self._client.delete(url, query_parameters, header_parameters, body)
elif method == 'MERGE':
request = self._client.merge(url, query_parameters, header_parameters, body)
response = self._client.send_request(request, **operation_config)
if response.status_code not in expected_status_codes:
exp = SendRequestException(response.text(), response.status_code)
raise exp
elif response.status_code == 202 and polling_timeout > 0:
def get_long_running_output(response):
return response
poller = LROPoller(self._client,
PipelineResponse(None, response, None),
get_long_running_output,
ARMPolling(polling_interval, **operation_config))
response = self.get_poller_result(poller, polling_timeout)
return response
def get_poller_result(self, poller, timeout):
try:
poller.wait(timeout=timeout)
return poller.result()
except Exception as exc:
raise
class SendRequestException(Exception):
def __init__(self, response, status_code):
self.response = response
self.status_code = status_code