Appearance
引擎核心 — 代码走读
EngineCore 初始化 — vllm/v1/engine/core.py
python
# 简化的初始化流程
class EngineCore:
def __init__(self, vllm_config: VllmConfig):
# 1. 创建调度器
self.scheduler = Scheduler(
scheduler_config=vllm_config.scheduler_config,
model_config=vllm_config.model_config,
cache_config=vllm_config.cache_config,
)
# 2. 创建执行器
self.executor = Executor.create(
vllm_config=vllm_config,
# 根据 parallel_config 选择 MultiProc/Ray/UniProc
)主循环
python
def run_loop(self):
while True:
# 1. 接收新请求和 abort 信号
new_requests = self._recv_from_client()
# 2. 添加新请求到调度队列
for req in new_requests:
self.scheduler.add_request(req)
# 3. 调度决策
scheduler_output = self.scheduler.schedule()
# 4. 执行模型推理
engine_core_outputs = self.executor.execute_model(scheduler_output)
# 5. 发送输出给前端
self._send_to_client(engine_core_outputs)AsyncLLM — vllm/v1/engine/async_llm.py
请求提交流程
python
class AsyncLLM:
async def generate(self, prompt, sampling_params, ...):
# 1. 预处理输入
processed_inputs = self.input_processor.preprocess(prompt)
# 2. 创建请求
request = EngineCoreRequest(
request_id=request_id,
prompt_token_ids=processed_inputs.token_ids,
sampling_params=sampling_params,
)
# 3. 发送到 EngineCore
await self.engine_core_client.submit_request(request)
# 4. 流式返回输出
async for output in self._output_stream(request_id):
yield output输出处理
python
class OutputProcessor:
def process_outputs(self, engine_core_outputs):
for output in engine_core_outputs:
# 1. Detokenize: token IDs → text
text = self.detokenizer.decode(output.new_token_ids)
# 2. 检查是否完成
if output.finish_reason is not None:
# 最终输出
yield RequestOutput(
outputs=[CompletionOutput(text=text, ...)],
finished=True,
)
else:
# 流式中间输出
yield RequestOutput(
outputs=[CompletionOutput(text=text, ...)],
finished=False,
)EngineCoreClient — vllm/v1/engine/core_client.py
异步模式初始化
python
class EngineCoreClient:
@staticmethod
def make_async(vllm_config):
# 1. 创建 ZMQ context
ctx = zmq.asyncio.Context()
# 2. 创建 ROUTER socket(客户端 bind,服务端 connect)
socket = ctx.socket(zmq.ROUTER)
socket.bind(f"ipc://{socket_path}")
# 3. 启动 EngineCore 进程
proc = multiprocessing.Process(
target=EngineCore.run_engine_core,
args=(vllm_config,),
)
proc.start()
return AsyncEngineCoreClient(socket)请求提交通道
python
async def submit_request(self, request):
# msgspec 二进制序列化
data = msgspec.msgpack.encode(request)
await self.socket.send(data)Detokenizer — vllm/v1/engine/detokenizer.py
Detokenizer 负责将 token IDs 转换回文本:
python
class Detokenizer:
def decode(self, new_token_ids, request_state):
# 增量解码:只处理新增的 token
new_text = self.tokenizer.decode(
request_state.all_token_ids + new_token_ids,
skip_special_tokens=True,
)
# 返回增量文本(去掉已解码的前缀)
incremental_text = new_text[len(request_state.decoded_text):]
return incremental_text关键函数索引
| 函数/类 | 文件 | 职责 |
|---|---|---|
EngineCore.__init__() | v1/engine/core.py | 初始化调度器和执行器 |
EngineCore.run_loop() | v1/engine/core.py | 主循环:接收→调度→执行→输出 |
AsyncLLM.generate() | v1/engine/async_llm.py | 异步推理入口 |
AsyncLLM._run_output_handler() | v1/engine/async_llm.py | 输出处理循环 |
EngineCoreClient.make_async() | v1/engine/core_client.py | 创建异步 IPC 客户端 |
InputProcessor.preprocess() | v1/engine/input_processor.py | tokenize 和预处理 |
OutputProcessor.process_outputs() | v1/engine/output_processor.py | 模型输出转换 |
Detokenizer.decode() | v1/engine/detokenizer.py | 增量 detokenize |