Appearance
API服务与部署 — 代码走读
API Server — vllm/entrypoints/openai/api_server.py
python
# FastAPI 应用初始化
app = FastAPI()
# 中间件注册
app.add_middleware(CORSMiddleware, ...)
# 路由注册
app.include_router(chat_completion_router)
app.include_router(completion_router)
app.include_router(embedding_router)
app.include_router(responses_router)
# 引擎初始化
@app.on_event("startup")
async def startup():
global engine
engine = AsyncLLMEngine.from_engine_args(engine_args)Chat Completion — vllm/entrypoints/openai/chat_completion/
python
@router.post("/v1/chat/completions")
async def create_chat_completion(request: ChatCompletionRequest):
# 1. 解析请求
generator = await chat_completion_handler(request, engine)
# 2. 流式或非流式返回
if request.stream:
return StreamingResponse(generator, media_type="text/event-stream")
else:
return await generator.__anext__()请求处理流程
python
async def chat_completion_handler(request, engine):
# 1. 渲染 chat template
prompt = tokenizer.apply_chat_template(request.messages)
# 2. 创建采样参数
sampling_params = SamplingParams(
temperature=request.temperature,
top_p=request.top_p,
max_tokens=request.max_tokens,
stop=request.stop,
)
# 3. 提交到引擎
result_generator = engine.generate(prompt, sampling_params, request_id)
# 4. 流式返回
async for result in result_generator:
chunk = ChatCompletionStreamResponse(
choices=[{
"delta": {"content": result.outputs[0].text},
"finish_reason": result.outputs[0].finish_reason,
}],
)
yield f"data: {chunk.json()}\n\n"LLM 离线类 — vllm/entrypoints/llm.py
python
# Mixin 组合架构
class LLM(BeamSearchOfflineMixin, PoolingOfflineMixin, OfflineInferenceMixin):
def __init__(self, model, **kwargs):
engine_args = EngineArgs(model=model, **kwargs)
self.llm_engine = LLMEngine.from_engine_args(engine_args)
# generate() 来自 OfflineInferenceMixin
# beam_search() 来自 BeamSearchOfflineMixinOfflineInferenceMixin — vllm/entrypoints/offline_utils.py
从 LLM 类中提取的离线推理核心逻辑:
python
class OfflineInferenceMixin:
def _preprocess_cmpl(self, prompts, params):
"""预处理 completion 请求"""
...
def _preprocess_chat(self, prompts, params):
"""预处理 chat 请求"""
...
def _run_engine(self, *, lora_requests, prompt_adapter_requests):
"""运行引擎直到所有请求完成"""
while self.llm_engine.has_unfinished_requests():
step_outputs = self.llm_engine.step()
def generate(self, prompts, sampling_params):
"""核心生成方法"""
self._add_completion_requests(prompts, sampling_params)
self._run_engine()
return self._collect_outputs()Beam Search 拆分
Beam search 也被拆分为独立模块:
entrypoints/generate/beam_search/offline.py—BeamSearchOfflineMixin(LLM 类使用)entrypoints/generate/beam_search/online.py—BeamSearchOnlineMixin(OpenAIServing 使用)entrypoints/generate/beam_search/utils.py— 共享数据结构和工具函数
CLI 入口 — vllm/entrypoints/cli/main.py
python
def main():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
# serve 子命令
serve_parser = subparsers.add_parser("serve")
serve_parser.set_defaults(func=run_server)
args = parser.parse_args()
args.func(args)前端管理 — vllm/v1/utils.py
RustFrontendProcessManager
python
class RustFrontendProcessManager:
"""管理 Rust 前端子进程"""
def __init__(self, ...):
# 启动 vllm-rs 二进制文件
self.proc = subprocess.Popen(
[rust_frontend_path, "frontend",
"--args-json", serialized_args],
pass_fds=[listening_socket_fd],
)前端模式选择 — vllm/entrypoints/cli/serve.py
python
if VLLM_RUST_FRONTEND_PATH:
# Rust 前端:通过 RustFrontendProcessManager 管理
frontend = RustFrontendProcessManager(...)
elif data_parallel_multi_port_external_lb:
# DP Supervisor:为每个 DP rank 生成独立 server
run_dp_supervisor(...)
else:
# 标准 Python API Server
run_api_server(...)DP Supervisor — vllm/entrypoints/openai/dp_supervisor.py
python
class DPSupervisor:
"""节点本地数据并行管理器"""
def __init__(self, ...):
# 为每个 DP rank 创建子进程
self.children = []
for rank in range(dp_size):
port = base_port + rank
env = {"CUDA_VISIBLE_DEVICES": str(rank)}
proc = subprocess.Popen(
[python, "-m", "vllm.entrypoints.openai.api_server",
"--port", str(port)],
env=env,
)
self.children.append(proc)
async def health_check(self):
"""聚合所有子进程的健康状态"""
for child_url in self.child_urls:
resp = await probe(child_url + "/health")
...关键函数索引
| 函数/类 | 文件 | 职责 |
|---|---|---|
create_chat_completion() | openai/chat_completion/ | Chat Completion 端点 |
create_completion() | openai/completion/ | Text Completion 端点 |
LLM.generate() | entrypoints/llm.py | 离线批量推理 |
OfflineInferenceMixin | entrypoints/offline_utils.py | 离线推理核心逻辑 |
BeamSearchOfflineMixin | entrypoints/generate/beam_search/offline.py | 离线 beam search |
RustFrontendProcessManager | v1/utils.py | Rust 前端子进程管理 |
DPSupervisor | entrypoints/openai/dp_supervisor.py | 多端口数据并行管理 |
LLMEngine.step() | v1/engine/llm_engine.py | 单步推理执行 |
EngineArgs | engine/arg_utils.py | CLI 参数解析 |
run_server() | entrypoints/cli/main.py | 启动 API 服务器 |