Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
Update unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
xlz-jbleclere committed Jan 14, 2022
1 parent 3542d41 commit 252c050
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 61 deletions.
3 changes: 2 additions & 1 deletion python/accelize_drm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
import datetime

__version__ = "@ACCELIZEDRM_LONG_VERSION@"
__copyright__ = "Copyright 2018 Accelize"
__copyright__ = "Copyright %s Accelize" % datetime.date.today().year
__licence__ = "Apache 2.0"
__all__ = ['DrmManager', 'exceptions', 'get_api_version']

Expand Down
85 changes: 45 additions & 40 deletions tests/test_async_health.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,32 +34,28 @@ def test_health_period_disabled(accelize_drm, conf_json, cred_json,
conf_json['settings'].update(logfile.json)
conf_json.save()

drm_manager = accelize_drm.DrmManager(
conf_json.path, cred_json.path,
driver.read_register_callback,
driver.write_register_callback,
async_cb.callback
)

# Set initial context on the live server
nb_health = 2
healthPeriod = 2
context = {'cnt':0, 'healthPeriod':healthPeriod, 'nb_health':nb_health, 'exit':False}
set_context(context)
assert get_context() == context

drm_manager.activate()
try:
with accelize_drm.DrmManager(
conf_json.path, cred_json.path,
driver.read_register_callback,
driver.write_register_callback,
async_cb.callback
) as drm_manager:
drm_manager.activate()
assert drm_manager.get('health_period') == healthPeriod
wait_func_true(lambda: get_context()['exit'],
timeout=healthPeriod * (nb_health + 1) * 2)
assert drm_manager.get('health_period') == 0
assert get_context()['cnt'] == nb_health
sleep(healthPeriod + 1)
assert get_context()['cnt'] == nb_health
finally:
drm_manager.deactivate()
del drm_manager
log_content = logfile.read()
assert search(r'Exiting background thread which checks health', log_content, MULTILINE)
assert search(r'Health thread is disabled', log_content, MULTILINE)
Expand All @@ -73,7 +69,7 @@ def test_health_period_disabled(accelize_drm, conf_json, cred_json,

@pytest.mark.no_parallel
def test_health_period_modification(accelize_drm, conf_json, cred_json, async_handler,
live_server, request):
live_server, request, log_file_factory):
"""
Test the asynchronous health feature can be modified dynamically.
"""
Expand All @@ -83,28 +79,29 @@ def test_health_period_modification(accelize_drm, conf_json, cred_json, async_ha

conf_json.reset()
conf_json['licensing']['url'] = _request.url + request.function.__name__
logfile = log_file_factory.create(2)
conf_json['settings'].update(logfile.json)
conf_json.save()

with accelize_drm.DrmManager(
conf_json.path, cred_json.path,
driver.read_register_callback,
driver.write_register_callback,
async_cb.callback
) as drm_manager:

# Set initial context on the live server
nb_health = 4
healthPeriod = 2
healthRetry = 0 # no retry
healthRetrySleep = 1
context = {'data': list(),
'healthPeriod':healthPeriod,
'healthRetry':healthRetry,
'healthRetrySleep':healthRetrySleep
}
set_context(context)
assert get_context() == context
# Set initial context on the live server
nb_health = 4
healthPeriod = 2
healthRetry = 0 # no retry
healthRetrySleep = 1
context = {'data': list(),
'healthPeriod':healthPeriod,
'healthRetry':healthRetry,
'healthRetrySleep':healthRetrySleep
}
set_context(context)
assert get_context() == context

with accelize_drm.DrmManager(
conf_json.path, cred_json.path,
driver.read_register_callback,
driver.write_register_callback,
async_cb.callback
) as drm_manager:
drm_manager.activate()
wait_func_true(lambda: len(get_context()['data']) >= nb_health,
timeout=(healthPeriod+3) * (nb_health+2))
Expand All @@ -122,7 +119,7 @@ def test_health_period_modification(accelize_drm, conf_json, cred_json, async_ha

@pytest.mark.no_parallel
def test_health_retry_disabled(accelize_drm, conf_json, cred_json, async_handler,
live_server, request):
live_server, request, log_file_factory):
"""
Test the asynchronous health retry feature can be disabled.
"""
Expand All @@ -132,6 +129,8 @@ def test_health_retry_disabled(accelize_drm, conf_json, cred_json, async_handler

conf_json.reset()
conf_json['licensing']['url'] = _request.url + request.function.__name__
logfile = log_file_factory.create(2)
conf_json['settings'].update(logfile.json)
conf_json.save()

with accelize_drm.DrmManager(
Expand Down Expand Up @@ -173,7 +172,7 @@ def test_health_retry_disabled(accelize_drm, conf_json, cred_json, async_handler

@pytest.mark.no_parallel
def test_health_retry_modification(accelize_drm, conf_json, cred_json,
async_handler, live_server, request):
async_handler, live_server, request, log_file_factory):
"""
Test the asynchronous health retry can be modified dynamically.
"""
Expand All @@ -183,6 +182,8 @@ def test_health_retry_modification(accelize_drm, conf_json, cred_json,

conf_json.reset()
conf_json['licensing']['url'] = _request.url + request.function.__name__
logfile = log_file_factory.create(2)
conf_json['settings'].update(logfile.json)
conf_json.save()

healthPeriod = 3
Expand Down Expand Up @@ -233,7 +234,7 @@ def test_health_retry_modification(accelize_drm, conf_json, cred_json,

@pytest.mark.no_parallel
def test_health_retry_sleep_modification(accelize_drm, conf_json, cred_json,
async_handler, live_server, request):
async_handler, live_server, request, log_file_factory):
"""
Test the asynchronous health retry sleep value when changed dynamically.
"""
Expand All @@ -243,6 +244,8 @@ def test_health_retry_sleep_modification(accelize_drm, conf_json, cred_json,

conf_json.reset()
conf_json['licensing']['url'] = _request.url + request.function.__name__
logfile = log_file_factory.create(2)
conf_json['settings'].update(logfile.json)
conf_json.save()

healthPeriod = 3
Expand Down Expand Up @@ -295,7 +298,7 @@ def test_health_retry_sleep_modification(accelize_drm, conf_json, cred_json,
@pytest.mark.no_parallel
@pytest.mark.minimum
def test_health_metering_data(accelize_drm, conf_json, cred_json, async_handler,
live_server, ws_admin, request):
live_server, ws_admin, request, log_file_factory):
"""
Test the metering data returned to the web service is correct.
"""
Expand All @@ -308,6 +311,8 @@ def test_health_metering_data(accelize_drm, conf_json, cred_json, async_handler,

conf_json.reset()
conf_json['licensing']['url'] = _request.url + request.function.__name__
logfile = log_file_factory.create(2)
conf_json['settings'].update(logfile.json)
conf_json.save()

with accelize_drm.DrmManager(
Expand Down Expand Up @@ -356,7 +361,7 @@ def wait_and_check_on_next_health(drm):

@pytest.mark.no_parallel
def test_segment_index(accelize_drm, conf_json, cred_json, async_handler,
live_server, log_file_factory, request):
live_server, log_file_factory, request):
"""
Test the DRM Controller capacity to handle stressfully health and license requests
"""
Expand All @@ -366,7 +371,7 @@ def test_segment_index(accelize_drm, conf_json, cred_json, async_handler,

conf_json.reset()
conf_json['licensing']['url'] = _request.url + request.function.__name__
logfile = log_file_factory.create(1)
logfile = log_file_factory.create(2)
conf_json['settings'].update(logfile.json)
conf_json.save()

Expand Down Expand Up @@ -444,7 +449,7 @@ def test_async_call_on_pause_when_health_is_enabled(accelize_drm, conf_json, cre

conf_json.reset()
conf_json['licensing']['url'] = _request.url + 'test_async_call_on_pause_depending_on_health_status'
logfile = log_file_factory.create(1)
logfile = log_file_factory.create(2)
conf_json['settings'].update(logfile.json)
conf_json.save()

Expand Down Expand Up @@ -492,7 +497,7 @@ def test_async_call_on_pause_when_health_is_enabled(accelize_drm, conf_json, cre

@pytest.mark.no_parallel
def test_no_async_call_on_pause_when_health_is_disabled(accelize_drm, conf_json, cred_json,
async_handler, live_server, log_file_factory, request):
async_handler, live_server, log_file_factory, request):
"""
Test the DRM pause function does NOT perform a async request before pausing
"""
Expand All @@ -502,7 +507,7 @@ def test_no_async_call_on_pause_when_health_is_disabled(accelize_drm, conf_json,

conf_json.reset()
conf_json['licensing']['url'] = _request.url + 'test_async_call_on_pause_depending_on_health_status'
logfile = log_file_factory.create(1)
logfile = log_file_factory.create(2)
conf_json['settings'].update(logfile.json)
conf_json.save()

Expand Down
45 changes: 25 additions & 20 deletions tests/test_drm_license_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
@pytest.mark.skip(reason='Waiting a fix from LGDN')
@pytest.mark.no_parallel
def test_header_error_on_key(accelize_drm, conf_json, cred_json, async_handler,
live_server, request):
live_server, request, log_file_factory):
"""
Test a MAC error is returned if the key value in the response has been modified
"""
Expand All @@ -31,18 +31,21 @@ def test_header_error_on_key(accelize_drm, conf_json, cred_json, async_handler,

conf_json.reset()
conf_json['licensing']['url'] = _request.url + request.function.__name__
logfile = log_file_factory.create(1)
conf_json['settings'].update(logfile.json)
conf_json.save()

# Set initial context on the live server
context = {'cnt':0}
set_context(context)
assert get_context() == context

with accelize_drm.DrmManager(
conf_json.path, cred_json.path,
driver.read_register_callback,
driver.write_register_callback,
async_cb.callback
) as drm_manager:
# Set initial context on the live server
context = {'cnt':0}
set_context(context)
assert get_context() == context
# Check failure is detected
with pytest.raises(accelize_drm.exceptions.DRMCtlrError) as excinfo:
drm_manager.activate()
Expand All @@ -53,7 +56,7 @@ def test_header_error_on_key(accelize_drm, conf_json, cred_json, async_handler,

@pytest.mark.no_parallel
def test_header_error_on_licenseTimer(accelize_drm, conf_json, cred_json, async_handler,
live_server, request):
live_server, request, log_file_factory):
"""
Test a MAC error is returned if the licenseTimer value in the response has been modified
"""
Expand All @@ -72,18 +75,21 @@ def test_header_error_on_licenseTimer(accelize_drm, conf_json, cred_json, async_

conf_json.reset()
conf_json['licensing']['url'] = _request.url + request.function.__name__
logfile = log_file_factory.create(2)
conf_json['settings'].update(logfile.json)
conf_json.save()

# Set initial context on the live server
context = {'cnt':0}
set_context(context)
assert get_context() == context

with accelize_drm.DrmManager(
conf_json.path, cred_json.path,
driver.read_register_callback,
driver.write_register_callback,
async_cb.callback
) as drm_manager:
# Set initial context on the live server
context = {'cnt':0}
set_context(context)
assert get_context() == context
drm_manager.activate()
start = datetime.now()
lic_duration = drm_manager.get('license_duration')
Expand All @@ -95,14 +101,13 @@ def test_header_error_on_licenseTimer(accelize_drm, conf_json, cred_json, async_
activators.autotest(is_activated=False)
activators.autotest(is_activated=False)
assert async_cb.was_called
assert async_cb.message is not None
assert async_cb.errcode == accelize_drm.exceptions.DRMCtlrError.error_code
assert "License header check error" in async_cb.message
assert "License MAC check error" in async_cb.message


@pytest.mark.no_parallel
def test_session_id_error(accelize_drm, conf_json, cred_json, async_handler,
live_server, request):
live_server, request, log_file_factory):
"""
Test an error is returned if a wrong session id is provided
"""
Expand All @@ -116,20 +121,21 @@ def test_session_id_error(accelize_drm, conf_json, cred_json, async_handler,

conf_json.reset()
conf_json['licensing']['url'] = _request.url + request.function.__name__
logfile = log_file_factory.create(2)
conf_json['settings'].update(logfile.json)
conf_json.save()

# Set initial context on the live server
context = {'session_id':'0', 'session_cnt':0, 'request_cnt':0}
set_context(context)
assert get_context() == context

with accelize_drm.DrmManager(
conf_json.path, cred_json.path,
driver.read_register_callback,
driver.write_register_callback,
async_cb.callback
) as drm_manager:

# Set initial context on the live server
context = {'session_id':'0', 'session_cnt':0, 'request_cnt':0}
set_context(context)
assert get_context() == context

# Start session #1 to record
drm_manager.activate()
start = datetime.now()
Expand All @@ -156,6 +162,5 @@ def test_session_id_error(accelize_drm, conf_json, cred_json, async_handler,
activators.autotest(is_activated=False)
drm_manager.deactivate()
assert async_cb.was_called
assert async_cb.message is not None
assert async_cb.errcode == accelize_drm.exceptions.DRMCtlrError.error_code
assert "License header check error" in async_cb.message
35 changes: 35 additions & 0 deletions tests/test_trng_quality.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,38 @@ def test_dna_and_challenge_duplication(accelize_drm, conf_json, cred_json, async
if dna_score:
assert dna_score < DUPLICATE_THRESHOLD


@pytest.mark.no_parallel
def test_saas_challenge_quality(accelize_drm, conf_json, cred_json, async_handler,
log_file_factory):
nb_loop = 5
driver = accelize_drm.pytest_fpga_driver[0]
async_cb = async_handler.create()
async_cb.reset()
logfile = log_file_factory.create(1)
conf_json['settings'].update(logfile.json)
conf_json.save()
with accelize_drm.DrmManager(
conf_json.path,
cred_json.path,
driver.read_register_callback,
driver.write_register_callback,
async_cb.callback
) as drm_manager:
for i in range(nb_loop):
drm_manager.activate()
drm_manager.deactivate()
log_content = logfile.read()
challenge_list = list(findall(r'"saasChallenge" : "([a-fA-F0-9]{32})",', log_content))
challenge_set = set(challenge_list)
try:
assert len(challenge_list) == len( challenge_set), "Found duplicate saas challenge"
except AssertionError as e:
dupl_dict = {}
for e in challenge_set:
nb = challenge_list.count(e)
if nb > 1:
print(f'challenge {e} appears {nb} times')
raise
logfile.remove()
async_cb.assert_NoError()

0 comments on commit 252c050

Please sign in to comment.