Skip to content

Commit

Permalink
[batch] Finalize job groups in database
Browse files Browse the repository at this point in the history
  • Loading branch information
jigold committed Nov 9, 2023
1 parent b7b8cc5 commit e038f55
Show file tree
Hide file tree
Showing 5 changed files with 783 additions and 17 deletions.
3 changes: 2 additions & 1 deletion batch/batch/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from hailtop.utils import humanize_timedelta_msecs, time_msecs_str

from .batch_format_version import BatchFormatVersion
from .constants import ROOT_JOB_GROUP_ID
from .exceptions import NonExistentBatchError, OpenBatchError
from .utils import coalesce

Expand Down Expand Up @@ -125,6 +126,6 @@ async def cancel(tx):
if record['state'] == 'open':
raise OpenBatchError(batch_id)

await tx.just_execute('CALL cancel_batch(%s);', (batch_id,))
await tx.just_execute('CALL cancel_job_group(%s, %s);', (batch_id, ROOT_JOB_GROUP_ID))

await cancel()
2 changes: 1 addition & 1 deletion batch/batch/front_end/front_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -1557,7 +1557,7 @@ async def _delete_batch(app, batch_id):
if not record:
raise web.HTTPNotFound()

await db.just_execute('CALL cancel_batch(%s);', (batch_id,))
await db.just_execute('CALL cancel_job_group(%s, %s);', (batch_id, ROOT_JOB_GROUP_ID))
await db.execute_update('UPDATE batches SET deleted = 1 WHERE id = %s;', (batch_id,))

if record['state'] == 'running':
Expand Down
156 changes: 141 additions & 15 deletions batch/sql/estimated-current.sql
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ CREATE TABLE IF NOT EXISTS `batch_updates` (
`token` VARCHAR(100) DEFAULT NULL,
`start_job_id` INT NOT NULL,
`n_jobs` INT NOT NULL,
`start_job_group_id` INT NOT NULL DEFAULT 0,
`n_job_groups` INT NOT NULL DEFAULT 1,
`committed` BOOLEAN NOT NULL DEFAULT FALSE,
`time_created` BIGINT NOT NULL,
`time_committed` BIGINT,
Expand Down Expand Up @@ -620,21 +622,27 @@ BEGIN
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id AND migrated = 1
ON DUPLICATE KEY UPDATE `usage` = aggregated_billing_project_user_resources_v3.`usage` + msec_diff_rollup * quantity;

INSERT INTO aggregated_job_group_resources_v2 (batch_id, resource_id, token, `usage`)
SELECT batch_id,
INSERT INTO aggregated_job_group_resources_v2 (batch_id, job_group_id, resource_id, token, `usage`)
SELECT attempt_resources.batch_id,
job_group_self_and_ancestors.ancestor_id,
resource_id,
rand_token,
msec_diff_rollup * quantity
FROM attempt_resources
WHERE batch_id = NEW.batch_id AND job_id = NEW.job_id AND attempt_id = NEW.attempt_id
LEFT JOIN jobs ON attempt_resources.batch_id = jobs.batch_id AND attempt_resources.job_id = jobs.job_id
LEFT JOIN job_group_self_and_ancestors ON jobs.batch_id = job_group_self_and_ancestors.batch_id AND jobs.job_group_id = job_group_self_and_ancestors.job_group_id
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_resources.attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = `usage` + msec_diff_rollup * quantity;

INSERT INTO aggregated_job_group_resources_v3 (batch_id, resource_id, token, `usage`)
INSERT INTO aggregated_job_group_resources_v3 (batch_id, job_group_id, resource_id, token, `usage`)
SELECT attempt_resources.batch_id,
job_group_self_and_ancestors.ancestor_id,
attempt_resources.deduped_resource_id,
rand_token,
msec_diff_rollup * quantity
FROM attempt_resources
LEFT JOIN jobs ON attempt_resources.batch_id = jobs.batch_id AND attempt_resources.job_id = jobs.job_id
LEFT JOIN job_group_self_and_ancestors ON jobs.batch_id = job_group_self_and_ancestors.batch_id AND jobs.job_group_id = job_group_self_and_ancestors.job_group_id
JOIN aggregated_job_group_resources_v2 ON
aggregated_job_group_resources_v2.batch_id = attempt_resources.batch_id AND
aggregated_job_group_resources_v2.resource_id = attempt_resources.resource_id AND
Expand Down Expand Up @@ -711,6 +719,22 @@ BEGIN
END IF;
END $$

DROP TRIGGER IF EXISTS jobs_before_insert $$
CREATE TRIGGER jobs_before_insert BEFORE INSERT ON jobs
FOR EACH ROW
BEGIN
DECLARE job_group_cancelled BOOLEAN;

SET job_group_cancelled = EXISTS (SELECT TRUE
FROM job_groups_cancelled
WHERE id = NEW.batch_id AND job_group_id = NEW.job_group_id
LOCK IN SHARE MODE);

IF job_group_cancelled THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = "job group has already been cancelled";
END IF;
END $$

DROP TRIGGER IF EXISTS jobs_after_update $$
CREATE TRIGGER jobs_after_update AFTER UPDATE ON jobs
FOR EACH ROW
Expand Down Expand Up @@ -804,18 +828,20 @@ BEGIN
SET delta_running_cancellable_cores_mcpu = delta_n_running_cancellable_jobs * cores_mcpu;
SET delta_running_cores_mcpu = delta_n_running_jobs * cores_mcpu;

INSERT INTO job_group_inst_coll_cancellable_resources (batch_id, update_id, inst_coll, token,
INSERT INTO job_group_inst_coll_cancellable_resources (batch_id, job_group_id, update_id, inst_coll, token,
n_ready_cancellable_jobs,
ready_cancellable_cores_mcpu,
n_creating_cancellable_jobs,
n_running_cancellable_jobs,
running_cancellable_cores_mcpu)
VALUES (NEW.batch_id, NEW.update_id, NEW.inst_coll, rand_token,
SELECT NEW.batch_id, NEW.update_id, job_group_self_and_ancestors.ancestor_id, NEW.inst_coll, rand_token,
delta_n_ready_cancellable_jobs,
delta_ready_cancellable_cores_mcpu,
delta_n_creating_cancellable_jobs,
delta_n_running_cancellable_jobs,
delta_running_cancellable_cores_mcpu)
delta_running_cancellable_cores_mcpu
FROM job_group_self_and_ancestors
WHERE job_group_self_and_ancestors.batch_id = NEW.batch_id AND job_group_self_and_ancestors.job_group_id = NEW.job_group_id
ON DUPLICATE KEY UPDATE
n_ready_cancellable_jobs = n_ready_cancellable_jobs + delta_n_ready_cancellable_jobs,
ready_cancellable_cores_mcpu = ready_cancellable_cores_mcpu + delta_ready_cancellable_cores_mcpu,
Expand Down Expand Up @@ -861,19 +887,24 @@ BEGIN
DECLARE cur_start_time BIGINT;
DECLARE cur_rollup_time BIGINT;
DECLARE cur_billing_project VARCHAR(100);
DECLARE cur_job_group_id INT;
DECLARE cur_user VARCHAR(100);
DECLARE msec_diff_rollup BIGINT;
DECLARE cur_n_tokens INT;
DECLARE rand_token INT;
DECLARE cur_billing_date DATE;
DECLARE bp_user_resources_migrated BOOLEAN DEFAULT FALSE;
DECLARE bp_user_resources_by_date_migrated BOOLEAN DEFAULT FALSE;
DECLARE batch_resources_migrated BOOLEAN DEFAULT FALSE;
DECLARE job_group_resources_migrated BOOLEAN DEFAULT FALSE;
DECLARE job_resources_migrated BOOLEAN DEFAULT FALSE;

SELECT billing_project, user INTO cur_billing_project, cur_user
FROM batches WHERE id = NEW.batch_id;

SELECT job_group_id INTO cur_job_group_id
FROM jobs
WHERE batch_id = NEW.batch_id AND job_id = NEW.job_id;

SELECT n_tokens INTO cur_n_tokens FROM globals LOCK IN SHARE MODE;
SET rand_token = FLOOR(RAND() * cur_n_tokens);

Expand Down Expand Up @@ -904,19 +935,23 @@ BEGIN
`usage` = `usage` + NEW.quantity * msec_diff_rollup;
END IF;

INSERT INTO aggregated_job_group_resources_v2 (batch_id, resource_id, token, `usage`)
VALUES (NEW.batch_id, NEW.resource_id, rand_token, NEW.quantity * msec_diff_rollup)
INSERT INTO aggregated_job_group_resources_v2 (batch_id, job_group_id, resource_id, token, `usage`)
SELECT NEW.batch_id, ancestor_id, NEW.resource_id, rand_token, NEW.quantity * msec_diff_rollup
FROM job_group_self_and_ancestors
WHERE job_group_self_and_ancestors.batch_id = NEW.batch_id AND job_group_self_and_ancestors.job_group_id = cur_job_group_id
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;

SELECT migrated INTO batch_resources_migrated
SELECT migrated INTO job_group_resources_migrated
FROM aggregated_job_group_resources_v2
WHERE batch_id = NEW.batch_id AND resource_id = NEW.resource_id AND token = rand_token
WHERE batch_id = NEW.batch_id AND job_group_id = cur_job_group_id AND resource_id = NEW.resource_id AND token = rand_token
FOR UPDATE;

IF batch_resources_migrated THEN
INSERT INTO aggregated_job_group_resources_v3 (batch_id, resource_id, token, `usage`)
VALUES (NEW.batch_id, NEW.deduped_resource_id, rand_token, NEW.quantity * msec_diff_rollup)
IF job_group_resources_migrated THEN
INSERT INTO aggregated_job_group_resources_v3 (batch_id, job_group_id, resource_id, token, `usage`)
SELECT NEW.batch_id, ancestor_id, NEW.deduped_resource_id, rand_token, NEW.quantity * msec_diff_rollup
FROM job_group_self_and_ancestors
WHERE job_group_self_and_ancestors.batch_id = NEW.batch_id AND job_group_self_and_ancestors.job_group_id = cur_job_group_id
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;
END IF;
Expand Down Expand Up @@ -1231,6 +1266,22 @@ BEGIN
jobs.job_id < cur_update_start_job_id + staging_n_jobs;
END IF;

INSERT INTO job_group_self_and_ancestors (batch_id, job_group_id, ancestor_id, level)
SELECT batch_id, job_group_id, job_group_id, level + 1
FROM (
SELECT batch_id, job_group_id, MIN(ancestor_id) AS last_known_ancestor, MAX(level) AS last_known_level
FROM job_group_self_and_ancestors
WHERE batch_id = in_batch_id
GROUP BY batch_id, job_group_id
HAVING last_known_ancestor != 0
) AS last_known_ancestors
LEFT JOIN LATERAL (
SELECT batch_id, last_known_ancestors.job_group_id, ancestor_id, last_known_ancestors.last_known_level + 1
FROM job_group_self_and_ancestors
WHERE last_known_ancestors.batch_id = job_group_self_and_ancestors.batch_id AND
last_known_ancestors.last_known_ancestor = job_group_self_and_ancestors.job_group_id
) AS new_ancestors ON TRUE;

COMMIT;
SELECT 0 as rc;
ELSE
Expand Down Expand Up @@ -1306,6 +1357,81 @@ BEGIN
COMMIT;
END $$

DROP PROCEDURE IF EXISTS cancel_job_group $$
CREATE PROCEDURE cancel_job_group(
IN in_batch_id VARCHAR(100),
IN in_job_group_id INT
)
BEGIN
DECLARE cur_user VARCHAR(100);
DECLARE cur_batch_state VARCHAR(40);
DECLARE cur_cancelled BOOLEAN;
DECLARE cur_n_cancelled_ready_jobs INT;
DECLARE cur_cancelled_ready_cores_mcpu BIGINT;
DECLARE cur_n_cancelled_running_jobs INT;
DECLARE cur_cancelled_running_cores_mcpu BIGINT;
DECLARE cur_n_n_cancelled_creating_jobs INT;

START TRANSACTION;

SELECT user, `state` INTO cur_user, cur_batch_state FROM batches
WHERE id = in_batch_id
FOR UPDATE;

SET cur_cancelled = EXISTS (SELECT TRUE
FROM job_groups_cancelled
WHERE id = in_batch_id AND job_group_id = in_job_group_id
FOR UPDATE);

IF cur_batch_state = 'running' AND NOT cur_cancelled THEN
INSERT INTO user_inst_coll_resources (user, inst_coll, token,
n_ready_jobs, ready_cores_mcpu,
n_running_jobs, running_cores_mcpu,
n_creating_jobs,
n_cancelled_ready_jobs, n_cancelled_running_jobs, n_cancelled_creating_jobs)
SELECT user, inst_coll, 0,
-1 * (@n_ready_cancellable_jobs := COALESCE(SUM(n_ready_cancellable_jobs), 0)),
-1 * (@ready_cancellable_cores_mcpu := COALESCE(SUM(ready_cancellable_cores_mcpu), 0)),
-1 * (@n_running_cancellable_jobs := COALESCE(SUM(n_running_cancellable_jobs), 0)),
-1 * (@running_cancellable_cores_mcpu := COALESCE(SUM(running_cancellable_cores_mcpu), 0)),
-1 * (@n_creating_cancellable_jobs := COALESCE(SUM(n_creating_cancellable_jobs), 0)),
COALESCE(SUM(n_ready_cancellable_jobs), 0),
COALESCE(SUM(n_running_cancellable_jobs), 0),
COALESCE(SUM(n_creating_cancellable_jobs), 0)
FROM job_group_inst_coll_cancellable_resources
JOIN batches ON batches.id = job_group_inst_coll_cancellable_resources.batch_id
INNER JOIN batch_updates ON job_group_inst_coll_cancellable_resources.batch_id = batch_updates.batch_id AND
job_group_inst_coll_cancellable_resources.update_id = batch_updates.update_id
WHERE job_group_inst_coll_cancellable_resources.batch_id = in_batch_id AND
job_group_inst_coll_cancellable_resources.job_group_id = in_job_group_id AND
job_group_updates.committed
GROUP BY user, inst_coll
ON DUPLICATE KEY UPDATE
n_ready_jobs = n_ready_jobs - @n_ready_cancellable_jobs,
ready_cores_mcpu = ready_cores_mcpu - @ready_cancellable_cores_mcpu,
n_running_jobs = n_running_jobs - @n_running_cancellable_jobs,
running_cores_mcpu = running_cores_mcpu - @running_cancellable_cores_mcpu,
n_creating_jobs = n_creating_jobs - @n_creating_cancellable_jobs,
n_cancelled_ready_jobs = n_cancelled_ready_jobs + @n_ready_cancellable_jobs,
n_cancelled_running_jobs = n_cancelled_running_jobs + @n_running_cancellable_jobs,
n_cancelled_creating_jobs = n_cancelled_creating_jobs + @n_creating_cancellable_jobs;

# delete all rows that are children of this job group
DELETE job_group_inst_coll_cancellable_resources FROM job_group_inst_coll_cancellable_resources
LEFT JOIN batch_updates ON job_group_inst_coll_cancellable_resources.batch_id = batch_updates.batch_id AND
job_group_inst_coll_cancellable_resources.update_id = batch_updates.update_id
INNER JOIN job_group_self_and_ancestors ON job_group_inst_coll_cancellable_resources.batch_id = job_group_self_and_ancestors.batch_id AND
job_group_inst_coll_cancellable_resources.job_group_id = job_group_self_and_ancestors.job_group_id
WHERE job_group_inst_coll_cancellable_resources.batch_id = in_batch_id AND
job_group_self_and_ancestors.parent_id = in_job_group_id AND
batch_updates.committed;

INSERT INTO job_groups_cancelled (id, job_group_id) VALUES (in_batch_id, in_job_group_id);
END IF;

COMMIT;
END $$

DROP PROCEDURE IF EXISTS add_attempt $$
CREATE PROCEDURE add_attempt(
IN in_batch_id BIGINT,
Expand Down
Loading

0 comments on commit e038f55

Please sign in to comment.