Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[batch] Show cost breakdown in the UI #13491

Merged
merged 10 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions batch/batch/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
log = logging.getLogger('batch')


def cost_breakdown_to_dict(cost_breakdown: dict):
return [{'resource': resource, 'cost': cost} for resource, cost in cost_breakdown.items()]


def batch_record_to_dict(record: Dict[str, Any]) -> Dict[str, Any]:
if record['state'] == 'open':
state = 'open'
Expand Down Expand Up @@ -39,6 +43,9 @@ def _time_msecs_str(t):
else:
duration = None

if record['cost_breakdown'] is not None:
record['cost_breakdown'] = cost_breakdown_to_dict(json.loads(record['cost_breakdown']))

d = {
'id': record['id'],
'user': record['user'],
Expand All @@ -58,6 +65,7 @@ def _time_msecs_str(t):
'duration': duration,
'msec_mcpu': record['msec_mcpu'],
'cost': coalesce(record['cost'], 0),
'cost_breakdown': record['cost_breakdown'],
}

attributes = json.loads(record['attributes'])
Expand All @@ -78,6 +86,9 @@ def job_record_to_dict(record: Dict[str, Any], name: Optional[str]) -> Dict[str,
exit_code = None
duration = None

if record['cost_breakdown'] is not None:
record['cost_breakdown'] = cost_breakdown_to_dict(json.loads(record['cost_breakdown']))

result = {
'batch_id': record['batch_id'],
'job_id': record['job_id'],
Expand All @@ -89,6 +100,7 @@ def job_record_to_dict(record: Dict[str, Any], name: Optional[str]) -> Dict[str,
'duration': duration,
'cost': coalesce(record['cost'], 0),
'msec_mcpu': record['msec_mcpu'],
'cost_breakdown': record['cost_breakdown'],
}

return result
Expand Down
34 changes: 21 additions & 13 deletions batch/batch/driver/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,34 @@
async def notify_batch_job_complete(db: Database, client_session: httpx.ClientSession, batch_id):
record = await db.select_and_fetchone(
'''
SELECT batches.*, COALESCE(SUM(`usage` * rate), 0) AS cost, batches_cancelled.id IS NOT NULL AS cancelled, batches_n_jobs_in_complete_states.n_completed, batches_n_jobs_in_complete_states.n_succeeded, batches_n_jobs_in_complete_states.n_failed, batches_n_jobs_in_complete_states.n_cancelled
SELECT batches.*,
cost_t.cost,
cost_t.cost_breakdown,
batches_cancelled.id IS NOT NULL AS cancelled,
batches_n_jobs_in_complete_states.n_completed,
batches_n_jobs_in_complete_states.n_succeeded,
batches_n_jobs_in_complete_states.n_failed,
batches_n_jobs_in_complete_states.n_cancelled
FROM batches
LEFT JOIN batches_n_jobs_in_complete_states
ON batches.id = batches_n_jobs_in_complete_states.id
LEFT JOIN (
SELECT batch_id, resource_id, CAST(COALESCE(SUM(`usage`), 0) AS SIGNED) AS `usage`
FROM aggregated_batch_resources_v2
WHERE batch_id = %s
GROUP BY batch_id, resource_id
) AS abr
ON batches.id = abr.batch_id
LEFT JOIN resources
ON abr.resource_id = resources.resource_id
LEFT JOIN LATERAL (
SELECT COALESCE(SUM(`usage` * rate), 0) AS cost, JSON_OBJECTAGG(resources.resource, COALESCE(`usage` * rate, 0)) AS cost_breakdown
FROM (
SELECT batch_id, resource_id, CAST(COALESCE(SUM(`usage`), 0) AS SIGNED) AS `usage`
FROM aggregated_batch_resources_v2
WHERE batches.id = aggregated_batch_resources_v2.batch_id
GROUP BY batch_id, resource_id
) AS usage_t
LEFT JOIN resources ON usage_t.resource_id = resources.resource_id
GROUP BY batch_id
) AS cost_t ON TRUE
LEFT JOIN batches_cancelled
ON batches.id = batches_cancelled.id
WHERE batches.id = %s AND NOT deleted AND callback IS NOT NULL AND
batches.`state` = 'complete'
GROUP BY batches.id;
batches.`state` = 'complete';
''',
(batch_id, batch_id),
(batch_id,),
'notify_batch_job_complete',
)

Expand Down
59 changes: 33 additions & 26 deletions batch/batch/front_end/front_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -1467,30 +1467,30 @@ async def _get_batch(app, batch_id):

record = await db.select_and_fetchone(
'''
WITH base_t AS (
SELECT batches.*,
batches_cancelled.id IS NOT NULL AS cancelled,
batches_n_jobs_in_complete_states.n_completed,
batches_n_jobs_in_complete_states.n_succeeded,
batches_n_jobs_in_complete_states.n_failed,
batches_n_jobs_in_complete_states.n_cancelled
batches_n_jobs_in_complete_states.n_cancelled,
cost_t.*
FROM batches
LEFT JOIN batches_n_jobs_in_complete_states
ON batches.id = batches_n_jobs_in_complete_states.id
LEFT JOIN batches_cancelled
ON batches.id = batches_cancelled.id
WHERE batches.id = %s AND NOT deleted
)
SELECT base_t.*, COALESCE(SUM(`usage` * rate), 0) AS cost
FROM base_t
LEFT JOIN (
SELECT aggregated_batch_resources_v2.batch_id, resource_id, CAST(COALESCE(SUM(`usage`), 0) AS SIGNED) AS `usage`
FROM base_t
LEFT JOIN aggregated_batch_resources_v2 ON base_t.id = aggregated_batch_resources_v2.batch_id
GROUP BY aggregated_batch_resources_v2.batch_id, aggregated_batch_resources_v2.resource_id
) AS usage_t ON base_t.id = usage_t.batch_id
LEFT JOIN resources ON usage_t.resource_id = resources.resource_id
GROUP BY base_t.id;
LEFT JOIN LATERAL (
SELECT COALESCE(SUM(`usage` * rate), 0) AS cost, JSON_OBJECTAGG(resources.resource, COALESCE(`usage` * rate, 0)) AS cost_breakdown
FROM (
SELECT batch_id, resource_id, CAST(COALESCE(SUM(`usage`), 0) AS SIGNED) AS `usage`
FROM aggregated_batch_resources_v2
WHERE batches.id = aggregated_batch_resources_v2.batch_id
GROUP BY batch_id, resource_id
) AS usage_t
LEFT JOIN resources ON usage_t.resource_id = resources.resource_id
GROUP BY batch_id
) AS cost_t ON TRUE
WHERE batches.id = %s AND NOT deleted;
''',
(batch_id,),
)
Expand Down Expand Up @@ -1666,6 +1666,11 @@ async def ui_batch(request, userdata, batch_id):

batch['cost'] = cost_str(batch['cost'])

if batch['cost_breakdown'] is not None:
for record in batch['cost_breakdown']:
record['cost'] = cost_str(record['cost'])
batch['cost_breakdown'].sort(key=lambda record: record['resource'])

page_context = {
'batch': batch,
'q': q,
Expand Down Expand Up @@ -1759,21 +1764,18 @@ async def _get_job(app, batch_id, job_id):
ON jobs.batch_id = batch_updates.batch_id AND jobs.update_id = batch_updates.update_id
WHERE jobs.batch_id = %s AND NOT deleted AND jobs.job_id = %s AND batch_updates.committed
)
SELECT base_t.*, COALESCE(SUM(`usage` * rate), 0) AS cost
SELECT base_t.*, cost_t.cost, cost_t.cost_breakdown
FROM base_t
LEFT JOIN (
SELECT aggregated_job_resources_v2.batch_id,
aggregated_job_resources_v2.job_id,
aggregated_job_resources_v2.resource_id,
CAST(COALESCE(SUM(`usage`), 0) AS SIGNED) AS `usage`
FROM base_t
LEFT JOIN aggregated_job_resources_v2
ON aggregated_job_resources_v2.batch_id = base_t.batch_id AND
aggregated_job_resources_v2.job_id = base_t.job_id
LEFT JOIN LATERAL (
SELECT COALESCE(SUM(`usage` * rate), 0) AS cost, JSON_OBJECTAGG(resources.resource, COALESCE(`usage` * rate, 0)) AS cost_breakdown
FROM (SELECT aggregated_job_resources_v2.batch_id, aggregated_job_resources_v2.job_id, resource_id, CAST(COALESCE(SUM(`usage`), 0) AS SIGNED) AS `usage`
FROM aggregated_job_resources_v2
WHERE aggregated_job_resources_v2.batch_id = base_t.batch_id AND aggregated_job_resources_v2.job_id = base_t.job_id
GROUP BY aggregated_job_resources_v2.batch_id, aggregated_job_resources_v2.job_id, aggregated_job_resources_v2.resource_id
) AS usage_t ON usage_t.batch_id = base_t.batch_id AND usage_t.job_id = base_t.job_id
) AS usage_t
LEFT JOIN resources ON usage_t.resource_id = resources.resource_id
GROUP BY base_t.batch_id, base_t.job_id, base_t.last_cancelled_attempt_id;
GROUP BY usage_t.batch_id, usage_t.job_id
) AS cost_t ON TRUE;
''',
(batch_id, job_id, batch_id, job_id),
)
Expand Down Expand Up @@ -2073,6 +2075,11 @@ async def ui_get_job(request, userdata, batch_id):
job['duration'] = humanize_timedelta_msecs(job['duration'])
job['cost'] = cost_str(job['cost'])

if job['cost_breakdown'] is not None:
for record in job['cost_breakdown']:
record['cost'] = cost_str(record['cost'])
job['cost_breakdown'].sort(key=lambda record: record['resource'])

job_status = job['status']
container_status_spec = dictfix.NoneOr(
{
Expand Down
37 changes: 21 additions & 16 deletions batch/batch/front_end/query/query_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,19 @@ def parse_list_batches_query_v1(user: str, q: str, last_batch_id: Optional[int])
ORDER BY id DESC
LIMIT 51
)
SELECT base_t.*, COALESCE(SUM(`usage` * rate), 0) AS cost
SELECT base_t.*, cost_t.cost, cost_t.cost_breakdown
FROM base_t
LEFT JOIN (
SELECT batch_id, resource_id, CAST(COALESCE(SUM(`usage`), 0) AS SIGNED) AS `usage`
FROM base_t
LEFT JOIN aggregated_batch_resources_v2 ON base_t.id = aggregated_batch_resources_v2.batch_id
GROUP BY batch_id, resource_id
) AS usage_t ON base_t.id = usage_t.batch_id
LEFT JOIN resources ON usage_t.resource_id = resources.resource_id
GROUP BY id
LEFT JOIN LATERAL (
SELECT COALESCE(SUM(`usage` * rate), 0) AS cost, JSON_OBJECTAGG(resources.resource, COALESCE(`usage` * rate, 0)) AS cost_breakdown
FROM (
SELECT batch_id, resource_id, CAST(COALESCE(SUM(`usage`), 0) AS SIGNED) AS `usage`
FROM aggregated_batch_resources_v2
WHERE base_t.id = aggregated_batch_resources_v2.batch_id
GROUP BY batch_id, resource_id
) AS usage_t
LEFT JOIN resources ON usage_t.resource_id = resources.resource_id
GROUP BY batch_id
) AS cost_t ON TRUE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Food for thought for a possible clean up PR: The fact that we repeat this query in a few places makes me wonder if we're better off using two DB queries in a transaction. I suspect the DB round-trip-latency is far from our main bottleneck and it makes it easier to understand the code if we had something like:

def batch_cost_info(tx: Transaction, batch_id):
    return tx.select_and_fetchone(...)

That way we can abstract the cost calculation out from the query code or the UI code.

Copy link
Contributor Author

@jigold jigold Aug 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These queries are a mess anyways. They need to be rewritten to use lateral joins without the WITH base_t anyways. I was trying not to do that in this PR. The issue why the queries needed to be changed more drastically is because doing JSON_OBJECTAGG() the way we had it before was returning {null: ...} if there were no records which was causing lots of downstream problems.

Should I go ahead and simplify the queries to what is optimal here in this PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, I was just noodling on how we could make this more readable for the future. let's keep this PR simple and to the point. A separate PR to explore alternative organization of the SQL seems fine.

ORDER BY id DESC;
'''

Expand Down Expand Up @@ -187,16 +190,18 @@ def parse_batch_jobs_query_v1(batch_id: int, q: str, last_job_id: Optional[int])
WHERE {' AND '.join(where_conditions)}
LIMIT 50
)
SELECT base_t.*, COALESCE(SUM(`usage` * rate), 0) AS cost
SELECT base_t.*, cost_t.cost, cost_t.cost_breakdown
FROM base_t
LEFT JOIN (
SELECT aggregated_job_resources_v2.batch_id, aggregated_job_resources_v2.job_id, resource_id, CAST(COALESCE(SUM(`usage`), 0) AS SIGNED) AS `usage`
FROM base_t
LEFT JOIN aggregated_job_resources_v2 ON base_t.batch_id = aggregated_job_resources_v2.batch_id AND base_t.job_id = aggregated_job_resources_v2.job_id
LEFT JOIN LATERAL (
SELECT COALESCE(SUM(`usage` * rate), 0) AS cost, JSON_OBJECTAGG(resources.resource, COALESCE(`usage` * rate, 0)) AS cost_breakdown
FROM (SELECT aggregated_job_resources_v2.batch_id, aggregated_job_resources_v2.job_id, resource_id, CAST(COALESCE(SUM(`usage`), 0) AS SIGNED) AS `usage`
FROM aggregated_job_resources_v2
WHERE aggregated_job_resources_v2.batch_id = base_t.batch_id AND aggregated_job_resources_v2.job_id = base_t.job_id
GROUP BY aggregated_job_resources_v2.batch_id, aggregated_job_resources_v2.job_id, aggregated_job_resources_v2.resource_id
) AS usage_t ON base_t.batch_id = usage_t.batch_id AND base_t.job_id = usage_t.job_id
) AS usage_t
LEFT JOIN resources ON usage_t.resource_id = resources.resource_id
GROUP BY base_t.batch_id, base_t.job_id;
GROUP BY usage_t.batch_id, usage_t.job_id
) AS cost_t ON TRUE;
'''

return (sql, where_args)
9 changes: 5 additions & 4 deletions batch/batch/front_end/query/query_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,14 @@ def parse_list_batches_query_v2(user: str, q: str, last_batch_id: Optional[int])
batches_n_jobs_in_complete_states.n_succeeded,
batches_n_jobs_in_complete_states.n_failed,
batches_n_jobs_in_complete_states.n_cancelled,
cost_t.cost
cost_t.cost, cost_t.cost_breakdown
FROM batches
LEFT JOIN billing_projects ON batches.billing_project = billing_projects.name
LEFT JOIN batches_n_jobs_in_complete_states ON batches.id = batches_n_jobs_in_complete_states.id
LEFT JOIN batches_cancelled ON batches.id = batches_cancelled.id
STRAIGHT_JOIN billing_project_users ON batches.billing_project = billing_project_users.billing_project
LEFT JOIN LATERAL (
SELECT COALESCE(SUM(`usage` * rate), 0) AS cost
SELECT COALESCE(SUM(`usage` * rate), 0) AS cost, JSON_OBJECTAGG(resources.resource, COALESCE(`usage` * rate, 0)) AS cost_breakdown
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great.

FROM (
SELECT batch_id, resource_id, CAST(COALESCE(SUM(`usage`), 0) AS SIGNED) AS `usage`
FROM aggregated_batch_resources_v2
Expand Down Expand Up @@ -269,7 +269,8 @@ def parse_batch_jobs_query_v2(batch_id: int, q: str, last_job_id: Optional[int])
attempts_table_join_str = ''

sql = f'''
SELECT jobs.*, batches.user, batches.billing_project, batches.format_version, job_attributes.value AS name, cost_t.cost
SELECT jobs.*, batches.user, batches.billing_project, batches.format_version, job_attributes.value AS name, cost_t.cost,
cost_t.cost_breakdown
FROM jobs
INNER JOIN batches ON jobs.batch_id = batches.id
INNER JOIN batch_updates ON jobs.batch_id = batch_updates.batch_id AND jobs.update_id = batch_updates.update_id
Expand All @@ -279,7 +280,7 @@ def parse_batch_jobs_query_v2(batch_id: int, q: str, last_job_id: Optional[int])
job_attributes.`key` = 'name'
{attempts_table_join_str}
LEFT JOIN LATERAL (
SELECT COALESCE(SUM(`usage` * rate), 0) AS cost
SELECT COALESCE(SUM(`usage` * rate), 0) AS cost, JSON_OBJECTAGG(resources.resource, COALESCE(`usage` * rate, 0)) AS cost_breakdown
FROM (SELECT aggregated_job_resources_v2.batch_id, aggregated_job_resources_v2.job_id, resource_id, CAST(COALESCE(SUM(`usage`), 0) AS SIGNED) AS `usage`
FROM aggregated_job_resources_v2
WHERE aggregated_job_resources_v2.batch_id = jobs.batch_id AND aggregated_job_resources_v2.job_id = jobs.job_id
Expand Down
22 changes: 22 additions & 0 deletions batch/batch/front_end/templates/batch.html
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,28 @@ <h2>Attributes</h2>
{% endfor %}
{% endif %}

<h2>Cost Breakdown</h2>
{% if batch['cost_breakdown'] %}
<table class="data-table">
<thead>
<tr>
<th>Resource</th>
<th>Cost</th>
</tr>
</thead>
<tbody>
{% for resource_cost in batch['cost_breakdown'] %}
<tr>
<td>{{ resource_cost['resource'] }}</td>
<td>{{ resource_cost['cost'] }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p>No accrued costs</p>
{% endif %}

<h2>Jobs</h2>
<div class="flex-col">
{{ table_search("job-search", base_path ~ "/batches/" ~ batch["id"]) }}
Expand Down
22 changes: 22 additions & 0 deletions batch/batch/front_end/templates/job.html
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,28 @@ <h2>Properties</h2>
<li>Cost: {% if 'cost' in job and job['cost'] is not none %}{{ job['cost'] }}{% endif %}</li>
</ul>

<h2>Cost Breakdown</h2>
{% if job['cost_breakdown'] %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What causes this to be None? It seems like the DB queries always return it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's None if the job hasn't started running yet. Same with batches with no jobs running yet.

<table class="data-table">
<thead>
<tr>
<th>Resource</th>
<th>Cost</th>
</tr>
</thead>
<tbody>
{% for resource_cost in job['cost_breakdown'] %}
<tr>
<td>{{ resource_cost['resource'] }}</td>
<td>{{ resource_cost['cost'] }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p>No accrued costs</p>
{% endif %}

<h2>Attempts</h2>
{% if attempts %}
<table class="data-table">
Expand Down
1 change: 1 addition & 0 deletions batch/test/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ async def callback(request):
callback_body.pop('time_closed')
callback_body.pop('time_completed')
callback_body.pop('duration')
callback_body.pop('cost_breakdown')
assert callback_body == {
'id': b.id,
'user': 'test',
Expand Down
4 changes: 4 additions & 0 deletions hail/python/hailtop/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ def first_extant_file(*files: Optional[str]) -> Optional[str]:
def cost_str(cost: Optional[int]) -> Optional[str]:
if cost is None:
return None
if cost == 0.0:
return '$0.0000'
if cost < 0.0001:
return '<$0.0001'
return f'${cost:.4f}'


Expand Down