diff --git a/docker/importer/importer.py b/docker/importer/importer.py index 4db19d77912..6da950e8f79 100755 --- a/docker/importer/importer.py +++ b/docker/importer/importer.py @@ -25,9 +25,11 @@ import logging import os import requests +from requests.adapters import HTTPAdapter import shutil import threading import time +from urllib3.util import Retry import atexit from typing import List, Tuple, Optional @@ -701,42 +703,77 @@ def _process_deletions_bucket(self, self._public_log_bucket, import_failure_logs) def _process_updates_rest(self, source_repo: osv.SourceRepository): - """Process updates from REST API.""" + """Process updates from REST API. + + To find new updates, first makes a HEAD request to check the 'Last-Modified' + header, and skips processing if it's before the source's last_modified_date + (and ignore_last_import_time isn't set). + + Otherwise, GETs the list of vulnerabilities and requests updates for + vulnerabilities modified after last_modified_date. + + last_modified_date is updated to the HEAD's 'Last-Modified' time, or the + latest vulnerability's modified date if 'Last-Modified' was missing/invalid. + """ logging.info('Begin processing REST: %s', source_repo.name) - ignore_last_import_time = ( - source_repo.ignore_last_import_time or not source_repo.last_update_date) - if ignore_last_import_time: + last_update_date = source_repo.last_update_date or datetime.datetime.min + if source_repo.ignore_last_import_time: + last_update_date = datetime.datetime.min source_repo.ignore_last_import_time = False source_repo.put() - import_time_now = utcnow() - request = requests.head(source_repo.rest_api_url, timeout=_TIMEOUT_SECONDS) + + s = requests.Session() + adapter = HTTPAdapter( + max_retries=Retry( + total=3, status_forcelist=[502, 503, 504], backoff_factor=1)) + s.mount('http://', adapter) + s.mount('https://', adapter) + + try: + request = s.head(source_repo.rest_api_url, timeout=_TIMEOUT_SECONDS) + except Exception: + logging.exception('Exception querying REST API:') + return if request.status_code != 200: logging.error('Failed to fetch REST API: %s', request.status_code) return - if 'Last-Modified' in request.headers: - last_modified = datetime.datetime.strptime( - request.headers['Last-Modified'], _HTTP_LAST_MODIFIED_FORMAT) - # Check whether endpoint has been modified since last update - if not ignore_last_import_time and (last_modified - < source_repo.last_update_date): - logging.info('No changes since last update.') - return + request_last_modified = None + if last_modified := request.headers.get('Last-Modified'): + try: + request_last_modified = datetime.datetime.strptime( + last_modified, _HTTP_LAST_MODIFIED_FORMAT) + # Check whether endpoint has been modified since last update + if request_last_modified <= last_update_date: + logging.info('No changes since last update.') + return + except ValueError: + logging.error('Invalid Last-Modified header: "%s"', last_modified) - request = requests.get(source_repo.rest_api_url, timeout=_TIMEOUT_SECONDS) + try: + request = s.get(source_repo.rest_api_url, timeout=_TIMEOUT_SECONDS) + except Exception: + logging.exception('Exception querying REST API:') + return # Parse vulns into Vulnerability objects from the REST API request. vulns = osv.parse_vulnerabilities_from_data( request.text, source_repo.extension, strict=self._strict_validation) + + vulns_last_modified = last_update_date # Create tasks for changed files. for vuln in vulns: import_failure_logs = [] - if not ignore_last_import_time and vuln.modified.ToDatetime( - ) < source_repo.last_update_date: + vuln_modified = vuln.modified.ToDatetime() + if request_last_modified and vuln_modified > request_last_modified: + logging.warning('%s was modified (%s) after Last-Modified header (%s)', + vuln.id, vuln_modified, request_last_modified) + vulns_last_modified = max(vulns_last_modified, vuln_modified) + if vuln_modified <= last_update_date: continue try: # TODO(jesslowe): Use a ThreadPoolExecutor to parallelize this - single_vuln = requests.get( + single_vuln = s.get( source_repo.link + vuln.id + source_repo.extension, timeout=_TIMEOUT_SECONDS) # Validate the individual request @@ -754,14 +791,13 @@ def _process_updates_rest(self, source_repo: osv.SourceRepository): except Exception as e: logging.exception('Failed to parse %s: error type: %s, details: %s', vuln.id, e.__class__.__name__, e) - import_failure_logs.append('Failed to parse vulnerability "' + vuln.id + - '"') + import_failure_logs.append(f'Failed to parse vulnerability "{vuln.id}"') continue replace_importer_log(storage.Client(), source_repo.name, self._public_log_bucket, import_failure_logs) - source_repo.last_update_date = import_time_now + source_repo.last_update_date = request_last_modified or vulns_last_modified source_repo.put() logging.info('Finished processing REST: %s', source_repo.name) diff --git a/docker/importer/importer_test.py b/docker/importer/importer_test.py index 32c748c957a..c6d570524c1 100644 --- a/docker/importer/importer_test.py +++ b/docker/importer/importer_test.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """Importer tests.""" +import contextlib import datetime import os import shutil @@ -22,6 +23,7 @@ import threading from unittest import mock +from urllib3.exceptions import SystemTimeWarning import warnings from google.cloud import ndb @@ -66,6 +68,7 @@ def setUp(self): self.tmp_dir = tempfile.mkdtemp() tests.mock_datetime(self) + warnings.filterwarnings('ignore', category=SystemTimeWarning) self.mock_repo = tests.mock_repository(self) storage_patcher = mock.patch('google.cloud.storage.Client') @@ -407,6 +410,7 @@ def setUp(self): self.tmp_dir = tempfile.mkdtemp() tests.mock_datetime(self) + warnings.filterwarnings('ignore', category=SystemTimeWarning) self.source_repo = osv.SourceRepository( type=osv.SourceRepositoryType.BUCKET, @@ -821,7 +825,7 @@ def setUp(self): self.tmp_dir = tempfile.mkdtemp() tests.mock_datetime(self) - warnings.filterwarnings("ignore", "unclosed", ResourceWarning) + warnings.filterwarnings('ignore', category=SystemTimeWarning) storage_patcher = mock.patch('google.cloud.storage.Client') self.addCleanup(storage_patcher.stop) @@ -841,7 +845,19 @@ def setUp(self): def tearDown(self): shutil.rmtree(self.tmp_dir, ignore_errors=True) - self.httpd.shutdown() + + @contextlib.contextmanager + def server(self, handler_class): + """REST mock server context manager.""" + httpd = http.server.HTTPServer(SERVER_ADDRESS, handler_class) + thread = threading.Thread(target=httpd.serve_forever) + thread.start() + try: + yield httpd + finally: + httpd.shutdown() + httpd.server_close() + thread.join() @mock.patch('google.cloud.pubsub_v1.PublisherClient.publish') @mock.patch('time.time', return_value=12345.0) @@ -849,17 +865,20 @@ def test_all_updated(self, unused_mock_time: mock.MagicMock, mock_publish: mock.MagicMock): """Testing basic rest endpoint import""" data_handler = MockDataHandler + data_handler.last_modified = 'Mon, 01 Jan 2024 00:00:00 GMT' data_handler.load_file(data_handler, 'rest_test.json') - self.httpd = http.server.HTTPServer(SERVER_ADDRESS, data_handler) - thread = threading.Thread(target=self.httpd.serve_forever) - thread.start() self.source_repo.last_update_date = datetime.datetime(2020, 1, 1) - self.source_repo.put() + repo = self.source_repo.put() imp = importer.Importer('fake_public_key', 'fake_private_key', self.tmp_dir, importer.DEFAULT_PUBLIC_LOGGING_BUCKET, 'bucket', False, False) - imp.run() + with self.server(data_handler): + imp.run() self.assertEqual(mock_publish.call_count, data_handler.cve_count) + self.assertEqual( + repo.get().last_update_date, + datetime.datetime(2024, 1, 1), + msg='Expected last_update_date to equal REST Last-Modified date') @mock.patch('google.cloud.pubsub_v1.PublisherClient.publish') @mock.patch('time.time', return_value=12345.0) @@ -867,18 +886,21 @@ def test_last_update_ignored(self, unused_mock_time: mock.MagicMock, mock_publish: mock.MagicMock): """Testing last update ignored""" data_handler = MockDataHandler + data_handler.last_modified = 'Mon, 01 Jan 2024 00:00:00 GMT' data_handler.load_file(data_handler, 'rest_test.json') - self.httpd = http.server.HTTPServer(SERVER_ADDRESS, data_handler) - thread = threading.Thread(target=self.httpd.serve_forever) - thread.start() self.source_repo.last_update_date = datetime.datetime(2023, 6, 6) self.source_repo.ignore_last_import_time = True - self.source_repo.put() + repo = self.source_repo.put() imp = importer.Importer('fake_public_key', 'fake_private_key', self.tmp_dir, importer.DEFAULT_PUBLIC_LOGGING_BUCKET, 'bucket', False, False) - imp.run() + with self.server(data_handler): + imp.run() self.assertEqual(mock_publish.call_count, data_handler.cve_count) + self.assertEqual( + repo.get().last_update_date, + datetime.datetime(2024, 1, 1), + msg='Expected last_update_date to equal REST Last-Modified date') @mock.patch('google.cloud.pubsub_v1.PublisherClient.publish') @mock.patch('time.time', return_value=12345.0) @@ -886,18 +908,19 @@ def test_no_updates(self, unused_mock_time: mock.MagicMock, mock_publish: mock.MagicMock): """Testing none last modified""" MockDataHandler.last_modified = 'Fri, 01 Jan 2021 00:00:00 GMT' - self.httpd = http.server.HTTPServer(SERVER_ADDRESS, MockDataHandler) - thread = threading.Thread(target=self.httpd.serve_forever) - thread.start() - self.source_repo.last_update_date = datetime.datetime(2024, 1, 1) - self.source_repo.put() + self.source_repo.last_update_date = datetime.datetime(2024, 2, 1) + repo = self.source_repo.put() imp = importer.Importer('fake_public_key', 'fake_private_key', self.tmp_dir, importer.DEFAULT_PUBLIC_LOGGING_BUCKET, 'bucket', True, False) - with self.assertLogs() as logs: + with self.assertLogs() as logs, self.server(MockDataHandler): imp.run() mock_publish.assert_not_called() self.assertIn('INFO:root:No changes since last update.', logs.output[1]) + self.assertEqual( + repo.get().last_update_date, + datetime.datetime(2024, 2, 1), + msg='last_update_date should not have been updated') @mock.patch('google.cloud.pubsub_v1.PublisherClient.publish') @mock.patch('time.time', return_value=12345.0) @@ -905,15 +928,14 @@ def test_few_updates(self, unused_mock_time: mock.MagicMock, mock_publish: mock.MagicMock): """Testing from date between entries - only entries after 6/6/2023 should be called""" - self.httpd = http.server.HTTPServer(SERVER_ADDRESS, MockDataHandler) - thread = threading.Thread(target=self.httpd.serve_forever) - thread.start() + MockDataHandler.last_modified = 'Mon, 01 Jan 2024 00:00:00 GMT' self.source_repo.last_update_date = datetime.datetime(2023, 6, 6) - self.source_repo.put() + repo = self.source_repo.put() imp = importer.Importer('fake_public_key', 'fake_private_key', self.tmp_dir, importer.DEFAULT_PUBLIC_LOGGING_BUCKET, 'bucket', False, False) - imp.run() + with self.server(MockDataHandler): + imp.run() mock_publish.assert_has_calls([ mock.call( self.tasks_topic, @@ -976,6 +998,10 @@ def test_few_updates(self, unused_mock_time: mock.MagicMock, deleted='false', req_timestamp='12345') ]) + self.assertEqual( + repo.get().last_update_date, + datetime.datetime(2024, 1, 1), + msg='Expected last_update_date to equal REST Last-Modified date') @mock.patch('importer.utcnow', lambda: datetime.datetime(2024, 1, 1)) @@ -986,6 +1012,7 @@ def setUp(self): tests.reset_emulator() tests.mock_datetime(self) + warnings.filterwarnings('ignore', category=SystemTimeWarning) def test_add_finding(self): """Test that creating an import finding works.""" diff --git a/docker/mock_test/mock_test_handler.py b/docker/mock_test/mock_test_handler.py index 7c9ca954da1..a6c41f7d974 100644 --- a/docker/mock_test/mock_test_handler.py +++ b/docker/mock_test/mock_test_handler.py @@ -8,7 +8,7 @@ class MockDataHandler(http.server.BaseHTTPRequestHandler): """Mock data handler for testing.""" - last_modified = 'Tue, 13 Jun 2023 00:00:00 GMT' + last_modified = 'Mon, 01 Jan 2024 00:00:00 GMT' file_path = 'rest_test.json' cve_count = -1 data = None