Appearance
执行器与Worker — 代码走读
Executor 抽象 — vllm/v1/executor/abstract.py
python
class Executor(ABC):
@abstractmethod
def determine_available_memory(self) -> int:
"""返回可用 KV 缓存显存"""
...
@abstractmethod
def initialize(self):
"""初始化 executor 和 workers"""
...
@abstractmethod
def execute_model(self, scheduler_output) -> list[EngineCoreOutput]:
"""执行一轮模型推理"""
...
@staticmethod
def create(vllm_config):
"""工厂方法,根据配置选择 executor 类型"""
parallel_config = vllm_config.parallel_config
if parallel_config.use_ray:
return RayExecutor(vllm_config)
elif parallel_config.world_size > 1:
return MultiProcExecutor(vllm_config)
else:
return UniProcExecutor(vllm_config)MultiProcExecutor — vllm/v1/executor/multiproc_executor.py
python
class MultiProcExecutor:
def __init__(self, vllm_config):
self.workers = []
for rank in range(vllm_config.parallel_config.world_size):
# 创建 Worker 进程
worker = WorkerProc(
rank=rank,
vllm_config=vllm_config,
)
self.workers.append(worker)
def execute_model(self, scheduler_output):
# 广播调度输出到所有 Worker
for worker in self.workers:
worker.send(scheduler_output)
# 等待所有 Worker 完成
outputs = []
for worker in self.workers:
output = worker.recv()
outputs.append(output)
return outputs[0] # rank 0 的输出GPU Worker — vllm/v1/worker/gpu_worker.py
python
class GPUWorker:
def init_device(self):
torch.cuda.set_device(self.device)
torch.cuda.empty_cache()
def load_model(self):
self.model_runner = ModelRunner(self.vllm_config)
self.model_runner.load_model()
def profile_run(self):
# 运行虚拟推理,测量显存使用
self.model_runner.profile_run()
def allocate_kv_cache(self, num_blocks):
self.model_runner.allocate_kv_cache(num_blocks)
def execute_model(self, scheduler_output):
return self.model_runner.execute_model(scheduler_output)GPU ModelRunner — vllm/v1/worker/gpu_model_runner.py
这是 vLLM 最大的单个文件(324K),核心方法:
python
class ModelRunner:
def execute_model(self, scheduler_output):
# 1. 构建输入
input_batch = self._prepare_inputs(scheduler_output)
# 2. 检查是否可以使用 CUDA Graph
if self._can_use_cudagraph(input_batch):
return self._execute_with_cudagraph(input_batch)
# 3. 普通执行
return self._execute_model_native(input_batch)CUDA Graph 执行
python
def _execute_with_cudagraph(self, input_batch):
batch_size = input_batch.num_seqs
# 获取对应 batch size 的已捕获图
graph = self.cudagraphs[batch_size]
# 更新图的输入缓冲区
graph.input_buffer.copy_(input_batch.to_tensor())
# 重放图
graph.replay()
# 读取输出
return graph.output_buffer关键函数索引
| 函数/类 | 文件 | 职责 |
|---|---|---|
Executor.create() | v1/executor/abstract.py | 工厂方法选择执行器 |
MultiProcExecutor.execute_model() | v1/executor/multiproc_executor.py | 多进程模型执行 |
GPUWorker.init_device() | v1/worker/gpu_worker.py | CUDA 设备初始化 |
GPUWorker.load_model() | v1/worker/gpu_worker.py | 模型加载入口 |
ModelRunner.execute_model() | v1/worker/gpu_model_runner.py | GPU 前向传播 |
ModelRunner._prepare_inputs() | v1/worker/gpu_model_runner.py | 输入批处理 |
ModelRunner._execute_with_cudagraph() | v1/worker/gpu_model_runner.py | CUDA Graph 重放 |