Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][misc] improve free_finished_seq_groups #6865

Merged
merged 2 commits into from
Jul 30, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions vllm/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ def _schedule_swapped(
"Failing the request %s because there's not enough kv "
"cache blocks to run the entire sequence.",
seq_group.request_id)
self._finished_requests_ids.append(seq_group.request_id)
for seq in seq_group.get_seqs():
seq.status = SequenceStatus.FINISHED_IGNORED
infeasible_seq_groups.append(seq_group)
Expand Down Expand Up @@ -699,6 +700,7 @@ def _schedule_prefills(
logger.warning(
"Input prompt (%d tokens) is too long"
" and exceeds limit of %d", num_new_tokens, prompt_limit)
self._finished_requests_ids.append(seq_group.request_id)
WoosukKwon marked this conversation as resolved.
Show resolved Hide resolved
for seq in waiting_seqs:
seq.status = SequenceStatus.FINISHED_IGNORED
ignored_seq_groups.append(seq_group)
Expand All @@ -714,6 +716,7 @@ def _schedule_prefills(
"Input prompt (%d tokens) is too long"
" and exceeds the capacity of block_manager",
num_new_tokens)
self._finished_requests_ids.append(seq_group.request_id)
WoosukKwon marked this conversation as resolved.
Show resolved Hide resolved
for seq in waiting_seqs:
seq.status = SequenceStatus.FINISHED_IGNORED
ignored_seq_groups.append(seq_group)
Expand Down Expand Up @@ -1058,13 +1061,16 @@ def free_seq(self, seq: Sequence) -> None:
self.block_manager.free(seq)

def free_finished_seq_groups(self) -> None:
Copy link
Collaborator

@WoosukKwon WoosukKwon Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mzusman I actually don't get the current (before this PR) logic here.

If I understand correctly, the order of execution is schedule -> get_and_reset_finished_requests_ids -> model execution -> free_finished_seq_groups. Hence, the requests freed from self.swapped or self.waiting were not actually added to finished_requests_ids before the model execution. This means the finished_requests_ids holds stale information lagged by 1 step. Is my understanding correct?

Copy link
Collaborator

@WoosukKwon WoosukKwon Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, to my understanding,

  1. Actually finished_requests_ids doesn't have to include the requests rejected in self.waiting since they (ignored_seq_groups) are never passed into the model runner.
  2. finished_requests_ids must include the requests rejected in self.swapped in the current step, which means the current code (before this PR) has a bug. However, the bug has not been noticed because this case is very rare.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, That's correct, finished_requests_ids holds information that is lagged by 1 step. finished requests will always be released upon the next step.

AFAIU, self.waiting also includes preempted requests that got rescheduled, preempted requests did previously passed into the model runner and are already registered in the mamba cache. If those requests get aborted then we do add them to the finished_requests_ids and release them through here though.
BTW Just to be sure, by requests rejected, do you mean aborted requests?

So actually, yeah, checking for finished requests in the self.waiting and self.swapped in free_finished_seq_groups is not necessary.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mzusman Thanks for the explanation! However, it's a bit unclear to me how it handles the preemption, both recompuation and swapping. Could you elaborate more on that?

Copy link
Contributor

@mzusman mzusman Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WoosukKwon Sure! Upon recomputation we actually keep the Mamba cache as it is and do not evict it (since Mamba cache is quite small), the request id persists during the preemption therefore we can still use it's corresponding cache upon recomputation.
RE swapping - We do not handle swapping at the moment as it occurs fairly rarely and it's quite complicated to implement it for the Mamba cache atm..

Therefore there's no reason to run through self.swapped in search for finished requests.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mzusman Thanks for the detailed explanation! It is super helpful to understand the implementation.

I think we should revisit this design decision in the near future. Currently, it's a bit confusing for code readers.

for queue in [self.running, self.swapped, self.waiting]:
mgoin marked this conversation as resolved.
Show resolved Hide resolved
self._finished_requests_ids += [
seq_group.request_id for seq_group in queue
if seq_group.is_finished()
]
self.running = deque(seq_group for seq_group in self.running
if not seq_group.is_finished())
# finished requests in self.waiting and self.swapped are already
# appended to self._finished_requests_ids during the scheduling.
# the only new finished requests are in self.running.
remaining: Deque[SequenceGroup] = deque()
for seq_group in self.running:
if seq_group.is_finished():
self._finished_requests_ids.append(seq_group.request_id)
else:
remaining.append(seq_group)
self.running = remaining
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@youkaichao how about wrapping this in

 if any(seq_group.is_finished() for seq_group in self.running):

to avoid rebuilding the deque every time, since it requests will be finished relatively rarely.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to refactor this into dictionary in the future, so that we can easily delete requests.


def _allocate_and_set_running(self, seq_group: SequenceGroup) -> None:
self.block_manager.allocate(seq_group)
Expand Down
Loading