From 232589e2445ab7643580e90f62c19b97913cc274 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jelmer=20Vernoo=C4=B3?= Date: Thu, 27 Oct 2022 19:02:32 +0100 Subject: [PATCH] Immediately reschedule when circumstances for a candidate change --- janitor/runner.py | 11 ++++++++++- janitor/schedule.py | 39 ++++++++++++++++++++++++++++++++++++++- state.sql | 2 +- 3 files changed, 49 insertions(+), 3 deletions(-) diff --git a/janitor/runner.py b/janitor/runner.py index 6010535a2..9c4a32067 100644 --- a/janitor/runner.py +++ b/janitor/runner.py @@ -106,7 +106,7 @@ FileSystemLogFileManager, ) from .queue import QueueItem, Queue -from .schedule import do_schedule_control, do_schedule, CandidateUnavailable +from .schedule import do_schedule_control, do_schedule, CandidateUnavailable, bulk_queue_refresh from .vcs import ( get_vcs_abbreviation, is_authenticated_url, @@ -1701,6 +1701,7 @@ async def finish_run(self, active_run: ActiveRun, result: JanitorResult) -> None if result.builder_result: await result.builder_result.store(conn, result.log_id) await conn.execute("DELETE FROM queue WHERE id = $1", active_run.queue_id) + await bulk_queue_refresh(conn, [active_run.package]) if self.followup_run: await self.followup_run(active_run, result) @@ -1933,6 +1934,7 @@ async def handle_log(request): async def handle_codebases(request): queue_processor = request.app['queue_processor'] + queue_refresh = [] codebases = [] for entry in await request.json(): if 'branch_url' in entry: @@ -1957,6 +1959,9 @@ async def handle_codebases(request): entry.get('vcs_last_revision'), entry.get('value'))) + if entry.get('name'): + queue_refresh.append(entry['name']) + async with queue_processor.database.acquire() as conn: # TODO(jelmer): When a codebase with a certain name already exists, # steal its name @@ -1972,6 +1977,7 @@ async def handle_codebases(request): "value = EXCLUDED.value, url = EXCLUDED.url, " "branch = EXCLUDED.branch", codebases) + await bulk_queue_refresh(conn, queue_refresh) return web.json_response({}) @@ -1983,6 +1989,7 @@ async def handle_candidates(request): unknown_campaigns = [] unknown_publish_policies = [] queue_processor = request.app['queue_processor'] + queue_refresh = [] async with queue_processor.database.acquire() as conn, conn.transaction(): known_packages = set() for record in (await conn.fetch('SELECT name FROM package')): @@ -2052,10 +2059,12 @@ async def handle_candidates(request): "codebase = EXCLUDED.codebase", entries, ) + await bulk_queue_refresh(conn, queue_refresh) return web.json_response({ 'unknown_campaigns': unknown_campaigns, 'unknown_codebases': unknown_codebases, 'unknown_publish_policies': unknown_publish_policies, + 'unknown_codebases': unknown_codebases, 'unknown_packages': unknown_packages}) diff --git a/janitor/schedule.py b/janitor/schedule.py index f991ef12a..15838fe7d 100644 --- a/janitor/schedule.py +++ b/janitor/schedule.py @@ -21,7 +21,7 @@ from datetime import datetime, timedelta import logging -from typing import Optional, List, Tuple +from typing import Optional, List, Tuple, Union from debian.changelog import Version @@ -67,6 +67,7 @@ async def iter_candidates_with_publish_policy( package.name AS package, package.codebase AS codebase, package.branch_url AS branch_url, + candidate.codebase AS codebase, candidate.suite AS campaign, candidate.context AS context, candidate.value AS value, @@ -200,6 +201,42 @@ async def estimate_duration( return timedelta(seconds=DEFAULT_ESTIMATED_DURATION) +async def bulk_queue_refresh( + conn: asyncpg.Connection, + todo: List[str], + *, + dry_run: bool = False +): + if not todo: + return + query = """ +SELECT + package.name AS package, + package.branch_url AS branch_url, + candidate.codebase AS codebase, + candidate.suite AS campaign, + candidate.context AS context, + candidate.value AS value, + candidate.success_chance AS success_chance, + named_publish_policy.per_branch_policy AS publish, + candidate.command AS command +FROM candidate +INNER JOIN package on package.name = candidate.package +INNER JOIN named_publish_policy ON + named_publish_policy.name = candidate.publish_policy +WHERE + NOT package.removed AND + package.branch_url IS NOT NULL AND + candidate.package = ANY($1::text) +""" + q = [ + queue_item_from_candidate_and_publish_policy(row) + for row in + await conn.fetch(query, todo)] + logging.info('Adding %d items to queue', len(todo)) + return await bulk_add_to_queue(conn, q, dry_run=dry_run) + + async def bulk_add_to_queue( conn: asyncpg.Connection, todo, diff --git a/state.sql b/state.sql index b9b92c42e..2920b8aa7 100644 --- a/state.sql +++ b/state.sql @@ -184,7 +184,7 @@ CREATE TABLE IF NOT EXISTS queue ( id serial, bucket queue_bucket not null default 'default', package text not null, - codebase text, + codebase text references codebase(name), branch_url text, suite suite_name not null, command text,