Skip to content

Commit

Permalink
[core] simplify seq group code (vllm-project#9569)
Browse files Browse the repository at this point in the history
Co-authored-by: Zhuohan Li <zhuohan123@gmail.com>
Signed-off-by: Maxime Fournioux <55544262+mfournioux@users.noreply.github.com>
  • Loading branch information
2 people authored and mfournioux committed Nov 20, 2024
1 parent 2b37edd commit 92b3b9b
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 566 deletions.
153 changes: 0 additions & 153 deletions tests/core/test_chunked_prefill_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import pytest # noqa

from vllm.config import CacheConfig, SchedulerConfig
from vllm.core.interfaces import AllocStatus
from vllm.core.scheduler import Scheduler
from vllm.sequence import Logprob, SequenceGroup

Expand Down Expand Up @@ -347,158 +346,6 @@ def test_prompt_limit_exceed():
assert out.ignored_seq_groups[0] == seq_group


def test_swap():
"""Verify swapping works with chunked prefill requests"""
block_size = 4
max_seqs = 30
max_model_len = 200
max_num_batched_tokens = 30
scheduler_config = SchedulerConfig(
"generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 16
cache_config.num_gpu_blocks = 16
scheduler = Scheduler(scheduler_config, cache_config, None)

_, seq_group = create_dummy_prompt("1",
prompt_length=60,
best_of=2,
block_size=block_size)
scheduler.add_seq_group(seq_group)
_, out = schedule_and_update_computed_tokens(scheduler)
# The request is chunked.
# prefill scheduled now.
assert len(out.scheduled_seq_groups) == 1
assert out.num_prefill_groups == 1
assert seq_group.is_prefill()
assert out.num_batched_tokens == max_num_batched_tokens

# The last request should be swapped out.
scheduler.block_manager.can_append_slots = MagicMock()

def cannot_append_second_group(seq_group, num_lookahead_slots):
return seq_group.request_id != "1"

scheduler.block_manager.can_append_slots.side_effect = (
cannot_append_second_group)

# The running prefill is now swapped.
_, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 0
assert out.num_batched_tokens == 0
assert out.blocks_to_swap_out != []
assert out.blocks_to_swap_in == []

# Add 1 more task. Swap should be prioritized over new prefill.
_, seq_group = create_dummy_prompt("2", prompt_length=60)
scheduler.add_seq_group(seq_group)
_, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 1
# 3 decodes. It is swapped in.
assert out.num_batched_tokens == 30
assert out.blocks_to_swap_in != []
assert out.blocks_to_swap_out == []


def test_running_prefill_prioritized_over_swap():
block_size = 4
max_seqs = 30
max_model_len = 200
max_num_batched_tokens = 30
scheduler_config = SchedulerConfig(
"generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 32
cache_config.num_gpu_blocks = 32
scheduler = Scheduler(scheduler_config, cache_config, None)

_, seq_group = create_dummy_prompt("1",
prompt_length=60,
best_of=2,
block_size=block_size)
scheduler.add_seq_group(seq_group)
_, out = schedule_and_update_computed_tokens(scheduler)
# The request is chunked.
# prefill scheduled now.
assert len(out.scheduled_seq_groups) == 1
assert out.num_prefill_groups == 1
assert seq_group.is_prefill()
assert out.num_batched_tokens == max_num_batched_tokens

# The request should be swapped out.
scheduler.block_manager.can_append_slots = MagicMock()

def cannot_append_second_group(seq_group, num_lookahead_slots):
return seq_group.request_id != "1"

scheduler.block_manager.can_append_slots.side_effect = (
cannot_append_second_group)

# The running prefill is now swapped.
_, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 0
assert out.num_batched_tokens == 0
assert out.blocks_to_swap_out != []
assert out.blocks_to_swap_in == []

# Add 1 more task. Swap is not possible, so prefill is running.
scheduler.block_manager.can_swap_in = MagicMock()
scheduler.block_manager.can_swap_in.return_value = AllocStatus.LATER

_, seq_group2 = create_dummy_prompt("2",
prompt_length=60,
block_size=block_size)
scheduler.add_seq_group(seq_group2)
_, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 1
# 3 decodes. It is swapped in.
assert out.num_batched_tokens == 30
assert out.blocks_to_swap_in == []
assert out.blocks_to_swap_out == []
assert out.scheduled_seq_groups[0].seq_group == seq_group2

# Now although swap is possible, running prefill is prioritized.
scheduler.block_manager.can_swap_in.return_value = AllocStatus.OK
_, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 1
# 3 decodes. It is swapped in.
assert out.num_batched_tokens == 30
assert out.blocks_to_swap_in == []
assert out.blocks_to_swap_out == []
assert not seq_group2.is_prefill()
assert out.scheduled_seq_groups[0].seq_group == seq_group2
append_new_token(seq_group2, 1)

# Decoding is prioritized.
_, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 1
# 3 decodes. It is swapped in.
assert out.num_batched_tokens == 1
assert out.blocks_to_swap_in == []
assert out.blocks_to_swap_out == []
assert not seq_group2.is_prefill()
assert out.scheduled_seq_groups[0].seq_group == seq_group2
append_new_token(seq_group2, 1)

# Since we abort the sequence group, we can finally swap.
scheduler.abort_seq_group(seq_group2.request_id)
_, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 1
assert out.num_batched_tokens == 30
assert out.blocks_to_swap_in != []
assert out.blocks_to_swap_out == []


def test_chunked_prefill_preempt():
"""Verify preempt works with chunked prefill requests"""
block_size = 4
Expand Down
204 changes: 1 addition & 203 deletions tests/core/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from vllm.core.interfaces import AllocStatus
from vllm.core.scheduler import Scheduler, SchedulingBudget
from vllm.lora.request import LoRARequest
from vllm.sequence import SequenceGroup, SequenceStatus
from vllm.sequence import SequenceGroup

from .utils import (append_new_token, append_new_token_seq_group,
create_dummy_prompt, get_sequence_groups,
Expand Down Expand Up @@ -296,55 +296,6 @@ def test_scheduler_delay_factor():
append_new_token(out, 1)


def test_swapped_out_prioritized():
block_size = 4
scheduler = initialize_scheduler(max_num_seqs=6,
block_size=block_size,
num_cpu_blocks=64,
num_gpu_blocks=64)
# best_of=2 * 3 == 6 sequences.
for i in range(3):
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
best_of=2,
block_size=block_size)
scheduler.add_seq_group(seq_group)
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
# prefill scheduled now.
assert len(out.scheduled_seq_groups) == 3
append_new_token(out, 1)

# The last request should be swapped out.
scheduler.block_manager.can_append_slots = MagicMock()

def cannot_append_second_group(seq_group, num_lookahead_slots):
return seq_group.request_id != "2"

scheduler.block_manager.can_append_slots.side_effect = (
cannot_append_second_group)

seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 2
assert out.num_batched_tokens == 2
assert out.blocks_to_swap_out != []
assert out.blocks_to_swap_in == []
append_new_token(out, 1)

# Add 1 more task. Swap should be prioritized over prefill.
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
best_of=2,
block_size=block_size)
scheduler.add_seq_group(seq_group)
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
append_new_token(out, 1)
assert len(out.scheduled_seq_groups) == 3
# 3 decodes. It is swapped in.
assert out.num_batched_tokens == 3
assert out.blocks_to_swap_in != []
assert out.blocks_to_swap_out == []


def initialize_scheduler(
*,
max_num_seqs=1000,
Expand Down Expand Up @@ -646,60 +597,6 @@ def cannot_append_second_group(seq_group, num_lookahead_slots):
assert output.blocks_to_copy == []


def test_decode_swap_beam_search():
"""
Test best_of > 1 swap out blocks
"""
block_size = 4
scheduler = initialize_scheduler(block_size=block_size,
num_gpu_blocks=64,
num_cpu_blocks=64)
curr_loras = None
budget = create_token_budget()
for i in range(3):
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
best_of=2,
block_size=block_size)
scheduler._allocate_and_set_running(seq_group)
scheduler._add_seq_group_to_running(seq_group)
append_new_token_seq_group(60, seq_group, 1)
budget.add_num_seqs(seq_group.request_id,
seq_group.get_max_num_running_seqs())
budget.add_num_batched_tokens(
seq_group.request_id, seq_group.num_seqs(SequenceStatus.RUNNING))

# The last request should be swapped out.
scheduler.block_manager.can_append_slots = MagicMock()

def cannot_append_second_group(seq_group, num_lookahead_slots):
return seq_group.request_id != "2"

scheduler.block_manager.can_append_slots.side_effect = (
cannot_append_second_group)
scheduler.block_manager.swap_out = MagicMock()
expected_swap_mapping = [("5", "7")]
scheduler.block_manager.swap_out.return_value = expected_swap_mapping

output = scheduler._schedule_running(budget, curr_loras)
remainig_running = scheduler.running
assert len(remainig_running) == 0
assert len(output.decode_seq_groups) == 2
assert len(output.prefill_seq_groups) == 0
assert output.decode_seq_groups[0].seq_group.request_id == "0"
assert output.decode_seq_groups[1].seq_group.request_id == "1"
assert len(output.preempted) == 0
assert len(output.swapped_out) == 1
# Budget should refledct preempted requests.
assert budget.num_batched_tokens == 2
# since there are 2 sequences, 2 should be subtracted.
assert budget.num_curr_seqs == 4
# Both should be preempted, not swapped.
assert output.blocks_to_swap_out == expected_swap_mapping
# Nothing is copied.
assert output.blocks_to_copy == []


def test_schedule_decode_blocks_to_copy_update():
"""
Verify blocks_to_copy is updated.
Expand Down Expand Up @@ -736,105 +633,6 @@ def test_schedule_decode_blocks_to_copy_update():
assert output.blocks_to_copy == [(2, 3)]


def test_schedule_swapped_simple():
block_size = 4
scheduler = initialize_scheduler(block_size=block_size)
curr_loras = None
blocks_to_swap_out: List[Tuple[int, int]] = []
_, seq_group = create_dummy_prompt("1",
prompt_length=4,
best_of=2,
block_size=block_size)
scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(4, seq_group, 1)
scheduler._swap_out(seq_group, blocks_to_swap_out)
scheduler._add_seq_group_to_swapped(seq_group)

budget = create_token_budget()
output = scheduler._schedule_swapped(budget, curr_loras)
remaining_swapped = scheduler.swapped
assert len(remaining_swapped) == 0
assert budget.num_batched_tokens == 1
assert budget.num_curr_seqs == 2
assert len(output.decode_seq_groups) == 1
assert len(output.prefill_seq_groups) == 0
# swap in is the reverse of swap out
blocks_to_swap_in_reverse = []
for swapin, swapout in output.blocks_to_swap_in:
blocks_to_swap_in_reverse.append((swapout, swapin))
assert blocks_to_swap_out == blocks_to_swap_in_reverse


def test_schedule_swapped_max_token_budget():
block_size = 4
scheduler = initialize_scheduler(block_size=block_size,
num_cpu_blocks=32,
num_gpu_blocks=32)
curr_loras = None
blocks_to_swap_out: List[Tuple[int, int]] = []
for i in range(2):
_, seq_group = create_dummy_prompt(str(i), prompt_length=60, best_of=2)
scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(60, seq_group, 1)
scheduler._swap_out(seq_group, blocks_to_swap_out)
scheduler._add_seq_group_to_swapped(seq_group)

budget = create_token_budget(token_budget=1)
output = scheduler._schedule_swapped(budget, curr_loras)
remaining_swapped = scheduler.swapped
assert len(remaining_swapped) == 1
assert budget.num_batched_tokens == 1
assert budget.num_curr_seqs == 2
assert len(output.decode_seq_groups) == 1
assert len(output.prefill_seq_groups) == 0

# Verify num_batched_tokens are respected.
budget = create_token_budget(token_budget=1)
add_token_budget(budget, 1, 0)
output = scheduler._schedule_swapped(budget, curr_loras)
remaining_swapped = scheduler.swapped
assert len(remaining_swapped) == 1
assert budget.num_batched_tokens == 1
assert budget.num_curr_seqs == 0
assert len(output.decode_seq_groups) == 0
assert len(output.prefill_seq_groups) == 0


def test_schedule_swapped_max_seqs():
block_size = 4
scheduler = initialize_scheduler(block_size=block_size,
num_cpu_blocks=64,
num_gpu_blocks=64)
curr_loras = None
blocks_to_swap_out: List[Tuple[int, int]] = []
for i in range(4):
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
block_size=4)
scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(60, seq_group, 1)
scheduler._swap_out(seq_group, blocks_to_swap_out)
scheduler._add_seq_group_to_swapped(seq_group)

budget = create_token_budget(max_num_seqs=2)
output = scheduler._schedule_swapped(budget, curr_loras)
remaining_swapped = scheduler.swapped
assert len(remaining_swapped) == 2
assert budget.num_batched_tokens == 2
assert budget.num_curr_seqs == 2
assert len(output.decode_seq_groups) == 2
assert len(output.prefill_seq_groups) == 0

# Verify num_curr_seqs are respected.
output = scheduler._schedule_swapped(budget, curr_loras)
remaining_swapped = scheduler.swapped
assert len(remaining_swapped) == 2
assert budget.num_batched_tokens == 2
assert budget.num_curr_seqs == 2
assert len(output.decode_seq_groups) == 0
assert len(output.prefill_seq_groups) == 0


def test_schedule_swapped_max_loras():
block_size = 4
lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)
Expand Down
Loading

0 comments on commit 92b3b9b

Please sign in to comment.