Appearance
调度系统 — 代码走读
Scheduler 类 — vllm/v1/core/sched/scheduler.py
这是 vLLM 中较大的文件之一(约 2300 行),实现了所有调度逻辑。
核心数据结构
python
class Scheduler:
def __init__(self, ...):
# 三个请求队列
self.waiting: deque[Request] = deque()
self.running: list[Request] = []
self.swapped: list[Request] = []
# KV 缓存管理器
self.kv_cache_manager = KVCacheManager(...)
# 调度预算
self.max_num_seqs = scheduler_config.max_num_seqs
self.max_num_batched_tokens = scheduler_config.max_num_batched_tokensschedule() 主方法
python
def schedule(self) -> SchedulerOutput:
# 阶段 1:处理 swapped 队列
scheduler_output = self._schedule_swapped()
# 阶段 2:处理 running 队列(decode)
scheduler_output = self._schedule_running(scheduler_output)
# 阶段 3:处理 waiting 队列(prefill)
scheduler_output = self._schedule_prefills(scheduler_output)
return scheduler_output_schedule_prefills 实现
python
def _schedule_prefills(self, scheduler_output):
# 计算剩余预算
remaining_tokens = self.max_num_batched_tokens - scheduler_output.num_batched_tokens
remaining_seqs = self.max_num_seqs - len(self.running)
while self.waiting and remaining_seqs > 0 and remaining_tokens > 0:
request = self.waiting[0]
# Chunked prefill:只处理部分 tokens
num_tokens = min(
len(request.remaining_token_ids),
remaining_tokens,
)
# 尝试分配 KV 缓存块
new_blocks = self.kv_cache_manager.allocate(request, num_tokens)
if new_blocks is None:
# 显存不足,尝试 preemption
if self._preempt(request):
continue
break
# 将请求从 waiting 移到 running
self.waiting.popleft()
self.running.append(request)
scheduler_output.add_prefill(request, num_tokens, new_blocks)
remaining_tokens -= num_tokens
remaining_seqs -= 1
return scheduler_outputKVCacheManager — vllm/v1/core/kv_cache_manager.py
块分配
python
class KVCacheManager:
def allocate(self, request, num_tokens):
# 计算需要的 block 数量
num_new_blocks = cdiv(
request.num_computed_tokens + num_tokens,
self.block_size,
) - request.num_blocks
if num_new_blocks > self.block_pool.num_free_blocks:
return None # 显存不足
# 从空闲池分配 block
new_blocks = self.block_pool.allocate(num_new_blocks)
# 前缀缓存:检查是否可以复用已有 block
if self.enable_prefix_caching:
shared_blocks = self._find_shared_prefix(request)
new_blocks = self._deduplicate(new_blocks, shared_blocks)
return new_blocksBlockPool — vllm/v1/core/block_pool.py
python
class BlockPool:
def __init__(self, num_blocks):
self.free_blocks: list[KVCacheBlock] = [
KVCacheBlock(id=i) for i in range(num_blocks)
]
self.num_free_blocks = len(self.free_blocks)
def allocate(self, num_blocks):
if num_blocks > self.num_free_blocks:
return None
blocks = self.free_blocks[:num_blocks]
self.free_blocks = self.free_blocks[num_blocks:]
self.num_free_blocks -= num_blocks
return blocks
def free(self, blocks):
self.free_blocks.extend(blocks)
self.num_free_blocks += len(blocks)RequestQueue — vllm/v1/core/sched/request_queue.py
支持多种调度策略:
python
class RequestQueue:
def __init__(self, policy="fcfs"):
self.policy = policy
self.queue: list[Request] = []
def push(self, request):
if self.policy == "fcfs":
self.queue.append(request)
elif self.policy == "priority":
# 按优先级插入排序
bisect.insort(self.queue, request, key=lambda r: -r.priority)
def pop(self):
return self.queue.pop(0)关键函数索引
| 函数/类 | 文件 | 职责 |
|---|---|---|
Scheduler.schedule() | v1/core/sched/scheduler.py | 主调度入口 |
Scheduler._schedule_prefills() | v1/core/sched/scheduler.py | 处理等待中的 prefill 请求 |
Scheduler._schedule_running() | v1/core/sched/scheduler.py | 处理活跃 decode 请求 |
Scheduler._schedule_swapped() | v1/core/sched/scheduler.py | 恢复被交换的请求 |
Scheduler._preempt() | v1/core/sched/scheduler.py | 抢占低优先级请求 |
KVCacheManager.allocate() | v1/core/kv_cache_manager.py | 分配 KV 缓存块 |
BlockPool.allocate() | v1/core/block_pool.py | 空闲块分配 |
RequestQueue.push() | v1/core/sched/request_queue.py | 请求入队 |