路径锁与崩溃恢复
OpenViking 通过路径锁和Redo Log 两个简单原语保护核心写操作(rm、mv、add_resource、session.commit)的一致性,确保 VikingFS、VectorDB、QueueManager 三个子系统在故障时不会出现数据不一致。
设计哲学
OpenViking 是上下文数据库,FS 是源数据,VectorDB 是派生索引。索引丢了可从源数据重建,源数据丢失不可恢复。因此:
宁可搜不到,不要搜到坏结果。
设计原则
- 写互斥:通过路径锁保证同一路径同一时间只有一个写操作
- 默认生效:所有数据操作命令自动加锁,用户无需额外配置
- 锁即保护:进入 LockContext 时加锁,退出时释放,没有 undo/journal/commit 语义
- 仅 session_memory 需要崩溃恢复:通过 RedoLog 在进程崩溃后重做记忆提取
- Queue 操作在锁外执行:SemanticQueue/EmbeddingQueue 的 enqueue 是幂等的,失败可重试
架构
Service Layer (rm / mv / add_resource / session.commit)
|
v
+--[LockContext 异步上下文管理器]-------+
| |
| 1. 创建 LockHandle |
| 2. 获取路径锁(轮询 + 超时) |
| 3. 执行操作(FS + VectorDB) |
| 4. 释放锁 |
| |
| 异常时:自动释放锁,异常原样传播 |
+---------------------------------------+
|
v
Storage Layer (VikingFS, VectorDB, QueueManager)两个核心组件
组件 1:PathLock + LockManager + LockContext(路径锁系统)
PathLock 实现基于文件的分布式锁,支持 POINT 和 SUBTREE 两种锁类型,使用 fencing token 防止 TOCTOU 竞争,自动检测并清理过期锁。
LockHandle 是轻量的锁持有者令牌:
@dataclass
class LockHandle:
id: str # 唯一标识,用于生成 fencing token
locks: list[str] # 已获取的锁文件路径
created_at: float # handle 创建时间
last_active_at: float # 最近一次成功 acquire/refresh 的时间LockManager 是全局单例,管理锁生命周期:
- 创建/释放 LockHandle
- 后台清理泄漏的锁(进程内安全网)
- 启动时执行 RedoLog 恢复
LockContext 是异步上下文管理器,封装加锁/解锁生命周期:
from openviking.storage.transaction import LockContext, get_lock_manager
async with LockContext(get_lock_manager(), [path], lock_mode="point") as handle:
# 在锁保护下执行操作
...
# 退出时自动释放锁(包括异常情况)组件 2:RedoLog(崩溃恢复)
仅用于 session.commit 的记忆提取阶段。操作前写标记,成功后删标记,启动时扫描遗留标记并重做。
/local/_system/redo/{task_id}/redo.jsonMemory 提取是幂等的 — 从同一个 archive 重新提取会得到相同结果。
一致性问题与解决方案
rm(uri)
| 问题 | 方案 |
|---|---|
| 先删文件再删索引 -> 文件已删但索引残留 -> 搜索返回不存在的文件 | 调换顺序:先删索引再删文件。索引删除失败 -> 文件和索引都在,搜索正常 |
加锁策略(根据目标类型区分):
- 删除目录:
lock_mode="subtree",锁目录自身 - 删除文件:
lock_mode="point",锁文件的父目录
操作流程:
1. 检查目标是目录还是文件,选择锁模式
2. 获取锁
3. 删除 VectorDB 索引 -> 搜索立刻不可见
4. 删除 FS 文件
5. 释放锁VectorDB 删除失败 -> 直接抛异常,锁自动释放,文件和索引都在。FS 删除失败 -> VectorDB 已删但文件还在,重试即可。
mv(old_uri, new_uri)
| 问题 | 方案 |
|---|---|
| 文件移到新路径但索引指向旧路径 -> 搜索返回旧路径(不存在) | 先 copy 再更新索引,失败时清理副本 |
加锁策略(通过 lock_mode="mv" 自动处理):
- 移动目录:源路径和目标父目录各加 SUBTREE 锁
- 移动文件:源的父目录和目标父目录各加 POINT 锁
操作流程:
1. 检查源是目录还是文件,确定 src_is_dir
2. 获取 mv 锁(内部根据 src_is_dir 选择 SUBTREE 或 POINT)
3. Copy 到新位置(源还在,安全)
4. 如果是目录,删除副本中被 cp 带过去的锁文件
5. 更新 VectorDB 中的 URI
- 失败 -> 清理副本,源和旧索引都在,一致状态
6. 删除源
7. 释放锁add_resource
| 问题 | 方案 |
|---|---|
| 文件从临时目录移到正式目录后崩溃 -> 文件存在但永远搜不到 | 首次添加与增量更新分离为两条独立路径 |
| 资源已落盘但语义处理/向量化还在跑时被 rm 删除 -> 处理白跑 | 生命周期 SUBTREE 锁,从落盘持续到处理完成 |
首次添加(target 不存在)— 在 ResourceProcessor.process_resource Phase 3.5 中处理:
1. 获取 POINT 锁,锁 final_uri 的父目录
2. agfs.mv 临时目录 -> 正式位置
3. 获取 SUBTREE 锁,锁 final_uri(在 POINT 锁内,消除竞态窗口)
4. 释放 POINT 锁
5. 清理临时目录
6. 入队 SemanticMsg(lifecycle_lock_handle_id=...) -> DAG 在 final 上跑
7. DAG 启动锁刷新循环(每 lock_expire/2 秒刷新锁 token 并更新 handle 活跃时间)
8. DAG 完成 + 所有 embedding 完成 -> 释放 SUBTREE 锁此期间 rm 尝试获取同路径 SUBTREE 锁会失败,抛出 ResourceBusyError。
增量更新(target 已存在)— temp 保持不动:
1. 获取 SUBTREE 锁,锁 target_uri(保护已有资源)
2. 入队 SemanticMsg(uri=temp, target_uri=final, lifecycle_lock_handle_id=...)
3. DAG 在 temp 上跑,启动锁刷新循环
4. DAG 完成后触发 sync_diff_callback 或 move_temp_to_target_callback
5. callback 执行完毕 -> 释放 SUBTREE 锁注意:DAG callback 不在外层加锁。每个 VikingFS.rm 和 VikingFS.mv 内部各自有独立锁保护。外层锁会与内部锁冲突导致死锁。
服务重启恢复:SemanticMsg 持久化在 QueueFS 中。重启后 SemanticProcessor 发现 lifecycle_lock_handle_id 对应的 handle 不在内存中,会重新获取 SUBTREE 锁。
session.commit()
| 问题 | 方案 |
|---|---|
| 消息已清空但 archive 未写入 -> 对话数据丢失 | Phase 1 无锁(archive 不完整无副作用)+ Phase 2 RedoLog |
LLM 调用耗时不可控(5s~60s+),不能放在持锁操作内。设计拆为两个阶段:
Phase 1 — 归档(无锁):
1. 生成归档摘要(LLM)
2. 写 archive(history/archive_N/messages.jsonl + 摘要)
3. 清空 messages.jsonl
4. 清空内存中的消息列表
Phase 2 — 记忆提取 + 写入(RedoLog):
1. 写 redo 标记(archive_uri、session_uri、用户身份信息)
2. 从归档消息提取 memories(LLM)
3. 写当前消息状态
4. 写 relations
5. 直接 enqueue SemanticQueue
6. 删除 redo 标记崩溃恢复分析:
| 崩溃时间点 | 状态 | 恢复动作 |
|---|---|---|
| Phase 1 写 archive 中途 | 无标记 | archive 不完整,下次 commit 从 history/ 扫描 index,不受影响 |
| Phase 1 archive 完成但 messages 未清空 | 无标记 | archive 完整 + messages 仍在 = 数据冗余但安全 |
| Phase 2 记忆提取/写入中途 | redo 标记存在 | 启动恢复:从 archive 重做提取+写入+入队 |
| Phase 2 完成 | redo 标记已删 | 无需恢复 |
LockContext
LockContext 是异步上下文管理器,封装锁的获取和释放:
from openviking.storage.transaction import LockContext, get_lock_manager
lock_manager = get_lock_manager()
# Point 锁(写操作、语义处理)
async with LockContext(lock_manager, [path], lock_mode="point"):
# 执行操作...
pass
# Subtree 锁(删除操作)
async with LockContext(lock_manager, [path], lock_mode="subtree"):
# 执行操作...
pass
# MV 锁(移动操作)
async with LockContext(lock_manager, [src], lock_mode="mv", mv_dst_parent_path=dst):
# 执行操作...
pass锁模式:
| lock_mode | 用途 | 行为 |
|---|---|---|
point | 写操作、语义处理 | 锁定指定路径;与同路径的任何锁和祖先目录的 SUBTREE 锁冲突 |
subtree | 删除操作 | 锁定子树根节点;与同路径的任何锁、后代目录的任何锁和祖先目录的 SUBTREE 锁冲突 |
mv | 移动操作 | 目录移动:源和目标均加 SUBTREE 锁;文件移动:源父目录和目标均加 POINT 锁(通过 src_is_dir 控制) |
异常处理:__aexit__ 总是释放锁,不吞异常。获取锁失败时抛出 LockAcquisitionError。
锁类型(POINT vs SUBTREE)
锁机制使用两种锁类型来处理不同的冲突场景:
| 同路径 POINT | 同路径 SUBTREE | 后代 POINT | 祖先 SUBTREE | |
|---|---|---|---|---|
| POINT | 冲突 | 冲突 | — | 冲突 |
| SUBTREE | 冲突 | 冲突 | 冲突 | 冲突 |
- POINT (P):用于写操作和语义处理。只锁单个目录。若祖先目录持有 SUBTREE 锁则阻塞。
- SUBTREE (S):用于删除和移动操作。逻辑上覆盖整个子树,但只在根目录写一个锁文件。获取前扫描所有后代和祖先目录确认无冲突锁。
锁机制
锁协议
锁文件路径:{path}/.path.ovlock
锁文件内容(Fencing Token):
{handle_id}:{time_ns}:{lock_type}其中 lock_type 为 P(POINT)或 S(SUBTREE)。
获取锁流程(POINT 模式)
循环直到超时(轮询间隔:200ms):
1. 检查目标目录存在
2. 检查目标路径是否被其他操作锁定
- 陈旧锁? -> 移除后重试
- 活跃锁? -> 等待
3. 检查所有祖先目录是否有 SUBTREE 锁
- 陈旧锁? -> 移除后重试
- 活跃锁? -> 等待
4. 写入 POINT (P) 锁文件
5. TOCTOU 双重检查:重新扫描祖先目录的 SUBTREE 锁
- 发现冲突:比较 (timestamp, handle_id)
- 后到者(更大的 timestamp/handle_id)主动让步(删除自己的锁),防止活锁
- 等待后重试
6. 验证锁文件归属(fencing token 匹配)
7. 成功
超时(默认 0 = 不等待)抛出 LockAcquisitionError获取锁流程(SUBTREE 模式)
循环直到超时(轮询间隔:200ms):
1. 检查目标目录存在
2. 检查目标路径是否被其他操作锁定
- 陈旧锁? -> 移除后重试
- 活跃锁? -> 等待
3. 检查所有祖先目录是否有 SUBTREE 锁
- 陈旧锁? -> 移除后重试
- 活跃锁? -> 等待
4. 扫描所有后代目录,检查是否有其他操作持有的锁
- 陈旧锁? -> 移除后重试
- 活跃锁? -> 等待
5. 写入 SUBTREE (S) 锁文件(只写一个文件,在根路径)
6. TOCTOU 双重检查:重新扫描后代目录和祖先目录
- 发现冲突:比较 (timestamp, handle_id)
- 后到者(更大的 timestamp/handle_id)主动让步(删除自己的锁),防止活锁
- 等待后重试
7. 验证锁文件归属(fencing token 匹配)
8. 成功
超时(默认 0 = 不等待)抛出 LockAcquisitionError锁过期清理
陈旧锁检测:PathLock 检查 fencing token 中的时间戳。超过 lock_expire(默认 300s)的锁被视为陈旧锁,在加锁过程中自动移除。
进程内清理:LockManager 每 60 秒检查活跃的 LockHandle。仍持有锁文件且失活时间超过 lock_expire 的 handle 会被强制释放。
孤儿锁:进程崩溃后遗留的锁文件,在下次任何操作尝试获取同一路径锁时,通过 stale lock 检测自动移除。
崩溃恢复
LockManager.start() 启动时自动扫描 /local/_system/redo/ 目录中的遗留标记:
| 场景 | 恢复方式 |
|---|---|
| session_memory 提取中途崩溃 | 从 archive 重做记忆提取 + 写入 + enqueue |
| 锁持有期间崩溃 | 锁文件留在 AGFS,下次获取时 stale 检测自动清理(默认 300s 过期) |
| enqueue 后 worker 处理前崩溃 | QueueFS SQLite 持久化,worker 重启后自动拉取 |
| 孤儿索引 | L2 按需加载时清理 |
防线总结
| 异常场景 | 防线 | 恢复时机 |
|---|---|---|
| 操作中途崩溃 | 锁自动过期 + stale 检测 | 下次获取同路径锁时 |
| add_resource 语义处理中途崩溃 | 生命周期锁过期 + SemanticProcessor 重启时重新获取 | worker 重启后 |
| session.commit Phase 2 崩溃 | RedoLog 标记 + 重做 | 重启时 |
| enqueue 后 worker 处理前崩溃 | QueueFS SQLite 持久化 | worker 重启后 |
| 孤儿索引 | L2 按需加载时清理 | 用户访问时 |
配置
路径锁默认启用,无需额外配置。默认不等待:若路径被锁定则立即抛出 LockAcquisitionError。如需允许等待重试,可通过 storage.transaction 段配置:
{
"storage": {
"transaction": {
"lock_timeout": 5.0,
"lock_expire": 300.0
}
}
}| 参数 | 类型 | 说明 | 默认值 |
|---|---|---|---|
lock_timeout | float | 获取锁的等待超时(秒)。0 = 立即失败(默认);> 0 = 最多等待此时间 | 0.0 |
lock_expire | float | 锁失活阈值(秒),超过此时间未被 refresh 的锁会被视为陈旧锁并回收 | 300.0 |
QueueFS 持久化
路径锁机制依赖 QueueFS 使用 SQLite 后端,确保 enqueue 的任务在进程重启后可恢复。这是默认配置,无需手动设置。
