Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
Update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
xlz-jbleclere committed Jan 21, 2022
1 parent 32bf9b3 commit 995c1ef
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 98 deletions.
4 changes: 2 additions & 2 deletions tests/test_drm_license_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def test_header_error_on_licenseTimer(accelize_drm, conf_json, cred_json, async_
activators.autotest(is_activated=False)
assert async_cb.was_called
assert async_cb.errcode == accelize_drm.exceptions.DRMCtlrError.error_code
assert "License MAC check error" in async_cb.message
assert search(r"License (header|MAC) check error", async_cb.message)


@pytest.mark.no_parallel
Expand Down Expand Up @@ -163,4 +163,4 @@ def test_session_id_error(accelize_drm, conf_json, cred_json, async_handler,
drm_manager.deactivate()
assert async_cb.was_called
assert async_cb.errcode == accelize_drm.exceptions.DRMCtlrError.error_code
assert "License header check error" in async_cb.message
assert search(r"License (header|MAC) check error", async_cb.message)
177 changes: 81 additions & 96 deletions tests/test_retry_mechanism.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,19 @@ def test_api_retry_disabled(accelize_drm, conf_json, cred_json, async_handler,
logfile = log_file_factory.create(2)
conf_json['settings'].update(logfile.json)
conf_json.save()
drm_manager = accelize_drm.DrmManager(
conf_json.path,
cred_json.path,
driver.read_register_callback,
driver.write_register_callback,
async_cb.callback
)
assert not drm_manager.get('license_status')
start = datetime.now()
with pytest.raises(accelize_drm.exceptions.DRMWSMayRetry) as excinfo:
drm_manager.activate()
end = datetime.now()
del drm_manager
assert (end - start).total_seconds() < 1
with accelize_drm.DrmManager(
conf_json.path,
cred_json.path,
driver.read_register_callback,
driver.write_register_callback,
async_cb.callback
) as drm_manager:
assert not drm_manager.get('license_status')
start = datetime.now()
with pytest.raises(accelize_drm.exceptions.DRMWSMayRetry) as excinfo:
drm_manager.activate()
end = datetime.now()
assert (end - start).total_seconds() < 2
log_content = logfile.read()
assert search(r'\[\s*critical\s*\]\s*\d+\s*,\s*\[errCode=\d+\]\s*Metering Web Service error 408',
log_content, IGNORECASE)
Expand All @@ -73,26 +72,25 @@ def test_api_retry_enabled(accelize_drm, conf_json, cred_json, async_handler,
logfile = log_file_factory.create(2)
conf_json['settings'].update(logfile.json)
conf_json.save()
drm_manager = accelize_drm.DrmManager(
conf_json.path,
cred_json.path,
driver.read_register_callback,
driver.write_register_callback,
async_cb.callback
)
api_retry_duration = 10
drm_manager.set(ws_api_retry_duration=api_retry_duration)
retry_duration = drm_manager.get('ws_api_retry_duration')
assert retry_duration == api_retry_duration
assert not drm_manager.get('license_status')
retry_sleep = drm_manager.get('ws_retry_period_short')
nb_attempts_expected = retry_duration / retry_sleep
assert nb_attempts_expected > 1
start = datetime.now()
with pytest.raises(accelize_drm.exceptions.DRMWSTimedOut) as excinfo:
drm_manager.activate()
end = datetime.now()
del drm_manager
with accelize_drm.DrmManager(
conf_json.path,
cred_json.path,
driver.read_register_callback,
driver.write_register_callback,
async_cb.callback
) as drm_manager:
drm_manager.set(ws_api_retry_duration=api_retry_duration)
retry_duration = drm_manager.get('ws_api_retry_duration')
assert retry_duration == api_retry_duration
assert not drm_manager.get('license_status')
retry_sleep = drm_manager.get('ws_retry_period_short')
nb_attempts_expected = retry_duration / retry_sleep
assert nb_attempts_expected > 1
start = datetime.now()
with pytest.raises(accelize_drm.exceptions.DRMWSTimedOut) as excinfo:
drm_manager.activate()
end = datetime.now()
total_seconds = int((end - start).total_seconds())
assert retry_duration - 1 <= total_seconds <= retry_duration + 1
log_content = logfile.read()
Expand Down Expand Up @@ -131,27 +129,23 @@ def test_long_to_short_retry_switch_on_authentication(accelize_drm, conf_json,
conf_json['settings']['ws_retry_period_long'] = retryLongPeriod
conf_json.save()

drm_manager = accelize_drm.DrmManager(
conf_json.path,
cred_json.path,
driver.read_register_callback,
driver.write_register_callback,
async_cb.callback
)

context = {'data':list(),
'cnt':0,
'expires_in':expires_in,
'timeoutSecond':timeoutSecond,
'exit': False
}
set_context(context)
assert get_context() == context

try:
with accelize_drm.DrmManager(
conf_json.path,
cred_json.path,
driver.read_register_callback,
driver.write_register_callback,
async_cb.callback
) as drm_manager:
context = {'data':list(),
'cnt':0,
'expires_in':expires_in,
'timeoutSecond':timeoutSecond,
'exit': False
}
set_context(context)
assert get_context() == context
drm_manager.activate()
wait_func_true(lambda: async_cb.was_called, timeout=timeoutSecond*2)
finally:
context = get_context()
context['exit'] = True
set_context(context)
Expand Down Expand Up @@ -197,27 +191,24 @@ def test_long_to_short_retry_switch_on_license(accelize_drm, conf_json, cred_jso
conf_json['settings']['ws_retry_period_long'] = retryLongPeriod
conf_json.save()

drm_manager = accelize_drm.DrmManager(
conf_json.path,
cred_json.path,
driver.read_register_callback,
driver.write_register_callback,
async_cb.callback
)

context = {'data':list(),
'cnt':0,
'timeoutSecond':timeoutSecond
}
set_context(context)
assert get_context() == context

drm_manager.activate()
lic_duration = drm_manager.get('license_duration')
try:
with accelize_drm.DrmManager(
conf_json.path,
cred_json.path,
driver.read_register_callback,
driver.write_register_callback,
async_cb.callback
) as drm_manager:
drm_manager.activate()
lic_duration = drm_manager.get('license_duration')
wait_func_true(lambda: async_cb.was_called,
timeout= lic_duration + 2*timeoutSecond)
finally:
timeout= lic_duration + 2*timeoutSecond)
drm_manager.deactivate()
assert async_cb.was_called
assert 'Timeout on License' in async_cb.message
Expand Down Expand Up @@ -257,26 +248,24 @@ def test_api_retry_on_lost_connection(accelize_drm, conf_json, cred_json, async_
conf_json['settings'].update(logfile.json)
conf_json.save()

drm_manager = accelize_drm.DrmManager(
conf_json.path,
cred_json.path,
driver.read_register_callback,
driver.write_register_callback,
async_cb.callback
)
retry_duration = drm_manager.get('ws_api_retry_duration')
retry_timeout = drm_manager.get('ws_request_timeout')
retry_sleep = drm_manager.get('ws_retry_period_short')
nb_attempts_expected = int(retry_duration / (retry_timeout + retry_sleep)) + 1
assert nb_attempts_expected > 1

context = {'data': list(),
'sleep':retry_timeout + 1}
set_context(context)
assert get_context() == context

with pytest.raises(accelize_drm.exceptions.DRMWSTimedOut) as excinfo:
drm_manager.activate()
with accelize_drm.DrmManager(
conf_json.path,
cred_json.path,
driver.read_register_callback,
driver.write_register_callback,
async_cb.callback
) as drm_manager:
retry_duration = drm_manager.get('ws_api_retry_duration')
retry_timeout = drm_manager.get('ws_request_timeout')
retry_sleep = drm_manager.get('ws_retry_period_short')
nb_attempts_expected = int(retry_duration / (retry_timeout + retry_sleep)) + 1
assert nb_attempts_expected > 1
context = {'data': list(),
'sleep':retry_timeout + 1}
set_context(context)
assert get_context() == context
with pytest.raises(accelize_drm.exceptions.DRMWSTimedOut) as excinfo:
drm_manager.activate()
assert async_handler.get_error_code(str(excinfo.value)) == accelize_drm.exceptions.DRMWSTimedOut.error_code
m = search(r'Timeout on License request after (\d+) attempts', str(excinfo.value))
assert m is not None
Expand Down Expand Up @@ -328,27 +317,23 @@ def test_thread_retry_on_lost_connection(accelize_drm, conf_json, cred_json, asy
conf_json['settings'].update(logfile.json)
conf_json.save()

drm_manager = accelize_drm.DrmManager(
conf_json.path,
cred_json.path,
driver.read_register_callback,
driver.write_register_callback,
async_cb.callback
)

context = {'cnt':0,
'timeoutSecond':licDuration
}
set_context(context)
assert get_context() == context

try:
with accelize_drm.DrmManager(
conf_json.path,
cred_json.path,
driver.read_register_callback,
driver.write_register_callback,
async_cb.callback
) as drm_manager:
drm_manager.activate()
wait_func_true(lambda: async_cb.was_called,
timeout=licDuration*2)
finally:
drm_manager.deactivate()
del drm_manager
assert async_cb.was_called
assert async_cb.errcode == accelize_drm.exceptions.DRMWSTimedOut.error_code
m = search(r'Timeout on License request after (\d+) attempts', async_cb.message)
Expand Down

0 comments on commit 995c1ef

Please sign in to comment.