diff --git a/.dockerignore b/.dockerignore
index 985c99a..8ab2b7d 100644
--- a/.dockerignore
+++ b/.dockerignore
@@ -1,4 +1,8 @@
/.git
/.github
/.idea
-/.pytest_cache
\ No newline at end of file
+/.pytest_cache
+/tests
+README.md
+README.gr.md
+LICENSE.md
\ No newline at end of file
diff --git a/Dockerfile b/Dockerfile
index 9434cf0..baab8d8 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,5 +1,5 @@
ARG PYTHON_VERSION=3.12
-FROM alpine:3.20.1 as build-stage
+FROM alpine:3.20.1 AS build-stage
ARG PYTHON_VERSION
COPY . /app/instaunfollowers/
RUN apk add --no-cache python3~=${PYTHON_VERSION} py3-pip
diff --git a/app/app.py b/app/app.py
index 8c0c2ba..4929458 100644
--- a/app/app.py
+++ b/app/app.py
@@ -1,10 +1,9 @@
"""Imports"""
import logging
import os.path
+import platform
import re
import zipfile
-import multiprocessing
-from gunicorn.app.base import BaseApplication
from flask import Flask, render_template, request, redirect, url_for, session
from flask_paginate import Pagination
from werkzeug.utils import secure_filename
@@ -27,7 +26,7 @@
# Upload folder
UPLOAD_FOLDER = os.path.join(os.getcwd(), 'uploads')
# Current version
-CURRENT_VERSION = 'v2.0.0'
+CURRENT_VERSION = 'v2.1.0'
# Update needed
UPDATE_NEEDED = bool(update_needed(CURRENT_VERSION, get_latest_version()))
@@ -36,13 +35,14 @@ def create_upload_dir():
"""
Create the upload directory if it does not exist.
"""
- try:
- os.makedirs(UPLOAD_FOLDER)
- logging.info("Directory %s created successfully.", {UPLOAD_FOLDER})
- except FileExistsError:
- logging.info("Directory %s already exists.", {UPLOAD_FOLDER})
- except OSError as exc:
- logging.error("Error creating directory: %s", {exc})
+ if not os.path.exists(UPLOAD_FOLDER):
+ try:
+ os.makedirs(UPLOAD_FOLDER)
+ logging.info("Directory %s created successfully.", UPLOAD_FOLDER)
+ except OSError as exc:
+ logging.error("Error creating directory: %s", exc)
+ else:
+ logging.info("Directory %s already exists.", UPLOAD_FOLDER)
def parse_usernames(html_source):
@@ -172,56 +172,63 @@ def unfollowers():
per_page=per_page,
pagination=pagination, )
+if platform.system() != "Windows":
+ from gunicorn.app.base import BaseApplication
+ import multiprocessing
-class InstaUnFollowers(BaseApplication):
- """
- This class extends the `gunicorn.app.base.BaseApplication` class and
- provides custom implementation for the `load_config` and `load` methods.
-
- Attributes:
- options (dict): Options for the application.
- application (obj): The application object.
-
- Args:
- application (obj): The application object.
- options (dict, optional): Options for the Gunicorn server.
-
- Note:
- 'init' and 'load' methods are implemented by WSGIApplication.
- """
-
- # pylint: disable=abstract-method
- def __init__(self, application, options=None):
- self.options = options or {}
- self.application = application
- super().__init__()
-
- def load_config(self):
+ class InstaUnFollowers(BaseApplication):
"""
- Load the configuration for the Gunicorn server.
+ This class extends the `gunicorn.app.base.BaseApplication` class and
+ provides custom implementation for the `load_config` and `load` methods.
- This method sets the Gunicorn server configuration values based on the provided options.
- """
- config = {key: value for key, value in self.options.items()
- if key in self.cfg.settings and value is not None}
- for key, value in config.items():
- self.cfg.set(key.lower(), value)
+ Attributes:
+ options (dict): Options for the application.
+ application (obj): The application object.
- def load(self):
- """
- Load the application.
+ Args:
+ application (obj): The application object.
+ options (dict, optional): Options for the Gunicorn server.
- Returns:
- obj: The application object.
+ Note:
+ 'init' and 'load' methods are implemented by WSGIApplication.
"""
- return self.application
-
-if __name__ == '__main__':
- create_upload_dir()
- gunicorn_options = {
- 'bind': '0.0.0.0:5000',
- 'workers': (multiprocessing.cpu_count() * 2) + 1,
- 'timeout': 500,
- }
- InstaUnFollowers(app, gunicorn_options).run()
+ # pylint: disable=abstract-method
+ def __init__(self, application, options=None):
+ self.options = options or {}
+ self.application = application
+ super().__init__()
+
+ def load_config(self):
+ """
+ Load the configuration for the Gunicorn server.
+
+ This method sets the Gunicorn server configuration values based on the provided options.
+ """
+ config = {key: value for key, value in self.options.items()
+ if key in self.cfg.settings and value is not None}
+ for key, value in config.items():
+ self.cfg.set(key.lower(), value)
+
+ def load(self):
+ """
+ Load the application.
+
+ Returns:
+ obj: The application object.
+ """
+ return self.application
+
+
+ if __name__ == '__main__':
+ create_upload_dir()
+ gunicorn_options = {
+ 'bind': '0.0.0.0:5000',
+ 'workers': (multiprocessing.cpu_count() * 2) + 1,
+ 'timeout': 500,
+ }
+ InstaUnFollowers(app, gunicorn_options).run()
+else:
+ if __name__ == '__main__':
+ create_upload_dir()
+ app.run(debug=True, host="0.0.0.0", port=5000)
diff --git a/app/healthcheck.py b/app/healthcheck.py
index 22e4b10..c382171 100644
--- a/app/healthcheck.py
+++ b/app/healthcheck.py
@@ -28,4 +28,5 @@ def fetch_url(url):
sys.exit(1)
-fetch_url('http://localhost:5000')
+if __name__ == "__main__":
+ fetch_url('http://localhost:5000')
diff --git a/tests/test_app.py b/tests/test_app.py
index 1c5c889..9ca8baf 100644
--- a/tests/test_app.py
+++ b/tests/test_app.py
@@ -1,15 +1,65 @@
"""Imports"""
-from unittest import TestCase
-from app.app import parse_usernames, get_unfollowers_paginated
+import unittest
+import zipfile
+from unittest import TestCase, mock
+from app.app import (app, create_upload_dir, find_unfollowers,
+ get_unfollowers_paginated, parse_usernames,
+ UPLOAD_FOLDER)
-class Test(TestCase):
+class TestApp(TestCase):
"""Unit Tests"""
- def test_parse_usernames_valid_input(self):
+ def setUp(self):
"""
- Test the function with valid html source
+ Set up the test environment before each test case.
"""
+ self.app = app.test_client()
+ self.app.testing = True
+
+ @mock.patch('app.app.os.makedirs')
+ @mock.patch('app.app.os.path.exists')
+ def test_directory_does_not_exist_makedirs_called(self, mock_exists, mock_makedirs):
+ """
+ Test that 'create_upload_dir' calls 'os.makedirs' when the directory does not exist.
+ """
+ mock_exists.return_value = False
+ create_upload_dir()
+ mock_exists.assert_called_once_with(UPLOAD_FOLDER)
+ mock_makedirs.assert_called_once_with(UPLOAD_FOLDER)
+
+ @mock.patch('app.app.os.makedirs')
+ @mock.patch('app.app.os.path.exists')
+ def test_directory_exists_makedirs_not_called(self, mock_exists, mock_makedirs):
+ """
+ Test that 'create_upload_dir' does not call 'os.makedirs' when the directory exists.
+ """
+ mock_exists.return_value = True
+ create_upload_dir()
+ mock_exists.assert_called_once_with(UPLOAD_FOLDER)
+ mock_makedirs.assert_not_called()
+
+ @mock.patch('app.app.os.makedirs')
+ @mock.patch('app.app.os.path.exists')
+ def test_makedirs_raises_exception_handled(self, mock_exists, mock_makedirs):
+ """
+ Test that 'create_upload_dir' handles exceptions raised by 'os.makedirs'.
+ """
+ mock_exists.return_value = False
+ mock_makedirs.side_effect = OSError("Unable to create directory")
+ try:
+ create_upload_dir()
+ except OSError:
+ self.fail("create_upload_dir raised an OSError unexpectedly!")
+ mock_exists.assert_called_once_with(UPLOAD_FOLDER)
+ mock_makedirs.assert_called_once_with(UPLOAD_FOLDER)
+
+ @mock.patch('app.app.parse_usernames')
+ def test_parse_usernames_valid_input(self, mock_parse_usernames):
+ """
+ Test that 'parse_usernames' extracts usernames correctly from valid HTML source.
+ """
+ mock_parse_usernames.return_value = ['user1', 'user2']
html_source = '
user1',
'
user2']
- self.assertRaises(TypeError, parse_usernames, html_source)
+ with self.assertRaises(TypeError):
+ parse_usernames(html_source)
- def test_get_unfollowers_paginated(self):
+ @mock.patch('app.app.get_unfollowers_paginated')
+ def test_get_unfollowers_paginated(self, mock_get_unfollowers_paginated):
"""
- Test for normal usage of the function.
+ Test that 'get_unfollowers_paginated' returns the correct paginated results.
"""
+ mock_get_unfollowers_paginated.return_value = ['unfollower3',
+ 'unfollower4']
unfollowers_list = ['unfollower1', 'unfollower2', 'unfollower3',
'unfollower4', 'unfollower5']
offset = 2
per_page = 2
- expected_result = ['unfollower3', 'unfollower4']
- self.assertEqual(get_unfollowers_paginated(unfollowers_list,
- offset,
- per_page),
- expected_result)
+ result = get_unfollowers_paginated(unfollowers_list,
+ offset,
+ per_page)
+ self.assertEqual(result, ['unfollower3', 'unfollower4'])
- def test_get_unfollowers_paginated_invalid_offset(self):
+ @mock.patch('app.app.get_unfollowers_paginated')
+ def test_get_unfollowers_paginated_invalid_offset(self, mock_get_unfollowers_paginated):
"""
- Test the function with invalid offset value
+ Test that 'get_unfollowers_paginated' returns an empty list for an invalid offset.
"""
+ mock_get_unfollowers_paginated.return_value = []
unfollowers_list = ['unfollower1', 'unfollower2', 'unfollower3',
'unfollower4', 'unfollower5']
offset = 10
per_page = 2
- expected_result = []
- self.assertEqual(get_unfollowers_paginated(unfollowers_list,
- offset,
- per_page),
- expected_result)
+ result = get_unfollowers_paginated(unfollowers_list, offset, per_page)
+ self.assertEqual(result, [])
- def test_get_unfollowers_paginated_list_with_one_element(self):
+ @mock.patch('app.app.get_unfollowers_paginated')
+ def test_get_unfollowers_paginated_list_with_one_element(self, mock_get_unfollowers_paginated):
"""
- Test the function with list that has only one element
+ Test that 'get_unfollowers_paginated' works with a list that has one element.
"""
+ mock_get_unfollowers_paginated.return_value = ['unfollower1']
unfollowers_list = ['unfollower1']
offset = 0
per_page = 1
- expected_result = ['unfollower1']
- self.assertEqual(get_unfollowers_paginated(unfollowers_list,
- offset,
- per_page),
- expected_result)
+ result = get_unfollowers_paginated(unfollowers_list,
+ offset,
+ per_page)
+ self.assertEqual(result, ['unfollower1'])
- def test_get_unfollowers_paginated_list_empty(self):
+ @mock.patch('app.app.get_unfollowers_paginated')
+ def test_get_unfollowers_paginated_list_empty(self, mock_get_unfollowers_paginated):
"""
- Test the function with empty list
+ Test that 'get_unfollowers_paginated' returns an empty list when given an empty list.
"""
+ mock_get_unfollowers_paginated.return_value = []
unfollowers_list = []
offset = 0
per_page = 1
- expected_result = []
- self.assertEqual(get_unfollowers_paginated(unfollowers_list,
- offset,
- per_page),
- expected_result)
+ result = get_unfollowers_paginated(unfollowers_list,
+ offset,
+ per_page)
+ self.assertEqual(result, [])
+
+ @mock.patch('app.app.zipfile.ZipFile')
+ def test_find_unfollowers_valid_zip(self, mock_zipfile):
+ """
+ Test that 'find_unfollowers' correctly identifies unfollowers from a valid zip file.
+ """
+ mock_zipfile.return_value.__enter__.return_value.namelist.return_value = ['followers.html',
+ 'following.html']
+ mock_zipfile.return_value.__enter__.return_value.read.side_effect = [
+ b'
user1',
+ b'
user2'
+ ]
+ unfollowers = find_unfollowers('test.zip')
+ self.assertEqual(unfollowers, ['user2'])
+
+ @mock.patch('app.app.zipfile.ZipFile')
+ def test_find_unfollowers_invalid_zip(self, mock_zipfile):
+ """
+ Test that 'find_unfollowers' handles invalid zip files correctly.
+ """
+ mock_zipfile.side_effect = zipfile.BadZipFile
+ unfollowers = find_unfollowers('invalid.zip')
+ self.assertIsNone(unfollowers)
+
+ @mock.patch('app.app.zipfile.ZipFile')
+ def test_find_unfollowers_empty_html(self, mock_zipfile):
+ """
+ Test that 'find_unfollowers' returns an empty list when
+ both the 'followers' and 'following' files are empty.
+ """
+ mock_zipfile.return_value.__enter__.return_value.namelist.return_value = ['followers.html',
+ 'following.html']
+ mock_zipfile.return_value.__enter__.return_value.read.side_effect = [b'', b'']
+ unfollowers = find_unfollowers('test.zip')
+ self.assertEqual(unfollowers, [])
+
+ def test_unfollowers_route(self):
+ """
+ Test that the '/unfollowers' route returns the correct paginated results.
+ """
+ with self.app as client:
+ unfollowers_list = [f'user{i}' for i in range(1, 21)]
+ with client.session_transaction() as sess:
+ sess['unfollowers'] = unfollowers_list
+ response = client.get('/unfollowers?page=1')
+ self.assertEqual(response.status_code, 200)
+ for i in range(1, 11):
+ self.assertIn(f'user{i}'.encode(), response.data)
+ response = client.get('/unfollowers?page=2')
+ self.assertEqual(response.status_code, 200)
+ for i in range(11, 21):
+ self.assertIn(f'user{i}'.encode(), response.data)
+
+ @mock.patch('app.app.os.remove')
+ @mock.patch('app.app.os.path.exists')
+ def test_upload_no_file(self, mock_exists, _):
+ """
+ Test the file upload route when no file is uploaded.
+ """
+ mock_exists.return_value = False
+ response = self.app.post('/', data={})
+ self.assertEqual(response.status_code, 302)
+
+ @mock.patch('app.app.os.remove')
+ @mock.patch('app.app.os.path.exists')
+ def test_upload_empty_filename(self, mock_exists, _):
+ """
+ Test the file upload route with an empty file name.
+ """
+ mock_exists.return_value = False
+ data = {'file': (b'', '')}
+ response = self.app.post('/', data=data)
+ self.assertEqual(response.status_code, 302)
+
+ @mock.patch('app.app.os.remove')
+ @mock.patch('app.app.os.path.exists')
+ def test_upload_invalid_extension(self, mock_exists, _):
+ """
+ Test the file upload route with an invalid file extension.
+ """
+ mock_exists.return_value = False
+ data = {'file': (b'test content', 'test.txt')}
+ response = self.app.post('/', data=data)
+ self.assertEqual(response.status_code, 302)
+
+ @mock.patch('platform.system')
+ def test_windows_specific_logic(self, mock_platform):
+ """
+ Test that the application runs correctly on the Windows platform.
+ """
+ mock_platform.return_value = "Windows"
+ response = self.app.get('/')
+ self.assertEqual(response.status_code, 200)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/tests/test_healthcheck.py b/tests/test_healthcheck.py
new file mode 100644
index 0000000..e919ce0
--- /dev/null
+++ b/tests/test_healthcheck.py
@@ -0,0 +1,46 @@
+"""Imports"""
+import unittest
+from unittest.mock import patch, MagicMock
+import requests
+from app.healthcheck import fetch_url
+
+
+class TestHealthCheck(unittest.TestCase):
+ """Unit Tests"""
+
+ @patch('app.healthcheck.requests.get')
+ @patch('app.healthcheck.sys.exit')
+ def test_fetch_url_success(self, mock_exit, mock_get):
+ """
+ Test fetch_url with a successful 200 status code
+ """
+ mock_response = MagicMock()
+ mock_response.status_code = 200
+ mock_get.return_value = mock_response
+ fetch_url('http://localhost:5000')
+ mock_exit.assert_called_once_with(0)
+
+ @patch('app.healthcheck.requests.get')
+ @patch('app.healthcheck.sys.exit')
+ def test_fetch_url_non_200(self, mock_exit, mock_get):
+ """
+ Test fetch_url with a non-200 status code
+ """
+ mock_response = MagicMock()
+ mock_response.status_code = 500
+ mock_get.return_value = mock_response
+ fetch_url('http://localhost:5000')
+ mock_exit.assert_called_once_with(1)
+
+ @patch('app.healthcheck.requests.get', side_effect=requests.exceptions.Timeout)
+ @patch('app.healthcheck.sys.exit')
+ def test_fetch_url_timeout(self, mock_exit, _):
+ """
+ Test fetch_url handling of a timeout exception
+ """
+ fetch_url('http://localhost:5000')
+ mock_exit.assert_called_once_with(1)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/tests/test_upd.py b/tests/test_upd.py
index 9cb0615..a439e20 100644
--- a/tests/test_upd.py
+++ b/tests/test_upd.py
@@ -1,60 +1,54 @@
"""Imports"""
from unittest import TestCase
-from app.updsys.upd import update_needed
+from unittest.mock import patch
+import requests
+from app.updsys.upd import get_latest_version, update_needed
class Test(TestCase):
"""Unit Tests"""
- def test_update_needed(self):
+
+ @patch('requests.get')
+ def test_get_latest_version(self, mock_get):
"""
- Test the function with multiple conditions.
+ Test the function with mocked HTTP responses.
"""
- # Test same version
- current_version = "v1.0.0"
- latest_version = "v1.0.0"
- self.assertFalse(update_needed(current_version, latest_version))
-
- # Test current version older
- current_version = "v1.0.0"
- latest_version = "v2.0.0"
- self.assertTrue(update_needed(current_version, latest_version))
-
- # Test current version newer
- current_version = "v2.0.0"
- latest_version = "v1.0.0"
- self.assertFalse(update_needed(current_version, latest_version))
-
- # Test major version of current version older
- current_version = "v1.0.0"
- latest_version = "v2.0.0"
- self.assertTrue(update_needed(current_version, latest_version))
-
- # Test major version of current version newer
- current_version = "v2.0.0"
- latest_version = "v1.0.0"
- self.assertFalse(update_needed(current_version, latest_version))
-
- # Test minor version of current version older
- current_version = "v1.1.0"
- latest_version = "v1.2.0"
- self.assertTrue(update_needed(current_version, latest_version))
+ # Mock a successful response with a valid version string
+ mock_response = mock_get.return_value
+ mock_response.status_code = 200
+ mock_response.text = '... v2.0.0 ...'
+ self.assertEqual(get_latest_version(), 'v2.0.0')
+ # Mock a response with no valid version string
+ mock_response.text = '... no version here ...'
+ self.assertIsNone(get_latest_version())
+ # Mock an error response
+ mock_get.side_effect = requests.exceptions.RequestException
+ self.assertIsNone(get_latest_version())
- # Test minor version of current version newer
- current_version = "v1.2.0"
- latest_version = "v1.1.0"
- self.assertFalse(update_needed(current_version, latest_version))
-
- # Test patch version of current version older
- current_version = "v1.0.1"
- latest_version = "v1.0.2"
- self.assertTrue(update_needed(current_version, latest_version))
-
- # Test patch version of current version newer
- current_version = "v1.0.2"
- latest_version = "v1.0.1"
- self.assertFalse(update_needed(current_version, latest_version))
-
- # Test invalid version strings
- current_version = "1.0.0"
- latest_version = "v2.0.0"
- self.assertFalse(update_needed(current_version, latest_version))
+ def test_update_needed(self):
+ """
+ Test the function with multiple conditions
+ and return comments for failed tests.
+ """
+ test_cases = [
+ ("v1.0.0", "v1.0.0", False, "Test same version"),
+ ("v1.0.0", "v2.0.0", True, "Test current version older"),
+ ("v2.0.0", "v1.0.0", False, "Test current version newer"),
+ ("v1.0.0", "v2.0.0", True, "Test major version of current version older"),
+ ("v2.0.0", "v1.0.0", False, "Test major version of current version newer"),
+ ("v1.1.0", "v1.2.0", True, "Test minor version of current version older"),
+ ("v1.2.0", "v1.1.0", False, "Test minor version of current version newer"),
+ ("v1.0.1", "v1.0.2", True, "Test patch version of current version older"),
+ ("v1.0.2", "v1.0.1", False, "Test patch version of current version newer"),
+ ("1.0.0", "v2.0.0", False, "Test invalid version strings"),
+ ]
+ failed_tests = []
+ for current_version, latest_version, expected_result, comment in test_cases:
+ with self.subTest(current_version=current_version, latest_version=latest_version):
+ try:
+ self.assertEqual(update_needed(current_version, latest_version),
+ expected_result)
+ except AssertionError:
+ failed_tests.append(comment)
+ if failed_tests:
+ self.fail(f"Tests failed for the following cases: {', '.join(failed_tests)}")