diff --git a/doc/source/middlewarearchitecture.rst b/doc/source/middlewarearchitecture.rst index 803fbd905..894d40dd9 100644 --- a/doc/source/middlewarearchitecture.rst +++ b/doc/source/middlewarearchitecture.rst @@ -1,5 +1,5 @@ .. - Copyright 2011-2012 OpenStack, LLC + Copyright 2011-2013 OpenStack, LLC All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -188,7 +188,8 @@ Configuration Options the timeout when validating token by http). * ``auth_port``: (optional, default `35357`) the port used to validate tokens * ``auth_protocol``: (optional, default `https`) -* ``auth_uri``: (optional, defaults to `auth_protocol`://`auth_host`:`auth_port`) +* ``auth_uri``: (optional, defaults to + `auth_protocol`://`auth_host`:`auth_port`) * ``certfile``: (required, if Keystone server requires client cert) * ``keyfile``: (required, if Keystone server requires client cert) This can be the same as the certfile if the certfile includes the private key. @@ -232,22 +233,24 @@ Memcache Protection =================== When using memcached, we are storing user tokens and token validation -information into the cache as raw data. Which means anyone who have access -to the memcache servers can read and modify data stored there. To mitigate -this risk, ``auth_token`` middleware provides an option to either encrypt -or authenticate the token data stored in the cache. - -* ``memcache_security_strategy``: (optional) if defined, indicate whether token - data should be encrypted or authenticated. Acceptable values are ``ENCRYPT`` - or ``MAC``. If ``ENCRYPT``, token data is encrypted in the cache. If - ``MAC``, token data is authenticated (with HMAC) in the cache. If its value - is neither ``MAC`` nor ``ENCRYPT``, ``auth_token`` will raise an exception - on initialization. +information into the cache as raw data. Which means that anyone who +has access to the memcache servers can read and modify data stored +there. To mitigate this risk, ``auth_token`` middleware provides an +option to authenticate and optionally encrypt the token data stored in +the cache. + +* ``memcache_security_strategy``: (optional) if defined, indicate + whether token data should be authenticated or authenticated and + encrypted. Acceptable values are ``MAC`` or ``ENCRYPT``. If ``MAC``, + token data is authenticated (with HMAC) in the cache. If + ``ENCRYPT``, token data is encrypted and authenticated in the + cache. If the value is not one of these options or empty, + ``auth_token`` will raise an exception on initialization. * ``memcache_secret_key``: (optional, mandatory if - ``memcache_security_strategy`` is defined) if defined, - a random string to be used for key derivation. If - ``memcache_security_strategy`` is defined and ``memcache_secret_key`` is - absent, ``auth_token`` will raise an exception on initialization. + ``memcache_security_strategy`` is defined) this string is used for + key derivation. If ``memcache_security_strategy`` is defined and + ``memcache_secret_key`` is absent, ``auth_token`` will raise an + exception on initialization. Exchanging User Information =========================== diff --git a/keystoneclient/middleware/auth_token.py b/keystoneclient/middleware/auth_token.py index 7e3012cb8..e50f723c8 100644 --- a/keystoneclient/middleware/auth_token.py +++ b/keystoneclient/middleware/auth_token.py @@ -222,6 +222,7 @@ CONF.register_opts(opts, group='keystone_authtoken') LIST_OF_VERSIONS_TO_ATTEMPT = ['v2.0', 'v3.0'] +CACHE_KEY_TEMPLATE = 'tokens/%s' def will_expire_soon(expiry): @@ -847,91 +848,81 @@ def _get_header(self, env, key, default=None): env_key = self._header_to_env_var(key) return env.get(env_key, default) - def _protect_cache_value(self, token, data): - """ Encrypt or sign data if necessary. """ - try: - if self._memcache_security_strategy == 'ENCRYPT': - return memcache_crypt.encrypt_data(token, - self._memcache_secret_key, - data) - elif self._memcache_security_strategy == 'MAC': - return memcache_crypt.sign_data(token, data) - else: - return data - except: - msg = 'Failed to encrypt/sign cache data.' - self.LOG.exception(msg) - return data - - def _unprotect_cache_value(self, token, data): - """ Decrypt or verify signed data if necessary. """ - if data is None: - return data - - try: - if self._memcache_security_strategy == 'ENCRYPT': - return memcache_crypt.decrypt_data(token, - self._memcache_secret_key, - data) - elif self._memcache_security_strategy == 'MAC': - return memcache_crypt.verify_signed_data(token, data) - else: - return data - except: - msg = 'Failed to decrypt/verify cache data.' - self.LOG.exception(msg) - # this should have the same effect as data not found in cache - return None - - def _get_cache_key(self, token): - """ Return the cache key. - - Do not use clear token as key if memcache protection is on. - - """ - htoken = token - if self._memcache_security_strategy in ('ENCRYPT', 'MAC'): - derv_token = token + self._memcache_secret_key - htoken = memcache_crypt.hash_data(derv_token) - return 'tokens/%s' % htoken - - def _cache_get(self, token): + def _cache_get(self, token, ignore_expires=False): """Return token information from cache. If token is invalid raise InvalidUserToken return token only if fresh (not expired). """ + if self._cache and token: - key = self._get_cache_key(token) - cached = self._cache.get(key) - cached = self._unprotect_cache_value(token, cached) + if self._memcache_security_strategy is None: + key = CACHE_KEY_TEMPLATE % token + serialized = self._cache.get(key) + else: + keys = memcache_crypt.derive_keys( + token, + self._memcache_secret_key, + self._memcache_security_strategy) + cache_key = CACHE_KEY_TEMPLATE % ( + memcache_crypt.get_cache_key(keys)) + raw_cached = self._cache.get(cache_key) + try: + # unprotect_data will return None if raw_cached is None + serialized = memcache_crypt.unprotect_data(keys, + raw_cached) + except Exception: + msg = 'Failed to decrypt/verify cache data' + self.LOG.exception(msg) + # this should have the same effect as data not + # found in cache + serialized = None + + if serialized is None: + return None + + # Note that 'invalid' and (data, expires) are the only + # valid types of serialized cache entries, so there is not + # a collision with json.loads(serialized) == None. + cached = json.loads(serialized) if cached == 'invalid': self.LOG.debug('Cached Token %s is marked unauthorized', token) raise InvalidUserToken('Token authorization failed') - if cached: - data, expires = cached - if time.time() < float(expires): - self.LOG.debug('Returning cached token %s', token) - return data - else: - self.LOG.debug('Cached Token %s seems expired', token) - - def _cache_store(self, token, data, expires=None): - """ Store value into memcache. """ - key = self._get_cache_key(token) - data = self._protect_cache_value(token, data) - data_to_store = data - if expires: - data_to_store = (data, expires) + + data, expires = cached + if ignore_expires or time.time() < float(expires): + self.LOG.debug('Returning cached token %s', token) + return data + else: + self.LOG.debug('Cached Token %s seems expired', token) + + def _cache_store(self, token, data): + """ Store value into memcache. + + data may be the string 'invalid' or a tuple like (data, expires) + + """ + serialized_data = json.dumps(data) + if self._memcache_security_strategy is None: + cache_key = CACHE_KEY_TEMPLATE % token + data_to_store = serialized_data + else: + keys = memcache_crypt.derive_keys( + token, + self._memcache_secret_key, + self._memcache_security_strategy) + cache_key = CACHE_KEY_TEMPLATE % memcache_crypt.get_cache_key(keys) + data_to_store = memcache_crypt.protect_data(keys, serialized_data) + # we need to special-case set() because of the incompatibility between # Swift MemcacheRing and python-memcached. See # https://bugs.launchpad.net/swift/+bug/1095730 if self._use_keystone_cache: - self._cache.set(key, + self._cache.set(cache_key, data_to_store, time=self.token_cache_time) else: - self._cache.set(key, + self._cache.set(cache_key, data_to_store, timeout=self.token_cache_time) @@ -959,7 +950,7 @@ def _cache_put(self, token, data, expires): """ if self._cache: self.LOG.debug('Storing %s token in memcache', token) - self._cache_store(token, data, expires) + self._cache_store(token, (data, expires)) def _cache_store_invalid(self, token): """Store invalid token in cache.""" diff --git a/keystoneclient/middleware/memcache_crypt.py b/keystoneclient/middleware/memcache_crypt.py index 91e261da0..6cadf3ab9 100755 --- a/keystoneclient/middleware/memcache_crypt.py +++ b/keystoneclient/middleware/memcache_crypt.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2010-2012 OpenStack LLC +# Copyright 2010-2013 OpenStack LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,33 +18,34 @@ """ Utilities for memcache encryption and integrity check. -Data is serialized before been encrypted or MACed. Encryption have a -dependency on the pycrypto. If pycrypto is not available, -CryptoUnabailableError will be raised. +Data should be serialized before entering these functions. Encryption +has a dependency on the pycrypto. If pycrypto is not available, +CryptoUnavailableError will be raised. -Encrypted data stored in memcache are prefixed with '{ENCRYPT:AES256}'. - -MACed data stored in memcache are prefixed with '{MAC:SHA1}'. +This module will not be called unless signing or encryption is enabled +in the config. It will always validate signatures, and will decrypt +data if encryption is enabled. It is not valid to mix protection +modes. """ import base64 import functools import hashlib -import json +import hmac +import math import os -# make sure pycrypt is available +# make sure pycrypto is available try: from Crypto.Cipher import AES except ImportError: AES = None - -# prefix marker indicating data is HMACed (signed by a secret key) -MAC_MARKER = '{MAC:SHA1}' -# prefix marker indicating data is encrypted -ENCRYPT_MARKER = '{ENCRYPT:AES256}' +HASH_FUNCTION = hashlib.sha384 +DIGEST_LENGTH = HASH_FUNCTION().digest_size +DIGEST_SPLIT = DIGEST_LENGTH // 3 +DIGEST_LENGTH_B64 = 4 * int(math.ceil(DIGEST_LENGTH / 3.0)) class InvalidMacError(Exception): @@ -81,77 +82,121 @@ def wrapper(*args, **kwds): return wrapper -def generate_aes_key(token, secret): - """ Generates and returns a 256 bit AES key, based on sha256 hash. """ - return hashlib.sha256(token + secret).digest() - - -def compute_mac(token, serialized_data): - """ Computes and returns the base64 encoded MAC. """ - return hash_data(serialized_data + token) +def constant_time_compare(first, second): + """ Returns True if both string inputs are equal, otherwise False + This function should take a constant amount of time regardless of + how many characters in the strings match. -def hash_data(data): - """ Return the base64 encoded SHA1 hash of the data. """ - return base64.b64encode(hashlib.sha1(data).digest()) - - -def sign_data(token, data): - """ MAC the data using SHA1. """ - mac_data = {} - mac_data['serialized_data'] = json.dumps(data) - mac = compute_mac(token, mac_data['serialized_data']) - mac_data['mac'] = mac - md = MAC_MARKER + base64.b64encode(json.dumps(mac_data)) - return md + """ + if len(first) != len(second): + return False + result = 0 + for x, y in zip(first, second): + result |= ord(x) ^ ord(y) + return result == 0 + + +def derive_keys(token, secret, strategy): + """ Derives keys for MAC and ENCRYPTION from the user-provided + secret. The resulting keys should be passed to the protect and + unprotect functions. + + As suggested by NIST Special Publication 800-108, this uses the + first 128 bits from the sha384 KDF for the obscured cache key + value, the second 128 bits for the message authentication key and + the remaining 128 bits for the encryption key. + + This approach is faster than computing a separate hmac as the KDF + for each desired key. + """ + digest = hmac.new(secret, token + strategy, HASH_FUNCTION).digest() + return {'CACHE_KEY': digest[:DIGEST_SPLIT], + 'MAC': digest[DIGEST_SPLIT: 2 * DIGEST_SPLIT], + 'ENCRYPTION': digest[2 * DIGEST_SPLIT:], + 'strategy': strategy} -def verify_signed_data(token, data): - """ Verify data integrity by ensuring MAC is valid. """ - if data.startswith(MAC_MARKER): - try: - data = data[len(MAC_MARKER):] - mac_data = json.loads(base64.b64decode(data)) - mac = compute_mac(token, mac_data['serialized_data']) - if mac != mac_data['mac']: - raise InvalidMacError('invalid MAC; expect=%s, actual=%s' % - (mac_data['mac'], mac)) - return json.loads(mac_data['serialized_data']) - except: - raise InvalidMacError('invalid MAC; data appeared to be corrupted') - else: - # doesn't appear to be MACed data - return data +def sign_data(key, data): + """ Sign the data using the defined function and the derived key""" + mac = hmac.new(key, data, HASH_FUNCTION).digest() + return base64.b64encode(mac) @assert_crypto_availability -def encrypt_data(token, secret, data): - """ Encryptes the data with the given secret key. """ +def encrypt_data(key, data): + """ Encrypt the data with the given secret key. + + Padding is n bytes of the value n, where 1 <= n <= blocksize. + """ iv = os.urandom(16) - aes_key = generate_aes_key(token, secret) - cipher = AES.new(aes_key, AES.MODE_CFB, iv) - data = json.dumps(data) - encoded_data = base64.b64encode(iv + cipher.encrypt(data)) - encoded_data = ENCRYPT_MARKER + encoded_data - return encoded_data + cipher = AES.new(key, AES.MODE_CBC, iv) + padding = 16 - len(data) % 16 + return iv + cipher.encrypt(data + chr(padding) * padding) @assert_crypto_availability -def decrypt_data(token, secret, data): +def decrypt_data(key, data): """ Decrypt the data with the given secret key. """ - if data.startswith(ENCRYPT_MARKER): - try: - # encrypted data - encoded_data = data[len(ENCRYPT_MARKER):] - aes_key = generate_aes_key(token, secret) - decoded_data = base64.b64decode(encoded_data) - iv = decoded_data[:16] - encrypted_data = decoded_data[16:] - cipher = AES.new(aes_key, AES.MODE_CFB, iv) - decrypted_data = cipher.decrypt(encrypted_data) - return json.loads(decrypted_data) - except: - raise DecryptError('data appeared to be corrupted') - else: - # doesn't appear to be encrypted data - return data + iv = data[:16] + cipher = AES.new(key, AES.MODE_CBC, iv) + try: + result = cipher.decrypt(data[16:]) + except Exception: + raise DecryptError('Encrypted data appears to be corrupted.') + + # Strip the last n padding bytes where n is the last value in + # the plaintext + padding = ord(result[-1]) + return result[:-1 * padding] + + +def protect_data(keys, data): + """ Given keys and serialized data, returns an appropriately + protected string suitable for storage in the cache. + + """ + if keys['strategy'] == 'ENCRYPT': + data = encrypt_data(keys['ENCRYPTION'], data) + + encoded_data = base64.b64encode(data) + + signature = sign_data(keys['MAC'], encoded_data) + return signature + encoded_data + + +def unprotect_data(keys, signed_data): + """ Given keys and cached string data, verifies the signature, + decrypts if necessary, and returns the original serialized data. + + """ + # cache backends return None when no data is found. We don't mind + # that this particular special value is unsigned. + if signed_data is None: + return None + + # First we calculate the signature + provided_mac = signed_data[:DIGEST_LENGTH_B64] + calculated_mac = sign_data( + keys['MAC'], + signed_data[DIGEST_LENGTH_B64:]) + + # Then verify that it matches the provided value + if not constant_time_compare(provided_mac, calculated_mac): + raise InvalidMacError('Invalid MAC; data appears to be corrupted.') + + data = base64.b64decode(signed_data[DIGEST_LENGTH_B64:]) + + # then if necessary decrypt the data + if keys['strategy'] == 'ENCRYPT': + data = decrypt_data(keys['ENCRYPTION'], data) + + return data + + +def get_cache_key(keys): + """ Given keys generated by derive_keys(), returns a base64 + encoded value suitable for use as a cache key in memcached. + + """ + return base64.b64encode(keys['CACHE_KEY']) diff --git a/tests/test_auth_token_middleware.py b/tests/test_auth_token_middleware.py index 06054d088..a4285043c 100644 --- a/tests/test_auth_token_middleware.py +++ b/tests/test_auth_token_middleware.py @@ -28,7 +28,6 @@ from keystoneclient.common import cms from keystoneclient import utils from keystoneclient.middleware import auth_token -from keystoneclient.middleware import memcache_crypt from keystoneclient.openstack.common import memorycache from keystoneclient.openstack.common import jsonutils from keystoneclient.openstack.common import timeutils @@ -1013,9 +1012,7 @@ def test_request_blank_token(self): def _get_cached_token(self, token): token_id = cms.cms_hash_token(token) # NOTE(vish): example tokens are expired so skip the expiration check. - key = self.middleware._get_cache_key(token_id) - cached = self.middleware._cache.get(key) - return self.middleware._unprotect_cache_value(token, cached) + return self.middleware._cache_get(token_id, ignore_expires=True) def test_memcache(self): req = webob.Request.blank('/') @@ -1036,7 +1033,8 @@ def test_memcache_set_invalid(self): token = 'invalid-token' req.headers['X-Auth-Token'] = token self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self._get_cached_token(token), "invalid") + self.assertRaises(auth_token.InvalidUserToken, + self._get_cached_token, token) def test_memcache_set_expired(self): token_cache_time = 10 @@ -1096,18 +1094,11 @@ def test_encrypt_cache_data(self): 'memcache_secret_key': 'mysecret' } self.set_middleware(conf=conf) - encrypted_data = self.middleware._protect_cache_value( - 'token', TOKEN_RESPONSES[self.token_dict['uuid_token_default']]) - self.assertEqual('{ENCRYPT:AES256}', encrypted_data[:16]) - self.assertEqual( - TOKEN_RESPONSES[self.token_dict['uuid_token_default']], - self.middleware._unprotect_cache_value('token', encrypted_data)) - # should return None if unable to decrypt - self.assertIsNone( - self.middleware._unprotect_cache_value( - 'token', '{ENCRYPT:AES256}corrupted')) - self.assertIsNone( - self.middleware._unprotect_cache_value('mykey', encrypted_data)) + token = 'my_token' + data = ('this_data', 10e100) + self.middleware._init_cache({}) + self.middleware._cache_store(token, data) + self.assertEqual(self.middleware._cache_get(token), data[0]) def test_sign_cache_data(self): conf = { @@ -1119,19 +1110,11 @@ def test_sign_cache_data(self): 'memcache_secret_key': 'mysecret' } self.set_middleware(conf=conf) - signed_data = self.middleware._protect_cache_value( - 'mykey', TOKEN_RESPONSES[self.token_dict['uuid_token_default']]) - expected = '{MAC:SHA1}' - self.assertEqual( - signed_data[:10], - expected) - self.assertEqual( - TOKEN_RESPONSES[self.token_dict['uuid_token_default']], - self.middleware._unprotect_cache_value('mykey', signed_data)) - # should return None on corrupted data - self.assertIsNone( - self.middleware._unprotect_cache_value('mykey', - '{MAC:SHA1}corrupted')) + token = 'my_token' + data = ('this_data', 10e100) + self.middleware._init_cache({}) + self.middleware._cache_store(token, data) + self.assertEqual(self.middleware._cache_get(token), data[0]) def test_no_memcache_protection(self): conf = { @@ -1142,47 +1125,11 @@ def test_no_memcache_protection(self): 'memcache_secret_key': 'mysecret' } self.set_middleware(conf=conf) - data = self.middleware._protect_cache_value('mykey', - 'This is a test!') - self.assertEqual(data, 'This is a test!') - self.assertEqual( - 'This is a test!', - self.middleware._unprotect_cache_value('mykey', data)) - - def test_get_cache_key(self): - conf = { - 'auth_host': 'keystone.example.com', - 'auth_port': 1234, - 'auth_admin_prefix': '/testadmin', - 'memcache_servers': ['localhost:11211'], - 'memcache_secret_key': 'mysecret' - } - self.set_middleware(conf=conf) - self.assertEqual( - 'tokens/mytoken', - self.middleware._get_cache_key('mytoken')) - conf = { - 'auth_host': 'keystone.example.com', - 'auth_port': 1234, - 'auth_admin_prefix': '/testadmin', - 'memcache_servers': ['localhost:11211'], - 'memcache_security_strategy': 'mac', - 'memcache_secret_key': 'mysecret' - } - self.set_middleware(conf=conf) - expected = 'tokens/' + memcache_crypt.hash_data('mytoken' + 'mysecret') - self.assertEqual(self.middleware._get_cache_key('mytoken'), expected) - conf = { - 'auth_host': 'keystone.example.com', - 'auth_port': 1234, - 'auth_admin_prefix': '/testadmin', - 'memcache_servers': ['localhost:11211'], - 'memcache_security_strategy': 'Encrypt', - 'memcache_secret_key': 'abc!' - } - self.set_middleware(conf=conf) - expected = 'tokens/' + memcache_crypt.hash_data('mytoken' + 'abc!') - self.assertEqual(self.middleware._get_cache_key('mytoken'), expected) + token = 'my_token' + data = ('this_data', 10e100) + self.middleware._init_cache({}) + self.middleware._cache_store(token, data) + self.assertEqual(self.middleware._cache_get(token), data[0]) def test_assert_valid_memcache_protection_config(self): # test missing memcache_secret_key diff --git a/tests/test_memcache_crypt.py b/tests/test_memcache_crypt.py index b2281d935..524cd21da 100644 --- a/tests/test_memcache_crypt.py +++ b/tests/test_memcache_crypt.py @@ -4,48 +4,66 @@ class MemcacheCryptPositiveTests(testtools.TestCase): - def test_generate_aes_key(self): - self.assertEqual( - len(memcache_crypt.generate_aes_key('Gimme Da Key', 'hush')), 32) + def _setup_keys(self, strategy): + return memcache_crypt.derive_keys('token', 'secret', strategy) - def test_compute_mac(self): - self.assertEqual( - memcache_crypt.compute_mac('mykey', 'This is a test!'), - 'tREu41yR5tEgeBWIuv9ag4AeKA8=') + def test_constant_time_compare(self): + # make sure it works as a compare, the "constant time" aspect + # isn't appropriate to test in unittests + ctc = memcache_crypt.constant_time_compare + self.assertTrue(ctc('abcd', 'abcd')) + self.assertTrue(ctc('', '')) + self.assertFalse(ctc('abcd', 'efgh')) + self.assertFalse(ctc('abc', 'abcd')) + self.assertFalse(ctc('abc', 'abc\x00')) + self.assertFalse(ctc('', 'abc')) + + def test_derive_keys(self): + keys = memcache_crypt.derive_keys('token', 'secret', 'strategy') + self.assertEqual(len(keys['ENCRYPTION']), + len(keys['CACHE_KEY'])) + self.assertEqual(len(keys['CACHE_KEY']), + len(keys['MAC'])) + self.assertNotEqual(keys['ENCRYPTION'], + keys['MAC']) + self.assertIn('strategy', keys.keys()) + + def test_key_strategy_diff(self): + k1 = self._setup_keys('MAC') + k2 = self._setup_keys('ENCRYPT') + self.assertNotEqual(k1, k2) def test_sign_data(self): - expected = '{MAC:SHA1}eyJtYWMiOiAiM0FrQmdPZHRybGo1RFFESHA1eUxqcDVq' +\ - 'Si9BPSIsICJzZXJpYWxpemVkX2RhdGEiOiAiXCJUaGlzIGlzIGEgdG' +\ - 'VzdCFcIiJ9' - self.assertEqual( - memcache_crypt.sign_data('mykey', 'This is a test!'), - expected) - - def test_verify_signed_data(self): - signed = memcache_crypt.sign_data('mykey', 'Testz') - self.assertEqual( - memcache_crypt.verify_signed_data('mykey', signed), - 'Testz') - self.assertEqual( - memcache_crypt.verify_signed_data('aasSFWE13WER', 'not MACed'), - 'not MACed') - - def test_encrypt_data(self): - expected = '{ENCRYPT:AES256}' - self.assertEqual( - memcache_crypt.encrypt_data('mykey', 'mysecret', - 'This is a test!')[:16], - expected) - - def test_decrypt_data(self): - encrypted = memcache_crypt.encrypt_data('mykey', 'mysecret', 'Testz') - self.assertEqual( - memcache_crypt.decrypt_data('mykey', 'mysecret', encrypted), - 'Testz') - self.assertEqual( - memcache_crypt.decrypt_data('mykey', 'mysecret', - 'Not Encrypted!'), - 'Not Encrypted!') + keys = self._setup_keys('MAC') + sig = memcache_crypt.sign_data(keys['MAC'], 'data') + self.assertEqual(len(sig), memcache_crypt.DIGEST_LENGTH_B64) + + def test_encryption(self): + keys = self._setup_keys('ENCRYPT') + # what you put in is what you get out + for data in ['data', '1234567890123456', '\x00\xFF' * 13 + ] + [chr(x % 256) * x for x in range(768)]: + crypt = memcache_crypt.encrypt_data(keys['ENCRYPTION'], data) + decrypt = memcache_crypt.decrypt_data(keys['ENCRYPTION'], crypt) + self.assertEqual(data, decrypt) + self.assertRaises(memcache_crypt.DecryptError, + memcache_crypt.decrypt_data, + keys['ENCRYPTION'], crypt[:-1]) + + def test_protect_wrappers(self): + data = 'My Pretty Little Data' + for strategy in ['MAC', 'ENCRYPT']: + keys = self._setup_keys(strategy) + protected = memcache_crypt.protect_data(keys, data) + self.assertNotEqual(protected, data) + if strategy == 'ENCRYPT': + self.assertNotIn(data, protected) + unprotected = memcache_crypt.unprotect_data(keys, protected) + self.assertEqual(data, unprotected) + self.assertRaises(memcache_crypt.InvalidMacError, + memcache_crypt.unprotect_data, + keys, protected[:-1]) + self.assertIsNone(memcache_crypt.unprotect_data(keys, None)) def test_no_pycrypt(self): aes = memcache_crypt.AES