diff --git a/deployment/build-and-stage.yaml b/deployment/build-and-stage.yaml index 942beee7208..5e3a67236ab 100644 --- a/deployment/build-and-stage.yaml +++ b/deployment/build-and-stage.yaml @@ -98,6 +98,16 @@ steps: args: ['push', '--all-tags', 'gcr.io/oss-vdb/alias-computation'] waitFor: ['build-alias-computation', 'cloud-build-queue'] +# Build/push staging-api-test images to gcr.io/oss-vdb-test. +- name: gcr.io/cloud-builders/docker + args: ['build', '-t', 'gcr.io/oss-vdb-test/staging-api-test:latest', '-t', 'gcr.io/oss-vdb-test/staging-api-test:$COMMIT_SHA', '.'] + dir: 'docker/staging_api_test' + id: 'build-staging-api-test' + waitFor: ['build-worker'] +- name: gcr.io/cloud-builders/docker + args: ['push', '--all-tags', 'gcr.io/oss-vdb-test/staging-api-test'] + waitFor: ['build-staging-api-test', 'cloud-build-queue'] + # Build/push cron job images. - name: gcr.io/cloud-builders/docker args: ['build', '-t', 'gcr.io/oss-vdb/cron:latest', '-t', 'gcr.io/oss-vdb/cron:$COMMIT_SHA', '.'] @@ -263,6 +273,7 @@ steps: importer=gcr.io/oss-vdb/importer:$COMMIT_SHA,\ exporter=gcr.io/oss-vdb/exporter:$COMMIT_SHA,\ alias-computation=gcr.io/oss-vdb/alias-computation:$COMMIT_SHA,\ + staging-api-test=gcr.io/oss-vdb-test/staging-api-test:$COMMIT_SHA,\ cron=gcr.io/oss-vdb/cron:$COMMIT_SHA,\ debian-convert=gcr.io/oss-vdb/debian-convert:$COMMIT_SHA,\ combine-to-osv=gcr.io/oss-vdb/combine-to-osv:$COMMIT_SHA,\ @@ -325,3 +336,4 @@ images: - 'gcr.io/oss-vdb/cpe-repo-gen:$COMMIT_SHA' - 'gcr.io/oss-vdb/nvd-cve-osv:$COMMIT_SHA' - 'gcr.io/oss-vdb/nvd-mirror:$COMMIT_SHA' +- 'gcr.io/oss-vdb-test/staging-api-test:$COMMIT_SHA' diff --git a/deployment/clouddeploy/gke-workers/environments/oss-vdb-test/kustomization.yaml b/deployment/clouddeploy/gke-workers/environments/oss-vdb-test/kustomization.yaml index 1c7656f3b7a..747e6a9f7c4 100644 --- a/deployment/clouddeploy/gke-workers/environments/oss-vdb-test/kustomization.yaml +++ b/deployment/clouddeploy/gke-workers/environments/oss-vdb-test/kustomization.yaml @@ -1,5 +1,6 @@ resources: - ../../base +- staging-api-test.yaml patches: - path: workers.yaml - path: scaler.yaml diff --git a/deployment/clouddeploy/gke-workers/environments/oss-vdb-test/staging-api-test.yaml b/deployment/clouddeploy/gke-workers/environments/oss-vdb-test/staging-api-test.yaml new file mode 100644 index 00000000000..b44dd7fec6c --- /dev/null +++ b/deployment/clouddeploy/gke-workers/environments/oss-vdb-test/staging-api-test.yaml @@ -0,0 +1,26 @@ +apiVersion: batch/v1 +kind: CronJob +metadata: + name: staging-api-test +spec: + schedule: "50 9 * * *" + concurrencyPolicy: Forbid + jobTemplate: + spec: + template: + spec: + containers: + - name: staging-api-test + image: staging-api-test + imagePullPolicy: Always + env: + - name: GOOGLE_CLOUD_PROJECT + value: "oss-vdb-test" + resources: + requests: + cpu: 1.5 + memory: "4G" + limits: + cpu: 2 + memory: "10G" + restartPolicy: Never \ No newline at end of file diff --git a/docker/staging_api_test/Dockerfile b/docker/staging_api_test/Dockerfile new file mode 100644 index 00000000000..af19e653ccc --- /dev/null +++ b/docker/staging_api_test/Dockerfile @@ -0,0 +1,26 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM gcr.io/oss-vdb/worker + +WORKDIR /staging_api_test + +COPY retrieve_bugs_from_db.py perform_api_calls.py run.sh ./ + +# Add aiohttp lib +RUN cd /env/docker/worker && POETRY_VIRTUALENVS_CREATE=false poetry add aiohttp + +RUN chmod 755 retrieve_bugs_from_db.py perform_api_calls.py run.sh + +ENTRYPOINT ["./run.sh"] diff --git a/docker/staging_api_test/build.sh b/docker/staging_api_test/build.sh new file mode 100755 index 00000000000..261f31a0d74 --- /dev/null +++ b/docker/staging_api_test/build.sh @@ -0,0 +1,19 @@ +#!/bin/bash -x +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +docker build -t gcr.io/oss-vdb-test/staging-api-test:$1 . && \ +docker build -t gcr.io/oss-vdb-test/staging-api-test:latest . && \ +docker push gcr.io/oss-vdb-test/staging-api-test:$1 && \ +docker push gcr.io/oss-vdb-test/staging-api-test:latest diff --git a/docker/staging_api_test/perform_api_calls.py b/docker/staging_api_test/perform_api_calls.py new file mode 100755 index 00000000000..62777ecdde1 --- /dev/null +++ b/docker/staging_api_test/perform_api_calls.py @@ -0,0 +1,466 @@ +#!/usr/bin/env python3 +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Mock API queries and send them to the test API endpoint for +performance testing. It is recommended to use two terminals to +run this script concurrently to generate sufficient traffic.""" + +import logging +import aiohttp +import asyncio +import os +import random +import sys +import time +import json + +from collections import Counter, defaultdict +from typing import Callable + +import osv +import osv.logs + +BASE_URL = 'https://api.test.osv.dev/v1' +GCP_PROJECT = 'oss-vdb-test' +BUG_DIR = './all_bugs' + +# Total run time in seconds +TOTAL_RUNTIME = 3600 +# Execute all pending batch size requests within the specified time interval. +FREQUENCY_IN_SECONDS = 1 + +# Number of `vulnerability get` requests to send per second +VULN_QUERY_BATCH_SIZE = 50 +# Number of `version query` requests to send per second +VERSION_QUERY_BATCH_SIZE = 100 +# Number of `package query` requests to send per second +PACKAGE_QUERY_BATCH_SIZE = 30 +# Number of `batch query` requests to send per second +BATCH_QUERY_BATCH_SIZE = 3 +# Number of large `batch query` requests to send per second +LARGE_BATCH_QUERY_BATCH_SIZE = 2 + + +class SimpleBug: + """A simplified bug only contains essential information + for making HTTP requests.""" + + def __init__(self, bug_dict: dict): + self.db_id = bug_dict['db_id'] + # If the package/ecosystem/version value is None, then add a fake value in. + if not bug_dict['project']: + self.packages = 'foo' + else: + self.packages = list(bug_dict['project']) + self.purl = bug_dict['purl'] + if not bug_dict['ecosystem']: + self.ecosystems = 'foo' + else: + self.ecosystems = list(bug_dict['ecosystem']) + + # Use the `affected fuzzy` value as the query version. + # If no 'affected fuzzy' is present, assign a default value. + self.affected_fuzzy = bug_dict['affected_fuzzy'] + if not self.affected_fuzzy: + self.affected_fuzzy = '1.0.0' + + +def read_from_json(filename: str, ecosystem_map: defaultdict, bug_map: dict, + package_map: defaultdict) -> None: + """Loads bugs from one JSON file into bug dicts. + + Args: + filename: the JSON filename. + + ecosystem_map: + A defaultdict mapping ecosystem names to their bugs. For example: + {'Maven': (CVE-XXXX-XXXX, CVE-XXXX-XXXX), 'PyPI': ()} + + bug_map: + A dict mapping bug ID to its `SimpleBug` object. For example: + {'CVE-XXXX-XXXX,': SimpleBug{}} + + package_map: + A defaultdict mapping package names to their bugs. For example: + {'tensorflow': (CVE-XXXX-XXXX, CVE-XXXX-XXXX), 'curl': ()} + + Returns: + None + """ + with open(filename, "r") as f: + json_file = json.load(f) + for bug_data in json_file: + bug = SimpleBug(bug_data) + for ecosystem in bug.ecosystems: + ecosystem_map[ecosystem].add(bug.db_id) + for package in bug.packages: + package_map[package].add(bug.db_id) + bug_map[bug.db_id] = bug + + +def load_all_bugs() -> tuple[defaultdict, dict, defaultdict]: + """Loads bugs from JSON directory + + Returns: + A defaultdict mapping ecosystem names to their bugs. For example: + {'Maven': (CVE-XXXX-XXXX, CVE-XXXX-XXXX), 'PyPI': ()} + + A dict mapping bug ID to its `SimpleBug` object. For example: + {'CVE-XXXX-XXXX,': SimpleBug{}} + + A defaultdict mapping package names to their bugs. For example: + {'tensorflow': (CVE-XXXX-XXXX, CVE-XXXX-XXXX), 'curl': ()} + """ + + ecosystem_map = defaultdict(set) + bug_map = {} + package_map = defaultdict(set) + for filename in os.listdir(BUG_DIR): + if filename.endswith('.json'): + file_path = os.path.join(BUG_DIR, filename) + read_from_json(file_path, ecosystem_map, bug_map, package_map) + return ecosystem_map, bug_map, package_map + + +async def make_http_request(session: aiohttp.ClientSession, request_url: str, + request_type: str, request_body: dict) -> None: + """Makes one HTTP request + + Args: + session: + The HTTP ClientSession + request_url: + The HTTP request URL + request_type: + The HTTP request type: `GET` or `POST` + request_body: + The HTTP request body in JSON format + """ + try: + timeout = aiohttp.ClientTimeout(sock_connect=300, sock_read=300) + if request_type == 'GET': + async with session.get(request_url): + pass # We're not awaiting the response, just sending the request + elif request_type == 'POST': + async with session.post(request_url, json=request_body, timeout=timeout): + pass # We're not awaiting the response, just sending the request + except Exception as e: + # When sending a large number of requests concurrently, + # some may fail due to timeout issues. + # These failures can be ignored as long as the server receives a + # sufficient volume of successful requests. + logging.warning('Error sending request %s with body %s: %s', request_url, + request_body, type(e)) + + +async def make_http_requests_async(request_ids: list, bug_map: dict, url: str, + batch_size: int, + payload_func: Callable) -> None: + """Makes the required number of HTTP requests per second async. + + Args: + request_ids: + A list of bug IDs + bug_map: + A dict mapping bug IDs to the corresponding `SimpleBug` objects + url: + The request URL + batch_size: + The number of requests to make per second + payload_func: + The payload function, such as `build_batch_payload` + """ + + begin_time = time.monotonic() + logging.info('[%s] Running make request %s for %d seconds', begin_time, + payload_func.__name__, TOTAL_RUNTIME) + + total_run_time = time.monotonic() - begin_time + index = 0 + length = len(request_ids) + async with aiohttp.ClientSession() as session: + while total_run_time < TOTAL_RUNTIME: + start_time = time.monotonic() + + batch_request_ids = request_ids[index:batch_size + index] + if payload_func.__name__ == build_vulnerability_payload.__name__: + for request_id in batch_request_ids: + # OSV getting vulnerability detail is a GET request + asyncio.create_task( + make_http_request(session, f'{url}/{request_id}', 'GET', + payload_func())) + elif payload_func.__name__ == build_batch_payload.__name__: + for _ in range(0, batch_size): + asyncio.create_task( + make_http_request(session, url, 'POST', + payload_func(request_ids, bug_map))) + else: + for request_id in batch_request_ids: + asyncio.create_task( + make_http_request(session, url, 'POST', + payload_func(request_id, bug_map))) + index += batch_size + if index >= length: + index = 0 + + end_time = time.monotonic() + time_elapsed = end_time - start_time + if time_elapsed < FREQUENCY_IN_SECONDS: + await asyncio.sleep(FREQUENCY_IN_SECONDS - time_elapsed) + total_run_time = time.monotonic() - begin_time + + +def build_vulnerability_payload() -> None: + """The vulnerability query doesn't need a request body""" + return None + + +def build_package_payload(request_id: str, bug_map: dict) -> dict[str, any]: + """Builds a package query payload + + Args: + request_id: + The bug ID + bug_map: + A dict mapping bug IDs to the corresponding `SimpleBug` objects + + Returns: + A dict containing package query payload, example: + '"package": {"name": "mruby","ecosystem": "OSS-Fuzz"}}' + """ + + package = random.choice(bug_map[request_id].packages) + ecosystem = random.choice(bug_map[request_id].ecosystems) + return {"package": {"name": package, "ecosystem": ecosystem}} + + +def build_version_payload(request_id: str, bug_map: dict) -> dict: + """Builds a version query payload + + Args: + request_id: + The bug ID + bug_map: + A dict mapping bug IDs to the corresponding `SimpleBug` objects + + Returns: + A dict containing package version query payload, example: + '{"package": { + "name": "mruby","ecosystem": "OSS-Fuzz"}, "version": "2.1.2rc"}' + """ + package = random.choice(bug_map[request_id].packages) + ecosystem = random.choice(bug_map[request_id].ecosystems) + return { + "version": bug_map[request_id].affected_fuzzy, + "package": { + "name": package, + "ecosystem": ecosystem + } + } + + +def build_batch_payload(request_ids: list, + bug_map: dict) -> dict[str, list[dict[str, any]]]: + """Builds a batch query payload + + Args: + request_id: + The bug ID + bug_map: + A dict mapping bug IDs to the corresponding `SimpleBug` objects + + Returns: + A dict containing OSV batch query payload, example: + '{ + "queries": [ + { + "package": { + ... + }, + "version": ... + }, + { + "package": { + ... + }, + "version": ... + }, + ] + }' + """ + size = random.randint(1, 100) + batch_ids = random.sample(request_ids, min(size, len(request_ids))) + queries = [] + for bug_id in batch_ids: + query = {} + query_type = random.choice(['version', 'package']) + if query_type == 'version': + query = build_version_payload(bug_id, bug_map) + elif query_type == 'package': + query = build_package_payload(bug_id, bug_map) + queries.append(query) + + return {"queries": [queries]} + + +def get_large_batch_query(package_map: defaultdict) -> list[str]: + """Gets a list of bug IDs for large batch queries. + This list contains bug IDs from the packages with the high + number of vulnerabilities. + + Args: + request_id: + The bug ID + bug_map: + A dict mapping bug IDs to the corresponding `SimpleBug` objects + + Returns: + A dict containing OSV batch query payload, example: + '{ + "queries": [ + { + "package": { + ... + }, + "version": ... + }, + { + "package": { + ... + }, + "version": ... + }, + ] + }' + """ + most_common = 5000 + package_counter = Counter() + for package in package_map: + # filter out invalid package name and Linux Kernel + if package in ('foo', 'Kernel'): + continue + package_counter[package] = len(package_map[package]) + most_vulnerable_packages = package_counter.most_common(most_common) + large_batch_query_ids = [] + for package, package_count in most_vulnerable_packages: + if package_count == 0: + break + large_batch_query_ids.append(package_map[package].pop()) + + random.shuffle(large_batch_query_ids) + return large_batch_query_ids + + +async def send_version_requests(request_ids: list, bug_map: dict) -> None: + """Sends version query requests + + Args: + request_id: + The bug ID + bug_map: + A dict mapping bug IDs to the corresponding `SimpleBug` objects + """ + + url = f'{BASE_URL}/query' + batch_size = VERSION_QUERY_BATCH_SIZE + await make_http_requests_async(request_ids, bug_map, url, batch_size, + build_version_payload) + + +async def send_package_requests(request_ids: list, bug_map: dict) -> None: + """Sends package query requests + + Args: + request_id: + The bug ID + bug_map: + A dict mapping bug IDs to the corresponding `SimpleBug` objects + """ + url = f'{BASE_URL}/query' + batch_size = PACKAGE_QUERY_BATCH_SIZE + await make_http_requests_async(request_ids, bug_map, url, batch_size, + build_package_payload) + + +async def send_vuln_requests(request_ids: list, bug_map: dict) -> None: + """Sends vulnerability get requests + + Args: + request_id: + The bug ID + bug_map: + A dict mapping bug IDs to the corresponding `SimpleBug` objects + """ + url = f'{BASE_URL}/vulns' + batch_size = VULN_QUERY_BATCH_SIZE + await make_http_requests_async(request_ids, bug_map, url, batch_size, + build_vulnerability_payload) + + +async def send_batch_requests(request_ids: list, bug_map: dict, + batch_size: int) -> None: + """Sends batch query requests + + Args: + request_id: + The bug ID + bug_map: + A dict mapping bug IDs to the corresponding `SimpleBug` objects + batch_size: + The batch query size + """ + url = f'{BASE_URL}/querybatch' + await make_http_requests_async(request_ids, bug_map, url, batch_size, + build_batch_payload) + + +async def main() -> None: + osv.logs.setup_gcp_logging('staging-test') + seed = random.randrange(sys.maxsize) + # The seed value can be replaced for debugging + random.seed(seed) + logging.info('Random seed %d', seed) + # The `ecosystem_map` can be used to filter our queries for a + # specific ecosystem. + ecosystem_map, bug_map, package_map = load_all_bugs() + vuln_query_ids = list(bug_map.keys()) + package_query_ids = [] + for package in package_map: + # Tests each package once. + package_query_ids.append(package_map[package].pop()) + random.shuffle(package_query_ids) + random.shuffle(vuln_query_ids) + logging.info( + 'It will send vulnerability get requests for %d vulnerabilities.', + len(vuln_query_ids)) + logging.info( + 'It will send package/version/batch query requests for ' + '%d packages within %d ecosystems.', len(package_query_ids), + len(ecosystem_map)) + + # Get all packages with the most frequently occurring number + # of vulnerabilities. + large_batch_query_ids = get_large_batch_query(package_map) + + await asyncio.gather( + send_vuln_requests(vuln_query_ids, bug_map), + send_package_requests(package_query_ids, bug_map), + send_version_requests(package_query_ids, bug_map), + send_batch_requests(package_query_ids, bug_map, BATCH_QUERY_BATCH_SIZE), + send_batch_requests(large_batch_query_ids, bug_map, + LARGE_BATCH_QUERY_BATCH_SIZE)) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docker/staging_api_test/retrieve_bugs_from_db.py b/docker/staging_api_test/retrieve_bugs_from_db.py new file mode 100644 index 00000000000..f4d1541d0f8 --- /dev/null +++ b/docker/staging_api_test/retrieve_bugs_from_db.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Fetch Bugs from from datastore""" + +import logging +import os +import random +import json + +import osv +import osv.logs + +from google.cloud import ndb + +GCP_PROJECT = 'oss-vdb-test' +BUG_DIR = './all_bugs' + + +def format_bug_for_output(bug: osv.Bug) -> dict[str, any]: + """Outputs ndb bug query results to JSON file + + Args: + bug: an `osv.Bug` queried from ndb. + + Returns: + A dict storing all the important `Bug` fields that we want to use later + """ + + affected_fuzzy = None + # Store one version for use as the query version later. + if len(bug.affected_fuzzy) > 0: + version_index = random.randrange(len(bug.affected_fuzzy)) + affected_fuzzy = bug.affected_fuzzy[version_index] + + return { + 'db_id': bug.db_id, + 'purl': bug.purl, + 'project': bug.project, + 'ecosystem': bug.ecosystem, + 'affected_fuzzy': affected_fuzzy + } + + +def get_bugs_from_datastore() -> None: + """Gets all bugs from the datastore and writes to `BUG_DIR`.""" + + entries_per_file = 10000 # amount of bugs per file + batch_size = 1000 + file_counter = 0 + os.makedirs(BUG_DIR, exist_ok=True) + + def write_to_json(): + """Writes to a new JSON file.""" + file_name = f'{BUG_DIR}/all_bugs_{file_counter}.json' + with open(file_name, 'w+') as f: + json.dump(results, f, indent=2) + logging.info('Saved %d entries to %s', total_entries, file_name) + + with ndb.Client(project=GCP_PROJECT).context(): + query = osv.Bug.query() + query = query.filter(osv.Bug.status == osv.BugStatus.PROCESSED, + osv.Bug.public == True) # pylint: disable=singleton-comparison + logging.info('Querying %s', query) + + results = [] + total_entries = 0 + next_cursor = None + + while True: + bugs, next_cursor, has_more = query.fetch_page( + page_size=batch_size, start_cursor=next_cursor) + if not has_more: + break + + logging.info('Fetching %d entries.', batch_size) + results.extend([format_bug_for_output(bug) for bug in bugs]) + total_entries += len(bugs) + + # Write bugs to separate files in case the query fails or times out. + if total_entries >= entries_per_file: + write_to_json() + + # Reset for the next file + results = [] + total_entries = 0 + file_counter += 1 + + # Write any remaining entries to the last file + if results: + write_to_json() + + logging.info('All results saved to %s.', BUG_DIR) + + +def main() -> None: + osv.logs.setup_gcp_logging('staging-test') + if not os.path.exists(BUG_DIR): + # This will take around 10 mins + get_bugs_from_datastore() + logging.info('Fetching data finished.') + else: + logging.info('%s exists, skipping fetching.', BUG_DIR) + + +if __name__ == '__main__': + main() diff --git a/docker/staging_api_test/run.sh b/docker/staging_api_test/run.sh new file mode 100644 index 00000000000..5130c7ad915 --- /dev/null +++ b/docker/staging_api_test/run.sh @@ -0,0 +1,25 @@ +#!/bin/bash -x +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +python3 ./retrieve_bugs_from_db.py + +# `aiohttp` has limits on the number of simultaneous connections. +# Running two instances of the program in parrallel +# can help circumvent this restriction. +python3 ./perform_api_calls.py & +python3 ./perform_api_calls.py & + +# Wait for both background processes to finish +wait \ No newline at end of file