fix: integrate SQLite event storage into load/save system

- Fix load_game to use World.create_with_db() for SQLite event storage
- Add get_events_db_path() to compute event database path from save path
- Add JSON to SQLite migration for backward compatibility with old saves
- Close old EventManager before loading new save to prevent connection leaks
- Add events_db metadata to save file
- Add comprehensive tests for database switching bug and save/load cycle
This commit is contained in:
Zihao Xu
2026-01-07 23:21:55 -08:00
parent a6b8198c3f
commit 06d1bed987
4 changed files with 606 additions and 16 deletions

View File

@@ -1307,6 +1307,11 @@ def api_load_game(req: LoadGameRequest):
if not target_path.exists():
raise HTTPException(status_code=404, detail="File not found")
# 关闭旧 World 的 EventManager释放 SQLite 连接。
old_world = game_instance.get("world")
if old_world and hasattr(old_world, "event_manager"):
old_world.event_manager.close()
# 加载
new_world, new_sim, new_sects = load_game(target_path)

View File

@@ -3,6 +3,7 @@
主要功能:
- load_game: 从JSON文件加载游戏完整状态
- get_events_db_path: 根据存档路径计算事件数据库路径
- check_save_compatibility: 检查存档版本兼容性(当前未实现严格检查)
加载流程(两阶段):
@@ -17,9 +18,12 @@
- 无法重建的动作会被置为None
- 不存在的Avatar引用会被忽略
事件存储:
- 事件存储在 SQLite 数据库中({save_name}_events.db
- 旧存档的 JSON 事件会自动迁移到 SQLite
注意事项:
- 读档后会重置前端UI状态头像图像、插值等
- 事件历史完整恢复(受限于保存时的数量)
- 地图从头重建(因为地图是固定的),但会恢复宗门总部位置
"""
import json
@@ -37,6 +41,15 @@ from src.classes.relation import Relation
from src.utils.config import CONFIG
def get_events_db_path(save_path: Path) -> Path:
"""
根据存档路径计算事件数据库路径。
例如save_20260105_1423.json -> save_20260105_1423_events.db
"""
return save_path.with_suffix("").with_name(save_path.stem + "_events.db")
def load_game(save_path: Optional[Path] = None) -> Tuple["World", "Simulator", List["Sect"]]:
"""
从文件加载游戏状态
@@ -85,8 +98,15 @@ def load_game(save_path: Optional[Path] = None) -> Tuple["World", "Simulator", L
world_data = save_data.get("world", {})
month_stamp = MonthStamp(world_data["month_stamp"])
# 重建World对象
world = World(map=game_map, month_stamp=month_stamp)
# 计算事件数据库路径。
events_db_path = get_events_db_path(save_path)
# 重建World对象使用 SQLite 事件存储)。
world = World.create_with_db(
map=game_map,
month_stamp=month_stamp,
events_db_path=events_db_path,
)
# 重建天地灵机
from src.classes.celestial_phenomenon import celestial_phenomena_by_id
@@ -153,18 +173,26 @@ def load_game(save_path: Optional[Path] = None) -> Tuple["World", "Simulator", L
if t_name in techniques_by_name:
sect.techniques.append(techniques_by_name[t_name])
# 重建事件历史
# 检查是否需要从 JSON 迁移事件(向后兼容旧存档)。
db_event_count = world.event_manager.count()
events_data = save_data.get("events", [])
for event_data in events_data:
event = Event.from_dict(event_data)
world.event_manager.add_event(event)
if db_event_count == 0 and len(events_data) > 0:
# SQLite 数据库是空的,但 JSON 中有事件,执行迁移。
print(f"正在从 JSON 迁移 {len(events_data)} 条事件到 SQLite...")
for event_data in events_data:
event = Event.from_dict(event_data)
world.event_manager.add_event(event)
print("事件迁移完成")
else:
print(f"已从 SQLite 加载 {db_event_count} 条事件")
# 重建Simulator
simulator_data = save_data.get("simulator", {})
simulator = Simulator(world)
simulator.birth_rate = simulator_data.get("birth_rate", CONFIG.game.npc_birth_rate_per_month)
print(f"存档加载成功!共加载 {len(all_avatars)} 个角色{len(events_data)} 条事件")
print(f"存档加载成功!共加载 {len(all_avatars)} 个角色")
return world, simulator, existed_sects
except Exception as e:

View File

@@ -7,20 +7,22 @@
- list_saves: 列出所有存档文件
存档内容:
- meta: 版本号、保存时间、游戏时间
- meta: 版本号、保存时间、游戏时间、事件数据库信息
- world: 游戏时间戳、本局启用的宗门列表
- avatars: 所有角色的完整状态通过AvatarSaveMixin.to_save_dict序列化
- events: 最近N条事件历史N在config.yml中配置
- events: 最近N条事件历史仅用于向后兼容迁移新事件存储在SQLite中
- simulator: 模拟器配置(如出生率)
存档格式:JSON明文易于调试
存档位置assets/saves/ (配置在config.yml中)
存档格式:
- JSON明文易于调试+ SQLite事件数据库
- 存档位置assets/saves/ (配置在config.yml中)
- 事件数据库:{save_name}_events.db与JSON文件同目录
注意事项:
- 当前版本只支持单一存档槽位save.json
- 不支持跨版本兼容(版本号仅记录,不做检查)
- 地图本身不保存(因为地图是固定的,只保存宗门总部位置)
- relations在Avatar中已转换为id映射避免循环引用
- 事件实时写入SQLiteJSON中的events字段仅用于旧存档迁移
"""
import json
from pathlib import Path
@@ -33,6 +35,7 @@ if TYPE_CHECKING:
from src.classes.sect import Sect
from src.utils.config import CONFIG
from src.sim.load.load_game import get_events_db_path
def save_game(
@@ -72,11 +75,17 @@ def save_game(
save_path = Path(save_path)
save_path.parent.mkdir(parents=True, exist_ok=True)
# 计算事件数据库路径。
events_db_path = get_events_db_path(save_path)
# 构建元信息
meta = {
"version": CONFIG.meta.version,
"save_time": datetime.now().isoformat(),
"game_time": f"{world.month_stamp.get_year()}{world.month_stamp.get_month().value}"
"game_time": f"{world.month_stamp.get_year()}{world.month_stamp.get_month().value}",
# SQLite 事件数据库信息。
"events_db": str(events_db_path.name),
"event_count": world.event_manager.count(),
}
# 构建世界数据

View File

@@ -10,9 +10,11 @@ from src.classes.calendar import Month, Year, create_month_stamp
from src.classes.avatar import Avatar, Gender
from src.classes.age import Age
from src.classes.cultivation import Realm
from src.classes.event import Event
from src.classes.event_storage import EventStorage
from src.sim.simulator import Simulator
from src.sim.save.save_game import save_game
from src.sim.load.load_game import load_game
from src.sim.load.load_game import load_game, get_events_db_path
from src.utils.id_generator import get_avatar_id
from src.utils.config import CONFIG
@@ -157,3 +159,549 @@ def test_save_load_with_relations(temp_save_dir):
assert l_av2 in l_av1.relations
assert l_av1.relations[l_av2] == Relation.FRIEND
# =============================================================================
# SQLite Event Storage Tests
# =============================================================================
def make_event(
year: int,
month: int,
content: str,
avatar_ids: list[str] | None = None,
is_major: bool = False,
is_story: bool = False,
) -> Event:
"""Helper to create an Event."""
month_stamp = create_month_stamp(Year(year), Month(month))
return Event(
month_stamp=month_stamp,
content=content,
related_avatars=avatar_ids,
is_major=is_major,
is_story=is_story,
)
class TestGetEventsDbPath:
"""Tests for get_events_db_path utility function."""
def test_json_to_db_path(self, tmp_path):
"""Test converting .json path to _events.db path."""
save_path = tmp_path / "save_20260105_1423.json"
db_path = get_events_db_path(save_path)
assert db_path.name == "save_20260105_1423_events.db"
assert db_path.parent == save_path.parent
def test_nested_path(self, tmp_path):
"""Test with nested directory structure."""
nested = tmp_path / "saves" / "slot1"
nested.mkdir(parents=True)
save_path = nested / "game.json"
db_path = get_events_db_path(save_path)
assert db_path == nested / "game_events.db"
class TestSaveLoadWithSQLiteEvents:
"""Tests for save/load cycle with SQLite event storage."""
def test_save_creates_events_db_metadata(self, temp_save_dir):
"""Test that save_game records events_db info in meta."""
game_map = create_test_map()
month_stamp = create_month_stamp(Year(100), Month.JANUARY)
events_db_path = temp_save_dir / "test_meta_events.db"
world = World.create_with_db(
map=game_map,
month_stamp=month_stamp,
events_db_path=events_db_path,
)
# Add some events.
world.event_manager.add_event(make_event(100, 1, "Event 1"))
world.event_manager.add_event(make_event(100, 2, "Event 2"))
sim = Simulator(world)
save_path = temp_save_dir / "test_meta.json"
success, _ = save_game(world, sim, [], save_path)
assert success
# Check meta contains events_db info.
with open(save_path, "r") as f:
data = json.load(f)
meta = data["meta"]
assert "events_db" in meta
assert "event_count" in meta
assert meta["event_count"] == 2
world.event_manager.close()
def test_load_uses_sqlite_events(self, temp_save_dir):
"""Test that load_game connects to the correct SQLite database."""
game_map = create_test_map()
month_stamp = create_month_stamp(Year(100), Month.JANUARY)
save_path = temp_save_dir / "test_sqlite.json"
events_db_path = get_events_db_path(save_path)
# Create world with SQLite storage.
world = World.create_with_db(
map=game_map,
month_stamp=month_stamp,
events_db_path=events_db_path,
)
# Add avatar and events.
avatar_id = get_avatar_id()
avatar = Avatar(
world=world,
name="TestAvatar",
id=avatar_id,
birth_month_stamp=create_month_stamp(Year(80), Month.JANUARY),
age=Age(20, Realm.Qi_Refinement),
gender=Gender.MALE,
)
world.avatar_manager.avatars[avatar.id] = avatar
# Add events - some related to avatar, some not.
world.event_manager.add_event(make_event(100, 1, "Avatar event 1", [avatar_id]))
world.event_manager.add_event(make_event(100, 2, "World event"))
world.event_manager.add_event(make_event(100, 3, "Avatar event 2", [avatar_id], is_major=True))
original_count = world.event_manager.count()
assert original_count == 3
# Save.
sim = Simulator(world)
success, _ = save_game(world, sim, [], save_path)
assert success
# Verify DB file exists.
assert events_db_path.exists()
# Close original world.
world.event_manager.close()
# Load.
with patch('src.run.load_map.load_cultivation_world_map', return_value=create_test_map()):
loaded_world, _, _ = load_game(save_path)
# Verify events are loaded from SQLite.
assert loaded_world.event_manager.count() == 3
# Verify event queries work.
avatar_events = loaded_world.event_manager.get_events_by_avatar(avatar_id)
assert len(avatar_events) == 2
major_events = loaded_world.event_manager.get_major_events_by_avatar(avatar_id)
assert len(major_events) == 1
assert major_events[0].content == "Avatar event 2"
loaded_world.event_manager.close()
def test_load_migrates_json_events_to_sqlite(self, temp_save_dir):
"""Test that loading an old save (with JSON events) migrates to SQLite."""
game_map = create_test_map()
month_stamp = create_month_stamp(Year(100), Month.JANUARY)
# Create a "legacy" save file with events in JSON (no _events.db).
save_path = temp_save_dir / "legacy_save.json"
events_db_path = get_events_db_path(save_path)
# Make sure no DB exists.
if events_db_path.exists():
events_db_path.unlink()
# Create legacy JSON save data.
legacy_events = [
make_event(100, 1, "Legacy event 1").to_dict(),
make_event(100, 2, "Legacy event 2").to_dict(),
make_event(100, 3, "Legacy event 3", is_major=True).to_dict(),
]
legacy_save_data = {
"meta": {"version": "1.0", "save_time": "2026-01-01", "game_time": "100年1月"},
"world": {"month_stamp": int(month_stamp), "existed_sect_ids": []},
"avatars": [],
"events": legacy_events,
"simulator": {"birth_rate": 0.1},
}
with open(save_path, "w") as f:
json.dump(legacy_save_data, f)
# Load - should migrate events to SQLite.
with patch('src.run.load_map.load_cultivation_world_map', return_value=create_test_map()):
loaded_world, _, _ = load_game(save_path)
# Verify events were migrated.
assert loaded_world.event_manager.count() == 3
# Verify DB file was created.
assert events_db_path.exists()
# Verify events are queryable.
events = loaded_world.event_manager.get_recent_events()
assert len(events) == 3
contents = [e.content for e in events]
assert "Legacy event 1" in contents
assert "Legacy event 2" in contents
assert "Legacy event 3" in contents
loaded_world.event_manager.close()
def test_load_does_not_duplicate_events_on_reload(self, temp_save_dir):
"""Test that reloading a save doesn't duplicate events."""
game_map = create_test_map()
month_stamp = create_month_stamp(Year(100), Month.JANUARY)
save_path = temp_save_dir / "test_no_dup.json"
events_db_path = get_events_db_path(save_path)
# Create and save.
world = World.create_with_db(
map=game_map,
month_stamp=month_stamp,
events_db_path=events_db_path,
)
world.event_manager.add_event(make_event(100, 1, "Event 1"))
world.event_manager.add_event(make_event(100, 2, "Event 2"))
sim = Simulator(world)
save_game(world, sim, [], save_path)
world.event_manager.close()
# Load twice.
for _ in range(2):
with patch('src.run.load_map.load_cultivation_world_map', return_value=create_test_map()):
loaded_world, _, _ = load_game(save_path)
# Should still be 2, not 4 or 6.
assert loaded_world.event_manager.count() == 2
loaded_world.event_manager.close()
def test_multiple_saves_have_separate_event_dbs(self, temp_save_dir):
"""Test that different saves use different event databases."""
game_map = create_test_map()
# Create first save.
save_path1 = temp_save_dir / "save1.json"
events_db_path1 = get_events_db_path(save_path1)
month_stamp1 = create_month_stamp(Year(100), Month.JANUARY)
world1 = World.create_with_db(
map=game_map,
month_stamp=month_stamp1,
events_db_path=events_db_path1,
)
world1.event_manager.add_event(make_event(100, 1, "Save1 Event 1"))
world1.event_manager.add_event(make_event(100, 2, "Save1 Event 2"))
sim1 = Simulator(world1)
save_game(world1, sim1, [], save_path1)
world1.event_manager.close()
# Create second save with different events.
save_path2 = temp_save_dir / "save2.json"
events_db_path2 = get_events_db_path(save_path2)
month_stamp2 = create_month_stamp(Year(200), Month.JUNE)
world2 = World.create_with_db(
map=game_map,
month_stamp=month_stamp2,
events_db_path=events_db_path2,
)
world2.event_manager.add_event(make_event(200, 6, "Save2 Event 1"))
world2.event_manager.add_event(make_event(200, 7, "Save2 Event 2"))
world2.event_manager.add_event(make_event(200, 8, "Save2 Event 3"))
sim2 = Simulator(world2)
save_game(world2, sim2, [], save_path2)
world2.event_manager.close()
# Load save1 and verify its events.
with patch('src.run.load_map.load_cultivation_world_map', return_value=create_test_map()):
loaded1, _, _ = load_game(save_path1)
assert loaded1.event_manager.count() == 2
events1 = loaded1.event_manager.get_recent_events()
assert all("Save1" in e.content for e in events1)
loaded1.event_manager.close()
# Load save2 and verify its events.
with patch('src.run.load_map.load_cultivation_world_map', return_value=create_test_map()):
loaded2, _, _ = load_game(save_path2)
assert loaded2.event_manager.count() == 3
events2 = loaded2.event_manager.get_recent_events()
assert all("Save2" in e.content for e in events2)
loaded2.event_manager.close()
class TestEventPaginationAfterLoad:
"""Tests for event pagination functionality after loading."""
def test_pagination_works_after_load(self, temp_save_dir):
"""Test that event pagination works correctly after loading a save."""
game_map = create_test_map()
month_stamp = create_month_stamp(Year(100), Month.JANUARY)
save_path = temp_save_dir / "test_pagination.json"
events_db_path = get_events_db_path(save_path)
world = World.create_with_db(
map=game_map,
month_stamp=month_stamp,
events_db_path=events_db_path,
)
# Add 25 events.
for i in range(25):
year = 100 + (i // 12)
month = (i % 12) + 1
world.event_manager.add_event(make_event(year, month, f"Event {i}"))
sim = Simulator(world)
save_game(world, sim, [], save_path)
world.event_manager.close()
# Load and test pagination.
with patch('src.run.load_map.load_cultivation_world_map', return_value=create_test_map()):
loaded_world, _, _ = load_game(save_path)
# First page.
page1, cursor1, has_more1 = loaded_world.event_manager.get_events_paginated(limit=10)
assert len(page1) == 10
assert has_more1 is True
assert cursor1 is not None
# Second page.
page2, cursor2, has_more2 = loaded_world.event_manager.get_events_paginated(limit=10, cursor=cursor1)
assert len(page2) == 10
assert has_more2 is True
# Third page (only 5 remaining).
page3, cursor3, has_more3 = loaded_world.event_manager.get_events_paginated(limit=10, cursor=cursor2)
assert len(page3) == 5
assert has_more3 is False
assert cursor3 is None
# Verify no duplicates.
all_ids = {e.id for e in page1} | {e.id for e in page2} | {e.id for e in page3}
assert len(all_ids) == 25
loaded_world.event_manager.close()
def test_avatar_filter_works_after_load(self, temp_save_dir):
"""Test that avatar filtering works correctly after loading."""
game_map = create_test_map()
month_stamp = create_month_stamp(Year(100), Month.JANUARY)
save_path = temp_save_dir / "test_filter.json"
events_db_path = get_events_db_path(save_path)
world = World.create_with_db(
map=game_map,
month_stamp=month_stamp,
events_db_path=events_db_path,
)
avatar1_id = get_avatar_id()
avatar2_id = get_avatar_id()
# Add events for different avatars.
world.event_manager.add_event(make_event(100, 1, "Avatar1 only", [avatar1_id]))
world.event_manager.add_event(make_event(100, 2, "Avatar2 only", [avatar2_id]))
world.event_manager.add_event(make_event(100, 3, "Both avatars", [avatar1_id, avatar2_id]))
world.event_manager.add_event(make_event(100, 4, "World event"))
sim = Simulator(world)
save_game(world, sim, [], save_path)
world.event_manager.close()
# Load and test filtering.
with patch('src.run.load_map.load_cultivation_world_map', return_value=create_test_map()):
loaded_world, _, _ = load_game(save_path)
# Filter by avatar1.
avatar1_events = loaded_world.event_manager.get_events_by_avatar(avatar1_id)
assert len(avatar1_events) == 2
contents = [e.content for e in avatar1_events]
assert "Avatar1 only" in contents
assert "Both avatars" in contents
# Filter by pair.
pair_events = loaded_world.event_manager.get_events_between(avatar1_id, avatar2_id)
assert len(pair_events) == 1
assert pair_events[0].content == "Both avatars"
# Paginated filter.
page, cursor, has_more = loaded_world.event_manager.get_events_paginated(avatar_id=avatar1_id)
assert len(page) == 2
loaded_world.event_manager.close()
class TestEventCleanupAfterLoad:
"""Tests for event cleanup functionality after loading."""
def test_cleanup_works_after_load(self, temp_save_dir):
"""Test that event cleanup works correctly after loading."""
game_map = create_test_map()
month_stamp = create_month_stamp(Year(100), Month.JANUARY)
save_path = temp_save_dir / "test_cleanup.json"
events_db_path = get_events_db_path(save_path)
world = World.create_with_db(
map=game_map,
month_stamp=month_stamp,
events_db_path=events_db_path,
)
# Add mix of major and minor events.
world.event_manager.add_event(make_event(100, 1, "Minor 1", is_major=False))
world.event_manager.add_event(make_event(100, 2, "Major 1", is_major=True))
world.event_manager.add_event(make_event(100, 3, "Minor 2", is_major=False))
world.event_manager.add_event(make_event(100, 4, "Major 2", is_major=True))
sim = Simulator(world)
save_game(world, sim, [], save_path)
world.event_manager.close()
# Load and cleanup.
with patch('src.run.load_map.load_cultivation_world_map', return_value=create_test_map()):
loaded_world, _, _ = load_game(save_path)
assert loaded_world.event_manager.count() == 4
# Cleanup minor events (keep major).
deleted = loaded_world.event_manager.cleanup(keep_major=True)
assert deleted == 2
assert loaded_world.event_manager.count() == 2
# Verify only major events remain.
events = loaded_world.event_manager.get_recent_events()
assert all(e.is_major for e in events)
loaded_world.event_manager.close()
class TestDatabaseSwitchingOnLoad:
"""Tests specifically for the database switching bug that was fixed."""
def test_load_switches_to_correct_database(self, temp_save_dir):
"""
Test that loading a different save properly switches the event database.
This tests the bug where loading a save would still query the old database.
"""
game_map = create_test_map()
# Create "current" session with some events.
current_db_path = temp_save_dir / "current_events.db"
current_world = World.create_with_db(
map=game_map,
month_stamp=create_month_stamp(Year(100), Month.JANUARY),
events_db_path=current_db_path,
)
current_world.event_manager.add_event(make_event(100, 1, "Current session event"))
current_event_count = current_world.event_manager.count()
assert current_event_count == 1
# Create a different save with more events.
other_save_path = temp_save_dir / "other_save.json"
other_db_path = get_events_db_path(other_save_path)
other_world = World.create_with_db(
map=game_map,
month_stamp=create_month_stamp(Year(200), Month.JUNE),
events_db_path=other_db_path,
)
# Add 10 events to other save.
for i in range(10):
other_world.event_manager.add_event(make_event(200, i % 12 + 1, f"Other save event {i}"))
sim = Simulator(other_world)
save_game(other_world, sim, [], other_save_path)
other_world.event_manager.close()
# Now "load" the other save (simulating what main.py does).
# Close current world's event manager first.
current_world.event_manager.close()
with patch('src.run.load_map.load_cultivation_world_map', return_value=create_test_map()):
loaded_world, _, _ = load_game(other_save_path)
# The loaded world should have 10 events, not 1.
assert loaded_world.event_manager.count() == 10
# Verify it's querying the correct database.
events = loaded_world.event_manager.get_recent_events()
assert all("Other save event" in e.content for e in events)
# Pagination should work.
page, cursor, has_more = loaded_world.event_manager.get_events_paginated(limit=5)
assert len(page) == 5
assert has_more is True
loaded_world.event_manager.close()
def test_events_persist_across_save_load_cycle(self, temp_save_dir):
"""Test that events added before save are available after load."""
game_map = create_test_map()
save_path = temp_save_dir / "persist_test.json"
events_db_path = get_events_db_path(save_path)
# Create world, add events, save.
world = World.create_with_db(
map=game_map,
month_stamp=create_month_stamp(Year(100), Month.JANUARY),
events_db_path=events_db_path,
)
avatar_id = get_avatar_id()
avatar = Avatar(
world=world,
name="TestAvatar",
id=avatar_id,
birth_month_stamp=create_month_stamp(Year(80), Month.JANUARY),
age=Age(20, Realm.Qi_Refinement),
gender=Gender.MALE,
)
world.avatar_manager.avatars[avatar.id] = avatar
# Add various events.
world.event_manager.add_event(make_event(100, 1, "Birth event", [avatar_id], is_major=True))
world.event_manager.add_event(make_event(100, 2, "Training event", [avatar_id]))
world.event_manager.add_event(make_event(100, 3, "World announcement"))
sim = Simulator(world)
save_game(world, sim, [], save_path)
# Get event count before closing.
original_count = world.event_manager.count()
original_avatar_events = len(world.event_manager.get_events_by_avatar(avatar_id))
world.event_manager.close()
# Load and verify.
with patch('src.run.load_map.load_cultivation_world_map', return_value=create_test_map()):
loaded_world, _, _ = load_game(save_path)
assert loaded_world.event_manager.count() == original_count
assert len(loaded_world.event_manager.get_events_by_avatar(avatar_id)) == original_avatar_events
# Check specific event content.
major_events = loaded_world.event_manager.get_major_events_by_avatar(avatar_id)
assert len(major_events) == 1
assert major_events[0].content == "Birth event"
loaded_world.event_manager.close()