Skip to content

API服务与部署 — 代码走读

API Server — vllm/entrypoints/openai/api_server.py

python
# FastAPI 应用初始化
app = FastAPI()

# 中间件注册
app.add_middleware(CORSMiddleware, ...)

# 路由注册
app.include_router(chat_completion_router)
app.include_router(completion_router)
app.include_router(embedding_router)
app.include_router(responses_router)

# 引擎初始化
@app.on_event("startup")
async def startup():
    global engine
    engine = AsyncLLMEngine.from_engine_args(engine_args)

Chat Completion — vllm/entrypoints/openai/chat_completion/

python
@router.post("/v1/chat/completions")
async def create_chat_completion(request: ChatCompletionRequest):
    # 1. 解析请求
    generator = await chat_completion_handler(request, engine)

    # 2. 流式或非流式返回
    if request.stream:
        return StreamingResponse(generator, media_type="text/event-stream")
    else:
        return await generator.__anext__()

请求处理流程

python
async def chat_completion_handler(request, engine):
    # 1. 渲染 chat template
    prompt = tokenizer.apply_chat_template(request.messages)

    # 2. 创建采样参数
    sampling_params = SamplingParams(
        temperature=request.temperature,
        top_p=request.top_p,
        max_tokens=request.max_tokens,
        stop=request.stop,
    )

    # 3. 提交到引擎
    result_generator = engine.generate(prompt, sampling_params, request_id)

    # 4. 流式返回
    async for result in result_generator:
        chunk = ChatCompletionStreamResponse(
            choices=[{
                "delta": {"content": result.outputs[0].text},
                "finish_reason": result.outputs[0].finish_reason,
            }],
        )
        yield f"data: {chunk.json()}\n\n"

LLM 离线类 — vllm/entrypoints/llm.py

python
# Mixin 组合架构
class LLM(BeamSearchOfflineMixin, PoolingOfflineMixin, OfflineInferenceMixin):
    def __init__(self, model, **kwargs):
        engine_args = EngineArgs(model=model, **kwargs)
        self.llm_engine = LLMEngine.from_engine_args(engine_args)

    # generate() 来自 OfflineInferenceMixin
    # beam_search() 来自 BeamSearchOfflineMixin

OfflineInferenceMixin — vllm/entrypoints/offline_utils.py

从 LLM 类中提取的离线推理核心逻辑:

python
class OfflineInferenceMixin:
    def _preprocess_cmpl(self, prompts, params):
        """预处理 completion 请求"""
        ...

    def _preprocess_chat(self, prompts, params):
        """预处理 chat 请求"""
        ...

    def _run_engine(self, *, lora_requests, prompt_adapter_requests):
        """运行引擎直到所有请求完成"""
        while self.llm_engine.has_unfinished_requests():
            step_outputs = self.llm_engine.step()

    def generate(self, prompts, sampling_params):
        """核心生成方法"""
        self._add_completion_requests(prompts, sampling_params)
        self._run_engine()
        return self._collect_outputs()

Beam Search 拆分

Beam search 也被拆分为独立模块:

  • entrypoints/generate/beam_search/offline.pyBeamSearchOfflineMixin(LLM 类使用)
  • entrypoints/generate/beam_search/online.pyBeamSearchOnlineMixin(OpenAIServing 使用)
  • entrypoints/generate/beam_search/utils.py — 共享数据结构和工具函数

CLI 入口 — vllm/entrypoints/cli/main.py

python
def main():
    parser = argparse.ArgumentParser()
    subparsers = parser.add_subparsers()

    # serve 子命令
    serve_parser = subparsers.add_parser("serve")
    serve_parser.set_defaults(func=run_server)

    args = parser.parse_args()
    args.func(args)

前端管理 — vllm/v1/utils.py

RustFrontendProcessManager

python
class RustFrontendProcessManager:
    """管理 Rust 前端子进程"""
    def __init__(self, ...):
        # 启动 vllm-rs 二进制文件
        self.proc = subprocess.Popen(
            [rust_frontend_path, "frontend",
             "--args-json", serialized_args],
            pass_fds=[listening_socket_fd],
        )

前端模式选择 — vllm/entrypoints/cli/serve.py

python
if VLLM_RUST_FRONTEND_PATH:
    # Rust 前端:通过 RustFrontendProcessManager 管理
    frontend = RustFrontendProcessManager(...)
elif data_parallel_multi_port_external_lb:
    # DP Supervisor:为每个 DP rank 生成独立 server
    run_dp_supervisor(...)
else:
    # 标准 Python API Server
    run_api_server(...)

DP Supervisor — vllm/entrypoints/openai/dp_supervisor.py

python
class DPSupervisor:
    """节点本地数据并行管理器"""
    def __init__(self, ...):
        # 为每个 DP rank 创建子进程
        self.children = []
        for rank in range(dp_size):
            port = base_port + rank
            env = {"CUDA_VISIBLE_DEVICES": str(rank)}
            proc = subprocess.Popen(
                [python, "-m", "vllm.entrypoints.openai.api_server",
                 "--port", str(port)],
                env=env,
            )
            self.children.append(proc)

    async def health_check(self):
        """聚合所有子进程的健康状态"""
        for child_url in self.child_urls:
            resp = await probe(child_url + "/health")
            ...

关键函数索引

函数/类文件职责
create_chat_completion()openai/chat_completion/Chat Completion 端点
create_completion()openai/completion/Text Completion 端点
LLM.generate()entrypoints/llm.py离线批量推理
OfflineInferenceMixinentrypoints/offline_utils.py离线推理核心逻辑
BeamSearchOfflineMixinentrypoints/generate/beam_search/offline.py离线 beam search
RustFrontendProcessManagerv1/utils.pyRust 前端子进程管理
DPSupervisorentrypoints/openai/dp_supervisor.py多端口数据并行管理
LLMEngine.step()v1/engine/llm_engine.py单步推理执行
EngineArgsengine/arg_utils.pyCLI 参数解析
run_server()entrypoints/cli/main.py启动 API 服务器