Skip to content

调度系统 — 代码走读

Scheduler 类 — vllm/v1/core/sched/scheduler.py

这是 vLLM 中较大的文件之一(约 2300 行),实现了所有调度逻辑。

核心数据结构

python
class Scheduler:
    def __init__(self, ...):
        # 三个请求队列
        self.waiting: deque[Request] = deque()
        self.running: list[Request] = []
        self.swapped: list[Request] = []

        # KV 缓存管理器
        self.kv_cache_manager = KVCacheManager(...)

        # 调度预算
        self.max_num_seqs = scheduler_config.max_num_seqs
        self.max_num_batched_tokens = scheduler_config.max_num_batched_tokens

schedule() 主方法

python
def schedule(self) -> SchedulerOutput:
    # 阶段 1:处理 swapped 队列
    scheduler_output = self._schedule_swapped()

    # 阶段 2:处理 running 队列(decode)
    scheduler_output = self._schedule_running(scheduler_output)

    # 阶段 3:处理 waiting 队列(prefill)
    scheduler_output = self._schedule_prefills(scheduler_output)

    return scheduler_output

_schedule_prefills 实现

python
def _schedule_prefills(self, scheduler_output):
    # 计算剩余预算
    remaining_tokens = self.max_num_batched_tokens - scheduler_output.num_batched_tokens
    remaining_seqs = self.max_num_seqs - len(self.running)

    while self.waiting and remaining_seqs > 0 and remaining_tokens > 0:
        request = self.waiting[0]

        # Chunked prefill:只处理部分 tokens
        num_tokens = min(
            len(request.remaining_token_ids),
            remaining_tokens,
        )

        # 尝试分配 KV 缓存块
        new_blocks = self.kv_cache_manager.allocate(request, num_tokens)

        if new_blocks is None:
            # 显存不足,尝试 preemption
            if self._preempt(request):
                continue
            break

        # 将请求从 waiting 移到 running
        self.waiting.popleft()
        self.running.append(request)
        scheduler_output.add_prefill(request, num_tokens, new_blocks)
        remaining_tokens -= num_tokens
        remaining_seqs -= 1

    return scheduler_output

KVCacheManager — vllm/v1/core/kv_cache_manager.py

块分配

python
class KVCacheManager:
    def allocate(self, request, num_tokens):
        # 计算需要的 block 数量
        num_new_blocks = cdiv(
            request.num_computed_tokens + num_tokens,
            self.block_size,
        ) - request.num_blocks

        if num_new_blocks > self.block_pool.num_free_blocks:
            return None  # 显存不足

        # 从空闲池分配 block
        new_blocks = self.block_pool.allocate(num_new_blocks)

        # 前缀缓存:检查是否可以复用已有 block
        if self.enable_prefix_caching:
            shared_blocks = self._find_shared_prefix(request)
            new_blocks = self._deduplicate(new_blocks, shared_blocks)

        return new_blocks

BlockPool — vllm/v1/core/block_pool.py

python
class BlockPool:
    def __init__(self, num_blocks):
        self.free_blocks: list[KVCacheBlock] = [
            KVCacheBlock(id=i) for i in range(num_blocks)
        ]
        self.num_free_blocks = len(self.free_blocks)

    def allocate(self, num_blocks):
        if num_blocks > self.num_free_blocks:
            return None
        blocks = self.free_blocks[:num_blocks]
        self.free_blocks = self.free_blocks[num_blocks:]
        self.num_free_blocks -= num_blocks
        return blocks

    def free(self, blocks):
        self.free_blocks.extend(blocks)
        self.num_free_blocks += len(blocks)

RequestQueue — vllm/v1/core/sched/request_queue.py

支持多种调度策略:

python
class RequestQueue:
    def __init__(self, policy="fcfs"):
        self.policy = policy
        self.queue: list[Request] = []

    def push(self, request):
        if self.policy == "fcfs":
            self.queue.append(request)
        elif self.policy == "priority":
            # 按优先级插入排序
            bisect.insort(self.queue, request, key=lambda r: -r.priority)

    def pop(self):
        return self.queue.pop(0)

关键函数索引

函数/类文件职责
Scheduler.schedule()v1/core/sched/scheduler.py主调度入口
Scheduler._schedule_prefills()v1/core/sched/scheduler.py处理等待中的 prefill 请求
Scheduler._schedule_running()v1/core/sched/scheduler.py处理活跃 decode 请求
Scheduler._schedule_swapped()v1/core/sched/scheduler.py恢复被交换的请求
Scheduler._preempt()v1/core/sched/scheduler.py抢占低优先级请求
KVCacheManager.allocate()v1/core/kv_cache_manager.py分配 KV 缓存块
BlockPool.allocate()v1/core/block_pool.py空闲块分配
RequestQueue.push()v1/core/sched/request_queue.py请求入队