diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 5414329c..9101fcd4 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -11,6 +11,6 @@ // uses the right one for terminals and tasks. For example, /bin/bash (or /bin/ash for Alpine). "terminal.integrated.shell.linux": null }, - "postCreateCommand": "pip install --user -e netkan/.", + "postCreateCommand": "pip install --user -e netkan/.['development']", "extensions": [] } diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml index ce9816f9..3edc5a5d 100644 --- a/.devcontainer/docker-compose.yml +++ b/.devcontainer/docker-compose.yml @@ -8,10 +8,10 @@ services: target: dev environment: SSH_KEY: ${CKAN_NETKAN_SSHKEY} - CKANMETA_REMOTE: ${CKAN_METADATA_PATH} - CKANMETA_USER: ${CKAN_METADATA_USER} - CKANMETA_REPO: ${CKAN_METADATA_REPO} - NETKAN_REMOTE: ${NETKAN_METADATA_PATH} + CKANMETA_REMOTES: ${CKAN_METADATA_PATHS} + CKAN_USER: ${CKAN_METADATA_USER} + CKAN_REPOS: ${CKAN_METADATA_REPOS} + NETKAN_REMOTES: ${NETKAN_METADATA_PATHS} AWS_DEFAULT_REGION: ${CKAN_AWS_DEFAULT_REGION} AWS_SECRET_ACCESS_KEY: ${CKAN_AWS_SECRET_ACCESS_KEY} AWS_ACCESS_KEY_ID: ${CKAN_AWS_ACCESS_KEY_ID} @@ -19,7 +19,7 @@ services: SQS_TIMEOUT: 30 STATUS_DB: DevNetKANStatus XKAN_GHSECRET: test - INFLATION_SQS_QUEUE: InboundDev.fifo + INFLATION_QUEUES: ksp=InboundDevKsp.fifo ksp2=InboundDevKsp2.fifo MIRROR_SQS_QUEUE: MirroringDev.fifo STATUS_BUCKET: ckan-test-status STATUS_INTERVAL: 0 diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml new file mode 100644 index 00000000..e15c4e2c --- /dev/null +++ b/.github/workflows/coverage.yml @@ -0,0 +1,30 @@ +name: Coverage + +on: + - pull_request + +jobs: + coverage: + runs-on: ubuntu-latest + defaults: + run: + working-directory: netkan + steps: + - uses: actions/checkout@v3 + - name: Setup Python + uses: actions/setup-python@v1 + with: + python-version: 3.7 + - name: Install test dependencies + run: pip install .[test] + - name: force our git config + run: cp .gitconfig ~/. + - name: Run Coverage + run: coverage run -m pytest + - name: Generate report + run: coverage xml + - name: Get Cover + uses: orgoro/coverage@v3 + with: + coverageFile: netkan/coverage.xml + token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.gitignore b/.gitignore index 5e764a9c..5543f08c 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,5 @@ __pycache__ .pytest_cache/ .eggs netkan/build/ +coverage.xml +.coverage diff --git a/dev-stack.py b/dev-stack.py index b7823824..4b85c881 100644 --- a/dev-stack.py +++ b/dev-stack.py @@ -20,10 +20,14 @@ t.set_description("Generate NetKAN Infrastructure CF Template") -inbound = t.add_resource(Queue("InboundDev", - QueueName="InboundDev.fifo", - ReceiveMessageWaitTimeSeconds=20, - FifoQueue=True)) +inbound_ksp = t.add_resource(Queue("InboundDevKsp", + QueueName="InboundDevKsp.fifo", + ReceiveMessageWaitTimeSeconds=20, + FifoQueue=True)) +inbound_ksp2 = t.add_resource(Queue("InboundDevKsp2", + QueueName="InboundDevKsp2.fifo", + ReceiveMessageWaitTimeSeconds=20, + FifoQueue=True)) outbound = t.add_resource(Queue("OutboundDev", QueueName="OutboundDev.fifo", ReceiveMessageWaitTimeSeconds=20, @@ -56,7 +60,8 @@ "sqs:GetQueueAttributes", ], "Resource": [ - GetAtt(inbound, "Arn"), + GetAtt(inbound_ksp, "Arn"), + GetAtt(inbound_ksp2, "Arn"), GetAtt(outbound, "Arn"), GetAtt(addqueue, "Arn"), GetAtt(mirrorqueue, "Arn"), @@ -71,7 +76,7 @@ } )) -for queue in [inbound, outbound, addqueue, mirrorqueue]: +for queue in [inbound_ksp, inbound_ksp2, outbound, addqueue, mirrorqueue]: t.add_output([ Output( "{}QueueURL".format(queue.title), diff --git a/docker-compose.yml b/docker-compose.yml index 7abfddba..27f7f95b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,9 +10,9 @@ services: target: dev environment: SSH_KEY: ${CKAN_NETKAN_SSHKEY} - CKANMETA_REMOTE: ${CKAN_METADATA_PATH} + CKANMETA_REMOTES: ${CKAN_METADATA_PATHS} CKANMETA_USER: ${CKAN_METADATA_USER} - CKANMETA_REPO: ${CKAN_METADATA_REPO} + CKANMETA_REPOS: ${CKAN_METADATA_REPOS} AWS_DEFAULT_REGION: ${CKAN_AWS_DEFAULT_REGION} AWS_SECRET_ACCESS_KEY: ${CKAN_AWS_SECRET_ACCESS_KEY} AWS_ACCESS_KEY_ID: ${CKAN_AWS_ACCESS_KEY_ID} @@ -30,13 +30,33 @@ services: context: netkan/. target: dev environment: - NETKAN_REMOTE: ${NETKAN_METADATA_PATH} + NETKAN_REMOTES: ${NETKAN_METADATA_PATHS} SSH_KEY: ${CKAN_NETKAN_SSHKEY} - CKANMETA_REMOTE: ${CKAN_METADATA_PATH} + CKANMETA_REMOTES: ${CKAN_METADATA_PATHS} AWS_DEFAULT_REGION: ${CKAN_AWS_DEFAULT_REGION} AWS_SECRET_ACCESS_KEY: ${CKAN_AWS_SECRET_ACCESS_KEY} AWS_ACCESS_KEY_ID: ${CKAN_AWS_ACCESS_KEY_ID} - SQS_QUEUE: InboundDev.fifo + INFLATION_QUEUES: ksp=InboundDevKsp.fifo ksp2=InboundDevKsp2.fifo + GAME_ID: ksp2 + MAX_QUEUED: 1 + DISCORD_WEBHOOK_ID: ${DISCORD_WEBHOOK_ID} + DISCORD_WEBHOOK_TOKEN: ${DISCORD_WEBHOOK_TOKEN} + volumes: + - ./netkan:/home/netkan/netkan + command: scheduler --dev + scheduler: + build: + context: netkan/. + target: dev + environment: + NETKAN_REMOTES: ${NETKAN_METADATA_PATHS} + SSH_KEY: ${CKAN_NETKAN_SSHKEY} + CKANMETA_REMOTES: ${CKAN_METADATA_PATHS} + AWS_DEFAULT_REGION: ${CKAN_AWS_DEFAULT_REGION} + AWS_SECRET_ACCESS_KEY: ${CKAN_AWS_SECRET_ACCESS_KEY} + AWS_ACCESS_KEY_ID: ${CKAN_AWS_ACCESS_KEY_ID} + INFLATION_QUEUES: ksp=InboundDevKsp.fifo ksp2=InboundDevKsp2.fifo + GAME_ID: ksp2 MAX_QUEUED: 1 DISCORD_WEBHOOK_ID: ${DISCORD_WEBHOOK_ID} DISCORD_WEBHOOK_TOKEN: ${DISCORD_WEBHOOK_TOKEN} @@ -46,7 +66,8 @@ services: inflator: image: kspckan/inflator environment: - QUEUES: InboundDev.fifo,OutboundDev.fifo + QUEUES: InboundDevKsp.fifo,OutboundDev.fifo + GAME: KSP GH_Token: ${CKAN_GH_Token} AWS_REGION: ${CKAN_AWS_DEFAULT_REGION} AWS_SECRET_ACCESS_KEY: ${CKAN_AWS_SECRET_ACCESS_KEY} @@ -57,18 +78,20 @@ services: # distros is 1000:1000, which will match the # user in the container. - ${HOME}/ckan_cache:/home/netkan/ckan_cache - legacyhooks: - image: kspckan/webhooks + inflator-ksp2: + image: kspckan/inflator environment: - SSH_KEY: ${CKAN_NETKAN_SSHKEY} + QUEUES: InboundDevKsp2.fifo,OutboundDev.fifo + GAME: KSP2 GH_Token: ${CKAN_GH_Token} - XKAN_GHSECRET: test - CKAN_meta: ${CKAN_METADATA_PATH} - NetKAN: ${NETKAN_METADATA_PATH} - IA_access: test - IA_secret: test - IA_collection: test + AWS_REGION: ${CKAN_AWS_DEFAULT_REGION} + AWS_SECRET_ACCESS_KEY: ${CKAN_AWS_SECRET_ACCESS_KEY} + AWS_ACCESS_KEY_ID: ${CKAN_AWS_ACCESS_KEY_ID} volumes: + # Docker will create this with root perms if + # it's not created first. First user on most + # distros is 1000:1000, which will match the + # user in the container. - ${HOME}/ckan_cache:/home/netkan/ckan_cache webhooks: build: @@ -77,14 +100,17 @@ services: environment: XKAN_GHSECRET: test SSH_KEY: ${CKAN_NETKAN_SSHKEY} - NETKAN_REMOTE: ${NETKAN_METADATA_PATH} - INFLATION_SQS_QUEUE: InboundDev.fifo + NETKAN_REMOTES: ${NETKAN_METADATA_PATHS} + INFLATION_SQS_QUEUES: ksp=InboundDevKsp.fifo ksp2=InboundDevKsp2.fifo MIRROR_SQS_QUEUE: MirroringDev.fifo AWS_DEFAULT_REGION: ${CKAN_AWS_DEFAULT_REGION} AWS_SECRET_ACCESS_KEY: ${CKAN_AWS_SECRET_ACCESS_KEY} AWS_ACCESS_KEY_ID: ${CKAN_AWS_ACCESS_KEY_ID} DISCORD_WEBHOOK_ID: ${DISCORD_WEBHOOK_ID} DISCORD_WEBHOOK_TOKEN: ${DISCORD_WEBHOOK_TOKEN} + CKANMETA_REMOTES: ${CKAN_METADATA_PATHS} + CKANMETA_USER: ${CKAN_METADATA_USER} + CKANMETA_REPOS: ${CKAN_METADATA_REPOS} entrypoint: .local/bin/gunicorn command: -b 0.0.0.0:5000 --access-logfile - "netkan.webhooks:create_app()" adder: @@ -93,7 +119,7 @@ services: target: dev environment: SQS_QUEUE: AddingDev.info - NETKAN_REMOTE: ${NETKAN_METADATA_PATH} + NETKAN_REMOTES: ${NETKAN_METADATA_PATHS} NETKAN_USER: ${CKAN_NETKAN_USER} NETKAN_REPO: ${CKAN_NETKAN_REPO} command: spacedock-adder @@ -107,7 +133,7 @@ services: AWS_DEFAULT_REGION: ${CKAN_AWS_DEFAULT_REGION} AWS_SECRET_ACCESS_KEY: ${CKAN_AWS_SECRET_ACCESS_KEY} AWS_ACCESS_KEY_ID: ${CKAN_AWS_ACCESS_KEY_ID} - CKANMETA_REMOTE: ${CKAN_METADATA_PATH} + CKANMETA_REMOTES: ${CKAN_METADATA_PATHS} IA_access: test IA_secret: test IA_collection: test @@ -162,8 +188,8 @@ services: target: dev environment: SSH_KEY: ${CKAN_NETKAN_SSHKEY} - NETKAN_REMOTE: ${NETKAN_METADATA_PATH} - CKANMETA_REMOTE: ${CKAN_METADATA_PATH} + NETKAN_REMOTES: ${NETKAN_METADATA_PATHS} + CKANMETA_REMOTES: ${CKAN_METADATA_PATHS} GH_Token: ${CKAN_GH_Token} DISCORD_WEBHOOK_ID: ${DISCORD_WEBHOOK_ID} DISCORD_WEBHOOK_TOKEN: ${DISCORD_WEBHOOK_TOKEN} diff --git a/netkan/.coveragerc b/netkan/.coveragerc new file mode 100644 index 00000000..c7109eec --- /dev/null +++ b/netkan/.coveragerc @@ -0,0 +1,7 @@ +[report] +include = + netkan/* + +exclude_lines = + pragma: no cover + if TYPE_CHECKING: diff --git a/netkan/netkan/cli/common.py b/netkan/netkan/cli/common.py index 439263a8..a9db7de3 100644 --- a/netkan/netkan/cli/common.py +++ b/netkan/netkan/cli/common.py @@ -1,9 +1,10 @@ import sys import logging from pathlib import Path -from typing import Union, Callable, Any, Optional +from typing import Union, Callable, Any, List, Optional, Tuple import click +from git import Repo from ..repos import NetkanRepo, CkanMetaRepo from ..utils import init_repo, init_ssh @@ -24,19 +25,24 @@ def ctx_callback(ctx: click.Context, param: click.Parameter, help='Enable debug logging', callback=ctx_callback), click.option('--queue', envvar='SQS_QUEUE', expose_value=False, help='SQS Queue to poll for metadata', callback=ctx_callback), + click.option('--inflation-queues', envvar='INFLATION_QUEUES', expose_value=False, + help='SQS Queues to publish inflation tasks', multiple=True, + callback=ctx_callback), click.option('--ssh-key', envvar='SSH_KEY', expose_value=False, help='SSH key for accessing repositories', callback=ctx_callback), click.option('--deep-clone', is_flag=True, default=False, expose_value=False, help='Perform a deep clone of the git repos', callback=ctx_callback), - click.option('--ckanmeta-remote', envvar='CKANMETA_REMOTE', expose_value=False, - help='Path/URL/SSH to Metadata Repo', callback=ctx_callback), - click.option('--netkan-remote', envvar='NETKAN_REMOTE', expose_value=False, - help='Path/URL/SSH to the Stub Metadata Repo', callback=ctx_callback), + click.option('--ckanmeta-remotes', envvar='CKANMETA_REMOTES', expose_value=False, + help='game=Path/URL/SSH to Metadata Repos, ie ksp=http://gihub.com', + multiple=True, callback=ctx_callback), + click.option('--netkan-remotes', envvar='NETKAN_REMOTES', expose_value=False, + help='game=Path/URL/SSH to the Stub Metadata Repos, ie ksp=git@github.com', + multiple=True, callback=ctx_callback), click.option('--token', envvar='GH_Token', expose_value=False, help='GitHub Token for PRs', callback=ctx_callback), - click.option('--repo', envvar='CKAN_REPO', expose_value=False, - help='GitHub repo to raise PR against (Org Repo: CKAN-meta/NetKAN)', - callback=ctx_callback), + click.option('--repos', envvar='CKAN_REPOS', expose_value=False, + help='GitHub repos to raise PR against (Org Repo: ksp=CKAN-meta/NetKAN)', + multiple=True, callback=ctx_callback), click.option('--user', envvar='CKAN_USER', expose_value=False, help='GitHub user/org repo resides under (Org User: KSP-CKAN)', callback=ctx_callback), @@ -48,27 +54,140 @@ def ctx_callback(ctx: click.Context, param: click.Parameter, help='Credentials for Internet Archive', callback=ctx_callback), click.option('--ia-secret', envvar='IA_secret', expose_value=False, help='Credentials for Internet Archive', callback=ctx_callback), - click.option('--ia-collection', envvar='IA_collection', expose_value=False, - help='Collection to put mirrored mods in on Internet Archive', - callback=ctx_callback), + click.option('--ia-collections', envvar='IA_COLLECTIONS', expose_value=False, + help='game=Collection, for mirroring mods in on Internet Archive', + multiple=True, callback=ctx_callback), + click.option('--game-id', envvar='GAME_ID', help='Game ID for this task', + expose_value=False, callback=ctx_callback) ] +class Game: + name: str + shared: 'SharedArgs' + clone_base: str = '/tmp' + _ckanmeta_repo: CkanMetaRepo + _ckanmeta_remote: str + _netkan_repo: NetkanRepo + _netkan_remote: str + _github_pr: GitHubPR + _ia_collection: str + _inflation_queue: str + + def __init__(self, name: str, shared: 'SharedArgs') -> None: + self.name = name.lower() + self.shared = shared + + def args(self, arg: str) -> str: + result = None + try: + result = [x.split('=')[1] for x in getattr( + self.shared, arg) if x.split('=')[0] == self.name][0] + except IndexError: + pass + if result is None: + logging.fatal( + "Expecting attribute '%s' to be set for '%s'; exiting disgracefully!", + arg, self.name + ) + sys.exit(1) + return result + + def repo_base_path(self, path: str) -> str: + return f'{self.clone_base}/{self.name}/{path}' + + @property + def ckanmeta_repo(self) -> CkanMetaRepo: + if getattr(self, '_ckanmeta_repo', None) is None: + self._ckanmeta_repo = CkanMetaRepo( + init_repo( + self.ckanmeta_remote, + self.repo_base_path('CKAN-meta'), + self.shared.deep_clone + ), + game_id=self.name + ) + return self._ckanmeta_repo + + @property + def ckanmeta_remote(self) -> str: + if getattr(self, '_ckanmeta_remote', None) is None: + self._ckanmeta_remote = self.args('ckanmeta_remotes') + return self._ckanmeta_remote + + @property + def netkan_repo(self) -> NetkanRepo: + if getattr(self, '_netkan_repo', None) is None: + self._netkan_repo = NetkanRepo( + init_repo( + self.netkan_remote, + self.repo_base_path('NetKAN'), + self.shared.deep_clone + ), + game_id=self.name + ) + return self._netkan_repo + + @property + def repos(self) -> List[Repo]: + return [self.ckanmeta_repo.git_repo, self.netkan_repo.git_repo] + + @property + def netkan_remote(self) -> str: + if getattr(self, '_netkan_remote', None) is None: + self._netkan_remote = self.args('netkan_remotes') + return self._netkan_remote + + @property + def github_pr(self) -> GitHubPR: + if getattr(self, '_github_pr', None) is None: + self._github_pr = GitHubPR( + self.shared.token, self.args('repos'), self.shared.user) + return self._github_pr + + @property + def ia_collection(self) -> str: + if getattr(self, '_ia_collection', None) is None: + self._ia_collection = self.args('ia_collections') + return self._ia_collection + + @property + def inflation_queue(self) -> str: + if getattr(self, '_inflation_queue', None) is None: + self._inflation_queue = self.args('inflation_queues') + return self._inflation_queue + + class SharedArgs: + ckanmeta_remotes: Tuple[str, ...] + deep_clone: bool + dev: bool + game_id: str + inflation_queues: Tuple[str, ...] + netkan_remotes: Tuple[str, ...] + queue: str + ia_access: str + ia_secret: str + ia_collections: Tuple[str, ...] + repos: Tuple[str, ...] + timeout: int + token: str + user: str + _debug: bool + _ssh_key: str def __init__(self) -> None: self._environment_data = None - self._debug: Optional[bool] = None - self._ssh_key: Optional[str] = None - self._ckanmeta_repo: Optional[CkanMetaRepo] = None - self._netkan_repo: Optional[NetkanRepo] = None - self._github_pr: Optional[GitHubPR] = None - self.deep_clone = False def __getattribute__(self, name: str) -> Any: - attr = super().__getattribute__(name) + attr = None + try: + attr = super().__getattribute__(name) + except AttributeError: + pass if not name.startswith('_') and attr is None: - logging.fatal("Expecting attribute '%s' to be set; exiting disgracefully!", name) + logging.fatal( + "Expecting attribute '%s' to be set; exiting disgracefully!", name) sys.exit(1) return attr @@ -95,44 +214,15 @@ def ssh_key(self, value: str) -> None: init_ssh(value, Path(Path.home(), '.ssh')) self._ssh_key = value - @property - def ckanmeta_repo(self) -> CkanMetaRepo: - if not self._ckanmeta_repo: - self._ckanmeta_repo = CkanMetaRepo( - init_repo(self._ckanmeta_remote, '/tmp/CKAN-meta', self.deep_clone)) - return self._ckanmeta_repo - - @property - def ckanmeta_remote(self) -> str: - return self._ckanmeta_remote - - @ckanmeta_remote.setter - def ckanmeta_remote(self, value: str) -> None: - self._ckanmeta_remote = value - - @property - def netkan_repo(self) -> NetkanRepo: - if not self._netkan_repo: - self._netkan_repo = NetkanRepo( - init_repo(self._netkan_remote, '/tmp/NetKAN', self.deep_clone)) - return self._netkan_repo - - @property - def netkan_remote(self) -> str: - return self._netkan_remote - - @netkan_remote.setter - def netkan_remote(self, value: str) -> None: - self._netkan_remote = value - - @property - def github_pr(self) -> GitHubPR: - if not self._github_pr: - self._github_pr = GitHubPR(self.token, self.repo, self.user) - return self._github_pr + def game(self, game: str) -> Game: + game_id = game.lower() + if getattr(self, f'_game_{game_id}', None) is None: + setattr(self, f'_game_{game_id}', Game(game_id, self)) + return getattr(self, f'_game_{game_id}') -pass_state = click.make_pass_decorator(SharedArgs, ensure=True) # pylint: disable=invalid-name +pass_state = click.make_pass_decorator( + SharedArgs, ensure=True) # pylint: disable=invalid-name def common_options(func: Callable[..., Any]) -> Callable[..., Any]: diff --git a/netkan/netkan/cli/services.py b/netkan/netkan/cli/services.py index 5ff8e096..346d2756 100644 --- a/netkan/netkan/cli/services.py +++ b/netkan/netkan/cli/services.py @@ -1,13 +1,12 @@ import logging -import boto3 import click from .common import common_options, pass_state, SharedArgs -from ..indexer import MessageHandler +from ..indexer import IndexerQueueHandler from ..scheduler import NetkanScheduler -from ..spacedock_adder import SpaceDockAdder +from ..spacedock_adder import SpaceDockAdderQueueHandler from ..mirrorer import Mirrorer @@ -15,24 +14,7 @@ @common_options @pass_state def indexer(common: SharedArgs) -> None: - sqs = boto3.resource('sqs') - queue = sqs.get_queue_by_name(QueueName=common.queue) - - while True: - messages = queue.receive_messages( - MaxNumberOfMessages=10, - MessageAttributeNames=['All'], - VisibilityTimeout=common.timeout - ) - if not messages: - continue - with MessageHandler(common.ckanmeta_repo, common.github_pr) as handler: - for message in messages: - handler.append(message) - handler.process_ckans() - queue.delete_messages( - Entries=handler.sqs_delete_entries() - ) + IndexerQueueHandler(common).run() @click.command() @@ -55,24 +37,33 @@ def indexer(common: SharedArgs) -> None: ) @common_options @pass_state -def scheduler(common: SharedArgs, max_queued: int, group: str, min_cpu: int, min_io: int) -> None: +def scheduler( + common: SharedArgs, + max_queued: int, + group: str, + min_cpu: int, + min_io: int +) -> None: + game = common.game(common.game_id) sched = NetkanScheduler( - common, common.queue, common.token, + common, game.inflation_queue, common.token, game.name, nonhooks_group=(group in ('all', 'nonhooks')), webhooks_group=(group in ('all', 'webhooks')), ) if sched.can_schedule(max_queued, common.dev, min_cpu, min_io): sched.schedule_all_netkans() - logging.info("NetKANs submitted to %s", common.queue) + logging.info("NetKANs submitted to %s", game.inflation_queue) @click.command() @common_options @pass_state def mirrorer(common: SharedArgs) -> None: + # We need at least 50 mods for a collection for ksp2, keeping + # to just ksp for now Mirrorer( - common.ckanmeta_repo, common.ia_access, common.ia_secret, - common.ia_collection, common.token + common.game('ksp').ckanmeta_repo, common.ia_access, common.ia_secret, + common.game('ksp').ia_collection, common.token ).process_queue(common.queue, common.timeout) @@ -80,10 +71,4 @@ def mirrorer(common: SharedArgs) -> None: @common_options @pass_state def spacedock_adder(common: SharedArgs) -> None: - sd_adder = SpaceDockAdder( - common.queue, - common.timeout, - common.netkan_repo, - common.github_pr, - ) - sd_adder.run() + SpaceDockAdderQueueHandler(common).run() diff --git a/netkan/netkan/cli/utilities.py b/netkan/netkan/cli/utilities.py index 04d6b632..5ee91a0b 100644 --- a/netkan/netkan/cli/utilities.py +++ b/netkan/netkan/cli/utilities.py @@ -30,8 +30,8 @@ @pass_state def auto_freezer(common: SharedArgs, days_limit: int, days_till_ignore: int) -> None: afr = AutoFreezer( - common.netkan_repo, - common.github_pr, + common.game(common.game_id).netkan_repo, + common.game(common.game_id).github_pr, ) afr.freeze_idle_mods(days_limit, days_till_ignore) afr.mark_frozen_mods() @@ -43,7 +43,7 @@ def auto_freezer(common: SharedArgs, days_limit: int, days_till_ignore: int) -> def download_counter(common: SharedArgs) -> None: logging.info('Starting Download Count Calculation...') DownloadCounter( - common.ckanmeta_repo, + common.game(common.game_id).ckanmeta_repo, common.token ).update_counts() logging.info('Download Counter completed!') @@ -93,7 +93,7 @@ def restore_status(filename: str) -> None: @common_options @pass_state def recover_status_timestamps(common: SharedArgs) -> None: - ModStatus.recover_timestamps(common.ckanmeta_repo) + ModStatus.recover_timestamps(common.game(common.game_id).ckanmeta_repo) @click.command() @@ -175,6 +175,6 @@ def clean_cache(days: int, cache: str) -> None: @pass_state def mirror_purge_epochs(common: SharedArgs, dry_run: bool) -> None: Mirrorer( - common.ckanmeta_repo, common.ia_access, - common.ia_secret, common.ia_collection + common.game(common.game_id).ckanmeta_repo, common.ia_access, + common.ia_secret, common.game(common.game_id).ia_collection ).purge_epochs(dry_run) diff --git a/netkan/netkan/common.py b/netkan/netkan/common.py index e5bba0eb..c4c53306 100644 --- a/netkan/netkan/common.py +++ b/netkan/netkan/common.py @@ -1,5 +1,5 @@ -from typing import List, Iterable, Dict, Any, IO -import boto3 # pylint: disable=unused-import + +from typing import List, Iterable, IO, TYPE_CHECKING import requests import github @@ -8,17 +8,29 @@ from .metadata import Netkan from .repos import NetkanRepo +if TYPE_CHECKING: + from mypy_boto3_sqs.service_resource import Message + from mypy_boto3_sqs.type_defs import ( + DeleteMessageBatchRequestEntryTypeDef, + SendMessageBatchRequestEntryTypeDef, + ) +else: + Message = object + DeleteMessageBatchRequestEntryTypeDef = object + SendMessageBatchRequestEntryTypeDef = object + + USER_AGENT = 'Mozilla/5.0 (compatible; Netkanbot/1.0; CKAN; +https://github.com/KSP-CKAN/NetKAN-Infra)' -def netkans(path: str, ids: Iterable[str]) -> Iterable[Netkan]: +def netkans(path: str, ids: Iterable[str], game_id: str) -> Iterable[Netkan]: repo = NetkanRepo(Repo(path)) - return (Netkan(p) for p in repo.nk_paths(ids)) + return (Netkan(p, game_id=game_id) for p in repo.nk_paths(ids)) -def sqs_batch_entries(messages: Iterable[Dict[str, str]], - batch_size: int = 10) -> Iterable[List[Dict[str, str]]]: - batch = [] +def sqs_batch_entries(messages: Iterable[SendMessageBatchRequestEntryTypeDef], + batch_size: int = 10) -> Iterable[List[SendMessageBatchRequestEntryTypeDef]]: + batch: List[SendMessageBatchRequestEntryTypeDef] = [] for msg in messages: batch.append(msg) if len(batch) == batch_size: @@ -37,7 +49,7 @@ def github_limit_remaining(token: str) -> int: return github.Github(token, user_agent=USER_AGENT).get_rate_limit().core.remaining -def deletion_msg(msg: 'boto3.resources.factory.sqs.Message') -> Dict[str, Any]: +def deletion_msg(msg: Message) -> DeleteMessageBatchRequestEntryTypeDef: return { 'Id': msg.message_id, 'ReceiptHandle': msg.receipt_handle, diff --git a/netkan/netkan/github_pr.py b/netkan/netkan/github_pr.py index 5b915bcb..65f998dd 100644 --- a/netkan/netkan/github_pr.py +++ b/netkan/netkan/github_pr.py @@ -1,19 +1,34 @@ import logging from typing import Optional, List, Dict, Any from github import Github, GithubException +from github.Repository import Repository from .common import USER_AGENT class GitHubPR: + token: str + git_repo: str + user: str + _repo: Repository def __init__(self, token: str, repo: str, user: str) -> None: - self.repo = Github(token, user_agent=USER_AGENT).get_repo(f'{user}/{repo}') + self.token = token + self.git_repo = repo + self.user = user + + @property + def repo(self) -> Repository: + if getattr(self, '_repo', None) is None: + self._repo = Github(self.token, user_agent=USER_AGENT).get_repo( + f'{self.user}/{self.git_repo}') + return self._repo def create_pull_request(self, title: str, branch: str, body: str, labels: Optional[List[str]] = None) -> None: try: pull = self.repo.create_pull(title, body, 'master', branch) - logging.info('Pull request for %s opened at %s', branch, pull.html_url) + logging.info('Pull request for %s opened at %s', + branch, pull.html_url) if labels: # Labels have to be set separately diff --git a/netkan/netkan/indexer.py b/netkan/netkan/indexer.py index c0700930..eab2f48c 100644 --- a/netkan/netkan/indexer.py +++ b/netkan/netkan/indexer.py @@ -3,22 +3,32 @@ from pathlib import Path, PurePath from collections import deque from datetime import datetime, timezone -from typing import List, Optional, Type, Dict, Any, Deque -from types import TracebackType +from typing import List, Optional, Dict, Any, Deque, Type, TYPE_CHECKING -import boto3 # pylint: disable=unused-import from dateutil.parser import parse from git.objects.commit import Commit +from .cli.common import Game from .metadata import Ckan +from .queue_handler import BaseMessageHandler, QueueHandler from .repos import CkanMetaRepo from .status import ModStatus from .github_pr import GitHubPR +if TYPE_CHECKING: + from mypy_boto3_sqs.service_resource import Message + from mypy_boto3_sqs.type_defs import DeleteMessageBatchRequestEntryTypeDef +else: + Message = object + DeleteMessageBatchRequestEntryTypeDef = object + +USER_AGENT = 'Mozilla/5.0 (compatible; Netkanbot/1.0; CKAN; +https://github.com/KSP-CKAN/NetKAN-Infra)' + + class CkanMessage: - def __init__(self, msg: 'boto3.resources.factory.sqs.Message', + def __init__(self, msg: Message, ckm_repo: CkanMetaRepo, github_pr: Optional[GitHubPR] = None) -> None: self.body = msg.body self.ckan = Ckan(contents=self.body) @@ -34,7 +44,7 @@ def __init__(self, msg: 'boto3.resources.factory.sqs.Message', self.indexed = False for item in msg.message_attributes.items(): attr_type = f'{item[1]["DataType"]}Value' - content = item[1][attr_type] + content = item[1][attr_type] # type: ignore[literal-required] if content.lower() in ['true', 'false']: content = content.lower() == 'true' if item[0] == 'FileName': @@ -154,45 +164,42 @@ def process_ckan(self) -> None: self._process_ckan() @property - def delete_attrs(self) -> Dict[str, Any]: + def delete_attrs(self) -> DeleteMessageBatchRequestEntryTypeDef: return { 'Id': self.message_id, 'ReceiptHandle': self.receipt_handle } -class MessageHandler: +class MessageHandler(BaseMessageHandler): + master: Deque[CkanMessage] + staged: Deque[CkanMessage] + processed: List[CkanMessage] - def __init__(self, repo: CkanMetaRepo, github_pr: Optional[GitHubPR] = None) -> None: - self.ckm_repo = repo - self.github_pr = github_pr - self.master: Deque[CkanMessage] = deque() - self.staged: Deque[CkanMessage] = deque() - self.processed: List[CkanMessage] = [] + def __init__(self, game: Game) -> None: + super().__init__(game) + self.master = deque() + self.staged = deque() + self.processed = [] def __str__(self) -> str: - return str(self.master + self.staged) + return str(' '.join([str(x) for x in self.master + self.staged])) def __len__(self) -> int: return len(self.master + self.staged) - # Apparently gitpython can be leaky on long running processes - # we can ensure we call close on it and run our handler inside - # a context manager - def __enter__(self) -> 'MessageHandler': - if not self.ckm_repo.is_active_branch('master'): - self.ckm_repo.checkout_branch('master') - self.ckm_repo.pull_remote_branch('master') - return self + @property + def repo(self) -> CkanMetaRepo: + return self.game.ckanmeta_repo - def __exit__(self, exc_type: Type[BaseException], - exc_value: BaseException, traceback: TracebackType) -> None: - self.ckm_repo.close_repo() + @property + def github_pr(self) -> GitHubPR: + return self.game.github_pr - def append(self, message: 'boto3.resources.factory.sqs.Message') -> None: + def append(self, message: Message) -> None: ckan = CkanMessage( message, - self.ckm_repo, + self.repo, self.github_pr ) if not ckan.Staged: @@ -206,15 +213,19 @@ def _process_queue(self, queue: Deque[CkanMessage]) -> None: ckan.process_ckan() self.processed.append(ckan) - def sqs_delete_entries(self) -> List[Dict[str, Any]]: + def sqs_delete_entries(self) -> List[DeleteMessageBatchRequestEntryTypeDef]: return [c.delete_attrs for c in self.processed] # Currently we intermingle Staged/Master commits # separating them out will be a little more efficient # with our push/pull traffic. - def process_ckans(self) -> None: + def process_messages(self) -> None: self._process_queue(self.master) if any(ckan.indexed for ckan in self.processed): - self.ckm_repo.pull_remote_branch('master') - self.ckm_repo.push_remote_branch('master') + self.repo.pull_remote_branch('master') + self.repo.push_remote_branch('master') self._process_queue(self.staged) + + +class IndexerQueueHandler(QueueHandler): + _handler_class: Type[BaseMessageHandler] = MessageHandler diff --git a/netkan/netkan/metadata.py b/netkan/netkan/metadata.py index 87706f40..aa1f582b 100644 --- a/netkan/netkan/metadata.py +++ b/netkan/netkan/metadata.py @@ -5,23 +5,34 @@ from hashlib import sha1 import uuid import urllib.parse -from typing import Optional, List, Tuple, Union, Any, Dict +from typing import Optional, List, Tuple, Union, Any, Dict, TYPE_CHECKING from ruamel.yaml import YAML import dateutil.parser from .csharp_compat import csharp_uri_tostring +if TYPE_CHECKING: + from mypy_boto3_sqs.type_defs import SendMessageBatchRequestEntryTypeDef +else: + SendMessageBatchRequestEntryTypeDef = object + class Netkan: KREF_PATTERN = re.compile('^#/ckan/([^/]+)/(.+)$') - def __init__(self, filename: Optional[Union[str, Path]] = None, contents: Optional[str] = None) -> None: + def __init__( + self, + filename: Optional[Union[str, Path]] = None, + contents: Optional[str] = None, + game_id: Optional[str] = None, + ) -> None: if filename: self.filename = Path(filename) self.contents = self.filename.read_text(encoding='UTF-8') elif contents: self.contents = contents + self.game_id = game_id yaml = YAML(typ='safe') # YAML parser doesn't allow tabs, so replace with spaces self._raw = yaml.load(self.contents.replace('\t', ' ')) @@ -85,12 +96,15 @@ def string_attrib(val: str) -> Dict[str, str]: } def sqs_message_attribs(self, high_ver: Optional['Ckan.Version'] = None) -> Dict[str, Any]: - attribs = {} + attribs: Dict[str, Any] = { + 'GameId': self.string_attrib(self.game_id or 'ksp') + } if high_ver and not getattr(self, 'x_netkan_allow_out_of_order', False): attribs['HighestVersion'] = self.string_attrib(high_ver.string) return attribs - def sqs_message(self, high_ver: Optional['Ckan.Version'] = None) -> Dict[str, Any]: + def sqs_message( + self, high_ver: Optional['Ckan.Version'] = None) -> SendMessageBatchRequestEntryTypeDef: hex_id = uuid.uuid4().hex return { 'Id': hex_id, diff --git a/netkan/netkan/mirrorer.py b/netkan/netkan/mirrorer.py index c4173844..2b57f33d 100644 --- a/netkan/netkan/mirrorer.py +++ b/netkan/netkan/mirrorer.py @@ -7,7 +7,7 @@ import shutil from pathlib import Path from importlib.resources import read_text -from typing import Optional, List, Union, Iterable, BinaryIO, Dict, Any +from typing import Optional, List, Union, Iterable, BinaryIO, Dict, Any, TYPE_CHECKING import boto3 import github import internetarchive @@ -17,10 +17,16 @@ from .repos import CkanMetaRepo from .common import deletion_msg, download_stream_to_file, USER_AGENT +if TYPE_CHECKING: + from mypy_boto3_sqs.type_defs import DeleteMessageBatchRequestEntryTypeDef +else: + DeleteMessageBatchRequestEntryTypeDef = object + class CkanMirror(Ckan): - DESCRIPTION_TEMPLATE = Template(read_text('netkan', 'mirror_description_template.jinja2')) + DESCRIPTION_TEMPLATE = Template( + read_text('netkan', 'mirror_description_template.jinja2')) REDISTRIBUTABLE_LICENSES = { "public-domain", @@ -310,7 +316,7 @@ def process_queue(self, queue_name: str, timeout: int) -> None: self.ckm_repo.git_repo.heads.master.checkout() self.ckm_repo.git_repo.remotes.origin.pull('master', strategy_option='theirs') # Start processing the messages - to_delete = [] + to_delete: List[DeleteMessageBatchRequestEntryTypeDef] = [] for msg in messages: # Check if archive.org is overloaded before each upload if self.ia_session.s3_is_overloaded(access_key=self.ia_access): diff --git a/netkan/netkan/queue_handler.py b/netkan/netkan/queue_handler.py new file mode 100644 index 00000000..170c4980 --- /dev/null +++ b/netkan/netkan/queue_handler.py @@ -0,0 +1,105 @@ +import logging + +from typing import Dict, List, Type, TYPE_CHECKING, Union +from types import TracebackType + +import boto3 + +from .repos import CkanMetaRepo, NetkanRepo +from .cli.common import Game, SharedArgs + +if TYPE_CHECKING: + from mypy_boto3_sqs.service_resource import Message + from mypy_boto3_sqs.type_defs import ( + DeleteMessageBatchRequestEntryTypeDef, + ) +else: + Game = object + Message = object + DeleteMessageBatchRequestEntryTypeDef = object + + +class BaseMessageHandler: + STRATEGY_OPTION = 'ours' + game: Game + + def __init__(self, game: Game) -> None: + self.game = game + + # Apparently gitpython can be leaky on long running processes + # we can ensure we call close on it and run our handler inside + # a context manager + def __enter__(self) -> 'BaseMessageHandler': + if not self.repo.is_active_branch('master'): + self.repo.checkout_branch('master') + self.repo.pull_remote_branch( + 'master', strategy_option=self.STRATEGY_OPTION) + return self + + def __exit__(self, exc_type: Type[BaseException], + exc_value: BaseException, traceback: TracebackType) -> None: + self.repo.close_repo() + + @property + def repo(self) -> Union[CkanMetaRepo, NetkanRepo]: + raise NotImplementedError + + def append(self, message: Message) -> None: + raise NotImplementedError + + def process_messages(self) -> None: + raise NotImplementedError + + def sqs_delete_entries(self) -> List[DeleteMessageBatchRequestEntryTypeDef]: + raise NotImplementedError + + +class QueueHandler: + common: SharedArgs + _game_handlers: Dict[str, BaseMessageHandler] + _handler_class: Type[BaseMessageHandler] = BaseMessageHandler + + def __init__(self, common: SharedArgs) -> None: + self.common = common + + @property + def game_handlers(self) -> Dict[str, BaseMessageHandler]: + if getattr(self, '_game_handlers', None) is None: + self._game_handlers = {} + return self._game_handlers + + def game_handler(self, game_id: str) -> BaseMessageHandler: + if self.game_handlers.get(game_id, None) is None: + self.game_handlers.update({ + game_id: self._handler_class(self.common.game(game_id)) + }) + return self.game_handlers[game_id] + + def append_message(self, game_id: str, message: Message) -> None: + self.game_handler(game_id).append(message) + + def run(self) -> None: + sqs = boto3.resource('sqs') + queue = sqs.get_queue_by_name(QueueName=self.common.queue) + while True: + messages = queue.receive_messages( + MaxNumberOfMessages=10, + MessageAttributeNames=['All'], + VisibilityTimeout=self.common.timeout + ) + if not messages: + continue + for message in messages: + game_id = message.message_attributes.get( # type: ignore[union-attr,call-overload] + 'GameId', {}).get('StringValue', None) + if game_id is None: + logging.error('GameId missing from MessageAttributes') + continue + self.append_message(game_id, message) + + for _, handler in self.game_handlers.items(): + with handler: + handler.process_messages() + queue.delete_messages( + Entries=handler.sqs_delete_entries() + ) diff --git a/netkan/netkan/repos.py b/netkan/netkan/repos.py index 0f9cbc43..4e0cb54f 100644 --- a/netkan/netkan/repos.py +++ b/netkan/netkan/repos.py @@ -14,8 +14,9 @@ class XkanRepo: Concantenates all common repo operations in one place """ - def __init__(self, git_repo: Repo) -> None: + def __init__(self, git_repo: Repo, game_id: Optional[str] = None) -> None: self.git_repo = git_repo + self.game_id = game_id def __repr__(self) -> str: return f'<{self.__class__.__name__}({self.git_repo.__repr__()})>' @@ -45,7 +46,8 @@ def checkout_branch(self, branch_name: str) -> None: branch.checkout() def pull_remote_branch(self, branch_name: str, strategy_option: str = 'ours') -> None: - self.git_repo.remotes.origin.pull(branch_name, strategy_option=strategy_option) + self.git_repo.remotes.origin.pull( + branch_name, strategy_option=strategy_option) def push_remote_branch(self, branch_name: str) -> None: self.git_repo.remotes.origin.push(branch_name) @@ -136,7 +138,7 @@ def nk_paths(self, identifiers: Iterable[str]) -> Iterable[Path]: return (self.nk_path(identifier) for identifier in identifiers) def netkans(self) -> Iterable[Netkan]: - return (Netkan(f) for f in self.all_nk_paths()) + return (Netkan(f, game_id=self.game_id) for f in self.all_nk_paths()) @staticmethod def _nk_sort(path: Path) -> str: @@ -160,9 +162,9 @@ def ckm_dir(self) -> Path: def identifiers(self) -> Iterable[str]: return (path.stem for path in self.ckm_dir.iterdir() if path.is_dir() - and self.IDENTIFIER_PATTERN.fullmatch(path.stem) - and any(child.match('*.ckan') - for child in path.iterdir())) + and self.IDENTIFIER_PATTERN.fullmatch(path.stem) + and any(child.match('*.ckan') + for child in path.iterdir())) def all_latest_modules(self) -> Iterable[Ckan]: return filter(None, diff --git a/netkan/netkan/scheduler.py b/netkan/netkan/scheduler.py index 5904d8a0..35d1c542 100644 --- a/netkan/netkan/scheduler.py +++ b/netkan/netkan/scheduler.py @@ -1,6 +1,6 @@ import datetime import logging -from typing import List, Dict, Any +from typing import List, Dict, Any, TYPE_CHECKING import boto3 import requests @@ -10,12 +10,20 @@ from .common import sqs_batch_entries, github_limit_remaining from .cli.common import SharedArgs +if TYPE_CHECKING: + from mypy_boto3_cloudwatch.client import CloudWatchClient + from mypy_boto3_sqs.type_defs import SendMessageBatchRequestEntryTypeDef +else: + CloudWatchClient = object + SendMessageBatchRequestEntryTypeDef = object + class NetkanScheduler: - def __init__(self, common: SharedArgs, queue: str, github_token: str, + def __init__(self, common: SharedArgs, queue: str, github_token: str, game_id: str, nonhooks_group: bool = False, webhooks_group: bool = False) -> None: self.common = common + self.game_id = game_id self.nonhooks_group = nonhooks_group self.webhooks_group = webhooks_group self.github_token = github_token @@ -30,13 +38,13 @@ def __init__(self, common: SharedArgs, queue: str, github_token: str, @property def nk_repo(self) -> NetkanRepo: - return self.common.netkan_repo + return self.common.game(self.game_id).netkan_repo @property def ckm_repo(self) -> CkanMetaRepo: - return self.common.ckanmeta_repo + return self.common.game(self.game_id).ckanmeta_repo - def sqs_batch_attrs(self, batch: List[Dict[str, Any]]) -> Dict[str, Any]: + def sqs_batch_attrs(self, batch: List[SendMessageBatchRequestEntryTypeDef]) -> Dict[str, Any]: return { 'QueueUrl': self.queue_url, 'Entries': batch @@ -52,7 +60,7 @@ def schedule_all_netkans(self) -> None: self.client.send_message_batch(**self.sqs_batch_attrs(batch)) @staticmethod - def cpu_credits(cloudwatch: 'boto3.CloudWatch.Client', instance_id: str, + def cpu_credits(cloudwatch: CloudWatchClient, instance_id: str, start: datetime.datetime, end: datetime.datetime) -> int: stats = cloudwatch.get_metric_statistics( Dimensions=[{'Name': 'InstanceId', 'Value': instance_id}], @@ -68,13 +76,13 @@ def cpu_credits(cloudwatch: 'boto3.CloudWatch.Client', instance_id: str, # check, but useful to avoid DoS'ing the service when we're doing high CPU operations. creds = 0 try: - creds = stats['Datapoints'][0]['Average'] + creds = int(stats['Datapoints'][0]['Average']) except IndexError: logging.error("Couldn't acquire CPU Credit Stats") - return int(creds) + return creds @staticmethod - def volume_credits_percent(cloudwatch: 'boto3.CloudWatch.Client', instance_id: str, + def volume_credits_percent(cloudwatch: CloudWatchClient, instance_id: str, start: datetime.datetime, end: datetime.datetime) -> int: client = boto3.client('ec2') response = client.describe_volumes( @@ -97,10 +105,10 @@ def volume_credits_percent(cloudwatch: 'boto3.CloudWatch.Client', instance_id: s ) creds = 0 try: - creds = stats['Datapoints'][0]['Average'] + creds = int(stats['Datapoints'][0]['Average']) except IndexError: logging.error("Couldn't acquire Volume Credit Stats") - return int(creds) + return creds def can_schedule(self, max_queued: int, dev: bool = False, min_cpu: int = 50, min_io: int = 70, min_gh: int = 1500) -> bool: diff --git a/netkan/netkan/spacedock_adder.py b/netkan/netkan/spacedock_adder.py index 10ae4139..d18b497a 100644 --- a/netkan/netkan/spacedock_adder.py +++ b/netkan/netkan/spacedock_adder.py @@ -3,60 +3,50 @@ import io from importlib.resources import read_text from string import Template -from collections import defaultdict +from collections import defaultdict, deque import logging -from typing import Dict, Any, Optional +from typing import Dict, Deque, Any, List, Optional, Type, TYPE_CHECKING import git -import boto3 from ruamel.yaml import YAML +from .cli.common import Game from .github_pr import GitHubPR from .mod_analyzer import ModAnalyzer -from .common import deletion_msg - +from .queue_handler import BaseMessageHandler, QueueHandler from .repos import NetkanRepo +if TYPE_CHECKING: + from mypy_boto3_sqs.service_resource import Message + from mypy_boto3_sqs.type_defs import DeleteMessageBatchRequestEntryTypeDef +else: + Message = object + DeleteMessageBatchRequestEntryTypeDef = object + # https://github.com/KSP-SpaceDock/SpaceDock/blob/master/KerbalStuff/ckan.py class SpaceDockAdder: - PR_BODY_TEMPLATE = Template(read_text('netkan', 'pr_body_template.md')) USER_TEMPLATE = Template('[$username]($user_url)') + _info: Dict[str, Any] - def __init__(self, queue: str, timeout: int, nk_repo: NetkanRepo, github_pr: Optional[GitHubPR] = None) -> None: - sqs = boto3.resource('sqs') - self.sqs_client = boto3.client('sqs') - self.queue = sqs.get_queue_by_name(QueueName=queue) - self.timeout = timeout + def __init__(self, message: Message, nk_repo: NetkanRepo, github_pr: Optional[GitHubPR] = None) -> None: + self.message = message self.nk_repo = nk_repo self.github_pr = github_pr self.yaml = YAML() self.yaml.indent(mapping=2, sequence=4, offset=2) - def run(self) -> None: - while True: - messages = self.queue.receive_messages( - MaxNumberOfMessages=10, - MessageAttributeNames=['All'], - VisibilityTimeout=self.timeout, - ) - if messages: - self.nk_repo.git_repo.heads.master.checkout() - self.nk_repo.git_repo.remotes.origin.pull( - 'master', strategy_option='ours') - - # Start processing the messages - to_delete = [] - for msg in messages: - if self.try_add(json.loads(msg.body)): - # Successfully handled -> OK to delete - to_delete.append(deletion_msg(msg)) - self.queue.delete_messages(Entries=to_delete) - # Clean up GitPython's lingering file handles between batches - self.nk_repo.git_repo.close() - - def try_add(self, info: Dict[str, Any]) -> bool: - netkan = self.make_netkan(info) + def __str__(self) -> str: + return f"{self.info.get('name', '')}" + + @property + def info(self) -> Dict[str, Any]: + if getattr(self, '_info', None) is None: + self._info = json.loads(self.message.body) + return self._info + + def try_add(self) -> bool: + netkan = self.make_netkan(self.info) # Create .netkan file or quit if already there netkan_path = self.nk_repo.nk_path(netkan.get('identifier', '')) @@ -92,10 +82,10 @@ def try_add(self, info: Dict[str, Any]) -> bool: # Commit self.nk_repo.git_repo.index.commit( ( - f"Add {info.get('name')} from {info.get('site_name')}" - f"\n\nThis is an automated commit on behalf of {info.get('username')}" + f"Add {self.info.get('name')} from {self.info.get('site_name')}" + f"\n\nThis is an automated commit on behalf of {self.info.get('username')}" ), - author=git.Actor(info.get('username'), info.get('email')) + author=git.Actor(self.info.get('username'), self.info.get('email')) ) # Push branch @@ -105,10 +95,12 @@ def try_add(self, info: Dict[str, Any]) -> bool: # Create pull request if self.github_pr: self.github_pr.create_pull_request( - title=f"Add {info.get('name')} from {info.get('site_name')}", + title=f"Add {self.info.get('name')} from {self.info.get('site_name')}", branch=branch_name, - body=SpaceDockAdder._pr_body(info), - labels=['Pull request', 'Mod request']) + body=self.PR_BODY_TEMPLATE.safe_substitute( + defaultdict(lambda: '', self.info)), + labels=['Pull request', 'Mod-request'], + ) return True @staticmethod @@ -149,3 +141,58 @@ def make_netkan(cls, info: Dict[str, Any]) -> Dict[str, Any]: **(props), 'x_via': f"Automated {info.get('site_name')} CKAN submission" } + + @property + def delete_attrs(self) -> DeleteMessageBatchRequestEntryTypeDef: + return { + 'Id': self.message.message_id, + 'ReceiptHandle': self.message.receipt_handle + } + + +class SpaceDockMessageHandler(BaseMessageHandler): + queued: Deque[SpaceDockAdder] + processed: List[SpaceDockAdder] + + def __init__(self, game: Game) -> None: + super().__init__(game) + self.queued = deque() + self.processed = [] + + def __str__(self) -> str: + return str(' '.join([str(x) for x in self.queued])) + + def __len__(self) -> int: + return len(self.queued) + + @property + def repo(self) -> NetkanRepo: + return self.game.netkan_repo + + @property + def github_pr(self) -> GitHubPR: + return self.game.github_pr + + def append(self, message: Message) -> None: + netkan = SpaceDockAdder( + message, + self.repo, + self.github_pr + ) + self.queued.append(netkan) + + def _process_queue(self, queue: Deque[SpaceDockAdder]) -> None: + while queue: + netkan = queue.popleft() + if netkan.try_add(): + self.processed.append(netkan) + + def sqs_delete_entries(self) -> List[DeleteMessageBatchRequestEntryTypeDef]: + return [c.delete_attrs for c in self.processed] + + def process_messages(self) -> None: + self._process_queue(self.queued) + + +class SpaceDockAdderQueueHandler(QueueHandler): + _handler_class: Type[BaseMessageHandler] = SpaceDockMessageHandler diff --git a/netkan/netkan/webhooks/__init__.py b/netkan/netkan/webhooks/__init__.py index 8859b49c..c4ca0a3b 100644 --- a/netkan/netkan/webhooks/__init__.py +++ b/netkan/netkan/webhooks/__init__.py @@ -33,12 +33,12 @@ def __init__(self) -> None: def create_app() -> NetkanWebhooks: # Set config values for other modules to retrieve current_config.setup( - os.environ.get('SSH_KEY', ''), - os.environ.get('XKAN_GHSECRET', ''), - os.environ.get('NETKAN_REMOTE', ''), '/tmp/NetKAN', - os.environ.get('CKANMETA_REMOTE', ''), '/tmp/CKAN-meta', - os.environ.get('INFLATION_SQS_QUEUE', ''), - os.environ.get('ADD_SQS_QUEUE', ''), - os.environ.get('MIRROR_SQS_QUEUE', '') + ssh_key=os.environ.get('SSH_KEY', ''), + secret=os.environ.get('XKAN_GHSECRET', ''), + netkan_remotes=os.environ.get('NETKAN_REMOTES', ''), + ckanmeta_remotes=os.environ.get('CKANMETA_REMOTES', ''), + inf_queue_names=os.environ.get('INFLATION_SQS_QUEUES', ''), + add_queue_name=os.environ.get('ADD_SQS_QUEUE', ''), + mir_queue_name=os.environ.get('MIRROR_SQS_QUEUE', '') ) return NetkanWebhooks() diff --git a/netkan/netkan/webhooks/config.py b/netkan/netkan/webhooks/config.py index 0cbfd708..9927389e 100644 --- a/netkan/netkan/webhooks/config.py +++ b/netkan/netkan/webhooks/config.py @@ -1,11 +1,22 @@ -from pathlib import Path +from typing import TYPE_CHECKING + import boto3 -from ..repos import NetkanRepo, CkanMetaRepo -from ..utils import init_repo, init_ssh +from ..cli.common import SharedArgs + + +if TYPE_CHECKING: + from mypy_boto3_sqs.client import SQSClient + from mypy_boto3_sqs.service_resource import Queue +else: + SQSClient = object + Queue = object class WebhooksConfig: + _client: SQSClient + _add_queue: Queue + _mirror_queue: Queue # pylint: disable=attribute-defined-outside-init @@ -13,24 +24,48 @@ class WebhooksConfig: # import a reference to our global config object before we set # its properties, and that requires a temporary 'empty' state. def setup(self, ssh_key: str, secret: str, - netkan_remote: str, netkan_path: str, - ckanmeta_remote: str, ckanmeta_path: str, - inf_queue_name: str, add_queue_name: str, mir_queue_name: str) -> None: + netkan_remotes: str, ckanmeta_remotes: str, + inf_queue_names: str, add_queue_name: str, mir_queue_name: str) -> None: self.secret = secret - # Cloning the repos requires an SSH key set up in our home dir - init_ssh(ssh_key, Path(Path.home(), '.ssh')) + self.common = SharedArgs() + self.common.ssh_key = ssh_key + self.common.ckanmeta_remotes = tuple(ckanmeta_remotes.split(' ')) + self.common.netkan_remotes = tuple(netkan_remotes.split(' ')) + self.common.inflation_queues = tuple(inf_queue_names.split(' ')) + self.common.deep_clone = False + self._add_queue_name = add_queue_name + self._mir_queue_name = mir_queue_name + + @property + def client(self) -> SQSClient: + if getattr(self, '_client', None) is None: + self._client = boto3.client('sqs') + return self._client - self.nk_repo = NetkanRepo(init_repo(netkan_remote, netkan_path, False)) - self.ckm_repo = CkanMetaRepo(init_repo(ckanmeta_remote, ckanmeta_path, False)) - self.repos = [self.nk_repo.git_repo, self.ckm_repo.git_repo] + def inflation_queue(self, game: str) -> Queue: + game_id = game.lower() + if getattr(self, f'_{game_id}_inflation_queue', None) is None: + sqs = boto3.resource('sqs') + setattr(self, f'_{game_id}_inflation_queue', sqs.get_queue_by_name( + QueueName=self.common.game(game_id).inflation_queue)) + return getattr(self, f'_{game_id}_inflation_queue') + + @property + def add_queue(self) -> Queue: + if getattr(self, '_add_queue', None) is None: + sqs = boto3.resource('sqs') + self._add_queue = sqs.get_queue_by_name( + QueueName=self._add_queue_name) + return self._add_queue - if inf_queue_name or add_queue_name or mir_queue_name: - self.client = boto3.client('sqs') + @property + def mirror_queue(self) -> Queue: + if getattr(self, '_mirror_queue', None) is None: sqs = boto3.resource('sqs') - self.inflation_queue = sqs.get_queue_by_name(QueueName=inf_queue_name) - self.add_queue = sqs.get_queue_by_name(QueueName=add_queue_name) - self.mirror_queue = sqs.get_queue_by_name(QueueName=mir_queue_name) + self._mirror_queue = sqs.get_queue_by_name( + QueueName=self._mir_queue_name) + return self._mirror_queue # Provide the active config to other modules diff --git a/netkan/netkan/webhooks/github_inflate.py b/netkan/netkan/webhooks/github_inflate.py index 003d8d5c..e29940f3 100644 --- a/netkan/netkan/webhooks/github_inflate.py +++ b/netkan/netkan/webhooks/github_inflate.py @@ -4,29 +4,32 @@ from flask import Blueprint, current_app, request, jsonify, Response from ..common import netkans, sqs_batch_entries, pull_all +from ..repos import NetkanRepo from ..status import ModStatus from .github_utils import signature_required from .config import current_config -github_inflate = Blueprint('github_inflate', __name__) # pylint: disable=invalid-name +github_inflate = Blueprint( + 'github_inflate', __name__) # pylint: disable=invalid-name # For after-commit hook in NetKAN repo # Handles: https://netkan.ksp-ckan.space/gh/inflate -@github_inflate.route('/inflate', methods=['POST']) +@github_inflate.route('/inflate/', methods=['POST']) @signature_required -def inflate_hook() -> Tuple[Union[Response, str], int]: +def inflate_hook(game_id: str) -> Tuple[Union[Response, str], int]: raw = request.get_json(silent=True) branch = raw.get('ref') # type: ignore[union-attr] - if branch != current_config.nk_repo.git_repo.head.ref.path: - current_app.logger.info('Received inflation request for wrong ref %s, ignoring', branch) + if branch != current_config.common.game(game_id).netkan_repo.git_repo.head.ref.path: + current_app.logger.info( + 'Received inflation request for wrong ref %s, ignoring', branch) return jsonify({'message': 'Wrong branch'}), 200 commits = raw.get('commits') # type: ignore[union-attr] if not commits: current_app.logger.info('No commits received') return jsonify({'message': 'No commits received'}), 200 - inflate(ids_from_commits(commits)) - freeze(frozen_ids_from_commits(commits)) + inflate(ids_from_commits(commits), game_id) + freeze(frozen_ids_from_commits(commits), game_id) return '', 204 @@ -34,18 +37,18 @@ def inflate_hook() -> Tuple[Union[Response, str], int]: # Handles: https://netkan.ksp-ckan.space/gh/release?identifier=AwesomeMod # Putting this here instead of in a github_release.py file # because it's small and quite similar to inflate -@github_inflate.route('/release', methods=['POST']) +@github_inflate.route('/release/', methods=['POST']) @signature_required -def release_hook() -> Tuple[str, int]: +def release_hook(game_id: str) -> Tuple[str, int]: ident = request.args.get('identifier') if not ident: return 'Param "identifier" is required, e.g. http://netkan.ksp-ckan.space/gh/release?identifier=AwesomeMod', 400 - inflate([ident]) + inflate([ident], game_id) return '', 204 def ends_with_netkan(filename: str) -> bool: - return filename.endswith(f".{current_config.nk_repo.UNFROZEN_SUFFIX}") + return filename.endswith(f".{NetkanRepo.UNFROZEN_SUFFIX}") def ids_from_commits(commits: List[Dict[str, Any]]) -> Iterable[str]: @@ -56,21 +59,22 @@ def ids_from_commits(commits: List[Dict[str, Any]]) -> Iterable[str]: return (Path(f).stem for f in files) -def inflate(ids: Iterable[str]) -> None: - if current_config.nk_repo.git_repo.working_dir: +def inflate(ids: Iterable[str], game_id: str) -> None: + game = current_config.common.game(game_id) + if game.netkan_repo.git_repo.working_dir: # Make sure our NetKAN and CKAN-meta repos are up to date - pull_all(current_config.repos) - messages = (nk.sqs_message(current_config.ckm_repo.highest_version(nk.identifier)) - for nk in netkans(str(current_config.nk_repo.git_repo.working_dir), ids)) + pull_all(game.repos) + messages = (nk.sqs_message(game.ckanmeta_repo.highest_version(nk.identifier)) + for nk in netkans(str(game.netkan_repo.git_repo.working_dir), ids, game_id)) for batch in sqs_batch_entries(messages): current_config.client.send_message_batch( - QueueUrl=current_config.inflation_queue.url, + QueueUrl=current_config.inflation_queue(game_id).url, Entries=batch ) def ends_with_frozen(filename: str) -> bool: - return filename.endswith(f".{current_config.nk_repo.FROZEN_SUFFIX}") + return filename.endswith(f".{NetkanRepo.FROZEN_SUFFIX}") def frozen_ids_from_commits(commits: List[Dict[str, Any]]) -> List[str]: @@ -81,7 +85,7 @@ def frozen_ids_from_commits(commits: List[Dict[str, Any]]) -> List[str]: return [Path(f).stem for f in files] -def freeze(ids: List[str]) -> None: +def freeze(ids: List[str], game_id: str) -> None: if ids: logging.info('Marking frozen mods...') for ident in ids: @@ -98,7 +102,7 @@ def freeze(ids: List[str]) -> None: cached_downloads = list(filter( None, (ck.cache_find_file - for ck in current_config.ckm_repo.ckans(ident)))) + for ck in current_config.common.game(game_id).ckanmeta_repo.ckans(ident)))) if cached_downloads: logging.info('Purging %s files from cache for %s', len(cached_downloads), ident) diff --git a/netkan/netkan/webhooks/github_mirror.py b/netkan/netkan/webhooks/github_mirror.py index fbb642e0..a95aa456 100644 --- a/netkan/netkan/webhooks/github_mirror.py +++ b/netkan/netkan/webhooks/github_mirror.py @@ -1,25 +1,31 @@ import re from pathlib import Path from hashlib import md5 -from typing import Tuple, List, Iterable, Dict, Any, Set, Union +from typing import Tuple, List, Iterable, Dict, Any, Set, Union, TYPE_CHECKING from flask import Blueprint, current_app, request, jsonify, Response from .github_utils import signature_required from ..common import sqs_batch_entries from .config import current_config +if TYPE_CHECKING: + from mypy_boto3_sqs.type_defs import SendMessageBatchRequestEntryTypeDef +else: + SendMessageBatchRequestEntryTypeDef = object -github_mirror = Blueprint('github_mirror', __name__) # pylint: disable=invalid-name +github_mirror = Blueprint( + 'github_mirror', __name__) # pylint: disable=invalid-name # For after-commit hook in CKAN-meta repo # Handles: https://netkan.ksp-ckan.space/gh/mirror -@github_mirror.route('/mirror', methods=['POST']) +@github_mirror.route('/mirror/', methods=['POST']) @signature_required -def mirror_hook() -> Tuple[Union[Response, str], int]: +def mirror_hook(game_id: str) -> Tuple[Union[Response, str], int]: raw = request.get_json(silent=True) ref = raw.get('ref') # type: ignore[union-attr] - expected_ref = current_config.ckm_repo.git_repo.heads.master.path + expected_ref = current_config.common.game( + game_id).ckanmeta_repo.git_repo.heads.master.path if ref != expected_ref: current_app.logger.info( "Wrong branch. Expected '%s', got '%s'", expected_ref, ref) @@ -28,16 +34,8 @@ def mirror_hook() -> Tuple[Union[Response, str], int]: if not commits: current_app.logger.info('No commits received') return jsonify({'message': 'No commits received'}), 200 - # Make sure it's not from the crawler - sender = raw.get('sender') # type: ignore[union-attr] - if sender: - login = sender.get('login') - if login: - if login == 'kspckan-crawler': - current_app.logger.info('Commits sent by crawler, skipping on demand mirror') - return '', 204 # Submit mirroring requests to queue in batches of <=10 - messages = (batch_message(p) for p in paths_from_commits(commits)) + messages = (batch_message(p, game_id) for p in paths_from_commits(commits)) for batch in sqs_batch_entries(messages): current_app.logger.info(f'Queueing mirroring request batch: {batch}') current_config.client.send_message_batch( @@ -47,16 +45,23 @@ def mirror_hook() -> Tuple[Union[Response, str], int]: return '', 204 -forbidden_id_chars = re.compile('[^-_A-Za-z0-9]') # pylint: disable=invalid-name +forbidden_id_chars = re.compile( + '[^-_A-Za-z0-9]') # pylint: disable=invalid-name -def batch_message(path: Path) -> Dict[str, Any]: +def batch_message(path: Path, game_id: str) -> SendMessageBatchRequestEntryTypeDef: body = path.as_posix() return { 'Id': forbidden_id_chars.sub('_', body)[-80:], 'MessageBody': body, 'MessageGroupId': '1', - 'MessageDeduplicationId': md5(body.encode()).hexdigest() + 'MessageDeduplicationId': md5(body.encode()).hexdigest(), + 'MessageAttributes': { + 'GameId': { + 'DataType': 'String', + 'StringValue': game_id, + } + } } diff --git a/netkan/netkan/webhooks/inflate.py b/netkan/netkan/webhooks/inflate.py index c6c2c1d4..848a4227 100644 --- a/netkan/netkan/webhooks/inflate.py +++ b/netkan/netkan/webhooks/inflate.py @@ -11,23 +11,24 @@ # For SpaceDock's trigger when new versions are uploaded # Handles: https://netkan.ksp-ckan.space/inflate # Payload: { "identifiers": [ "Id1", "Id2", ... ] } -@inflate.route('/inflate', methods=['POST']) -def inflate_hook() -> Tuple[str, int]: - if current_config.nk_repo.git_repo.working_dir: - # SpaceDock doesn't set the `Content-Type: application/json` header - raw = request.get_json(force=True) - ids = raw.get('identifiers') # type: ignore[union-attr] - if not ids: - current_app.logger.info('No identifiers received') - return 'An array of identifiers is required', 400 - # Make sure our NetKAN and CKAN-meta repos are up to date - pull_all(current_config.repos) - messages = (nk.sqs_message(current_config.ckm_repo.highest_version(nk.identifier)) - for nk in netkans(str(current_config.nk_repo.git_repo.working_dir), ids)) - for batch in sqs_batch_entries(messages): - current_app.logger.info(f'Queueing inflation request batch: {batch}') - current_config.client.send_message_batch( - QueueUrl=current_config.inflation_queue.url, - Entries=batch - ) +@inflate.route('/inflate/', methods=['POST']) +def inflate_hook(game_id: str) -> Tuple[str, int]: + # SpaceDock doesn't set the `Content-Type: application/json` header + raw = request.get_json(force=True) + game = current_config.common.game(game_id) + ids = raw.get('identifiers') # type: ignore[union-attr] + if not ids: + current_app.logger.info('No identifiers received') + return 'An array of identifiers is required', 400 + # Make sure our NetKAN and CKAN-meta repos are up to date + pull_all(game.repos) + messages = (nk.sqs_message(game.ckanmeta_repo.highest_version(nk.identifier)) + for nk in netkans(str(game.netkan_repo.git_repo.working_dir), ids, game_id=game_id)) + for batch in sqs_batch_entries(messages): + current_app.logger.info( + f'Queueing inflation request batch: {batch}') + current_config.client.send_message_batch( + QueueUrl=current_config.inflation_queue(game_id).url, + Entries=batch + ) return '', 204 diff --git a/netkan/netkan/webhooks/spacedock_add.py b/netkan/netkan/webhooks/spacedock_add.py index 9f5ecd37..eb70e386 100644 --- a/netkan/netkan/webhooks/spacedock_add.py +++ b/netkan/netkan/webhooks/spacedock_add.py @@ -1,12 +1,18 @@ from hashlib import md5 import json -from typing import Tuple, Dict, Any +from typing import Tuple, Dict, Any, TYPE_CHECKING from flask import Blueprint, current_app, request from ..common import sqs_batch_entries from .config import current_config -spacedock_add = Blueprint('spacedock_add', __name__) # pylint: disable=invalid-name +if TYPE_CHECKING: + from mypy_boto3_sqs.type_defs import SendMessageBatchRequestEntryTypeDef +else: + SendMessageBatchRequestEntryTypeDef = object + +spacedock_add = Blueprint( + 'spacedock_add', __name__) # pylint: disable=invalid-name # For mod creation hook on SpaceDock, creates pull requests @@ -24,10 +30,10 @@ # user_url: https://spacedock.info/profile/ModAuthor1 # mod_url: https://spacedock.info/mod/12345 # site_name: SpaceDock -@spacedock_add.route('/add', methods=['POST']) -def add_hook() -> Tuple[str, int]: +@spacedock_add.route('/add/', methods=['POST']) +def add_hook(game_id: str) -> Tuple[str, int]: # Submit add requests to queue in batches of <=10 - messages = [batch_message(request.form)] + messages = [batch_message(request.form, game_id)] for batch in sqs_batch_entries(messages): current_app.logger.info(f'Queueing add request batch: {batch}') current_config.client.send_message_batch( @@ -37,11 +43,17 @@ def add_hook() -> Tuple[str, int]: return '', 204 -def batch_message(raw: Dict[str, Any]) -> Dict[str, Any]: +def batch_message(raw: Dict[str, Any], game_id: str) -> SendMessageBatchRequestEntryTypeDef: body = json.dumps(raw) return { 'Id': '1', 'MessageBody': body, 'MessageGroupId': '1', - 'MessageDeduplicationId': md5(body.encode()).hexdigest() + 'MessageDeduplicationId': md5(body.encode()).hexdigest(), + 'MessageAttributes': { + 'GameId': { + 'DataType': 'String', + 'StringValue': game_id, + } + } } diff --git a/netkan/netkan/webhooks/spacedock_inflate.py b/netkan/netkan/webhooks/spacedock_inflate.py index 893e416d..445cec48 100644 --- a/netkan/netkan/webhooks/spacedock_inflate.py +++ b/netkan/netkan/webhooks/spacedock_inflate.py @@ -6,7 +6,8 @@ from .config import current_config -spacedock_inflate = Blueprint('spacedock_inflate', __name__) # pylint: disable=invalid-name +spacedock_inflate = Blueprint( + 'spacedock_inflate', __name__) # pylint: disable=invalid-name # For after-upload hook on SpaceDock @@ -16,12 +17,12 @@ # event_type: update - New version of mod was uploaded # version-update - Default version changed # delete - Mod was deleted from SpaceDock -@spacedock_inflate.route('/inflate', methods=['POST']) -def inflate_hook() -> Tuple[str, int]: +@spacedock_inflate.route('/inflate/', methods=['POST']) +def inflate_hook(game_id: str) -> Tuple[str, int]: # Make sure our NetKAN and CKAN-meta repos are up to date - pull_all(current_config.repos) + pull_all(current_config.common.game(game_id).repos) # Get the relevant netkans - nks = find_netkans(request.form.get('mod_id', '')) + nks = find_netkans(request.form.get('mod_id', ''), game_id) if nks: if request.form.get('event_type') == 'delete': # Just let the team know on Discord @@ -43,17 +44,18 @@ def inflate_hook() -> Tuple[str, int]: return '', 204 # Submit them to the queue - messages = (nk.sqs_message(current_config.ckm_repo.highest_version(nk.identifier)) - for nk in nks) + messages = (nk.sqs_message( + current_config.common.game(game_id).ckanmeta_repo.highest_version(nk.identifier)) + for nk in nks) for batch in sqs_batch_entries(messages): current_config.client.send_message_batch( - QueueUrl=current_config.inflation_queue.url, + QueueUrl=current_config.inflation_queue(game_id).url, Entries=batch ) return '', 204 return 'No such module', 404 -def find_netkans(sd_id: str) -> List[Netkan]: - all_nk = current_config.nk_repo.netkans() +def find_netkans(sd_id: str, game_id: str) -> List[Netkan]: + all_nk = current_config.common.game(game_id).netkan_repo.netkans() return [nk for nk in all_nk if nk.kref_src == 'spacedock' and nk.kref_id == sd_id] diff --git a/netkan/pytest.ini b/netkan/pytest.ini index fa58e925..b1fbcc36 100644 --- a/netkan/pytest.ini +++ b/netkan/pytest.ini @@ -1,3 +1,7 @@ [pytest] python_files = tests/__init__.py addopts = -p no:cacheprovider --mypy --pylint +filterwarnings = + ignore + default:::netkan.* + default:::tests.* diff --git a/netkan/setup.py b/netkan/setup.py index 9e4d045f..5d353d8f 100644 --- a/netkan/setup.py +++ b/netkan/setup.py @@ -37,6 +37,8 @@ 'development': [ 'ptvsd', 'autopep8', + 'boto3-stubs[essential,cloudwatch]', + 'coverage', 'troposphere', 'pytest', 'pytest-mypy', @@ -50,6 +52,8 @@ 'types-Jinja2', ], 'test': [ + 'boto3-stubs[essential,cloudwatch]', + 'coverage', 'pytest', 'pytest-mypy', 'mypy', diff --git a/netkan/tests/__init__.py b/netkan/tests/__init__.py index 9cc81985..135084ca 100644 --- a/netkan/tests/__init__.py +++ b/netkan/tests/__init__.py @@ -9,3 +9,5 @@ from .utils import * from .csharp_compat import * from .auto_freezer import * +from .spacedock_adder import * +from .webhooks import * diff --git a/netkan/tests/cli.py b/netkan/tests/cli.py index 2ccfb4a1..4103c3f5 100644 --- a/netkan/tests/cli.py +++ b/netkan/tests/cli.py @@ -1,22 +1,27 @@ -import unittest +import sys from time import time from os import utime from pathlib import Path, PurePath from shutil import copy2 from tempfile import TemporaryDirectory +from unittest import TestCase, mock + from click.testing import CliRunner from git import Repo from netkan.cli import clean_cache +from netkan.cli.common import SharedArgs, Game from netkan.repos import NetkanRepo +from .common import SharedArgsHarness + # This file is intended to test the commands in cli.py, running them directly via click.testing.CliRunner().invoke(). -class TestCleanCache(unittest.TestCase): +class TestCleanCache(TestCase): cache_path = TemporaryDirectory() testdata_path = Path(PurePath(__file__).parent, 'testdata/NetKAN/') - repo = NetkanRepo(Repo.init(testdata_path)) + repo = NetkanRepo(Repo.init(testdata_path), 'ksp') source_file_1 = repo.nk_path('DogeCoinFlag') source_file_2 = repo.nk_path('FlagCoinDoge') @@ -44,8 +49,61 @@ def tearDown(self): def test_clean_all(self): - result = self.runner.invoke(clean_cache, ['--days', '42', '--cache', self.cache_path.name]) + result = self.runner.invoke( + clean_cache, ['--days', '42', '--cache', self.cache_path.name]) self.assertEqual(result.exit_code, 0) self.assertFalse(Path.exists(self.target_file_1)) self.assertTrue(Path.exists(self.target_file_2)) + + +class TestSharedArgs(TestCase): + + def test_debug_unset(self): + shared = SharedArgs() + with mock.patch.object(sys, 'argv', ['group', 'command']): + setattr(shared, 'debug', None) + self.assertFalse(shared.debug) + + def test_shared_unset_arg_exits(self): + shared = SharedArgs() + with self.assertRaises(SystemExit) as error: + shared.ckanmeta_remote # pylint: disable=pointless-statement + self.assertEqual(error.exception.code, 1) + + +class TestGame(SharedArgsHarness): + + def test_game_unset_var_exits(self): + game = Game('unknown', self.shared_args) + with self.assertRaises(SystemExit) as error: + game.ckanmeta_remote # pylint: disable=pointless-statement + self.assertEqual(error.exception.code, 1) + + def test_shared_args_game_ksp(self): + self.assertEqual(self.shared_args.game('ksp').name, 'ksp') + self.assertIsInstance(self.shared_args.game('ksp'), Game) + + def test_ckanmeta_remote_ksp(self): + path = f'{self.tmpdir.name}/upstream/ckan' + self.assertEqual( + Game('ksp', self.shared_args).ckanmeta_remote, path) + + def test_netkan_remote_ksp(self): + path = f'{self.tmpdir.name}/upstream/netkan' + self.assertEqual( + Game('ksp', self.shared_args).netkan_remote, path) + + def test_shared_args_game_ksp2(self): + self.assertEqual(self.shared_args.game('ksp2').name, 'ksp2') + self.assertIsInstance(self.shared_args.game('ksp'), Game) + + def test_ckanmeta_remote_ksp2(self): + path = f'{self.tmpdir.name}/upstream/ckan' + self.assertEqual( + Game('ksp2', self.shared_args).ckanmeta_remote, path) + + def test_netkan_remote_ksp2(self): + path = f'{self.tmpdir.name}/upstream/netkan' + self.assertEqual( + Game('ksp2', self.shared_args).netkan_remote, path) diff --git a/netkan/tests/common.py b/netkan/tests/common.py new file mode 100644 index 00000000..a6e437f7 --- /dev/null +++ b/netkan/tests/common.py @@ -0,0 +1,125 @@ +import shutil +import tempfile + +from abc import ABC +from unittest import TestCase, mock +from pathlib import Path, PurePath + +from git import Repo +from gitdb.exc import BadName + +from netkan.cli.common import SharedArgs + + +class SharedArgsMixin(ABC): + _cls_tmpdir: tempfile.TemporaryDirectory + repos = ['ckan', 'netkan'] + ckan_data = Path(PurePath(__file__).parent, 'testdata/CKAN-meta') + netkan_data = Path(PurePath(__file__).parent, 'testdata/NetKAN') + + @classmethod + def _tmpdir(cls) -> tempfile.TemporaryDirectory: + if getattr(cls, '_cls_tmpdir', None) is None: + cls._cls_tmpdir = tempfile.TemporaryDirectory() + return cls._cls_tmpdir + + @classmethod + def configure_repos(cls) -> None: + for repo in cls.repos: + working = Path(cls._tmpdir().name, 'working', repo) + upstream = Path(cls._tmpdir().name, 'upstream', repo) + upstream.mkdir(parents=True) + Repo.init(upstream, bare=True) + shutil.copytree(getattr(cls, f'{repo}_data'), working) + git_repo = Repo.init(working) + shutil.copy(Path(__file__).parent.parent / '.gitconfig', + working / '.git' / 'config') + git_repo.index.add(git_repo.untracked_files) + git_repo.index.commit('Test Data') + git_repo.create_remote('origin', upstream.as_posix()) + git_repo.remotes.origin.push('master:master') + setattr(cls, f'{repo}_path', working) + setattr(cls, f'{repo}_upstream', upstream) + + @classmethod + def cleanup(cls) -> None: + cls._tmpdir().cleanup() + + @property + def tmpdir(self) -> tempfile.TemporaryDirectory: + return self._tmpdir() + + +class SharedArgsHarness(TestCase, SharedArgsMixin): + + @classmethod + def setUpClass(cls): + super(SharedArgsHarness, cls).setUpClass() + cls.configure_repos() + + @classmethod + def tearDownClass(cls): + super(SharedArgsHarness, cls).tearDownClass() + cls.cleanup() + + def setUp(self): + patch = mock.patch( + 'netkan.cli.common.Game.clone_base', self.tmpdir.name) + patch.start() + self.shared_args = SharedArgs() + self.shared_args.deep_clone = True + self.shared_args.token = '1234' + self.shared_args.user = 'ckan-test' + ckan_upstream = getattr(self, 'ckan_upstream') + netkan_upstream = getattr(self, 'netkan_upstream') + attributes = [ + ('ckanmeta_remotes', + (f'ksp={ckan_upstream}', f'ksp2={ckan_upstream}')), + ('netkan_remotes', + (f'ksp={netkan_upstream}', f'ksp2={netkan_upstream}')), + ('repos', + ('ksp=Test/KSP', 'ksp2=Test/KSP2')), + ('inflation_queues', + ('ksp=ksp.queue.url', 'ksp2=ksp2.queue.url')), + ('ia_collections', + ('ksp=kspcollection ksp2=ksp2collection')) + ] + for attr, val in attributes: + setattr(self.shared_args, attr, val) + + def tearDown(self): + for repo in ['ckan_path', 'netkan_path']: + meta = Repo(getattr(self, repo)) + meta.git.clean('-df') + meta.heads.master.checkout() + try: + cleanup = meta.create_head('cleanup', 'HEAD~1') + meta.head.reference = cleanup + meta.head.reset(index=True, working_tree=True) + except BadName: + pass + mock.patch.stopall() + + @staticmethod + def mocked_message(staged=False, filename='DogeCoinFlag-v1.02.ckan'): + msg = mock.Mock() + msg.body = Path( + PurePath(__file__).parent, + 'testdata/', filename + ).read_text('utf-8') + msg.message_attributes = { + 'CheckTime': { + 'StringValue': '2019-06-24T19:06:14', 'DataType': 'String'}, + 'ModIdentifier': { + 'StringValue': 'DogeCoinFlag', 'DataType': 'String'}, + 'Staged': {'StringValue': str(staged), 'DataType': 'String'}, + 'Success': {'StringValue': 'True', 'DataType': 'String'}, + 'FileName': { + 'StringValue': f'./{filename}', + 'DataType': 'String' + } + } + msg.message_id = 'MessageMcMessageFace' + msg.receipt_handle = 'HandleMcHandleFace' + msg.md5_of_body = '709d9d3484f8c1c719b15a8c3425276a' + return msg diff --git a/netkan/tests/indexer.py b/netkan/tests/indexer.py index 3cb5320b..ed1788f0 100644 --- a/netkan/tests/indexer.py +++ b/netkan/tests/indexer.py @@ -10,9 +10,11 @@ from datetime import datetime from gitdb.exc import BadName -from netkan.indexer import CkanMessage +from netkan.indexer import CkanMessage, MessageHandler, IndexerQueueHandler from netkan.repos import CkanMetaRepo +from .common import SharedArgsHarness + class TestCkan(unittest.TestCase): test_data = Path(PurePath(__file__).parent, 'testdata/no_change') @@ -72,6 +74,10 @@ def tearDown(self): def test_ckan_message_changed(self): self.assertFalse(self.message.metadata_changed()) + def test_ckan_message_str(self): + self.assertEqual('DogeCoinFlag: 2019-06-24T19:06:14', + str(self.message)) + def test_ckan_message_mod_version(self): self.assertEqual('DogeCoinFlag-v1.02', self.message.mod_version) @@ -87,6 +93,10 @@ def test_ckan_message_filename(self): def test_ckan_message_stage(self): self.assertFalse(self.message.Staged) + def test_ckan_message_stage_name(self): + self.assertEqual('add/DogeCoinFlag-v1.02', + self.message.staging_branch_name) + def test_ckan_message_delete_attrs(self): self.assertEqual( self.message.delete_attrs['Id'], 'MessageMcMessageFace' @@ -237,3 +247,82 @@ def test_ckan_message_status_attrs(self): ) with self.assertRaises(KeyError): attrs['last_indexed'] + + +class TestMessageHandler(SharedArgsHarness): + + def setUp(self): + super().setUp() + self.handler = MessageHandler(game=self.shared_args.game('ksp')) + + def test_class_string(self): + self.handler.append(self.mocked_message()) + self.handler.append(self.mocked_message(staged=True)) + self.assertEqual(str( + self.handler), 'DogeCoinFlag: 2019-06-24T19:06:14 DogeCoinFlag: 2019-06-24T19:06:14') + + def test_add_primary(self): + self.handler.append(self.mocked_message()) + self.assertEqual(len(self.handler), 1) + self.assertEqual( + self.handler.master[0].ckan.name, + 'Dogecoin Flag' + ) + + def test_add_staged(self): + self.handler.append(self.mocked_message(staged=True)) + self.assertEqual(len(self.handler), 1) + self.assertEqual( + self.handler.staged[0].ckan.name, + 'Dogecoin Flag' + ) + + def test_add_both(self): + self.handler.append(self.mocked_message()) + self.handler.append(self.mocked_message(staged=True)) + self.assertEqual(len(self.handler), 2) + self.assertEqual(len(self.handler.master), 1) + self.assertEqual(len(self.handler.staged), 1) + + def test_branch_checkout_primary_on_enter(self): + repo = self.shared_args.game('ksp').ckanmeta_repo + with repo.change_branch('test_branch'): + self.assertTrue(repo.is_active_branch('test_branch')) + repo.checkout_branch('test_branch') + self.assertTrue(repo.is_active_branch('test_branch')) + with MessageHandler(game=self.shared_args.game('ksp')) as handler: + self.assertTrue(repo.is_active_branch('master')) + + @ mock.patch('netkan.indexer.CkanMessage.process_ckan') + def test_process_ckans(self, mocked_process): + self.handler.append(self.mocked_message()) + self.handler.append(self.mocked_message(staged=True)) + self.handler.process_messages() + self.assertEqual(len(self.handler.processed), 2) + + @ mock.patch('netkan.indexer.CkanMessage.process_ckan') + def test_delete_attrs(self, mocked_process): + self.handler.append(self.mocked_message()) + self.handler.append(self.mocked_message(staged=True)) + self.handler.process_messages() + attrs = [{'Id': 'MessageMcMessageFace', 'ReceiptHandle': 'HandleMcHandleFace'}, { + 'Id': 'MessageMcMessageFace', 'ReceiptHandle': 'HandleMcHandleFace'}] + self.assertEqual(self.handler.sqs_delete_entries(), attrs) + + +class TestIndexerQueueHandler(SharedArgsHarness): + + def test_ksp_message_append(self): + indexer = IndexerQueueHandler(self.shared_args) + indexer.append_message('ksp', self.mocked_message()) + self.assertTrue('ksp' in indexer.game_handlers) + + def test_ksp_message_no_ksp2(self): + indexer = IndexerQueueHandler(self.shared_args) + indexer.append_message('ksp', self.mocked_message()) + self.assertFalse('ksp2' in indexer.game_handlers) + + def test_ksp2_message_append(self): + indexer = IndexerQueueHandler(self.shared_args) + indexer.append_message('ksp2', self.mocked_message()) + self.assertTrue('ksp2' in indexer.game_handlers) diff --git a/netkan/tests/metadata.py b/netkan/tests/metadata.py index e623c902..69e7cb42 100644 --- a/netkan/tests/metadata.py +++ b/netkan/tests/metadata.py @@ -10,7 +10,8 @@ class TestNetKAN(unittest.TestCase): - nk_repo = NetkanRepo(Repo(Path(PurePath(__file__).parent, 'testdata/NetKAN'))) + nk_repo = NetkanRepo( + Repo(Path(PurePath(__file__).parent, 'testdata/NetKAN')), 'ksp') def test_netkan_message(self): dogecoinflag = self.nk_repo.nk_path('DogeCoinFlag') diff --git a/netkan/tests/scheduler.py b/netkan/tests/scheduler.py index 7616da3c..bbb92848 100644 --- a/netkan/tests/scheduler.py +++ b/netkan/tests/scheduler.py @@ -1,22 +1,21 @@ -import unittest -from unittest.mock import Mock from pathlib import Path, PurePath -from git import Repo from netkan.common import sqs_batch_entries -from netkan.repos import CkanMetaRepo, NetkanRepo from netkan.scheduler import NetkanScheduler +from .common import SharedArgsHarness -class TestScheduler(unittest.TestCase): - test_data = PurePath(__file__).parent.joinpath('testdata', 'NetKAN') - ckm_root = PurePath(__file__).parent.joinpath('testdata', 'CKAN-meta') + +class TestScheduler(SharedArgsHarness): + repos = ['ckan', 'netkan', 'countkan'] + netkan_data = Path(PurePath(__file__).parent, 'testdata', 'NetKAN') + ckan_data = Path(PurePath(__file__).parent, 'testdata', 'CKAN-meta') + countkan_data = Path(PurePath(__file__).parent, 'testdata', 'NetTEN') def setUp(self): - common = Mock() - common.netkan_repo = NetkanRepo(Repo.init(self.test_data)) - common.ckanmeta_repo = CkanMetaRepo(Repo.init(self.ckm_root)) - self.scheduler = NetkanScheduler(common, 'TestyMcTestFace', 'token') + super().setUp() + self.scheduler = NetkanScheduler( + self.shared_args, 'TestyMcTestFace', 'token', 'ksp') self.messages = (nk.sqs_message(self.scheduler.ckm_repo.highest_version(nk.identifier)) for nk in self.scheduler.nk_repo.netkans()) @@ -39,10 +38,12 @@ def test_sqs_batch_attrs(self): self.assertEqual(len(attrs['Entries']), 10) def test_sqs_batching_ten(self): - common = Mock() - common.netkan_repo = NetkanRepo(Repo.init(Path(PurePath(__file__).parent, 'testdata/NetTEN'))) - common.ckanmeta_repo = CkanMetaRepo(Repo.init(self.ckm_root)) - scheduler = NetkanScheduler(common, 'TestyMcTestFace', 'token') + setattr(self.shared_args, 'netkan_remotes', + (f'count={getattr(self, "countkan_upstream")}',)) + setattr(self.shared_args, 'ckanmeta_remotes', + (f'count={getattr(self, "ckan_upstream")}',)) + scheduler = NetkanScheduler( + self.shared_args, 'TestyMcTestFace', 'token', 'count') messages = (nk.sqs_message(scheduler.ckm_repo.highest_version(nk.identifier)) for nk in scheduler.nk_repo.netkans()) diff --git a/netkan/tests/spacedock_adder.py b/netkan/tests/spacedock_adder.py new file mode 100644 index 00000000..47513635 --- /dev/null +++ b/netkan/tests/spacedock_adder.py @@ -0,0 +1,85 @@ +# pylint: disable-all +# flake8: noqa + +from unittest import mock + +from netkan.spacedock_adder import ( + SpaceDockMessageHandler, SpaceDockAdderQueueHandler +) +from netkan.repos import NetkanRepo + +from .common import SharedArgsHarness + + +class TestSpaceDockMessageHandler(SharedArgsHarness): + + def setUp(self): + super().setUp() + self.handler = SpaceDockMessageHandler( + game=self.shared_args.game('ksp')) + + def test_class_string(self): + self.handler.append(self.mocked_message()) + self.assertEqual(str( + self.handler), 'Dogecoin Flag') + + def test_add_primary(self): + self.handler.append(self.mocked_message( + filename='DogeCoinFlag.netkan')) + self.assertEqual(len(self.handler), 1) + self.assertEqual( + self.handler.queued[0].info.get('name'), + 'Dogecoin Flag' + ) + + def test_branch_checkout_primary_on_enter(self): + repo = self.shared_args.game('ksp').netkan_repo + with repo.change_branch('test_branch'): + self.assertTrue(repo.is_active_branch('test_branch')) + repo.checkout_branch('test_branch') + self.assertTrue(repo.is_active_branch('test_branch')) + with SpaceDockMessageHandler(game=self.shared_args.game('ksp')) as handler: + self.assertIsInstance(repo, NetkanRepo) + self.assertTrue(repo.is_active_branch('master')) + + @mock.patch('netkan.spacedock_adder.SpaceDockAdder.try_add') + def test_process_netkans(self, mocked_process): + mocked_process.return_value = True + self.handler.append(self.mocked_message()) + self.handler.process_messages() + self.assertEqual(len(self.handler.processed), 1) + + @mock.patch('netkan.spacedock_adder.SpaceDockAdder.try_add') + def test_process_netkans_fail(self, mocked_process): + mocked_process.return_value = False + self.handler.append(self.mocked_message()) + self.assertEqual(len(self.handler.queued), 1) + self.handler.process_messages() + self.assertEqual(len(self.handler.processed), 0) + + @mock.patch('netkan.spacedock_adder.SpaceDockAdder.try_add') + def test_delete_attrs(self, mocked_process): + mocked_process.return_value = True + self.handler.append(self.mocked_message()) + self.handler.process_messages() + attrs = [{'Id': 'MessageMcMessageFace', + 'ReceiptHandle': 'HandleMcHandleFace'}] + self.assertEqual(self.handler.sqs_delete_entries(), attrs) + + +class TestSpaceDockAdderQueueHandler(SharedArgsHarness): + + def test_ksp_message_append(self): + adder = SpaceDockAdderQueueHandler(self.shared_args) + adder.append_message('ksp', self.mocked_message()) + self.assertTrue('ksp' in adder.game_handlers) + + def test_ksp_message_no_ksp2(self): + adder = SpaceDockAdderQueueHandler(self.shared_args) + adder.append_message('ksp', self.mocked_message()) + self.assertFalse('ksp2' in adder.game_handlers) + + def test_ksp2_message_append(self): + adder = SpaceDockAdderQueueHandler(self.shared_args) + adder.append_message('ksp2', self.mocked_message()) + self.assertTrue('ksp2' in adder.game_handlers) diff --git a/netkan/tests/testdata/DogeCoinFlag.netkan b/netkan/tests/testdata/DogeCoinFlag.netkan new file mode 100644 index 00000000..da2be319 --- /dev/null +++ b/netkan/tests/testdata/DogeCoinFlag.netkan @@ -0,0 +1,15 @@ +{ + "spec_version" : 1, + "identifier" : "DogeCoinFlag", + "name" : "Dogecoin Flag", + "abstract" : "Such flag. Very currency. Wow.", + "description" : "Adorn your craft with your favourite cryptocurrency. To the mün!", + "$kref" : "#/ckan/github/pjf/DogeCoinFlag", + "ksp_version" : "any", + "comment" : "flagmod", + "license" : "CC-BY", + "author" : "daviddwk", + "resources" : { + "homepage" : "https://www.reddit.com/r/dogecoin/comments/1tdlgg/i_made_a_more_accurate_dogecoin_and_a_ksp_flag/" + } +} diff --git a/netkan/tests/webhooks.py b/netkan/tests/webhooks.py new file mode 100644 index 00000000..f518e045 --- /dev/null +++ b/netkan/tests/webhooks.py @@ -0,0 +1,468 @@ +import os +import sys + +from unittest import mock, TestCase +from unittest.mock import MagicMock + +from netkan.webhooks import create_app + +from .common import SharedArgsMixin + + +def inflation_queue(_, game: str) -> MagicMock: + queue = MagicMock() + queue.url = f'{game}.queue.url' + return queue + + +class WebhooksHarness(TestCase, SharedArgsMixin): + + @classmethod + def setUpClass(cls) -> None: + super(WebhooksHarness, cls).setUpClass() + cls.configure_repos() + + @classmethod + def tearDownClass(cls) -> None: + super(WebhooksHarness, cls).setUpClass() + cls.cleanup() + + def setUp(self) -> None: + netkan = getattr(self, "netkan_upstream") + ckan = getattr(self, "ckan_upstream") + env_patcher = mock.patch.dict( + os.environ, + { + 'LC_ALL': os.environ.get('LC_ALL', 'C.UTF-8'), + 'LANG': os.environ.get('LANG', 'C.UTF-8'), + 'NETKAN_REMOTES': f'ksp={netkan} ksp2={netkan}', + 'CKANMETA_REMOTES': f'ksp={ckan} ksp2={ckan}', + 'SSH_KEY': '12345' + }, + clear=True, + ) + env_patcher.start() + sys_patch = mock.patch.object(sys, 'argv', ['group', 'command']) + sys_patch.start() + patch = mock.patch( + 'netkan.cli.common.Game.clone_base', self.tmpdir.name) + patch.start() + app = create_app() + app.config.update({ + "TESTING": True, + }) + self.ctx = app.app_context() + self.ctx.push() + self.client = app.test_client() + + def tearDown(self) -> None: + super().tearDown() + mock.patch.stopall() + + +class TestWebhookGitHubInflate(WebhooksHarness): + + def setUp(self) -> None: + super().setUp() + queue_url = mock.patch( + 'netkan.webhooks.config.WebhooksConfig.inflation_queue', inflation_queue) + queue_url.start() + + @staticmethod + def mock_netkan_hook() -> dict: + return { + 'ref': 'refs/heads/master', + 'commits': [ + { + 'id': 'fec27dc0350adc7dc8659cde980d1eca9ce30167', + 'added': [], + 'modified': [ + "NetKAN/DogeCoinFlag.netkan" + ] + } + ] + } + + @mock.patch('netkan.webhooks.github_utils.sig_match') + @mock.patch('netkan.webhooks.config.WebhooksConfig.client') + def test_inflate_ksp(self, queued: MagicMock, sig: MagicMock): + sig.return_value = True + response = self.client.post( + '/gh/inflate/ksp', json=self.mock_netkan_hook(), follow_redirects=True) + self.assertEqual(response.status_code, 204) + call = queued.method_calls.pop().call_list().pop() + self.assertEqual( + call[2].get('Entries')[0].get('MessageAttributes').get( + 'GameId').get('StringValue'), + 'ksp', + ) + self.assertEqual(call[2].get('QueueUrl'), 'ksp.queue.url') + + @mock.patch('netkan.webhooks.github_utils.sig_match') + @mock.patch('netkan.webhooks.config.WebhooksConfig.client') + def test_inflate_ksp2(self, queued: MagicMock, sig: MagicMock): + sig.return_value = True + response = self.client.post( + '/gh/inflate/ksp2', json=self.mock_netkan_hook(), follow_redirects=True) + self.assertEqual(response.status_code, 204) + call = queued.method_calls.pop().call_list().pop() + self.assertEqual( + call[2].get('Entries')[0].get('MessageAttributes').get( + 'GameId').get('StringValue'), + 'ksp2', + ) + self.assertEqual(call[2].get('QueueUrl'), 'ksp2.queue.url') + + @mock.patch('netkan.webhooks.github_utils.sig_match') + def test_inflate_ksp_wrong_branch(self, sig: MagicMock): + sig.return_value = True + data = self.mock_netkan_hook() + data.update(ref='refs/heads/not_primary') + response = self.client.post( + '/gh/inflate/ksp', json=data, follow_redirects=True) + self.assertEqual(response.status_code, 200) + self.assertDictEqual(response.json, {'message': 'Wrong branch'}) + + @mock.patch('netkan.webhooks.github_utils.sig_match') + def test_inflate_ksp2_wrong_branch(self, sig: MagicMock): + sig.return_value = True + data = self.mock_netkan_hook() + data.update(ref='refs/heads/not_primary') + response = self.client.post( + '/gh/inflate/ksp2', json=data, follow_redirects=True) + self.assertEqual(response.status_code, 200) + self.assertDictEqual(response.json, {'message': 'Wrong branch'}) + + @mock.patch('netkan.webhooks.github_utils.sig_match') + def test_inflate_ksp_no_commits(self, sig: MagicMock): + sig.return_value = True + data = self.mock_netkan_hook() + data.update(commits=[]) + response = self.client.post( + '/gh/inflate/ksp', json=data, follow_redirects=True) + self.assertEqual(response.status_code, 200) + self.assertDictEqual(response.json, {'message': 'No commits received'}) + + @mock.patch('netkan.webhooks.github_utils.sig_match') + def test_inflate_ksp2_no_commits(self, sig: MagicMock): + sig.return_value = True + data = self.mock_netkan_hook() + data.update(commits=[]) + response = self.client.post( + '/gh/inflate/ksp2', json=data, follow_redirects=True) + self.assertEqual(response.status_code, 200) + self.assertDictEqual(response.json, {'message': 'No commits received'}) + + @mock.patch('netkan.status.ModStatus.get') + @mock.patch('netkan.webhooks.github_utils.sig_match') + def test_freeze_ksp(self, sig: MagicMock, status: MagicMock): + # This does not test the status, rather that we return a 204 + # when there are mods to freeze. + sig.return_value = True + mocked_status = mock.MagicMock() + mocked_status.frozen = False + status.return_value = mocked_status + data = self.mock_netkan_hook() + data.get('commits', [{}])[0].update({ + 'added': ['NetKAN/DogeCoinFlag.frozen'], + 'modified': [] + }) + response = self.client.post( + '/gh/inflate/ksp', json=data, follow_redirects=True) + self.assertEqual(response.status_code, 204) + + @mock.patch('netkan.webhooks.github_utils.sig_match') + @mock.patch('netkan.status.ModStatus.get') + def test_freeze_ksp2(self, status: MagicMock, sig: MagicMock): + # This does not test the status, rather that we return a 204 + # when there are mods to freeze. + sig.return_value = True + mocked_status = mock.MagicMock() + mocked_status.frozen = False + status.return_value = mocked_status + data = self.mock_netkan_hook() + data.get('commits', [{}])[0].update({ + 'added': ['NetKAN/DogeCoinFlag.frozen'], + 'modified': [] + }) + response = self.client.post( + '/gh/inflate/ksp2', json=data, follow_redirects=True) + self.assertEqual(response.status_code, 204) + + +class TestWebhookGitHubMirror(WebhooksHarness): + + def setUp(self) -> None: + super().setUp() + queue = MagicMock() + queue.url = 'some.queue.url' + queue_url = mock.patch( + 'netkan.webhooks.config.WebhooksConfig.mirror_queue', queue) + queue_url.start() + + @staticmethod + def mock_netkan_hook() -> dict: + return { + 'ref': 'refs/heads/master', + 'commits': [ + { + 'id': 'fec27dc0350adc7dc8659cde980d1eca9ce30167', + 'added': [], + 'modified': [ + "DogeCoinFlag/DogeCoinFlag-v1.0.2.ckan" + ] + } + ] + } + + @mock.patch('netkan.webhooks.github_utils.sig_match') + @mock.patch('netkan.webhooks.config.WebhooksConfig.client') + def test_inflate_ksp(self, queued: MagicMock, sig: MagicMock): + sig.return_value = True + response = self.client.post( + '/gh/mirror/ksp', json=self.mock_netkan_hook(), follow_redirects=True) + self.assertEqual(response.status_code, 204) + call = queued.method_calls.pop().call_list().pop() + self.assertEqual( + call[2].get('Entries')[0].get('MessageAttributes').get( + 'GameId').get('StringValue'), + 'ksp', + ) + + @mock.patch('netkan.webhooks.github_utils.sig_match') + @mock.patch('netkan.webhooks.config.WebhooksConfig.client') + def test_inflate_ksp2(self, queued: MagicMock, sig: MagicMock): + sig.return_value = True + response = self.client.post( + '/gh/mirror/ksp2', json=self.mock_netkan_hook(), follow_redirects=True) + self.assertEqual(response.status_code, 204) + call = queued.method_calls.pop().call_list().pop() + self.assertEqual( + call[2].get('Entries')[0].get('MessageAttributes').get( + 'GameId').get('StringValue'), + 'ksp2', + ) + + @mock.patch('netkan.webhooks.github_utils.sig_match') + def test_inflate_ksp_wrong_branch(self, sig: MagicMock): + sig.return_value = True + data = self.mock_netkan_hook() + data.update(ref='refs/heads/not_primary') + response = self.client.post( + '/gh/mirror/ksp', json=data, follow_redirects=True) + self.assertEqual(response.status_code, 200) + self.assertDictEqual(response.json, {'message': 'Wrong branch'}) + + @mock.patch('netkan.webhooks.github_utils.sig_match') + def test_inflate_ksp2_wrong_branch(self, sig: MagicMock): + sig.return_value = True + data = self.mock_netkan_hook() + data.update(ref='refs/heads/not_primary') + response = self.client.post( + '/gh/mirror/ksp2', json=data, follow_redirects=True) + self.assertEqual(response.status_code, 200) + self.assertDictEqual(response.json, {'message': 'Wrong branch'}) + + @mock.patch('netkan.webhooks.github_utils.sig_match') + def test_inflate_ksp_no_commits(self, sig: MagicMock): + sig.return_value = True + data = self.mock_netkan_hook() + data.update(commits=[]) + response = self.client.post( + '/gh/mirror/ksp', json=data, follow_redirects=True) + self.assertEqual(response.status_code, 200) + self.assertDictEqual(response.json, {'message': 'No commits received'}) + + @mock.patch('netkan.webhooks.github_utils.sig_match') + def test_inflate_ksp2_no_commits(self, sig: MagicMock): + sig.return_value = True + data = self.mock_netkan_hook() + data.update(commits=[]) + response = self.client.post( + '/gh/mirror/ksp2', json=data, follow_redirects=True) + self.assertEqual(response.status_code, 200) + self.assertDictEqual(response.json, {'message': 'No commits received'}) + + +class TestWebhookInflate(WebhooksHarness): + + def setUp(self) -> None: + super().setUp() + queue_url = mock.patch( + 'netkan.webhooks.config.WebhooksConfig.inflation_queue', inflation_queue) + queue_url.start() + + @staticmethod + def mock_netkan_hook() -> dict: + return { + 'identifiers': ['DogeCoinFlag'], + } + + @mock.patch('netkan.webhooks.config.WebhooksConfig.client') + def test_inflate_ksp(self, queued: MagicMock): + response = self.client.post( + '/inflate/ksp', json=self.mock_netkan_hook(), follow_redirects=True) + self.assertEqual(response.status_code, 204) + call = queued.method_calls.pop().call_list().pop() + self.assertEqual( + call[2].get('Entries')[0].get('MessageAttributes').get( + 'GameId').get('StringValue'), + 'ksp', + ) + self.assertEqual(call[2].get('QueueUrl'), 'ksp.queue.url') + + @mock.patch('netkan.webhooks.config.WebhooksConfig.client') + def test_inflate_ksp2(self, queued: MagicMock): + response = self.client.post( + '/inflate/ksp2', json=self.mock_netkan_hook(), follow_redirects=True) + self.assertEqual(response.status_code, 204) + call = queued.method_calls.pop().call_list().pop() + self.assertEqual( + call[2].get('Entries')[0].get('MessageAttributes').get( + 'GameId').get('StringValue'), + 'ksp2', + ) + self.assertEqual(call[2].get('QueueUrl'), 'ksp2.queue.url') + + def test_inflate_ksp_no_identifiers(self): + data = self.mock_netkan_hook() + data.update(identifiers=[]) + response = self.client.post( + '/inflate/ksp', json=data, follow_redirects=True) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.text, 'An array of identifiers is required') + + def test_inflate_ksp2_no_identifiers(self): + data = self.mock_netkan_hook() + data.update(identifiers=[]) + response = self.client.post( + '/inflate/ksp2', json=data, follow_redirects=True) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.text, 'An array of identifiers is required') + + +class TestWebhookSpaceDockInflate(WebhooksHarness): + + def setUp(self) -> None: + super().setUp() + queue = MagicMock() + queue.url = 'some.queue.url' + queue_url = mock.patch( + 'netkan.webhooks.config.WebhooksConfig.inflation_queue', queue) + queue_url.start() + + @staticmethod + def mock_netkan_hook() -> dict: + return { + 'mod_id': '777', + 'event_type': 'update', + } + + @mock.patch('netkan.webhooks.config.WebhooksConfig.client') + def test_inflate_ksp(self, queued: MagicMock): + response = self.client.post( + '/sd/inflate/ksp', data=self.mock_netkan_hook(), follow_redirects=True) + self.assertEqual(response.status_code, 204) + call = queued.method_calls.pop().call_list().pop() + self.assertEqual( + call[2].get('Entries')[0].get('MessageAttributes').get( + 'GameId').get('StringValue'), + 'ksp', + ) + + @mock.patch('netkan.webhooks.config.WebhooksConfig.client') + def test_inflate_ksp2(self, queued: MagicMock): + response = self.client.post( + '/sd/inflate/ksp2', data=self.mock_netkan_hook(), follow_redirects=True) + self.assertEqual(response.status_code, 204) + call = queued.method_calls.pop().call_list().pop() + self.assertEqual( + call[2].get('Entries')[0].get('MessageAttributes').get( + 'GameId').get('StringValue'), + 'ksp2', + ) + + def test_inflate_ksp_no_identifiers(self): + data = self.mock_netkan_hook() + data.update(mod_id='ABC') + response = self.client.post( + '/sd/inflate/ksp', json=data, follow_redirects=True) + self.assertEqual(response.status_code, 404) + self.assertEqual(response.text, 'No such module') + + def test_inflate_ksp2_invalid_id(self): + data = self.mock_netkan_hook() + data.update(mod_id='ABC') + response = self.client.post( + '/sd/inflate/ksp2', json=data, follow_redirects=True) + self.assertEqual(response.status_code, 404) + self.assertEqual(response.text, 'No such module') + + def test_inflate_delete(self): + data = self.mock_netkan_hook() + data.update(event_type='delete') + response = self.client.post( + '/sd/inflate/ksp', data=data, follow_redirects=True) + self.assertEqual(response.status_code, 204) + + def test_inflate_ksp_locked(self): + data = self.mock_netkan_hook() + data.update(event_type='locked') + response = self.client.post( + '/sd/inflate/ksp', data=data, follow_redirects=True) + self.assertEqual(response.status_code, 204) + + def test_inflate_ksp_unlocked(self): + data = self.mock_netkan_hook() + data.update(event_type='unlocked') + response = self.client.post( + '/sd/inflate/ksp', data=data, follow_redirects=True) + self.assertEqual(response.status_code, 204) + + +class TestWebhookSpaceDockAdd(WebhooksHarness): + + def setUp(self) -> None: + super().setUp() + queue = MagicMock() + queue.url = 'some.queue.url' + queue_url = mock.patch( + 'netkan.webhooks.config.WebhooksConfig.add_queue', queue) + queue_url.start() + + @staticmethod + def mock_netkan_hook() -> dict: + return { + 'name: Mod Name Entered by the User on spacedock' + 'id': '12345', + 'license': 'GPL-3.0', + 'username': 'modauthor1', + 'email': 'modauthor1@gmail.com', + 'short_description': 'A mod that you should definitely install', + 'description': 'A mod that you should definitely install, and so on and so on', + 'site_name': 'SpaceDock', + } + + @mock.patch('netkan.webhooks.config.WebhooksConfig.client') + def test_inflate_ksp(self, queued: MagicMock): + response = self.client.post( + '/sd/add/ksp', data=self.mock_netkan_hook(), follow_redirects=True) + self.assertEqual(response.status_code, 204) + call = queued.method_calls.pop().call_list().pop() + self.assertEqual( + call[2].get('Entries')[0].get('MessageAttributes').get( + 'GameId').get('StringValue'), + 'ksp', + ) + + @mock.patch('netkan.webhooks.config.WebhooksConfig.client') + def test_inflate_ksp2(self, queued: MagicMock): + response = self.client.post( + '/sd/add/ksp2', data=self.mock_netkan_hook(), follow_redirects=True) + self.assertEqual(response.status_code, 204) + call = queued.method_calls.pop().call_list().pop() + self.assertEqual( + call[2].get('Entries')[0].get('MessageAttributes').get( + 'GameId').get('StringValue'), + 'ksp2', + ) diff --git a/nginx/nginx-dev.conf b/nginx/nginx-dev.conf index 3583c6ee..1f3227e2 100644 --- a/nginx/nginx-dev.conf +++ b/nginx/nginx-dev.conf @@ -12,10 +12,6 @@ http { log_format main '$remote_addr "$request" $status $body_bytes_sent "$http_user_agent"'; access_log /dev/stdout main; - upstream legacy { - server legacyhooks:5000 fail_timeout=0; - } - upstream webhooks { server webhooks:5000 fail_timeout=0; } @@ -41,19 +37,20 @@ http { ssl_ciphers "ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA256:ECDHE-ECDSA-AES128-SHA:ECDHE-RSA-AES256-SHA384:ECDHE-RSA-AES128-SHA:ECDHE-ECDSA-AES256-SHA384:ECDHE-ECDSA-AES256-SHA:ECDHE-RSA-AES256-SHA:DHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA:DHE-RSA-AES256-SHA256:DHE-RSA-AES256-SHA:ECDHE-ECDSA-DES-CBC3-SHA:ECDHE-RSA-DES-CBC3-SHA:EDH-RSA-DES-CBC3-SHA:AES128-GCM-SHA256:AES256-GCM-SHA384:AES128-SHA256:AES256-SHA256:AES128-SHA:AES256-SHA:DES-CBC3-SHA:!DSS"; - location /gh/mirror { - proxy_set_header Host $http_host; - proxy_set_header X-Forwarded-Host $host; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - proxy_pass http://legacy; - } - location / { proxy_set_header Host $http_host; proxy_set_header X-Forwarded-Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + + # multi-game transition + rewrite ^/inflate(?!/) /inflate/ksp break; + rewrite ^/sd/inflate(?!/) /sd/inflate/ksp break; + rewrite ^/sd/add(?!/) /sd/add/ksp break; + rewrite ^/gh/inflate(?!/) /gh/inflate/ksp break; + rewrite ^/gh/release(?!/) /gh/release/ksp break; + rewrite ^/gh/mirror(?!/) /gh/mirror/ksp break; + proxy_pass http://webhooks; } } diff --git a/nginx/nginx.conf b/nginx/nginx.conf index 49f8ab95..a6f48a19 100644 --- a/nginx/nginx.conf +++ b/nginx/nginx.conf @@ -42,6 +42,15 @@ http { proxy_set_header X-Forwarded-Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + + # multi-game transition + rewrite ^/inflate(?!/) /inflate/ksp break; + rewrite ^/sd/inflate(?!/) /sd/inflate/ksp break; + rewrite ^/sd/add(?!/) /sd/add/ksp break; + rewrite ^/gh/inflate(?!/) /gh/inflate/ksp break; + rewrite ^/gh/release(?!/) /gh/release/ksp break; + rewrite ^/gh/mirror(?!/) /gh/mirror/ksp break; + proxy_pass http://webhooks; } } diff --git a/prod-stack.py b/prod-stack.py index eb8743ed..59402943 100644 --- a/prod-stack.py +++ b/prod-stack.py @@ -22,14 +22,13 @@ BOT_FQDN = 'netkan.ksp-ckan.space' EMAIL = 'domains@ksp-ckan.space' PARAM_NAMESPACE = '/NetKAN/Indexer/' -NETKAN_REMOTE = 'git@github.com:KSP-CKAN/NetKAN.git' +NETKAN_REMOTES = 'ksp=git@github.com:KSP-CKAN/NetKAN.git' NETKAN_USER = 'KSP-CKAN' -NETKAN_REPO = 'NetKAN' -CKANMETA_REMOTE = 'git@github.com:KSP-CKAN/CKAN-meta.git' +NETKAN_REPOS = 'ksp=NetKAN' +CKANMETA_REMOTES = 'ksp=git@github.com:KSP-CKAN/CKAN-meta.git' CKANMETA_USER = 'KSP-CKAN' -CKANMETA_REPO = 'CKAN-meta' +CKANMETA_REPOS = 'ksp=CKAN-meta' NETKAN_USER = 'KSP-CKAN' -NETKAN_REPO = 'NetKAN' STATUS_BUCKET = 'status.ksp-ckan.space' status_key = 'status/netkan.json' @@ -80,6 +79,7 @@ ), ]) +INFLATION_QUEUES = Sub('ksp=${ksp}', ksp=GetAtt(inbound, 'QueueName')) # DyanamoDB: NetKAN Status netkan_db = t.add_resource(Table( @@ -692,9 +692,9 @@ 'SSH_KEY', 'GH_Token', ], 'env': [ - ('CKANMETA_REMOTE', CKANMETA_REMOTE), + ('CKANMETA_REMOTES', CKANMETA_REMOTES), ('CKAN_USER', CKANMETA_USER), - ('CKAN_REPO', CKANMETA_REPO), + ('CKAN_REPOS', CKANMETA_REPOS), ('SQS_QUEUE', GetAtt(outbound, 'QueueName')), ('AWS_DEFAULT_REGION', Sub('${AWS::Region}')), ], @@ -704,20 +704,21 @@ 'linux_parameters': LinuxParameters(InitProcessEnabled=True), }, { - 'name': 'Scheduler', + 'name': 'SchedulerKsp', 'command': 'scheduler', 'memory': '156', 'secrets': ['SSH_KEY', 'GH_Token'], 'env': [ - ('SQS_QUEUE', GetAtt(inbound, 'QueueName')), - ('NETKAN_REMOTE', NETKAN_REMOTE), - ('CKANMETA_REMOTE', CKANMETA_REMOTE), + ('GAME_ID', 'ksp'), + ('INFLATION_QUEUES', INFLATION_QUEUES), + ('NETKAN_REMOTES', NETKAN_REMOTES), + ('CKANMETA_REMOTES', CKANMETA_REMOTES), ('AWS_DEFAULT_REGION', Sub('${AWS::Region}')), ], 'schedule': 'rate(30 minutes)', }, { - 'name': 'SchedulerWebhooksPass', + 'name': 'SchedulerKspWebhooksPass', 'command': [ 'scheduler', '--group', 'webhooks', '--max-queued', '2000', @@ -727,9 +728,10 @@ 'memory': '156', 'secrets': ['SSH_KEY', 'GH_Token'], 'env': [ - ('SQS_QUEUE', GetAtt(inbound, 'QueueName')), - ('NETKAN_REMOTE', NETKAN_REMOTE), - ('CKANMETA_REMOTE', CKANMETA_REMOTE), + ('GAME_ID', 'ksp'), + ('INFLATION_QUEUES', INFLATION_QUEUES), + ('NETKAN_REMOTES', NETKAN_REMOTES), + ('CKANMETA_REMOTES', CKANMETA_REMOTES), ('AWS_DEFAULT_REGION', Sub('${AWS::Region}')), ], 'schedule': 'rate(1 day)', @@ -797,15 +799,16 @@ 'schedule': 'rate(5 minutes)', }, { - 'name': 'DownloadCounter', + 'name': 'DownloadCounterKsp', 'command': 'download-counter', 'memory': '156', 'secrets': [ 'SSH_KEY', 'GH_Token', ], 'env': [ - ('NETKAN_REMOTE', NETKAN_REMOTE), - ('CKANMETA_REMOTE', CKANMETA_REMOTE), + ('GAME_ID', 'ksp'), + ('NETKAN_REMOTES', NETKAN_REMOTES), + ('CKANMETA_REMOTES', CKANMETA_REMOTES), ], 'schedule': 'rate(1 day)', }, @@ -847,9 +850,10 @@ 'name': 'AutoFreezer', 'command': 'auto-freezer', 'env': [ - ('NETKAN_REMOTE', NETKAN_REMOTE), + ('GAME_ID', 'ksp'), + ('NETKAN_REMOTES', NETKAN_REMOTES), ('CKAN_USER', NETKAN_USER), - ('CKAN_REPO', NETKAN_REPO), + ('CKAN_REPOS', NETKAN_REPOS), ], 'secrets': [ 'SSH_KEY', 'GH_Token', @@ -871,10 +875,10 @@ 'XKAN_GHSECRET', 'SSH_KEY', ], 'env': [ - ('NETKAN_REMOTE', NETKAN_REMOTE), - ('CKANMETA_REMOTE', CKANMETA_REMOTE), + ('NETKAN_REMOTES', NETKAN_REMOTES), + ('CKANMETA_REMOTES', CKANMETA_REMOTES), ('AWS_DEFAULT_REGION', Sub('${AWS::Region}')), - ('INFLATION_SQS_QUEUE', GetAtt(inbound, 'QueueName')), + ('INFLATION_SQS_QUEUES', INFLATION_QUEUES), ('ADD_SQS_QUEUE', GetAtt(addqueue, 'QueueName')), ('MIRROR_SQS_QUEUE', GetAtt(mirrorqueue, 'QueueName')), ], @@ -898,9 +902,9 @@ 'env': [ ('SQS_QUEUE', GetAtt(addqueue, 'QueueName')), ('AWS_DEFAULT_REGION', Sub('${AWS::Region}')), - ('NETKAN_REMOTE', NETKAN_REMOTE), + ('NETKAN_REMOTES', NETKAN_REMOTES), ('CKAN_USER', NETKAN_USER), - ('CKAN_REPO', NETKAN_REPO), + ('CKAN_REPOS', NETKAN_REPOS), ], }, { @@ -910,8 +914,8 @@ 'IA_access', 'IA_secret', 'SSH_KEY', 'GH_Token' ], 'env': [ - ('CKANMETA_REMOTE', CKANMETA_REMOTE), - ('IA_collection', 'kspckanmods'), + ('CKANMETA_REMOTES', CKANMETA_REMOTES), + ('IA_COLLECTIONS', 'ksp=kspckanmods'), ('SQS_QUEUE', GetAtt(mirrorqueue, 'QueueName')), ('AWS_DEFAULT_REGION', Sub('${AWS::Region}')), ], @@ -969,7 +973,8 @@ Links=[], ) if entrypoint: - entrypoint = entrypoint if isinstance(entrypoint, list) else [entrypoint] + entrypoint = entrypoint if isinstance( + entrypoint, list) else [entrypoint] definition.EntryPoint = entrypoint if command: command = command if isinstance(command, list) else [command]