Skip to content

Commit

Permalink
[batch] Rename batches tables to job groups
Browse files Browse the repository at this point in the history
  • Loading branch information
jigold committed Oct 13, 2023
1 parent 8149391 commit 92e463e
Show file tree
Hide file tree
Showing 12 changed files with 1,071 additions and 144 deletions.
14 changes: 7 additions & 7 deletions batch/batch/driver/canceller.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ async def cancel_cancelled_ready_jobs_loop_body(self):
async def user_cancelled_ready_jobs(user, remaining) -> AsyncIterator[Dict[str, Any]]:
async for batch in self.db.select_and_fetchall(
'''
SELECT batches.id, batches_cancelled.id IS NOT NULL AS cancelled
SELECT batches.id, job_groups_cancelled.id IS NOT NULL AS cancelled
FROM batches
LEFT JOIN batches_cancelled
ON batches.id = batches_cancelled.id
LEFT JOIN job_groups_cancelled
ON batches.id = job_groups_cancelled.id
WHERE user = %s AND `state` = 'running';
''',
(user,),
Expand Down Expand Up @@ -186,8 +186,8 @@ async def user_cancelled_creating_jobs(user, remaining) -> AsyncIterator[Dict[st
'''
SELECT batches.id
FROM batches
INNER JOIN batches_cancelled
ON batches.id = batches_cancelled.id
INNER JOIN job_groups_cancelled
ON batches.id = job_groups_cancelled.id
WHERE user = %s AND `state` = 'running';
''',
(user,),
Expand Down Expand Up @@ -283,8 +283,8 @@ async def user_cancelled_running_jobs(user, remaining) -> AsyncIterator[Dict[str
'''
SELECT batches.id
FROM batches
INNER JOIN batches_cancelled
ON batches.id = batches_cancelled.id
INNER JOIN job_groups_cancelled
ON batches.id = job_groups_cancelled.id
WHERE user = %s AND `state` = 'running';
''',
(user,),
Expand Down
6 changes: 3 additions & 3 deletions batch/batch/driver/instance_collection/job_private.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,10 @@ async def create_instances_loop_body(self):
async def user_runnable_jobs(user, remaining) -> AsyncIterator[Dict[str, Any]]:
async for batch in self.db.select_and_fetchall(
'''
SELECT batches.id, batches_cancelled.id IS NOT NULL AS cancelled, userdata, user, format_version
SELECT batches.id, job_groups_cancelled.id IS NOT NULL AS cancelled, userdata, user, format_version
FROM batches
LEFT JOIN batches_cancelled
ON batches.id = batches_cancelled.id
LEFT JOIN job_groups_cancelled
ON batches.id = job_groups_cancelled.id
WHERE user = %s AND `state` = 'running';
''',
(user,),
Expand Down
10 changes: 5 additions & 5 deletions batch/batch/driver/instance_collection/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,8 @@ async def regions_to_ready_cores_mcpu_from_estimated_job_queue(self) -> List[Tup
SELECT jobs.batch_id, jobs.job_id, cores_mcpu, always_run, n_regions, regions_bits_rep
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled)
LEFT JOIN batches ON jobs.batch_id = batches.id
LEFT JOIN batches_cancelled ON batches.id = batches_cancelled.id
WHERE user = %s AND batches.`state` = 'running' AND jobs.state = 'Ready' AND NOT always_run AND batches_cancelled.id IS NULL AND inst_coll = %s
LEFT JOIN job_groups_cancelled ON batches.id = job_groups_cancelled.id
WHERE user = %s AND batches.`state` = 'running' AND jobs.state = 'Ready' AND NOT always_run AND job_groups_cancelled.id IS NULL AND inst_coll = %s
ORDER BY jobs.batch_id ASC, jobs.job_id ASC
LIMIT {share * self.job_queue_scheduling_window_secs}
)
Expand Down Expand Up @@ -607,10 +607,10 @@ async def schedule_loop_body(self):
async def user_runnable_jobs(user):
async for batch in self.db.select_and_fetchall(
'''
SELECT batches.id, batches_cancelled.id IS NOT NULL AS cancelled, userdata, user, format_version
SELECT batches.id, job_groups_cancelled.id IS NOT NULL AS cancelled, userdata, user, format_version
FROM batches
LEFT JOIN batches_cancelled
ON batches.id = batches_cancelled.id
LEFT JOIN job_groups_cancelled
ON batches.id = job_groups_cancelled.id
WHERE user = %s AND `state` = 'running';
''',
(user,),
Expand Down
22 changes: 11 additions & 11 deletions batch/batch/driver/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,27 @@ async def notify_batch_job_complete(db: Database, client_session: httpx.ClientSe
SELECT batches.*,
cost_t.cost,
cost_t.cost_breakdown,
batches_cancelled.id IS NOT NULL AS cancelled,
batches_n_jobs_in_complete_states.n_completed,
batches_n_jobs_in_complete_states.n_succeeded,
batches_n_jobs_in_complete_states.n_failed,
batches_n_jobs_in_complete_states.n_cancelled
job_groups_cancelled.id IS NOT NULL AS cancelled,
job_groups_n_jobs_in_complete_states.n_completed,
job_groups_n_jobs_in_complete_states.n_succeeded,
job_groups_n_jobs_in_complete_states.n_failed,
job_groups_n_jobs_in_complete_states.n_cancelled
FROM batches
LEFT JOIN batches_n_jobs_in_complete_states
ON batches.id = batches_n_jobs_in_complete_states.id
LEFT JOIN job_groups_n_jobs_in_complete_states
ON batches.id = job_groups_n_jobs_in_complete_states.id
LEFT JOIN LATERAL (
SELECT COALESCE(SUM(`usage` * rate), 0) AS cost, JSON_OBJECTAGG(resources.resource, COALESCE(`usage` * rate, 0)) AS cost_breakdown
FROM (
SELECT batch_id, resource_id, CAST(COALESCE(SUM(`usage`), 0) AS SIGNED) AS `usage`
FROM aggregated_batch_resources_v2
WHERE batches.id = aggregated_batch_resources_v2.batch_id
FROM aggregated_job_group_resources_v2
WHERE batches.id = aggregated_job_group_resources_v2.batch_id
GROUP BY batch_id, resource_id
) AS usage_t
LEFT JOIN resources ON usage_t.resource_id = resources.resource_id
GROUP BY batch_id
) AS cost_t ON TRUE
LEFT JOIN batches_cancelled
ON batches.id = batches_cancelled.id
LEFT JOIN job_groups_cancelled
ON batches.id = job_groups_cancelled.id
WHERE batches.id = %s AND NOT deleted AND callback IS NOT NULL AND
batches.`state` = 'complete';
''',
Expand Down
14 changes: 7 additions & 7 deletions batch/batch/driver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1019,11 +1019,11 @@ async def check(tx):
FROM
(
SELECT batches.user, jobs.state, jobs.cores_mcpu, jobs.inst_coll,
(jobs.always_run OR NOT (jobs.cancelled OR batches_cancelled.id IS NOT NULL)) AS runnable,
(NOT jobs.always_run AND (jobs.cancelled OR batches_cancelled.id IS NOT NULL)) AS cancelled
(jobs.always_run OR NOT (jobs.cancelled OR job_groups_cancelled.id IS NOT NULL)) AS runnable,
(NOT jobs.always_run AND (jobs.cancelled OR job_groups_cancelled.id IS NOT NULL)) AS cancelled
FROM batches
INNER JOIN jobs ON batches.id = jobs.batch_id
LEFT JOIN batches_cancelled ON batches.id = batches_cancelled.id
LEFT JOIN job_groups_cancelled ON batches.id = job_groups_cancelled.id
WHERE batches.`state` = 'running'
) as v
GROUP BY user, inst_coll
Expand Down Expand Up @@ -1138,7 +1138,7 @@ async def check(tx):
SELECT batch_id, billing_project, JSON_OBJECTAGG(resource, `usage`) as resources
FROM (
SELECT batch_id, resource_id, CAST(COALESCE(SUM(`usage`), 0) AS SIGNED) AS `usage`
FROM aggregated_batch_resources_v2
FROM aggregated_job_group_resources_v2
GROUP BY batch_id, resource_id) AS t
LEFT JOIN resources ON t.resource_id = resources.resource_id
JOIN batches ON batches.id = t.batch_id
Expand Down Expand Up @@ -1245,10 +1245,10 @@ async def cancel_fast_failing_batches(app):

records = db.select_and_fetchall(
'''
SELECT batches.id, batches_n_jobs_in_complete_states.n_failed
SELECT batches.id, job_groups_n_jobs_in_complete_states.n_failed
FROM batches
LEFT JOIN batches_n_jobs_in_complete_states
ON batches.id = batches_n_jobs_in_complete_states.id
LEFT JOIN job_groups_n_jobs_in_complete_states
ON batches.id = job_groups_n_jobs_in_complete_states.id
WHERE state = 'running' AND cancel_after_n_failures IS NOT NULL AND n_failed >= cancel_after_n_failures
'''
)
Expand Down
56 changes: 28 additions & 28 deletions batch/batch/front_end/front_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -1097,7 +1097,7 @@ async def insert_jobs_into_db(tx):
query_name='insert_jobs_telemetry',
)

batches_inst_coll_staging_args = [
job_groups_inst_coll_staging_args = [
(
batch_id,
update_id,
Expand All @@ -1112,18 +1112,18 @@ async def insert_jobs_into_db(tx):
]
await tx.execute_many(
'''
INSERT INTO batches_inst_coll_staging (batch_id, update_id, job_group_id, inst_coll, token, n_jobs, n_ready_jobs, ready_cores_mcpu)
INSERT INTO job_groups_inst_coll_staging (batch_id, update_id, job_group_id, inst_coll, token, n_jobs, n_ready_jobs, ready_cores_mcpu)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
n_jobs = n_jobs + VALUES(n_jobs),
n_ready_jobs = n_ready_jobs + VALUES(n_ready_jobs),
ready_cores_mcpu = ready_cores_mcpu + VALUES(ready_cores_mcpu);
''',
batches_inst_coll_staging_args,
query_name='insert_batches_inst_coll_staging',
job_groups_inst_coll_staging_args,
query_name='insert_job_groups_inst_coll_staging',
)

batch_inst_coll_cancellable_resources_args = [
job_group_inst_coll_cancellable_resources_args = [
(
batch_id,
update_id,
Expand All @@ -1137,13 +1137,13 @@ async def insert_jobs_into_db(tx):
]
await tx.execute_many(
'''
INSERT INTO batch_inst_coll_cancellable_resources (batch_id, update_id, job_group_id, inst_coll, token, n_ready_cancellable_jobs, ready_cancellable_cores_mcpu)
INSERT INTO job_group_inst_coll_cancellable_resources (batch_id, update_id, job_group_id, inst_coll, token, n_ready_cancellable_jobs, ready_cancellable_cores_mcpu)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
n_ready_cancellable_jobs = n_ready_cancellable_jobs + VALUES(n_ready_cancellable_jobs),
ready_cancellable_cores_mcpu = ready_cancellable_cores_mcpu + VALUES(ready_cancellable_cores_mcpu);
''',
batch_inst_coll_cancellable_resources_args,
job_group_inst_coll_cancellable_resources_args,
query_name='insert_inst_coll_cancellable_resources',
)

Expand Down Expand Up @@ -1352,20 +1352,20 @@ async def insert(tx):

await tx.execute_insertone(
'''
INSERT INTO batches_n_jobs_in_complete_states (id, job_group_id) VALUES (%s, %s);
INSERT INTO job_groups_n_jobs_in_complete_states (id, job_group_id) VALUES (%s, %s);
''',
(id, ROOT_JOB_GROUP_ID),
query_name='insert_batches_n_jobs_in_complete_states',
query_name='insert_job_groups_n_jobs_in_complete_states',
)

if attributes:
await tx.execute_many(
'''
INSERT INTO `batch_attributes` (batch_id, job_group_id, `key`, `value`)
INSERT INTO `job_group_attributes` (batch_id, job_group_id, `key`, `value`)
VALUES (%s, %s, %s, %s)
''',
[(id, ROOT_JOB_GROUP_ID, k, v) for k, v in attributes.items()],
query_name='insert_batch_attributes',
query_name='insert_job_group_attributes',
)
return id

Expand Down Expand Up @@ -1452,9 +1452,9 @@ async def update(tx: Transaction):
# but do allow updates to batches with jobs that have been cancelled.
record = await tx.execute_and_fetchone(
'''
SELECT batches_cancelled.id IS NOT NULL AS cancelled
SELECT job_groups_cancelled.id IS NOT NULL AS cancelled
FROM batches
LEFT JOIN batches_cancelled ON batches.id = batches_cancelled.id
LEFT JOIN job_groups_cancelled ON batches.id = job_groups_cancelled.id
WHERE batches.id = %s AND user = %s AND NOT deleted
FOR UPDATE;
''',
Expand Down Expand Up @@ -1504,23 +1504,23 @@ async def _get_batch(app, batch_id):
record = await db.select_and_fetchone(
'''
SELECT batches.*,
batches_cancelled.id IS NOT NULL AS cancelled,
batches_n_jobs_in_complete_states.n_completed,
batches_n_jobs_in_complete_states.n_succeeded,
batches_n_jobs_in_complete_states.n_failed,
batches_n_jobs_in_complete_states.n_cancelled,
job_groups_cancelled.id IS NOT NULL AS cancelled,
job_groups_n_jobs_in_complete_states.n_completed,
job_groups_n_jobs_in_complete_states.n_succeeded,
job_groups_n_jobs_in_complete_states.n_failed,
job_groups_n_jobs_in_complete_states.n_cancelled,
cost_t.*
FROM batches
LEFT JOIN batches_n_jobs_in_complete_states
ON batches.id = batches_n_jobs_in_complete_states.id
LEFT JOIN batches_cancelled
ON batches.id = batches_cancelled.id
LEFT JOIN job_groups_n_jobs_in_complete_states
ON batches.id = job_groups_n_jobs_in_complete_states.id
LEFT JOIN job_groups_cancelled
ON batches.id = job_groups_cancelled.id
LEFT JOIN LATERAL (
SELECT COALESCE(SUM(`usage` * rate), 0) AS cost, JSON_OBJECTAGG(resources.resource, COALESCE(`usage` * rate, 0)) AS cost_breakdown
FROM (
SELECT batch_id, resource_id, CAST(COALESCE(SUM(`usage`), 0) AS SIGNED) AS `usage`
FROM aggregated_batch_resources_v2
WHERE batches.id = aggregated_batch_resources_v2.batch_id
FROM aggregated_job_group_resources_v2
WHERE batches.id = aggregated_job_group_resources_v2.batch_id
GROUP BY batch_id, resource_id
) AS usage_t
LEFT JOIN resources ON usage_t.resource_id = resources.resource_id
Expand Down Expand Up @@ -1590,9 +1590,9 @@ async def close_batch(request, userdata):

record = await db.select_and_fetchone(
'''
SELECT batches_cancelled.id IS NOT NULL AS cancelled
SELECT job_groups_cancelled.id IS NOT NULL AS cancelled
FROM batches
LEFT JOIN batches_cancelled ON batches.id = batches_cancelled.id
LEFT JOIN job_groups_cancelled ON batches.id = job_groups_cancelled.id
WHERE user = %s AND batches.id = %s AND NOT deleted;
''',
(user, batch_id),
Expand Down Expand Up @@ -1627,10 +1627,10 @@ async def commit_update(request: web.Request, userdata):

record = await db.select_and_fetchone(
'''
SELECT start_job_id, batches_cancelled.id IS NOT NULL AS cancelled
SELECT start_job_id, job_groups_cancelled.id IS NOT NULL AS cancelled
FROM batches
LEFT JOIN batch_updates ON batches.id = batch_updates.batch_id
LEFT JOIN batches_cancelled ON batches.id = batches_cancelled.id
LEFT JOIN job_groups_cancelled ON batches.id = job_groups_cancelled.id
WHERE user = %s AND batches.id = %s AND batch_updates.update_id = %s AND NOT deleted;
''',
(user, batch_id, update_id),
Expand Down
8 changes: 4 additions & 4 deletions batch/batch/front_end/query/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ def query(self) -> Tuple[str, List[Any]]:
condition = "(`state` = 'running')"
args = []
elif self.state == BatchState.CANCELLED:
condition = '(batches_cancelled.id IS NOT NULL)'
condition = '(job_groups_cancelled.id IS NOT NULL)'
args = []
elif self.state == BatchState.FAILURE:
condition = '(n_failed > 0)'
Expand Down Expand Up @@ -457,7 +457,7 @@ def __init__(self, term: str):
def query(self) -> Tuple[str, List[str]]:
sql = '''
((batches.id) IN
(SELECT batch_id FROM batch_attributes
(SELECT batch_id FROM job_group_attributes
WHERE `key` = %s OR `value` = %s))
'''
return (sql, [self.term, self.term])
Expand All @@ -480,7 +480,7 @@ def __init__(self, term: str):
def query(self) -> Tuple[str, List[str]]:
sql = '''
((batches.id) IN
(SELECT batch_id FROM batch_attributes
(SELECT batch_id FROM job_group_attributes
WHERE `key` LIKE %s OR `value` LIKE %s))
'''
escaped_term = f'%{self.term}%'
Expand All @@ -507,7 +507,7 @@ def query(self) -> Tuple[str, List[str]]:
value = f'%{value}%'
sql = f'''
((batches.id) IN
(SELECT batch_id FROM batch_attributes
(SELECT batch_id FROM job_group_attributes
WHERE `key` = %s AND `value` {op} %s))
'''
return (sql, [self.key, value])
Expand Down
28 changes: 14 additions & 14 deletions batch/batch/front_end/query/query_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ def parse_list_batches_query_v1(user: str, q: str, last_batch_id: Optional[int])
k, v = t.split('=', 1)
condition = '''
((batches.id) IN
(SELECT batch_id FROM batch_attributes
(SELECT batch_id FROM job_group_attributes
WHERE `key` = %s AND `value` = %s))
'''
args = [k, v]
elif t.startswith('has:'):
k = t[4:]
condition = '''
((batches.id) IN
(SELECT batch_id FROM batch_attributes
(SELECT batch_id FROM job_group_attributes
WHERE `key` = %s))
'''
args = [k]
Expand Down Expand Up @@ -65,7 +65,7 @@ def parse_list_batches_query_v1(user: str, q: str, last_batch_id: Optional[int])
condition = "(`state` = 'running')"
args = []
elif t == 'cancelled':
condition = '(batches_cancelled.id IS NOT NULL)'
condition = '(job_groups_cancelled.id IS NOT NULL)'
args = []
elif t == 'failure':
condition = '(n_failed > 0)'
Expand All @@ -86,17 +86,17 @@ def parse_list_batches_query_v1(user: str, q: str, last_batch_id: Optional[int])
sql = f'''
WITH base_t AS (
SELECT batches.*,
batches_cancelled.id IS NOT NULL AS cancelled,
batches_n_jobs_in_complete_states.n_completed,
batches_n_jobs_in_complete_states.n_succeeded,
batches_n_jobs_in_complete_states.n_failed,
batches_n_jobs_in_complete_states.n_cancelled
job_groups_cancelled.id IS NOT NULL AS cancelled,
job_groups_n_jobs_in_complete_states.n_completed,
job_groups_n_jobs_in_complete_states.n_succeeded,
job_groups_n_jobs_in_complete_states.n_failed,
job_groups_n_jobs_in_complete_states.n_cancelled
FROM batches
LEFT JOIN billing_projects ON batches.billing_project = billing_projects.name
LEFT JOIN batches_n_jobs_in_complete_states
ON batches.id = batches_n_jobs_in_complete_states.id
LEFT JOIN batches_cancelled
ON batches.id = batches_cancelled.id
LEFT JOIN job_groups_n_jobs_in_complete_states
ON batches.id = job_groups_n_jobs_in_complete_states.id
LEFT JOIN job_groups_cancelled
ON batches.id = job_groups_cancelled.id
STRAIGHT_JOIN billing_project_users ON batches.billing_project = billing_project_users.billing_project
WHERE {' AND '.join(where_conditions)}
ORDER BY id DESC
Expand All @@ -108,8 +108,8 @@ def parse_list_batches_query_v1(user: str, q: str, last_batch_id: Optional[int])
SELECT COALESCE(SUM(`usage` * rate), 0) AS cost, JSON_OBJECTAGG(resources.resource, COALESCE(`usage` * rate, 0)) AS cost_breakdown
FROM (
SELECT batch_id, resource_id, CAST(COALESCE(SUM(`usage`), 0) AS SIGNED) AS `usage`
FROM aggregated_batch_resources_v2
WHERE base_t.id = aggregated_batch_resources_v2.batch_id
FROM aggregated_job_group_resources_v2
WHERE base_t.id = aggregated_job_group_resources_v2.batch_id
GROUP BY batch_id, resource_id
) AS usage_t
LEFT JOIN resources ON usage_t.resource_id = resources.resource_id
Expand Down
Loading

0 comments on commit 92e463e

Please sign in to comment.