Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Immediately reschedule when circumstances for a candidate change #195

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion janitor/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
FileSystemLogFileManager,
)
from .queue import QueueItem, Queue
from .schedule import do_schedule_control, do_schedule, CandidateUnavailable
from .schedule import do_schedule_control, do_schedule, CandidateUnavailable, bulk_queue_refresh
from .vcs import (
get_vcs_abbreviation,
is_authenticated_url,
Expand Down Expand Up @@ -1701,6 +1701,7 @@ async def finish_run(self, active_run: ActiveRun, result: JanitorResult) -> None
if result.builder_result:
await result.builder_result.store(conn, result.log_id)
await conn.execute("DELETE FROM queue WHERE id = $1", active_run.queue_id)
await bulk_queue_refresh(conn, [active_run.package])
if self.followup_run:
await self.followup_run(active_run, result)

Expand Down Expand Up @@ -1933,6 +1934,7 @@ async def handle_log(request):
async def handle_codebases(request):
queue_processor = request.app['queue_processor']

queue_refresh = []
codebases = []
for entry in await request.json():
if 'branch_url' in entry:
Expand All @@ -1957,6 +1959,9 @@ async def handle_codebases(request):
entry.get('vcs_last_revision'),
entry.get('value')))

if entry.get('name'):
queue_refresh.append(entry['name'])

async with queue_processor.database.acquire() as conn:
# TODO(jelmer): When a codebase with a certain name already exists,
# steal its name
Expand All @@ -1972,6 +1977,7 @@ async def handle_codebases(request):
"value = EXCLUDED.value, url = EXCLUDED.url, "
"branch = EXCLUDED.branch",
codebases)
await bulk_queue_refresh(conn, queue_refresh)

return web.json_response({})

Expand All @@ -1983,6 +1989,7 @@ async def handle_candidates(request):
unknown_campaigns = []
unknown_publish_policies = []
queue_processor = request.app['queue_processor']
queue_refresh = []
async with queue_processor.database.acquire() as conn, conn.transaction():
known_packages = set()
for record in (await conn.fetch('SELECT name FROM package')):
Expand Down Expand Up @@ -2052,10 +2059,12 @@ async def handle_candidates(request):
"codebase = EXCLUDED.codebase",
entries,
)
await bulk_queue_refresh(conn, queue_refresh)
return web.json_response({
'unknown_campaigns': unknown_campaigns,
'unknown_codebases': unknown_codebases,
'unknown_publish_policies': unknown_publish_policies,
'unknown_codebases': unknown_codebases,
'unknown_packages': unknown_packages})


Expand Down
39 changes: 38 additions & 1 deletion janitor/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from datetime import datetime, timedelta
import logging
from typing import Optional, List, Tuple
from typing import Optional, List, Tuple, Union

from debian.changelog import Version

Expand Down Expand Up @@ -67,6 +67,7 @@ async def iter_candidates_with_publish_policy(
package.name AS package,
package.codebase AS codebase,
package.branch_url AS branch_url,
candidate.codebase AS codebase,
candidate.suite AS campaign,
candidate.context AS context,
candidate.value AS value,
Expand Down Expand Up @@ -200,6 +201,42 @@ async def estimate_duration(
return timedelta(seconds=DEFAULT_ESTIMATED_DURATION)


async def bulk_queue_refresh(
conn: asyncpg.Connection,
todo: List[str],
*,
dry_run: bool = False
):
if not todo:
return
query = """
SELECT
package.name AS package,
package.branch_url AS branch_url,
candidate.codebase AS codebase,
candidate.suite AS campaign,
candidate.context AS context,
candidate.value AS value,
candidate.success_chance AS success_chance,
named_publish_policy.per_branch_policy AS publish,
candidate.command AS command
FROM candidate
INNER JOIN package on package.name = candidate.package
INNER JOIN named_publish_policy ON
named_publish_policy.name = candidate.publish_policy
WHERE
NOT package.removed AND
package.branch_url IS NOT NULL AND
candidate.package = ANY($1::text)
"""
q = [
queue_item_from_candidate_and_publish_policy(row)
for row in
await conn.fetch(query, todo)]
logging.info('Adding %d items to queue', len(todo))
return await bulk_add_to_queue(conn, q, dry_run=dry_run)


async def bulk_add_to_queue(
conn: asyncpg.Connection,
todo,
Expand Down
2 changes: 1 addition & 1 deletion state.sql
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ CREATE TABLE IF NOT EXISTS queue (
id serial,
bucket queue_bucket not null default 'default',
package text not null,
codebase text,
codebase text references codebase(name),
branch_url text,
suite suite_name not null,
command text,
Expand Down