Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

On demand #1

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion vllm/model_executor/layers/fused_moe/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from vllm.model_executor.layers.fused_moe.layer import (FusedMoE,
FusedMoEMethodBase)
FusedMoEMethodBase,
MoeGpuBuffer,
DebugCudaEvent)
from vllm.triton_utils import HAS_TRITON

__all__ = [
"FusedMoE",
"FusedMoEMethodBase",
"MoeGpuBuffer",
"DebugCudaEvent",
]

if HAS_TRITON:
Expand Down
45 changes: 35 additions & 10 deletions vllm/model_executor/layers/fused_moe/fused_moe.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from vllm import _custom_ops as ops
from vllm.logger import init_logger
from vllm.platforms import current_platform
from vllm.model_executor.layers.fused_moe import DebugCudaEvent, MoeGpuBuffer

logger = init_logger(__name__)

Expand Down Expand Up @@ -207,7 +208,7 @@ def moe_align_block_size(
- First, flatten topk_ids to [2, 3, 4, 1, 2, 4, 1, 3, 4, 1, 2, 3].
- Then append padding tokens [12, 12, 12, 12] for each block.
- After sorting by expert index, we obtain token_ids
[3, 6, 9, 12, 0, 4, 10, 12, 1, 7, 11, 12, 2, 5, 8, 12].
[3, 6, 9, 12, 0, 4, 10, 12, 1, 7, 11, 12, 2, 5, 8, 12]. The token IDs are sorted by expert. So everything
Tokens 12 are non-existent (padding) and are ignored in
the subsequent matrix multiplication.
- The padding ensures that the total number of tokens is now divisible
Expand Down Expand Up @@ -456,8 +457,8 @@ def get_config_dtype_str(dtype: torch.dtype,


def fused_experts(hidden_states: torch.Tensor,
w1: torch.Tensor,
w2: torch.Tensor,
moe_gpu_buffer: MoeGpuBuffer,
moe_events: DebugCudaEvent,
topk_weights: torch.Tensor,
topk_ids: torch.Tensor,
inplace: bool = False,
Expand All @@ -468,9 +469,12 @@ def fused_experts(hidden_states: torch.Tensor,
w2_scale: Optional[torch.Tensor] = None,
a1_scale: Optional[torch.Tensor] = None,
a2_scale: Optional[torch.Tensor] = None):

# Check constraints.
w1 = moe_gpu_buffer.w13_gpu
w2 = moe_gpu_buffer.w2_gpu
assert hidden_states.shape[1] == w1.shape[2], "Hidden size mismatch"
assert topk_weights.shape == topk_ids.shape, "topk shape mismatch"
#assert topk_weights.shape == topk_ids.shape, "topk shape mismatch"
assert hidden_states.is_contiguous(), "Hidden_states must be contiguous"
assert w1.is_contiguous(), "Expert weights1 must be contiguous"
assert w2.is_contiguous(), "Expert weights2 must be contiguous"
Expand Down Expand Up @@ -543,6 +547,10 @@ def fused_experts(hidden_states: torch.Tensor,
sorted_token_ids, expert_ids, num_tokens_post_padded = (
moe_align_block_size(curr_topk_ids, config['BLOCK_SIZE_M'], E))

print(f"Top_k IDs: {curr_topk_ids}")
print(f"sorted_token_ids IDs: {sorted_token_ids}")
print(f"expert ids: {expert_ids}")

invoke_fused_moe_kernel(curr_hidden_states,
w1,
intermediate_cache1,
Expand Down Expand Up @@ -582,13 +590,19 @@ def fused_experts(hidden_states: torch.Tensor,
torch.sum(intermediate_cache3.view(*intermediate_cache3.shape),
dim=1,
out=out_hidden_states[begin_chunk_idx:end_chunk_idx])

# we could trigger events after the kernels are over, but i don't know how well the code works to determine
# how often it will loop
moe_events.mlp_w2_finished_event.record()

return out_hidden_states


def fused_moe(
hidden_states: torch.Tensor,
w1: torch.Tensor,
w2: torch.Tensor,
stream: torch.cuda.Stream,
gating_output: torch.Tensor,
topk: int,
renormalize: bool,
Expand Down Expand Up @@ -637,7 +651,7 @@ def fused_moe(
- torch.Tensor: The output tensor after applying the MoE layer.
"""
# Check constraints.
assert gating_output.shape[1] == w1.shape[0], "Number of experts mismatch"
#assert gating_output.shape[1] == w1.shape[0], "Number of experts mismatch"

if use_grouped_topk:
assert num_expert_group is not None and topk_group is not None
Expand All @@ -648,11 +662,22 @@ def fused_moe(
topk_weights, topk_ids = fused_topk(hidden_states, gating_output, topk,
renormalize)

return fused_experts(hidden_states,
w1,
w2,
topk_weights,
topk_ids,
unique_indices = torch.unique(topk_ids.flatten())

# This number needs to match the number of experts in the layer
# 64 for deepseek, 8 for Mixtral
if unique_indices.shape[0] != 8:
sorted_elements, _ = torch.sort(unique_indices)
rank_mapping = {elem.item(): rank for rank, elem in enumerate(sorted_elements)}
for old_value, new_value in rank_mapping.items():
topk_ids[topk_ids == old_value] = new_value

return fused_experts(hidden_states=hidden_states,
w1=w1,
w2=w2,
topk_weights=topk_weights,
topk_ids=topk_ids,
stream=stream,
inplace=inplace,
override_config=override_config,
use_fp8_w8a8=use_fp8_w8a8,
Expand Down
111 changes: 102 additions & 9 deletions vllm/model_executor/layers/fused_moe/layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,38 @@
from vllm.model_executor.layers.quantization.base_config import (
QuantizationConfig, QuantizeMethodBase)
from vllm.model_executor.utils import set_weight_attrs

import numpy as np
logger = init_logger(__name__)


class DebugCudaEvent:
def __init__(self, topk: int):
self._topk_decided_event = torch.cuda.Event()
self.mlp_w13_finished_event = torch.cuda.Event()
self.mlp_w2_finished_event = torch.cuda.Event()
self.experts = np.full((topk,), fill_value=0, dtype=np.int64)
self.is_first_layer = True
self.is_prefill = False

def reset_events(self):
self._topk_decided_event = torch.cuda.Event()
self.mlp_w13_finished_event = torch.cuda.Event()
self.mlp_w2_finished_event = torch.cuda.Event()

def triggerTopkEvent(self, experts: torch.Tensor):
self.experts = experts[0].cpu().numpy()
self._topk_decided_event.record()


class MoeGpuBuffer:
def __init__(self, w13_shape: tuple[int, int, int], w2_shape: tuple[int, int, int]):
assert w13_shape[0] == w2_shape[0], "Moe GPU buffers must have the same number of experts"
self.w13_gpu = torch.nn.Parameter(torch.zeros(w13_shape), requires_grad=False)
self.w2_gpu = torch.nn.Parameter(torch.zeros(w2_shape), requires_grad=False)
self.expert_ids: List[int] = []
self.load_predicted_experts_stream = torch.cuda.Stream()


class FusedMoEMethodBase(QuantizeMethodBase):

@abstractmethod
Expand All @@ -25,23 +53,31 @@ def create_weights(self, layer: torch.nn.Module, num_experts: int,

@abstractmethod
def apply(self, layer: torch.nn.Module, x: torch.Tensor,
router_logits: torch.Tensor, top_k: int, renormalize: bool,
router_logits: torch.Tensor, top_k: int, stream: torch.cuda.Stream,
w13_gpu: torch.nn.Parameter, w2_gpu: torch.nn.Parameter, w1_cpu:torch.nn.parameter, w2_cpu: torch.nn.Parameter, renormalize: bool,
use_grouped_topk: bool) -> torch.Tensor:
raise NotImplementedError


def find_invalid_indices(predicted_expert_ids: torch.Tensor, actual_expert_ids: torch.Tensor) -> List[int] :
mask = torch.isin(predicted_expert_ids, actual_expert_ids)
invalid_indices = torch.nonzero(~mask, as_tuple=True)[0]

return invalid_indices.tolist()


class UnquantizedFusedMoEMethod(FusedMoEMethodBase, CustomOp):
"""MoE method without quantization."""

def create_weights(self, layer: torch.nn.Module, num_experts: int,
hidden_size: int, intermediate_size: int,
params_dtype: torch.dtype, **extra_weight_attrs):

# Fused gate_up_proj (column parallel)
w13_weight = torch.nn.Parameter(torch.empty(num_experts,
2 * intermediate_size,
hidden_size,
dtype=params_dtype),
dtype=params_dtype,
device='cpu'),
requires_grad=False)
layer.register_parameter("w13_weight", w13_weight)
set_weight_attrs(w13_weight, extra_weight_attrs)
Expand All @@ -50,7 +86,8 @@ def create_weights(self, layer: torch.nn.Module, num_experts: int,
w2_weight = torch.nn.Parameter(torch.empty(num_experts,
hidden_size,
intermediate_size,
dtype=params_dtype),
dtype=params_dtype,
device='cpu'),
requires_grad=False)
layer.register_parameter("w2_weight", w2_weight)
set_weight_attrs(w2_weight, extra_weight_attrs)
Expand All @@ -62,21 +99,33 @@ def apply(self,
top_k: int,
renormalize: bool,
use_grouped_topk: bool,
moe_gpu_buffer: MoeGpuBuffer,
router_event: DebugCudaEvent,
w1_cpu: torch.nn.Parameter,
w2_cpu: torch.nn.Parameter,
topk_group: Optional[int] = None,
num_expert_group: Optional[int] = None) -> torch.Tensor:

return self.forward(x=x,
layer=layer,
router_logits=router_logits,
top_k=top_k,
moe_gpu_buffer=moe_gpu_buffer,
router_event=router_event,
w1_cpu=w1_cpu,
w2_cpu=w2_cpu,
renormalize=renormalize,
use_grouped_topk=use_grouped_topk,
topk_group=topk_group,
num_expert_group=num_expert_group)
num_expert_group=num_expert_group,)

def forward_cuda(self,
layer: torch.nn.Module,
x: torch.Tensor,
moe_gpu_buffer: MoeGpuBuffer,
router_event: DebugCudaEvent,
w1_cpu: torch.Tensor,
w2_cpu: torch.Tensor,
use_grouped_topk: bool,
top_k: int,
router_logits: torch.Tensor,
Expand All @@ -96,11 +145,47 @@ def forward_cuda(self,
topk_group=topk_group,
num_expert_group=num_expert_group)

# Handles the prefill stage. Could have issues if the prompt is only one token long

if topk_ids.shape[0] > 1:
return fused_experts(hidden_states=x,
moe_gpu_buffer=moe_gpu_buffer,
topk_weights=topk_weights,
topk_ids=topk_ids,
moe_events=router_event,
inplace=True)

if router_event.is_first_layer:
w1_cpu.pin_memory()
w2_cpu.pin_memory()
moe_gpu_buffer.w13_gpu[:top_k, :, :] = w1_cpu[topk_ids[0].tolist()].to('cuda', non_blocking=True)
moe_gpu_buffer.w2_gpu[:top_k, :, :] = w2_cpu[topk_ids[0].tolist()].to('cuda', non_blocking=True)
topk_ids[0] = torch.arange(0, top_k)

else:
router_event.triggerTopkEvent(topk_ids)
invalid_indices = find_invalid_indices(torch.tensor(moe_gpu_buffer.expert_ids).to('cuda'), topk_ids[0])
for index in range(len(topk_ids[0])):
actual_id = topk_ids[0][index]
if actual_id not in moe_gpu_buffer.expert_ids:
with torch.cuda.stream(moe_gpu_buffer.load_predicted_experts_stream):
replaced_index = invalid_indices.pop(0) # Pop the first value
w1_cpu.pin_memory()
w2_cpu.pin_memory()

moe_gpu_buffer.w13_gpu[replaced_index, :, :] = w1_cpu[actual_id].to('cuda', non_blocking=True)
moe_gpu_buffer.w2_gpu[replaced_index, :, :] = w2_cpu[actual_id].to('cuda', non_blocking=True)
moe_gpu_buffer.expert_ids[replaced_index] = actual_id
topk_ids[0][index] = replaced_index
else:
new_index = moe_gpu_buffer.expert_ids.index(actual_id)
topk_ids[0][index] = new_index

return fused_experts(hidden_states=x,
w1=layer.w13_weight,
w2=layer.w2_weight,
moe_gpu_buffer=moe_gpu_buffer,
topk_weights=topk_weights,
topk_ids=topk_ids,
moe_events=router_event,
inplace=True)

def forward_cpu(self, *args, **kwargs):
Expand Down Expand Up @@ -280,13 +365,21 @@ def select_experts(hidden_states: torch.Tensor,
return topk_weights, topk_ids

def forward(self, hidden_states: torch.Tensor,
router_logits: torch.Tensor):
router_logits: torch.Tensor,
moe_gpu_buffer: MoeGpuBuffer,
router_event: DebugCudaEvent,
w1_cpu: torch.nn.Parameter,
w2_cpu: torch.nn.Parameter):
assert self.quant_method is not None

# Matrix multiply.
final_hidden_states = self.quant_method.apply(
layer=self,
x=hidden_states,
moe_gpu_buffer=moe_gpu_buffer,
router_event=router_event,
w1_cpu=w1_cpu,
w2_cpu=w2_cpu,
router_logits=router_logits,
top_k=self.top_k,
renormalize=self.renormalize,
Expand Down
30 changes: 24 additions & 6 deletions vllm/model_executor/models/deepseek_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def __init__(
"Only silu is supported for now.")
self.act_fn = SiluAndMul()

def forward(self, x):
def forward(self, x, w13_gpu: torch.nn.Parameter = None, w2_gpu: torch.nn.Parameter = None):
gate_up, _ = self.gate_up_proj(x)
x = self.act_fn(gate_up)
x, _ = self.down_proj(x)
Expand Down Expand Up @@ -138,16 +138,25 @@ def __init__(
reduce_results=False,
)

def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
def forward(self, hidden_states: torch.Tensor,
w13_gpu: torch.nn.Parameter,
w2_gpu: torch.nn.Parameter) -> torch.Tensor:
num_tokens, hidden_dim = hidden_states.shape
hidden_states = hidden_states.view(-1, hidden_dim)
if self.n_shared_experts is not None:
shared_output = self.shared_experts(hidden_states)
# router_logits: (num_tokens, n_experts)
router_logits, _ = self.gate(hidden_states)

stream = None
final_hidden_states = self.experts(
hidden_states=hidden_states,
router_logits=router_logits) * self.routed_scaling_factor
router_logits=router_logits,
w13_gpu=w13_gpu,
w2_gpu=w2_gpu,
w1_cpu=self.experts.w13_weight,
w2_cpu=self.experts.w2_weight,
stream=stream) * self.routed_scaling_factor
if shared_output is not None:
final_hidden_states = final_hidden_states + shared_output
if self.tp_size > 1:
Expand Down Expand Up @@ -380,6 +389,8 @@ def forward(
kv_cache: torch.Tensor,
attn_metadata: AttentionMetadata,
residual: Optional[torch.Tensor],
w13_gpu_weights: torch.nn.Parameter,
w2_gpu_weights: torch.nn.Parameter,
) -> torch.Tensor:
# Self Attention
if residual is None:
Expand All @@ -398,7 +409,7 @@ def forward(
# Fully Connected
hidden_states, residual = self.post_attention_layernorm(
hidden_states, residual)
hidden_states = self.mlp(hidden_states)
hidden_states = self.mlp(hidden_states, w13_gpu=w13_gpu_weights, w2_gpu=w2_gpu_weights)
return hidden_states, residual


Expand Down Expand Up @@ -435,6 +446,9 @@ def __init__(
),
prefix=f"{prefix}.layers")

self.gpu_weight13 = torch.nn.Parameter(requires_grad=False)
self.gpu_weight2 = torch.nn.Parameter(requires_grad=False)

if get_pp_group().is_last_rank:
self.norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
else:
Expand All @@ -458,9 +472,13 @@ def forward(

for i in range(self.start_layer, self.end_layer):
layer = self.layers[i]
hidden_states, residual = layer(positions, hidden_states,
hidden_states, residual = layer(positions,
hidden_states,
kv_caches[i - self.start_layer],
attn_metadata, residual)
attn_metadata,
residual,
w13_gpu_weights=self.gpu_weight13,
w2_gpu_weights=self.gpu_weight2)

if not get_pp_group().is_last_rank:
return IntermediateTensors({
Expand Down
Loading