Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move processing time to qos #142

Merged
merged 17 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 64 additions & 1 deletion cads_broker/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
"DISMISSED_MESSAGE", "The request has been dismissed by the system."
)

QOS_FUNCTIONS_CACHE: dict[str, dict[str, Any]] = dict()


class NoResultFound(Exception):
pass
Expand Down Expand Up @@ -379,7 +381,7 @@ def reset_qos_rules(session: sa.orm.Session, qos):
for request in get_running_requests(session):
# Recompute the limits
# It just updates the database. Internal qos is already updated.
limits = qos.limits_for(request, session)
limits = qos.limits_for(request)
_, rules = delete_request_qos_status(
request_uid=request.request_uid,
rules=limits,
Expand Down Expand Up @@ -493,6 +495,67 @@ def get_users_queue_from_processing_time(
return queueing_user_costs | running_user_costs


def users_last_finished_at(
session: sa.orm.Session, after: datetime.datetime
) -> dict[str, datetime.datetime]:
"""Return the last completed request for each user."""
statement = (
sa.select(SystemRequest.user_uid, sa.func.max(SystemRequest.finished_at))
.where(SystemRequest.finished_at > after)
.group_by(SystemRequest.user_uid)
)
return dict(session.execute(statement).all())


def user_last_completed_request(
session: sa.orm.Session, user_uid: str, interval: int
) -> int:
global QOS_FUNCTIONS_CACHE
after_datetime = datetime.datetime.now() - datetime.timedelta(seconds=interval)

if QOS_FUNCTIONS_CACHE.get("users_last_finished_at") is not None:
users_last_finished_at_datetime = QOS_FUNCTIONS_CACHE.get(
"users_last_finished_at"
)
else:
users_last_finished_at_datetime = users_last_finished_at(
session, after_datetime
)
QOS_FUNCTIONS_CACHE["users_last_finished_at"] = users_last_finished_at_datetime

user_last_finished_at_datetime = users_last_finished_at_datetime.get(
user_uid, after_datetime
)

value = int(
(
datetime.datetime.now()
- max(user_last_finished_at_datetime, after_datetime)
).total_seconds()
)
return value


def user_resource_used(
user_uid: str,
session: sa.orm.Session,
interval: int,
) -> int:
"""Return the amount of resource used by a user."""
global QOS_FUNCTIONS_CACHE
if QOS_FUNCTIONS_CACHE.get("users_resources") is not None:
users_resources = QOS_FUNCTIONS_CACHE.get("users_resources")
else:
users_resources = get_users_queue_from_processing_time(
session=session,
interval_stop=datetime.datetime.now(),
interval=datetime.timedelta(hours=interval / 60 / 60),
)
QOS_FUNCTIONS_CACHE["users_resources"] = users_resources

return users_resources.get(user_uid, 0)


def get_stuck_requests(session: sa.orm.Session, minutes: int = 15) -> list[str]:
"""Get all running requests that are not assigned to any worker."""
subquery = (
Expand Down
109 changes: 27 additions & 82 deletions cads_broker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ def set_request_error_status(
if queued_request:
self.queue.add(request_uid, request)
self.qos.notify_end_of_request(
request, session, scheduler=self.internal_scheduler
request, scheduler=self.internal_scheduler
)
else:
request = db.set_request_status(
Expand All @@ -387,13 +387,11 @@ def manage_dismissed_request(self, request, session):
else:
request.status = "deleted"
if previous_status == "running":
self.qos.notify_end_of_request(
request, session, scheduler=self.internal_scheduler
)
self.qos.notify_end_of_request(request, scheduler=self.internal_scheduler)
elif previous_status == "accepted":
self.queue.pop(request.request_uid, None)
self.qos.notify_dismission_of_request(
request, session, scheduler=self.internal_scheduler
request, scheduler=self.internal_scheduler
)
# set finished_at if it is not set
if request.finished_at is None:
Expand Down Expand Up @@ -461,7 +459,7 @@ def sync_database(self, session: sa.orm.Session) -> None:
# notify the qos only if the request has been set to successful or failed here.
if finished_request:
self.qos.notify_end_of_request(
finished_request, session, scheduler=self.internal_scheduler
finished_request, scheduler=self.internal_scheduler
)
logger.info(
"job has finished",
Expand All @@ -481,7 +479,7 @@ def sync_database(self, session: sa.orm.Session) -> None:
)
if successful_request:
self.qos.notify_end_of_request(
request, session, scheduler=self.internal_scheduler
request, scheduler=self.internal_scheduler
)
logger.info(
"job has finished",
Expand All @@ -503,7 +501,7 @@ def sync_database(self, session: sa.orm.Session) -> None:
if queued_request:
self.queue.add(queued_request.request_uid, request)
self.qos.notify_end_of_request(
request, session, scheduler=self.internal_scheduler
request, scheduler=self.internal_scheduler
)
else:
db.set_request_status(
Expand All @@ -514,7 +512,7 @@ def sync_database(self, session: sa.orm.Session) -> None:
session=session,
)
self.qos.notify_end_of_request(
request, session, scheduler=self.internal_scheduler
request, scheduler=self.internal_scheduler
)
logger.info("job has finished", **db.logger_kwargs(request=request))

Expand Down Expand Up @@ -602,7 +600,7 @@ def on_future_done(self, future: distributed.Future) -> str:
# self.futures.pop(future.key, None)
if request:
self.qos.notify_end_of_request(
request, session, scheduler=self.internal_scheduler
request, scheduler=self.internal_scheduler
)
logger.info(
"job has finished",
Expand All @@ -618,7 +616,7 @@ def cache_requests_qos_properties(self, requests, session: sa.orm.Session) -> No
# copy list of requests to avoid RuntimeError: dictionary changed size during iteration
for request in list(requests):
try:
self.qos._properties(request, check_permissions=True, session=session)
self.qos._properties(request, check_permissions=True)
except PermissionError as exception:
db.add_event(
event_type="user_visible_error",
Expand All @@ -635,87 +633,34 @@ def cache_requests_qos_properties(self, requests, session: sa.orm.Session) -> No
}
self.queue.pop(request.request_uid, None)
self.qos.notify_dismission_of_request(
request, session, scheduler=self.internal_scheduler
request, scheduler=self.internal_scheduler
)
logger.info("job has finished", **db.logger_kwargs(request=request))
session.commit()

def processing_time_priority_algorithm(
@perf_logger
def submit_requests(
self,
session_write: sa.orm.Session,
number_of_requests: int,
candidates: Iterable[db.SystemRequest],
) -> None:
"""Check the qos rules and submit the requests to the dask scheduler."""
user_requests: dict[str, list[db.SystemRequest]] = {}
for request in candidates:
user_requests.setdefault(request.user_uid, []).append(request)
# FIXME: this is a temporary solution to prioritize subrequests from the high priority user
interval_stop = datetime.datetime.now()
# temporary solution to prioritize high priority user
users_queue = db.get_users_queue_from_processing_time(
interval_stop=interval_stop,
session=session_write,
interval=ONE_HOUR * CONFIG.broker_priority_interval_hours,
queue = sorted(
candidates,
key=lambda candidate: self.qos.priority(candidate),
reverse=True,
)
requests_counter: int = 0
users_queue = {
k: v
for k, v in sorted(
users_queue.items(),
key=lambda user_cost: self.qos.user_priority(
user_uid=user_cost[0], priority_cost=user_cost[1]
),
)
}
for user_uid, user_cost in users_queue.items():
may_run: bool = True
if user_uid not in user_requests:
continue
requests = sorted(
user_requests[user_uid],
key=lambda candidate: self.qos.priority(candidate, session_write),
reverse=True,
)
for request in requests:
# need to check the limits on each request to update the qos_rules table
can_run = self.qos.can_run(
request, session=session_write, scheduler=self.internal_scheduler
)
if can_run and may_run and requests_counter < number_of_requests:
requests_counter = 0
for request in queue:
if self.qos.can_run(request, scheduler=self.internal_scheduler):
if requests_counter <= int(number_of_requests):
self.submit_request(
request, priority=user_cost, session=session_write
request,
session=session_write,
priority=self.qos.priority(request),
)
may_run = False
requests_counter += 1

@perf_logger
def submit_requests(
self,
session_write: sa.orm.Session,
number_of_requests: int,
candidates: Iterable[db.SystemRequest],
) -> None:
"""Check the qos rules and submit the requests to the dask scheduler."""
if CONFIG.broker_priority_algorithm == "processing_time":
logger.info("priority algorithm", algorithm="processing_time")
self.processing_time_priority_algorithm(
session_write, number_of_requests, candidates
)
else:
queue = sorted(
candidates,
key=lambda candidate: self.qos.priority(candidate, session_write),
reverse=True,
)
requests_counter = 0
for request in queue:
if self.qos.can_run(
request, session=session_write, scheduler=self.internal_scheduler
):
if requests_counter <= int(number_of_requests):
self.submit_request(request, session=session_write)
requests_counter += 1
requests_counter += 1

def submit_request(
self,
Expand All @@ -727,9 +672,7 @@ def submit_request(
request = db.set_request_status(
request_uid=request.request_uid, status="running", session=session
)
self.qos.notify_start_of_request(
request, session, scheduler=self.internal_scheduler
)
self.qos.notify_start_of_request(request, scheduler=self.internal_scheduler)
self.queue.pop(request.request_uid)
future = self.client.submit(
worker.submit_workflow,
Expand All @@ -756,6 +699,8 @@ def run(self) -> None:
"""Run the broker loop."""
while True:
start_loop = time.perf_counter()
# reset the cache of the qos functions
db.QOS_FUNCTIONS_CACHE.clear()
with self.session_maker_read() as session_read:
if (rules_hash := get_rules_hash(self.qos.path)) != self.qos.rules_hash:
logger.info("reloading qos rules")
Expand Down
16 changes: 8 additions & 8 deletions cads_broker/expressions/RulesParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ def parse_number(self):
if self.peek(True) == ".":
s += self.next()
c = self.next()
if not str.isdigit(self, c):
if not str.isdigit(c):
raise ParserError(
"parseNumber invalid '{c}'",
self.line + 1,
)

s += c
while str.isdigit(self, self.peek(True)):
while str.isdigit(self.peek(True)):
s += self.next()

c = self.peek(True)
Expand All @@ -79,14 +79,14 @@ def parse_number(self):
s += c
c = self.next()

if not str.isdigit(self, c):
if not str.isdigit(c):
raise ParserError(
f"parseNumber invalid '{c}'",
self.line + 1,
)

s += c
while str.isdigit(self, self.peek()):
while str.isdigit(self.peek()):
s += self.next()

try:
Expand Down Expand Up @@ -312,13 +312,13 @@ def parse_priority(self, rules, environment):

rules.add_priority(environment, info, condition, conclusion)

def parse_user_priority(self, rules, environment):
def parse_dynamic_priority(self, rules, environment):
info = self.parse_string()
condition = self.parse_expression()
self.consume(":")
conclusion = self.parse_expression()

rules.add_user_priority(environment, info, condition, conclusion)
rules.add_dynamic_priority(environment, info, condition, conclusion)

def parse_definition(self, rules):
self.peek()
Expand Down Expand Up @@ -395,8 +395,8 @@ def parse_rules(self, rules, environment, raise_exception=True):
self.parse_user_limit(rules, environment)
continue

if ident == "user_priority":
self.parse_user_priority(rules, environment)
if ident == "dynamic_priority":
self.parse_dynamic_priority(rules, environment)
continue

if ident == "define":
Expand Down
21 changes: 21 additions & 0 deletions cads_broker/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,27 @@ def register_functions():
session=context.environment.session,
),
)
expressions.FunctionFactory.FunctionFactory.register_function(
"user_resource_used",
lambda context, interval=24 * 60 * 60: database.user_resource_used(
user_uid=context.request.user_uid,
interval=interval,
session=context.environment.session,
),
)
expressions.FunctionFactory.FunctionFactory.register_function(
"user_last_completed_request",
lambda context, interval=24 * 60 * 60: database.user_last_completed_request(
user_uid=context.request.user_uid,
interval=interval,
session=context.environment.session,
),
)
expressions.FunctionFactory.FunctionFactory.register_function(
"request_age",
lambda context: context.request.age
)

expressions.FunctionFactory.FunctionFactory.register_function("tagged", tagged)
expressions.FunctionFactory.FunctionFactory.register_function(
"request_contains_all", request_contains_all
Expand Down
1 change: 1 addition & 0 deletions cads_broker/qos/Properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ def __init__(self):
self.starting_priority = 0
self.limits = []
self.priorities = []
self.dynamic_priorities = []
self.permissions = []
Loading
Loading