-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathIdentificationServiceHttpClientHelper.py
359 lines (310 loc) · 13.9 KB
/
IdentificationServiceHttpClientHelper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
""" Copyright (c) Microsoft. All rights reserved.
Licensed under the MIT license.
Microsoft Cognitive Services (formerly Project Oxford): https://www.microsoft.com/cognitive-services
Microsoft Cognitive Services (formerly Project Oxford) GitHub:
https://github.com/Microsoft/ProjectOxford-ClientSDK
Copyright (c) Microsoft Corporation
All rights reserved.
MIT License:
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED ""AS IS"", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
import http.client
import urllib.parse
import json
import time
from contextlib import closing
import IdentificationProfile
import IdentificationResponse
import EnrollmentResponse
import ProfileCreationResponse
import logging
class IdentificationServiceHttpClientHelper:
"""Abstracts the interaction with the Identification service."""
_STATUS_OK = 200
_STATUS_ACCEPTED = 202
_BASE_URI = 'westus.api.cognitive.microsoft.com'
_IDENTIFICATION_PROFILES_URI = '/spid/v1.0/identificationProfiles'
_IDENTIFICATION_URI = '/spid/v1.0/identify'
_SUBSCRIPTION_KEY_HEADER = 'Ocp-Apim-Subscription-Key'
_CONTENT_TYPE_HEADER = 'Content-Type'
_JSON_CONTENT_HEADER_VALUE = 'application/json'
_STREAM_CONTENT_HEADER_VALUE = 'application/octet-stream'
_SHORT_AUDIO_PARAMETER_NAME = 'shortAudio'
_OPERATION_LOCATION_HEADER = 'Operation-Location'
_OPERATION_STATUS_FIELD_NAME = 'status'
_OPERATION_PROC_RES_FIELD_NAME = 'processingResult'
_OPERATION_MESSAGE_FIELD_NAME = 'message'
_OPERATION_STATUS_SUCCEEDED = 'succeeded'
_OPERATION_STATUS_FAILED = 'failed'
_OPERATION_STATUS_UPDATE_DELAY = 5
def __init__(self, subscription_key):
"""Constructor of the IdentificationServiceHttpClientHelper class.
Arguments:
subscription_key -- the subscription key string
"""
self._subscription_key = subscription_key
def get_all_profiles(self):
"""Return a list of all profiles on the server."""
try:
# Send the request
res, message = self._send_request(
'GET',
self._BASE_URI,
self._IDENTIFICATION_PROFILES_URI,
self._JSON_CONTENT_HEADER_VALUE)
if res.status == self._STATUS_OK:
# Parse the response body
profiles_raw = json.loads(message)
return [IdentificationProfile.IdentificationProfile(profiles_raw[i])
for i in range(0, len(profiles_raw))]
else:
reason = res.reason if not message else message
raise Exception('Error getting all profiles: ' + reason)
except:
logging.error('Error getting all profiles.')
raise
def get_profile(self, profile_id):
"""Get a speaker's profile with given profile ID
Arguments:
subscription_key -- the subscription key string
profile_id -- the profile ID of the profile to resets
"""
try:
# Prepare the request
request_url = '{0}/{1}'.format(
self._IDENTIFICATION_PROFILES_URI,
profile_id)
# Send the request
res, message = self._send_request(
'GET',
self._BASE_URI,
request_url,
self._JSON_CONTENT_HEADER_VALUE)
if res.status == self._STATUS_OK:
# Parse the response body
profile_raw = json.loads(message)
return IdentificationProfile.IdentificationProfile(profile_raw)
else:
reason = res.reason if not message else message
raise Exception('Error getting profile: ' + reason)
except:
logging.error('Error getting profile.')
raise
def create_profile(self, locale):
"""Creates a profile on the server and returns a dictionary of the creation response.
Arguments:
locale -- the locale string for the profile
"""
try:
# Prepare the body of the message
body = json.dumps({'locale': '{0}'.format(locale)})
# Send the request
res, message = self._send_request(
'POST',
self._BASE_URI,
self._IDENTIFICATION_PROFILES_URI,
self._JSON_CONTENT_HEADER_VALUE,
body)
if res.status == self._STATUS_OK:
# Parse the response body
return ProfileCreationResponse.ProfileCreationResponse(json.loads(message))
else:
reason = res.reason if not message else message
raise Exception('Error creating profile: ' + reason)
except:
logging.error('Error creating profile.')
raise
def delete_profile(self, profile_id):
""" Deletes a profile from the server
Arguments:
profile_id -- the profile ID string of user to delete
"""
try:
# Prepare the request
request_url = '{0}/{1}'.format(
self._IDENTIFICATION_PROFILES_URI,
profile_id)
# Send the request
res, message = self._send_request(
'DELETE',
self._BASE_URI,
request_url,
self._JSON_CONTENT_HEADER_VALUE)
if res.status != self._STATUS_OK:
reason = res.reason if not message else message
raise Exception('Error deleting profile: ' + reason)
except:
logging.error('Error deleting profile')
raise
def reset_enrollments(self, profile_id):
"""Reset enrollments of a given profile from the server
Arguments:
subscription_key -- the subscription key string
profile_id -- the profile ID of the profile to reset
"""
try:
# Prepare the request
request_url = '{0}/{1}/reset'.format(
self._IDENTIFICATION_PROFILES_URI,
profile_id)
# Send the request
res, message = self._send_request(
'POST',
self._BASE_URI,
request_url,
self._JSON_CONTENT_HEADER_VALUE)
if res.status != self._STATUS_OK:
reason = res.reason if not message else message
raise Exception('Error resetting profile: ' + reason)
except:
logging.error('Error resetting profile')
raise
def enroll_profile(self, profile_id, file_path, force_short_audio = False):
"""Enrolls a profile using an audio file and returns a
dictionary of the enrollment response.
Arguments:
profile_id -- the profile ID string of the user to enroll
file_path -- the file path string of the audio file to use
force_short_audio -- instruct the service to waive the recommended minimum audio limit
needed for enrollment
"""
try:
# Prepare the request
request_url = '{0}/{1}/enroll?{2}={3}'.format(
self._IDENTIFICATION_PROFILES_URI,
urllib.parse.quote(profile_id),
self._SHORT_AUDIO_PARAMETER_NAME,
force_short_audio)
# Prepare the body of the message
with open(file_path, 'rb') as body:
# Send the request
res, message = self._send_request(
'POST',
self._BASE_URI,
request_url,
self._STREAM_CONTENT_HEADER_VALUE,
body)
if res.status == self._STATUS_OK:
# Parse the response body
return EnrollmentResponse.EnrollmentResponse(json.loads(message))
elif res.status == self._STATUS_ACCEPTED:
operation_url = res.getheader(self._OPERATION_LOCATION_HEADER)
return EnrollmentResponse.EnrollmentResponse(
self._poll_operation(operation_url))
else:
reason = res.reason if not message else message
raise Exception('Error enrolling profile: ' + reason)
except:
logging.error('Error enrolling profile.')
raise
def identify_file(self, file_path, test_profile_ids, force_short_audio = False):
"""Enrolls a profile using an audio file and returns a
dictionary of the enrollment response.
Arguments:
file_path -- the file path of the audio file to test
test_profile_ids -- an array of test profile IDs strings
force_short_audio -- instruct the service to waive the recommended minimum audio limit
needed for enrollment
"""
try:
# Prepare the request
if len(test_profile_ids) < 1:
raise Exception('Error identifying file: no test profile IDs are provided.')
test_profile_ids_str = ','.join(test_profile_ids)
request_url = '{0}?identificationProfileIds={1}&{2}={3}'.format(
self._IDENTIFICATION_URI,
urllib.parse.quote(test_profile_ids_str),
self._SHORT_AUDIO_PARAMETER_NAME,
force_short_audio)
# Prepare the body of the message
with open(file_path, 'rb') as body:
# Send the request
res, message = self._send_request(
'POST',
self._BASE_URI,
request_url,
self._STREAM_CONTENT_HEADER_VALUE,
body)
if res.status == self._STATUS_OK:
# Parse the response body
return IdentificationResponse.IdentificationResponse(json.loads(message))
elif res.status == self._STATUS_ACCEPTED:
operation_url = res.getheader(self._OPERATION_LOCATION_HEADER)
return IdentificationResponse.IdentificationResponse(
self._poll_operation(operation_url))
else:
reason = res.reason if not message else message
raise Exception('Error identifying file: ' + reason)
except:
logging.error('Error identifying file.')
raise
def _poll_operation(self, operation_url):
"""Polls on an operation till it is done
Arguments:
operation_url -- the url to poll for the operation status
"""
try:
# Parse the operation URL
parsed_url = urllib.parse.urlparse(operation_url)
while True:
# Send the request
res, message = self._send_request(
'GET',
parsed_url.netloc,
parsed_url.path,
self._JSON_CONTENT_HEADER_VALUE)
if res.status != self._STATUS_OK:
reason = res.reason if not message else message
raise Exception('Operation Error: ' + reason)
# Parse the response body
operation_response = json.loads(message)
if operation_response[self._OPERATION_STATUS_FIELD_NAME] == \
self._OPERATION_STATUS_SUCCEEDED:
return operation_response[self._OPERATION_PROC_RES_FIELD_NAME]
elif operation_response[self._OPERATION_STATUS_FIELD_NAME] == \
self._OPERATION_STATUS_FAILED:
raise Exception('Operation Error: ' +
operation_response[self._OPERATION_MESSAGE_FIELD_NAME])
else:
time.sleep(self._OPERATION_STATUS_UPDATE_DELAY)
except:
logging.error('Error polling the operation status.')
raise
def _send_request(self, method, base_url, request_url, content_type_value, body=None):
"""Sends the request to the server then returns the response and the response body string.
Arguments:
method -- specifies whether the request is a GET or POST request
base_url -- the base url for the connection
request_url -- the request url for the connection
content_type_value -- the value of the content type field in the headers
body -- the body of the request (needed only in POST methods)
"""
try:
# Set the headers
headers = {self._CONTENT_TYPE_HEADER: content_type_value,
self._SUBSCRIPTION_KEY_HEADER: self._subscription_key}
# Start the connection
with closing(http.client.HTTPSConnection(base_url)) as conn:
# Send the request
conn.request(method, request_url, body, headers)
res = conn.getresponse()
message = res.read().decode('utf-8')
return res, message
except:
logging.error('Error sending the request.')
raise