Skip to content

执行器与Worker — 代码走读

Executor 抽象 — vllm/v1/executor/abstract.py

python
class Executor(ABC):
    @abstractmethod
    def determine_available_memory(self) -> int:
        """返回可用 KV 缓存显存"""
        ...

    @abstractmethod
    def initialize(self):
        """初始化 executor 和 workers"""
        ...

    @abstractmethod
    def execute_model(self, scheduler_output) -> list[EngineCoreOutput]:
        """执行一轮模型推理"""
        ...

    @staticmethod
    def create(vllm_config):
        """工厂方法,根据配置选择 executor 类型"""
        parallel_config = vllm_config.parallel_config
        if parallel_config.use_ray:
            return RayExecutor(vllm_config)
        elif parallel_config.world_size > 1:
            return MultiProcExecutor(vllm_config)
        else:
            return UniProcExecutor(vllm_config)

MultiProcExecutor — vllm/v1/executor/multiproc_executor.py

python
class MultiProcExecutor:
    def __init__(self, vllm_config):
        self.workers = []
        for rank in range(vllm_config.parallel_config.world_size):
            # 创建 Worker 进程
            worker = WorkerProc(
                rank=rank,
                vllm_config=vllm_config,
            )
            self.workers.append(worker)

    def execute_model(self, scheduler_output):
        # 广播调度输出到所有 Worker
        for worker in self.workers:
            worker.send(scheduler_output)

        # 等待所有 Worker 完成
        outputs = []
        for worker in self.workers:
            output = worker.recv()
            outputs.append(output)

        return outputs[0]  # rank 0 的输出

GPU Worker — vllm/v1/worker/gpu_worker.py

python
class GPUWorker:
    def init_device(self):
        torch.cuda.set_device(self.device)
        torch.cuda.empty_cache()

    def load_model(self):
        self.model_runner = ModelRunner(self.vllm_config)
        self.model_runner.load_model()

    def profile_run(self):
        # 运行虚拟推理,测量显存使用
        self.model_runner.profile_run()

    def allocate_kv_cache(self, num_blocks):
        self.model_runner.allocate_kv_cache(num_blocks)

    def execute_model(self, scheduler_output):
        return self.model_runner.execute_model(scheduler_output)

GPU ModelRunner — vllm/v1/worker/gpu_model_runner.py

这是 vLLM 最大的单个文件(324K),核心方法:

python
class ModelRunner:
    def execute_model(self, scheduler_output):
        # 1. 构建输入
        input_batch = self._prepare_inputs(scheduler_output)

        # 2. 检查是否可以使用 CUDA Graph
        if self._can_use_cudagraph(input_batch):
            return self._execute_with_cudagraph(input_batch)

        # 3. 普通执行
        return self._execute_model_native(input_batch)

CUDA Graph 执行

python
def _execute_with_cudagraph(self, input_batch):
    batch_size = input_batch.num_seqs

    # 获取对应 batch size 的已捕获图
    graph = self.cudagraphs[batch_size]

    # 更新图的输入缓冲区
    graph.input_buffer.copy_(input_batch.to_tensor())

    # 重放图
    graph.replay()

    # 读取输出
    return graph.output_buffer

关键函数索引

函数/类文件职责
Executor.create()v1/executor/abstract.py工厂方法选择执行器
MultiProcExecutor.execute_model()v1/executor/multiproc_executor.py多进程模型执行
GPUWorker.init_device()v1/worker/gpu_worker.pyCUDA 设备初始化
GPUWorker.load_model()v1/worker/gpu_worker.py模型加载入口
ModelRunner.execute_model()v1/worker/gpu_model_runner.pyGPU 前向传播
ModelRunner._prepare_inputs()v1/worker/gpu_model_runner.py输入批处理
ModelRunner._execute_with_cudagraph()v1/worker/gpu_model_runner.pyCUDA Graph 重放