Skip to content

引擎核心 — 代码走读

EngineCore 初始化 — vllm/v1/engine/core.py

python
# 简化的初始化流程
class EngineCore:
    def __init__(self, vllm_config: VllmConfig):
        # 1. 创建调度器
        self.scheduler = Scheduler(
            scheduler_config=vllm_config.scheduler_config,
            model_config=vllm_config.model_config,
            cache_config=vllm_config.cache_config,
        )

        # 2. 创建执行器
        self.executor = Executor.create(
            vllm_config=vllm_config,
            # 根据 parallel_config 选择 MultiProc/Ray/UniProc
        )

主循环

python
def run_loop(self):
    while True:
        # 1. 接收新请求和 abort 信号
        new_requests = self._recv_from_client()

        # 2. 添加新请求到调度队列
        for req in new_requests:
            self.scheduler.add_request(req)

        # 3. 调度决策
        scheduler_output = self.scheduler.schedule()

        # 4. 执行模型推理
        engine_core_outputs = self.executor.execute_model(scheduler_output)

        # 5. 发送输出给前端
        self._send_to_client(engine_core_outputs)

AsyncLLM — vllm/v1/engine/async_llm.py

请求提交流程

python
class AsyncLLM:
    async def generate(self, prompt, sampling_params, ...):
        # 1. 预处理输入
        processed_inputs = self.input_processor.preprocess(prompt)

        # 2. 创建请求
        request = EngineCoreRequest(
            request_id=request_id,
            prompt_token_ids=processed_inputs.token_ids,
            sampling_params=sampling_params,
        )

        # 3. 发送到 EngineCore
        await self.engine_core_client.submit_request(request)

        # 4. 流式返回输出
        async for output in self._output_stream(request_id):
            yield output

输出处理

python
class OutputProcessor:
    def process_outputs(self, engine_core_outputs):
        for output in engine_core_outputs:
            # 1. Detokenize: token IDs → text
            text = self.detokenizer.decode(output.new_token_ids)

            # 2. 检查是否完成
            if output.finish_reason is not None:
                # 最终输出
                yield RequestOutput(
                    outputs=[CompletionOutput(text=text, ...)],
                    finished=True,
                )
            else:
                # 流式中间输出
                yield RequestOutput(
                    outputs=[CompletionOutput(text=text, ...)],
                    finished=False,
                )

EngineCoreClient — vllm/v1/engine/core_client.py

异步模式初始化

python
class EngineCoreClient:
    @staticmethod
    def make_async(vllm_config):
        # 1. 创建 ZMQ context
        ctx = zmq.asyncio.Context()

        # 2. 创建 ROUTER socket(客户端 bind,服务端 connect)
        socket = ctx.socket(zmq.ROUTER)
        socket.bind(f"ipc://{socket_path}")

        # 3. 启动 EngineCore 进程
        proc = multiprocessing.Process(
            target=EngineCore.run_engine_core,
            args=(vllm_config,),
        )
        proc.start()

        return AsyncEngineCoreClient(socket)

请求提交通道

python
async def submit_request(self, request):
    # msgspec 二进制序列化
    data = msgspec.msgpack.encode(request)
    await self.socket.send(data)

Detokenizer — vllm/v1/engine/detokenizer.py

Detokenizer 负责将 token IDs 转换回文本:

python
class Detokenizer:
    def decode(self, new_token_ids, request_state):
        # 增量解码:只处理新增的 token
        new_text = self.tokenizer.decode(
            request_state.all_token_ids + new_token_ids,
            skip_special_tokens=True,
        )
        # 返回增量文本(去掉已解码的前缀)
        incremental_text = new_text[len(request_state.decoded_text):]
        return incremental_text

关键函数索引

函数/类文件职责
EngineCore.__init__()v1/engine/core.py初始化调度器和执行器
EngineCore.run_loop()v1/engine/core.py主循环:接收→调度→执行→输出
AsyncLLM.generate()v1/engine/async_llm.py异步推理入口
AsyncLLM._run_output_handler()v1/engine/async_llm.py输出处理循环
EngineCoreClient.make_async()v1/engine/core_client.py创建异步 IPC 客户端
InputProcessor.preprocess()v1/engine/input_processor.pytokenize 和预处理
OutputProcessor.process_outputs()v1/engine/output_processor.py模型输出转换
Detokenizer.decode()v1/engine/detokenizer.py增量 detokenize