diff --git a/tests/test_filelock.py b/tests/test_filelock.py index 41fd29b..d379595 100644 --- a/tests/test_filelock.py +++ b/tests/test_filelock.py @@ -1,392 +1,275 @@ -#!/usr/bin/env python - -# This is free and unencumbered software released into the public domain. -# -# Anyone is free to copy, modify, publish, use, compile, sell, or -# distribute this software, either in source code form or as a compiled -# binary, for any purpose, commercial or non-commercial, and by any -# means. -# -# In jurisdictions that recognize copyright laws, the author or authors -# of this software dedicate any and all copyright interest in the -# software to the public domain. We make this dedication for the benefit -# of the public at large and to the detriment of our heirs and -# successors. We intend this dedication to be an overt act of -# relinquishment in perpetuity of all present and future rights to this -# software under copyright law. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. -# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR -# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, -# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR -# OTHER DEALINGS IN THE SOFTWARE. -# -# For more information, please refer to - - -""" -Some tests for the file lock. -""" - -import errno -import os +from __future__ import unicode_literals + import sys import threading -import unittest -import filelock +import pytest -PY2 = sys.version_info[0] == 2 -PY3 = sys.version_info[0] == 3 +from filelock import FileLock, SoftFileLock, Timeout -class ExThread(threading.Thread): - def __init__(self, *args, **kargs): - threading.Thread.__init__(self, *args, **kargs) - self.ex = None - return None +@pytest.mark.parametrize("lock_type", [FileLock, SoftFileLock]) +def test_simple(lock_type, tmp_path): + lock_path = tmp_path / "a" + lock = lock_type(str(lock_path)) - def run(self): - try: - threading.Thread.run(self) - except: - self.ex = sys.exc_info() - return None + with lock as locked: + assert lock.is_locked + assert lock is locked + assert not lock.is_locked - def join(self): - threading.Thread.join(self) - if self.ex is not None: - if PY3: - raise self.ex[0].with_traceback(self.ex[1], self.ex[2]) - elif PY2: - wrapper_ex = self.ex[1] - raise (wrapper_ex.__class__, wrapper_ex, self.ex[2]) - return None +@pytest.mark.parametrize("lock_type", [FileLock, SoftFileLock]) +def test_nested_context_manager(lock_type, tmp_path): + # lock is not released before the most outer with statement that locked the lock, is left + lock_path = tmp_path / "a" + lock = lock_type(str(lock_path)) -class BaseTest(object): - """ - Base class for all filelock tests. - """ + with lock as lock_1: + assert lock.is_locked + assert lock is lock_1 - # The filelock type (class), which is tested. - LOCK_TYPE = None + with lock as lock_2: + assert lock.is_locked + assert lock is lock_2 - # The path to the lockfile. - LOCK_PATH = "test.lock" + with lock as lock_3: + assert lock.is_locked + assert lock is lock_3 - def setUp(self): - """Deletes the potential lock file at :attr:`LOCK_PATH`.""" - try: - os.remove(self.LOCK_PATH) - except OSError as e: - # FileNotFound - if e.errno != errno.ENOENT: - raise - return None - - def tearDown(self): - """Deletes the potential lock file at :attr:`LOCK_PATH`.""" - try: - os.remove(self.LOCK_PATH) - except OSError as e: - # FileNotFound - if e.errno != errno.ENOENT: - raise - return None - - def test_simple(self): - """ - Asserts that the lock is locked in a context statement and that the - return value of the *__enter__* method is the lock. - """ - lock = self.LOCK_TYPE(self.LOCK_PATH) - - with lock as l: - self.assertTrue(lock.is_locked) - self.assertTrue(lock is l) - self.assertFalse(lock.is_locked) - return None - - def test_nested(self): - """ - Asserts, that the lock is not released before the most outer with - statement that locked the lock, is left. - """ - lock = self.LOCK_TYPE(self.LOCK_PATH) - - with lock as l1: - self.assertTrue(lock.is_locked) - self.assertTrue(lock is l1) - - with lock as l2: - self.assertTrue(lock.is_locked) - self.assertTrue(lock is l2) - - with lock as l3: - self.assertTrue(lock.is_locked) - self.assertTrue(lock is l3) - - self.assertTrue(lock.is_locked) - self.assertTrue(lock.is_locked) - self.assertFalse(lock.is_locked) - return None - - def test_nested1(self): - """ - The same as *test_nested*, but this method uses the *acquire()* method - to create the lock, rather than the implicit *__enter__* method. - """ - lock = self.LOCK_TYPE(self.LOCK_PATH) - - with lock.acquire() as l1: - self.assertTrue(lock.is_locked) - self.assertTrue(lock is l1) - - with lock.acquire() as l2: - self.assertTrue(lock.is_locked) - self.assertTrue(lock is l2) - - with lock.acquire() as l3: - self.assertTrue(lock.is_locked) - self.assertTrue(lock is l3) - - self.assertTrue(lock.is_locked) - self.assertTrue(lock.is_locked) - self.assertFalse(lock.is_locked) - return None - - def test_nested_forced_release(self): - """ - Acquires the lock using a with-statement and releases the lock - before leaving the with-statement. - """ - lock = self.LOCK_TYPE(self.LOCK_PATH) - - with lock: - self.assertTrue(lock.is_locked) - - lock.acquire() - self.assertTrue(lock.is_locked) - - lock.release(force=True) - self.assertFalse(lock.is_locked) - self.assertFalse(lock.is_locked) - return None - - def test_threaded(self): - """ - Runs 250 threads, which need the filelock. The lock must be acquired - if at least one thread required it and released, as soon as all threads - stopped. - """ - lock = self.LOCK_TYPE(self.LOCK_PATH) - - def my_thread(): - for i in range(100): - with lock: - self.assertTrue(lock.is_locked) - return None - - NUM_THREADS = 250 - - threads = [ExThread(target=my_thread) for i in range(NUM_THREADS)] - for thread in threads: - thread.start() - for thread in threads: - thread.join() - - self.assertFalse(lock.is_locked) - return None - - def test_threaded1(self): - """ - Runs multiple threads, which acquire the same lock file with a different - FileLock object. When thread group 1 acquired the lock, thread group 2 - must not hold their lock. - """ - - def thread1(): - """ - Requires lock1. - """ - for i in range(1000): - with lock1: - self.assertTrue(lock1.is_locked) - self.assertFalse(lock2.is_locked) # FIXME (Filelock) - return None - - def thread2(): - """ - Requires lock2. - """ - for i in range(1000): - with lock2: - self.assertFalse(lock1.is_locked) # FIXME (FileLock) - self.assertTrue(lock2.is_locked) - return None - - NUM_THREADS = 10 - - lock1 = self.LOCK_TYPE(self.LOCK_PATH) - lock2 = self.LOCK_TYPE(self.LOCK_PATH) - - threads1 = [ExThread(target=thread1) for i in range(NUM_THREADS)] - threads2 = [ExThread(target=thread2) for i in range(NUM_THREADS)] - - for i in range(NUM_THREADS): - threads1[i].start() - threads2[i].start() - for i in range(NUM_THREADS): - threads1[i].join() - threads2[i].join() - - self.assertFalse(lock1.is_locked) - self.assertFalse(lock2.is_locked) - return None - - def test_timeout(self): - """ - Tests if the lock raises a TimeOut error, when it can not be acquired. - """ - lock1 = self.LOCK_TYPE(self.LOCK_PATH) - lock2 = self.LOCK_TYPE(self.LOCK_PATH) - - # Acquire lock 1. - lock1.acquire() - self.assertTrue(lock1.is_locked) - self.assertFalse(lock2.is_locked) - - # Try to acquire lock 2. - self.assertRaises(filelock.Timeout, lock2.acquire, timeout=1) # FIXME (Filelock) - self.assertFalse(lock2.is_locked) - self.assertTrue(lock1.is_locked) - - # Release lock 1. - lock1.release() - self.assertFalse(lock1.is_locked) - self.assertFalse(lock2.is_locked) - return None - - def test_default_timeout(self): - """ - Test if the default timeout parameter works. - """ - lock1 = self.LOCK_TYPE(self.LOCK_PATH) - lock2 = self.LOCK_TYPE(self.LOCK_PATH, timeout=1) - - self.assertEqual(lock2.timeout, 1) - - # Acquire lock 1. - lock1.acquire() - self.assertTrue(lock1.is_locked) - self.assertFalse(lock2.is_locked) - - # Try to acquire lock 2. - self.assertRaises(filelock.Timeout, lock2.acquire) # FIXME (SoftFileLock) - self.assertFalse(lock2.is_locked) - self.assertTrue(lock1.is_locked) - - lock2.timeout = 0 - self.assertEqual(lock2.timeout, 0) - - self.assertRaises(filelock.Timeout, lock2.acquire) - self.assertFalse(lock2.is_locked) - self.assertTrue(lock1.is_locked) - - # Release lock 1. - lock1.release() - self.assertFalse(lock1.is_locked) - self.assertFalse(lock2.is_locked) - return None - - def test_context(self): - """ - Tests, if the filelock is released, when an exception is thrown in - a with-statement. - """ - lock = self.LOCK_TYPE(self.LOCK_PATH) + assert lock.is_locked + assert lock.is_locked + assert not lock.is_locked - try: - with lock as lock1: - self.assertIs(lock, lock1) - self.assertTrue(lock.is_locked) - raise Exception() - except: - self.assertFalse(lock.is_locked) - return None - - def test_context1(self): - """ - The same as *test_context1()*, but uses the *acquire()* method. - """ - lock = self.LOCK_TYPE(self.LOCK_PATH) - try: - with lock.acquire() as lock1: - self.assertIs(lock, lock1) - self.assertTrue(lock.is_locked) - raise Exception() - except: - self.assertFalse(lock.is_locked) - return None - - @unittest.skipIf(hasattr(sys, "pypy_version_info"), "del() does not trigger GC in PyPy") - def test_del(self): - """ - Tests, if the lock is released, when the object is deleted. - """ - lock1 = self.LOCK_TYPE(self.LOCK_PATH) - lock2 = self.LOCK_TYPE(self.LOCK_PATH) - - # Acquire lock 1. - lock1.acquire() - self.assertTrue(lock1.is_locked) - self.assertFalse(lock2.is_locked) - - # Try to acquire lock 2. - self.assertRaises(filelock.Timeout, lock2.acquire, timeout=1) # FIXME (SoftFileLock) - - # Delete lock 1 and try to acquire lock 2 again. - del lock1 +@pytest.mark.parametrize("lock_type", [FileLock, SoftFileLock]) +def test_nested_acquire(lock_type, tmp_path): + # lock is not released before the most outer with statement that locked the lock, is left + lock_path = tmp_path / "a" + lock = lock_type(str(lock_path)) - lock2.acquire() - self.assertTrue(lock2.is_locked) + with lock.acquire() as lock_1: + assert lock.is_locked + assert lock is lock_1 - lock2.release() - return None + with lock.acquire() as lock_2: + assert lock.is_locked + assert lock is lock_2 + with lock.acquire() as lock_3: + assert lock.is_locked + assert lock is lock_3 -class FileLockTest(BaseTest, unittest.TestCase): - """ - Tests the hard file lock, which is available on the current platform. - """ + assert lock.is_locked + assert lock.is_locked + assert not lock.is_locked - LOCK_TYPE = filelock.FileLock - LOCK_PATH = "test.lock" +@pytest.mark.parametrize("lock_type", [FileLock, SoftFileLock]) +def test_nested_forced_release(lock_type, tmp_path): + # acquires the lock using a with-statement and releases the lock before leaving the with-statement + lock_path = tmp_path / "a" + lock = lock_type(str(lock_path)) -class SoftFileLockTest(BaseTest, unittest.TestCase): - """ - Tests the soft file lock, which is always available. - """ + with lock: + assert lock.is_locked - LOCK_TYPE = filelock.SoftFileLock - LOCK_PATH = "test.softlock" + lock.acquire() + assert lock.is_locked - def test_cleanup(self): - """ - Tests if the lock file is removed after use. - """ - lock = self.LOCK_TYPE(self.LOCK_PATH) + lock.release(force=True) + assert not lock.is_locked + assert not lock.is_locked - with lock: - self.assertTrue(os.path.exists(self.LOCK_PATH)) - self.assertFalse(os.path.exists(self.LOCK_PATH)) - return None +class ExThread(threading.Thread): + def __init__(self, target): + super(ExThread, self).__init__(target=target) + self.ex = None -if __name__ == "__main__": - unittest.main() + def run(self): + try: + super(ExThread, self).run() + except Exception: + self.ex = sys.exc_info() + + def join(self, timeout=None): + super(ExThread, self).join() + if self.ex is not None: + if sys.version_info[0] == 2: + wrapper_ex = self.ex[1] + raise (wrapper_ex.__class__, wrapper_ex, self.ex[2]) + raise self.ex[0].with_traceback(self.ex[1], self.ex[2]) + + +@pytest.mark.parametrize("lock_type", [FileLock, SoftFileLock]) +def test_threaded_shared_lock_obj(lock_type, tmp_path): + # Runs 100 threads, which need the filelock. The lock must be acquired if at least one thread required it and + # released, as soon as all threads stopped. + lock_path = tmp_path / "a" + lock = lock_type(str(lock_path)) + + def thread_work(): + for _ in range(100): + with lock: + assert lock.is_locked + + threads = [ExThread(target=thread_work) for i in range(100)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + assert not lock.is_locked + + +@pytest.mark.parametrize("lock_type", [FileLock, SoftFileLock]) +def test_threaded_lock_different_lock_obj(lock_type, tmp_path): + # Runs multiple threads, which acquire the same lock file with a different FileLock object. When thread group 1 + # acquired the lock, thread group 2 must not hold their lock. + + def thread_work_one(): + for i in range(1000): + with lock_1: + assert lock_1.is_locked + assert not lock_2.is_locked + + def thread_work_two(): + for i in range(1000): + with lock_2: + assert not lock_1.is_locked + assert lock_2.is_locked + + lock_path = tmp_path / "a" + lock_1, lock_2 = lock_type(str(lock_path)), lock_type(str(lock_path)) + threads = [(ExThread(target=thread_work_one), ExThread(target=thread_work_two)) for i in range(10)] + + for thread_1, thread_2 in threads: + thread_1.start() + thread_2.start() + for thread_1, thread_2 in threads: + thread_1.join() + thread_2.join() + + assert not lock_1.is_locked + assert not lock_2.is_locked + + +@pytest.mark.parametrize("lock_type", [FileLock, SoftFileLock]) +def test_timeout(lock_type, tmp_path): + # raises Timeout error when the lock cannot be acquired + lock_path = tmp_path / "a" + lock_1, lock_2 = lock_type(str(lock_path)), lock_type(str(lock_path)) + + # acquire lock 1 + lock_1.acquire() + assert lock_1.is_locked + assert not lock_2.is_locked + + # try to acquire lock 2 + with pytest.raises(Timeout, match="The file lock '.*' could not be acquired."): + lock_2.acquire(timeout=0.1) + assert not lock_2.is_locked + assert lock_1.is_locked + + # release lock 1 + lock_1.release() + assert not lock_1.is_locked + assert not lock_2.is_locked + + +@pytest.mark.parametrize("lock_type", [FileLock, SoftFileLock]) +def test_default_timeout(lock_type, tmp_path): + # test if the default timeout parameter works + lock_path = tmp_path / "a" + lock1, lock2 = lock_type(str(lock_path)), lock_type(str(lock_path), timeout=0.1) + assert lock2.timeout == 0.1 + + # acquire lock 1 + lock1.acquire() + assert lock1.is_locked + assert not lock2.is_locked + + # try to acquire lock 2 + with pytest.raises(Timeout, match="The file lock '.*' could not be acquired."): + lock2.acquire() + assert not lock2.is_locked + assert lock1.is_locked + + lock2.timeout = 0 + assert lock2.timeout == 0 + + with pytest.raises(Timeout, match="The file lock '.*' could not be acquired."): + lock2.acquire() + assert not lock2.is_locked + assert lock1.is_locked + + # release lock 1 + lock1.release() + assert not lock1.is_locked + assert not lock2.is_locked + + +@pytest.mark.parametrize("lock_type", [FileLock, SoftFileLock]) +def test_context_release_on_exc(lock_type, tmp_path): + # lock is released when an exception is thrown in a with-statement + lock_path = tmp_path / "a" + lock = lock_type(str(lock_path)) + + try: + with lock as lock1: + assert lock is lock1 + assert lock.is_locked + raise Exception + except Exception: + assert not lock.is_locked + + +@pytest.mark.parametrize("lock_type", [FileLock, SoftFileLock]) +def test_acquire_release_on_exc(lock_type, tmp_path): + # lock is released when an exception is thrown in a acquire statement + lock_path = tmp_path / "a" + lock = lock_type(str(lock_path)) + + try: + with lock.acquire() as lock1: + assert lock is lock1 + assert lock.is_locked + raise Exception + except Exception: + assert not lock.is_locked + + +@pytest.mark.skipif(hasattr(sys, "pypy_version_info"), reason="del() does not trigger GC in PyPy") +@pytest.mark.parametrize("lock_type", [FileLock, SoftFileLock]) +def test_del(lock_type, tmp_path): + # lock is released when the object is deleted + lock_path = tmp_path / "a" + lock_1, lock_2 = lock_type(str(lock_path)), lock_type(str(lock_path)) + + # acquire lock 1 + lock_1.acquire() + assert lock_1.is_locked + assert not lock_2.is_locked + + # try to acquire lock 2 + with pytest.raises(Timeout, match="The file lock '.*' could not be acquired."): + lock_2.acquire(timeout=0.1) + + # delete lock 1 and try to acquire lock 2 again + del lock_1 + + lock_2.acquire() + assert lock_2.is_locked + + lock_2.release() + + +def test_cleanup_soft_lock(tmp_path): + # tests if the lock file is removed after use + lock_path = tmp_path / "a" + lock = SoftFileLock(str(lock_path)) + + with lock: + assert lock_path.exists() + assert not lock_path.exists() diff --git a/whitelist.txt b/whitelist.txt index fac0e8e..0c45994 100644 --- a/whitelist.txt +++ b/whitelist.txt @@ -1,19 +1,13 @@ autoclass autodoc autosectionlabel -context1 creat -enoent exc fcntl filelock fmt intersphinx intervall -kargs -l1 -l2 -l3 lk lock1 lock2 @@ -21,24 +15,18 @@ lockfile merchantability msvcrt nblck -nested1 nitpicky noninfringement -num param pygments rdwr returnproxy seealso +skipif src -thread1 -thread2 -threaded1 -threads1 -threads2 +tmp trunc typehints -unittest unlck unlicense wronly