feat: SQLite event storage with pagination and filtering

Implement SQLite-based event persistence as specified in sqlite-event-manager.md.

## Changes

### Backend
- **EventStorage** (`src/classes/event_storage.py`): New SQLite storage layer
  - Cursor-based pagination with compound cursor `{month_stamp}_{rowid}`
  - Avatar filtering (single and pair queries)
  - Major/minor event separation
  - Cleanup API with `keep_major` and `before_month_stamp` filters

- **EventManager** (`src/classes/event_manager.py`): Refactored to use SQLite
  - Delegates to EventStorage for persistence
  - Memory fallback mode for testing
  - New `get_events_paginated()` method

- **API** (`src/server/main.py`):
  - `GET /api/events` - Paginated event retrieval with filtering
  - `DELETE /api/events/cleanup` - User-triggered cleanup

### Frontend
- **EventPanel.vue**: Scroll-to-load pagination, dual-person filter UI
- **world.ts**: Event state management with pagination
- **game.ts**: New API client methods

### Testing
- 81 new tests for EventStorage, EventManager, and API
- Added `pytest-asyncio` and `httpx` to requirements.txt

## Known Issues: Save/Load is Currently Broken

After loading a saved game, the following issues occur:

1. **Wrong database used**: API returns events from the startup database instead
   of the loaded save's `_events.db` file
2. **Events from wrong time period**: Shows events from year 115 when loaded
   save is at year 114
3. **Pagination broken after load**: `has_more` returns `False` despite hundreds
   of events in the saved database
4. **Filter functionality broken**: Character selection filter stops working
   after loading a game

Root cause: `load_game.py` does not properly switch the EventManager's database
connection to the loaded save's events database.
This commit is contained in:
Zihao Xu
2026-01-07 00:40:34 -08:00
parent e4ff312f58
commit a1f08dd0ab
14 changed files with 2892 additions and 195 deletions

389
tests/test_api_events.py Normal file
View File

@@ -0,0 +1,389 @@
"""
Tests for the Events API endpoints.
Covers:
- GET /api/events - pagination and filtering
- DELETE /api/events/cleanup - event cleanup
Uses FastAPI TestClient to test the API directly.
"""
import pytest
import tempfile
from pathlib import Path
from unittest.mock import patch, MagicMock
from fastapi.testclient import TestClient
from src.classes.world import World
from src.classes.map import Map
from src.classes.tile import TileType
from src.classes.calendar import Month, Year, create_month_stamp
from src.classes.event import Event
from src.classes.event_storage import EventStorage
from src.classes.event_manager import EventManager
def create_test_map():
"""Create a simple 10x10 plain map for testing."""
m = Map(width=10, height=10)
for x in range(10):
for y in range(10):
m.create_tile(x, y, TileType.PLAIN)
return m
def make_event(
year: int,
month: int,
content: str,
avatar_ids: list[str] | None = None,
is_major: bool = False,
is_story: bool = False,
) -> Event:
"""Helper to create an Event."""
month_stamp = create_month_stamp(Year(year), Month(month))
return Event(
month_stamp=month_stamp,
content=content,
related_avatars=avatar_ids,
is_major=is_major,
is_story=is_story,
)
@pytest.fixture
def temp_db_path():
"""Create a temporary database file path."""
with tempfile.TemporaryDirectory() as tmpdir:
yield Path(tmpdir) / "test_events.db"
@pytest.fixture
def mock_world_with_events(temp_db_path):
"""Create a mock world with event manager."""
game_map = create_test_map()
month_stamp = create_month_stamp(Year(100), Month.JANUARY)
world = World.create_with_db(
map=game_map,
month_stamp=month_stamp,
events_db_path=temp_db_path,
)
# Add some test events
world.event_manager.add_event(make_event(100, 1, "Event 1", ["a1"]))
world.event_manager.add_event(make_event(100, 2, "Event 2", ["a2"]))
world.event_manager.add_event(make_event(100, 3, "Event between", ["a1", "a2"]))
world.event_manager.add_event(make_event(100, 4, "Major event", ["a1"], is_major=True))
world.event_manager.add_event(make_event(100, 5, "Story event", ["a1"], is_story=True))
yield world
world.event_manager.close()
@pytest.fixture
def client_with_world(mock_world_with_events):
"""Create a TestClient with mocked game_instance."""
# We need to patch the game_instance in main.py
from src.server import main
# Backup original
original_instance = main.game_instance.copy()
# Set up mock
main.game_instance["world"] = mock_world_with_events
main.game_instance["sim"] = MagicMock()
main.game_instance["is_paused"] = True
client = TestClient(main.app)
yield client
# Restore
main.game_instance.update(original_instance)
class TestGetEventsAPI:
"""Tests for GET /api/events endpoint."""
def test_get_events_returns_all(self, client_with_world):
"""Test getting all events without filters."""
response = client_with_world.get("/api/events")
assert response.status_code == 200
data = response.json()
assert "events" in data
assert "next_cursor" in data
assert "has_more" in data
assert len(data["events"]) == 5
assert data["has_more"] is False
def test_get_events_with_limit(self, client_with_world):
"""Test pagination with limit parameter."""
response = client_with_world.get("/api/events?limit=2")
assert response.status_code == 200
data = response.json()
assert len(data["events"]) == 2
assert data["has_more"] is True
assert data["next_cursor"] is not None
def test_get_events_pagination_cursor(self, client_with_world):
"""Test pagination with cursor."""
# First page
response1 = client_with_world.get("/api/events?limit=3")
data1 = response1.json()
cursor = data1["next_cursor"]
assert cursor is not None
# Second page
response2 = client_with_world.get(f"/api/events?limit=3&cursor={cursor}")
data2 = response2.json()
assert len(data2["events"]) == 2 # 5 total, 3 in first page
# No overlap in event IDs
ids1 = {e["id"] for e in data1["events"]}
ids2 = {e["id"] for e in data2["events"]}
assert ids1.isdisjoint(ids2)
def test_get_events_by_avatar(self, client_with_world):
"""Test filtering by single avatar."""
response = client_with_world.get("/api/events?avatar_id=a1")
assert response.status_code == 200
data = response.json()
# a1 has: Event 1, Event between, Major event, Story event
assert len(data["events"]) == 4
for event in data["events"]:
assert "a1" in event["related_avatar_ids"]
def test_get_events_by_avatar_pair(self, client_with_world):
"""Test filtering by avatar pair."""
response = client_with_world.get("/api/events?avatar_id_1=a1&avatar_id_2=a2")
assert response.status_code == 200
data = response.json()
# Only "Event between" involves both
assert len(data["events"]) == 1
assert data["events"][0]["content"] == "Event between"
def test_get_events_returns_correct_structure(self, client_with_world):
"""Test that events have correct structure."""
response = client_with_world.get("/api/events?limit=1")
assert response.status_code == 200
data = response.json()
assert len(data["events"]) == 1
event = data["events"][0]
# Check required fields
assert "id" in event
assert "text" in event
assert "content" in event
assert "year" in event
assert "month" in event
assert "month_stamp" in event
assert "related_avatar_ids" in event
assert "is_major" in event
assert "is_story" in event
def test_get_events_no_world(self):
"""Test API response when no world is loaded."""
from src.server import main
original = main.game_instance.copy()
main.game_instance["world"] = None
try:
client = TestClient(main.app)
response = client.get("/api/events")
assert response.status_code == 200
data = response.json()
assert data["events"] == []
assert data["next_cursor"] is None
assert data["has_more"] is False
finally:
main.game_instance.update(original)
class TestCleanupEventsAPI:
"""Tests for DELETE /api/events/cleanup endpoint."""
def test_cleanup_deletes_minor_events(self, client_with_world, mock_world_with_events):
"""Test that cleanup deletes minor events."""
initial_count = mock_world_with_events.event_manager.count()
response = client_with_world.delete("/api/events/cleanup")
assert response.status_code == 200
data = response.json()
# Should delete non-major events (4 of them)
assert data["deleted"] == 4
assert mock_world_with_events.event_manager.count() == 1
def test_cleanup_with_keep_major_false(self, client_with_world, mock_world_with_events):
"""Test cleanup with keep_major=false deletes all."""
response = client_with_world.delete("/api/events/cleanup?keep_major=false")
assert response.status_code == 200
data = response.json()
assert data["deleted"] == 5
assert mock_world_with_events.event_manager.count() == 0
def test_cleanup_with_before_month_stamp(self, client_with_world, mock_world_with_events):
"""Test cleanup with before_month_stamp filter."""
# Add an older event
old_event = make_event(50, 1, "Old event", is_major=False)
mock_world_with_events.event_manager.add_event(old_event)
before_stamp = int(create_month_stamp(Year(99), Month.JANUARY))
response = client_with_world.delete(
f"/api/events/cleanup?keep_major=false&before_month_stamp={before_stamp}"
)
assert response.status_code == 200
data = response.json()
# Only the old event should be deleted
assert data["deleted"] == 1
assert mock_world_with_events.event_manager.count() == 5
def test_cleanup_no_world(self):
"""Test cleanup response when no world is loaded."""
from src.server import main
original = main.game_instance.copy()
main.game_instance["world"] = None
try:
client = TestClient(main.app)
response = client.delete("/api/events/cleanup")
assert response.status_code == 200
data = response.json()
assert data["deleted"] == 0
assert "error" in data
finally:
main.game_instance.update(original)
class TestEventsPaginationIntegration:
"""Integration tests for events pagination."""
def test_full_pagination_cycle(self, temp_db_path):
"""Test complete pagination through many events."""
from src.server import main
# Create world with many events
game_map = create_test_map()
month_stamp = create_month_stamp(Year(100), Month.JANUARY)
world = World.create_with_db(
map=game_map,
month_stamp=month_stamp,
events_db_path=temp_db_path,
)
# Add 50 events
for i in range(50):
world.event_manager.add_event(
make_event(100 + (i // 12), (i % 12) + 1, f"Event {i}", ["a1"])
)
original = main.game_instance.copy()
main.game_instance["world"] = world
main.game_instance["sim"] = MagicMock()
try:
client = TestClient(main.app)
all_event_ids = set()
cursor = None
page_count = 0
while True:
url = "/api/events?limit=15"
if cursor:
url += f"&cursor={cursor}"
response = client.get(url)
assert response.status_code == 200
data = response.json()
for event in data["events"]:
assert event["id"] not in all_event_ids, "Duplicate event in pagination"
all_event_ids.add(event["id"])
page_count += 1
if not data["has_more"]:
break
cursor = data["next_cursor"]
# Should have gotten all 50 events
assert len(all_event_ids) == 50
# Should have taken 4 pages (15+15+15+5)
assert page_count == 4
finally:
world.event_manager.close()
main.game_instance.update(original)
def test_events_order_consistency(self, temp_db_path):
"""Test that events maintain consistent ordering across pages."""
from src.server import main
game_map = create_test_map()
month_stamp = create_month_stamp(Year(100), Month.JANUARY)
world = World.create_with_db(
map=game_map,
month_stamp=month_stamp,
events_db_path=temp_db_path,
)
# Add events with known order
for i in range(10):
world.event_manager.add_event(
make_event(100, i + 1, f"Event {i}")
)
original = main.game_instance.copy()
main.game_instance["world"] = world
main.game_instance["sim"] = MagicMock()
try:
client = TestClient(main.app)
# Get events in two pages
response1 = client.get("/api/events?limit=5")
response2 = client.get(f"/api/events?limit=5&cursor={response1.json()['next_cursor']}")
page1 = response1.json()["events"]
page2 = response2.json()["events"]
# Events should be in descending order (newest first)
all_events = page1 + page2
month_stamps = [e["month_stamp"] for e in all_events]
# Each month_stamp should be >= the next (descending order)
for i in range(len(month_stamps) - 1):
assert month_stamps[i] >= month_stamps[i + 1]
finally:
world.event_manager.close()
main.game_instance.update(original)

700
tests/test_event_storage.py Normal file
View File

@@ -0,0 +1,700 @@
"""
Tests for EventStorage and EventManager.
Covers:
- EventStorage: add_event, get_events, pagination, cursor handling, cleanup
- EventManager: all query methods, get_events_paginated
- Memory fallback mode
"""
import pytest
import tempfile
from pathlib import Path
from src.classes.event import Event, NULL_EVENT
from src.classes.event_storage import EventStorage
from src.classes.event_manager import EventManager
from src.classes.calendar import MonthStamp, Year, Month, create_month_stamp
# --- Fixtures ---
@pytest.fixture
def temp_db_path():
"""Create a temporary database file path."""
with tempfile.TemporaryDirectory() as tmpdir:
yield Path(tmpdir) / "test_events.db"
@pytest.fixture
def event_storage(temp_db_path):
"""Create an EventStorage instance with a temporary database."""
storage = EventStorage(temp_db_path)
yield storage
storage.close()
@pytest.fixture
def event_manager(temp_db_path):
"""Create an EventManager with SQLite storage."""
manager = EventManager.create_with_db(temp_db_path)
yield manager
manager.close()
@pytest.fixture
def memory_event_manager():
"""Create an EventManager in memory mode (no SQLite)."""
return EventManager.create_in_memory()
def make_event(
year: int,
month: int,
content: str,
avatar_ids: list[str] | None = None,
is_major: bool = False,
is_story: bool = False,
event_id: str | None = None,
) -> Event:
"""Helper to create an Event with the given parameters."""
month_stamp = create_month_stamp(Year(year), Month(month))
kwargs = {
"month_stamp": month_stamp,
"content": content,
"related_avatars": avatar_ids,
"is_major": is_major,
"is_story": is_story,
}
if event_id is not None:
kwargs["id"] = event_id
return Event(**kwargs)
# --- EventStorage Tests ---
class TestEventStorageBasic:
"""Basic EventStorage functionality tests."""
def test_init_creates_tables(self, temp_db_path):
"""Test that EventStorage creates necessary tables on init."""
storage = EventStorage(temp_db_path)
assert storage._conn is not None
# Verify tables exist
cursor = storage._conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name IN ('events', 'event_avatars')"
)
tables = [row[0] for row in cursor.fetchall()]
assert "events" in tables
assert "event_avatars" in tables
storage.close()
def test_add_event_success(self, event_storage):
"""Test adding a single event."""
event = make_event(100, 5, "Test event content", ["avatar_1", "avatar_2"])
result = event_storage.add_event(event)
assert result is True
assert event_storage.count() == 1
def test_add_event_duplicate_ignored(self, event_storage):
"""Test that duplicate events (same ID) are ignored."""
event = make_event(100, 5, "Original content", event_id="fixed-id")
event_storage.add_event(event)
# Try to add with same ID but different content
duplicate = make_event(100, 5, "Different content", event_id="fixed-id")
result = event_storage.add_event(duplicate)
assert result is True # INSERT OR IGNORE doesn't fail
assert event_storage.count() == 1
def test_add_event_without_avatars(self, event_storage):
"""Test adding an event without related avatars."""
event = make_event(100, 5, "World event", avatar_ids=None)
result = event_storage.add_event(event)
assert result is True
assert event_storage.count() == 1
def test_count(self, event_storage):
"""Test event counting."""
assert event_storage.count() == 0
event_storage.add_event(make_event(100, 1, "Event 1"))
assert event_storage.count() == 1
event_storage.add_event(make_event(100, 2, "Event 2"))
assert event_storage.count() == 2
class TestEventStorageQueries:
"""EventStorage query functionality tests."""
def test_get_events_empty_db(self, event_storage):
"""Test querying an empty database."""
events, cursor = event_storage.get_events()
assert events == []
assert cursor is None
def test_get_events_all(self, event_storage):
"""Test getting all events (no filter)."""
event_storage.add_event(make_event(100, 1, "Event 1", ["a1"]))
event_storage.add_event(make_event(100, 2, "Event 2", ["a2"]))
event_storage.add_event(make_event(100, 3, "Event 3", ["a1", "a2"]))
events, cursor = event_storage.get_events()
assert len(events) == 3
# Events returned in descending order (newest first)
assert events[0].content == "Event 3"
assert events[1].content == "Event 2"
assert events[2].content == "Event 1"
def test_get_events_by_avatar(self, event_storage):
"""Test filtering events by single avatar."""
event_storage.add_event(make_event(100, 1, "Event A1 only", ["a1"]))
event_storage.add_event(make_event(100, 2, "Event A2 only", ["a2"]))
event_storage.add_event(make_event(100, 3, "Event both", ["a1", "a2"]))
events, _ = event_storage.get_events(avatar_id="a1")
assert len(events) == 2
contents = [e.content for e in events]
assert "Event A1 only" in contents
assert "Event both" in contents
assert "Event A2 only" not in contents
def test_get_events_by_avatar_pair(self, event_storage):
"""Test filtering events by avatar pair."""
event_storage.add_event(make_event(100, 1, "Event A1 only", ["a1"]))
event_storage.add_event(make_event(100, 2, "Event A2 only", ["a2"]))
event_storage.add_event(make_event(100, 3, "Event A1+A2", ["a1", "a2"]))
event_storage.add_event(make_event(100, 4, "Event A1+A3", ["a1", "a3"]))
events, _ = event_storage.get_events(avatar_id_pair=("a1", "a2"))
assert len(events) == 1
assert events[0].content == "Event A1+A2"
def test_get_events_by_avatar_returns_related_avatars(self, event_storage):
"""Test that related_avatars are correctly returned."""
event_storage.add_event(make_event(100, 1, "Multi avatar", ["a1", "a2", "a3"]))
events, _ = event_storage.get_events(avatar_id="a1")
assert len(events) == 1
assert set(events[0].related_avatars) == {"a1", "a2", "a3"}
class TestEventStoragePagination:
"""EventStorage pagination tests."""
def test_pagination_limit(self, event_storage):
"""Test that limit parameter works."""
for i in range(10):
event_storage.add_event(make_event(100, i + 1, f"Event {i}"))
events, cursor = event_storage.get_events(limit=5)
assert len(events) == 5
assert cursor is not None # Has more
def test_pagination_cursor_format(self, event_storage):
"""Test cursor format is {month_stamp}_{rowid}."""
for i in range(10):
event_storage.add_event(make_event(100, i + 1, f"Event {i}"))
_, cursor = event_storage.get_events(limit=5)
assert cursor is not None
parts = cursor.split("_")
assert len(parts) == 2
# Both parts should be integers
assert parts[0].isdigit()
assert parts[1].isdigit()
def test_pagination_cursor_continues(self, event_storage):
"""Test that using cursor returns next page."""
for i in range(10):
event_storage.add_event(make_event(100, i + 1, f"Event {i}"))
# First page
page1, cursor1 = event_storage.get_events(limit=5)
assert len(page1) == 5
assert cursor1 is not None # More events exist
# Second page
page2, cursor2 = event_storage.get_events(limit=5, cursor=cursor1)
assert len(page2) == 5
# No overlap between pages
page1_ids = {e.id for e in page1}
page2_ids = {e.id for e in page2}
assert page1_ids.isdisjoint(page2_ids)
# cursor2 is None because all 10 events have been returned
assert cursor2 is None
# All 10 unique events were returned across both pages
all_ids = page1_ids | page2_ids
assert len(all_ids) == 10
def test_pagination_no_more_events(self, event_storage):
"""Test that cursor is None when no more events."""
for i in range(3):
event_storage.add_event(make_event(100, i + 1, f"Event {i}"))
events, cursor = event_storage.get_events(limit=10)
assert len(events) == 3
assert cursor is None # No more
def test_pagination_with_filter(self, event_storage):
"""Test pagination combined with avatar filter."""
for i in range(10):
avatar_id = "a1" if i % 2 == 0 else "a2"
event_storage.add_event(make_event(100, i + 1, f"Event {i}", [avatar_id]))
# Get a1's events (5 total)
page1, cursor = event_storage.get_events(avatar_id="a1", limit=3)
assert len(page1) == 3
page2, _ = event_storage.get_events(avatar_id="a1", limit=3, cursor=cursor)
assert len(page2) == 2 # Only 2 remaining
class TestEventStorageHelperMethods:
"""Tests for helper query methods."""
def test_get_events_by_avatar_method(self, event_storage):
"""Test get_events_by_avatar returns in chronological order."""
event_storage.add_event(make_event(100, 1, "First", ["a1"]))
event_storage.add_event(make_event(100, 6, "Second", ["a1"]))
event_storage.add_event(make_event(101, 1, "Third", ["a1"]))
events = event_storage.get_events_by_avatar("a1")
# Should be in chronological order (oldest first)
assert events[0].content == "First"
assert events[1].content == "Second"
assert events[2].content == "Third"
def test_get_events_between_method(self, event_storage):
"""Test get_events_between returns in chronological order."""
event_storage.add_event(make_event(100, 1, "First pair", ["a1", "a2"]))
event_storage.add_event(make_event(100, 6, "Second pair", ["a1", "a2"]))
event_storage.add_event(make_event(100, 3, "A1 only", ["a1"]))
events = event_storage.get_events_between("a1", "a2")
assert len(events) == 2
# Chronological order
assert events[0].content == "First pair"
assert events[1].content == "Second pair"
def test_get_major_events_by_avatar(self, event_storage):
"""Test getting only major events for an avatar."""
event_storage.add_event(make_event(100, 1, "Minor 1", ["a1"], is_major=False))
event_storage.add_event(make_event(100, 2, "Major 1", ["a1"], is_major=True))
event_storage.add_event(make_event(100, 3, "Story", ["a1"], is_major=True, is_story=True))
event_storage.add_event(make_event(100, 4, "Major 2", ["a1"], is_major=True))
events = event_storage.get_major_events_by_avatar("a1")
# Should only include major non-story events
assert len(events) == 2
contents = [e.content for e in events]
assert "Major 1" in contents
assert "Major 2" in contents
assert "Story" not in contents
assert "Minor 1" not in contents
def test_get_minor_events_by_avatar(self, event_storage):
"""Test getting minor events (including stories) for an avatar."""
event_storage.add_event(make_event(100, 1, "Minor 1", ["a1"], is_major=False))
event_storage.add_event(make_event(100, 2, "Major 1", ["a1"], is_major=True))
event_storage.add_event(make_event(100, 3, "Story", ["a1"], is_major=True, is_story=True))
events = event_storage.get_minor_events_by_avatar("a1")
# Should include minor and story events
assert len(events) == 2
contents = [e.content for e in events]
assert "Minor 1" in contents
assert "Story" in contents
assert "Major 1" not in contents
def test_get_recent_events(self, event_storage):
"""Test get_recent_events returns in chronological order."""
event_storage.add_event(make_event(100, 1, "First"))
event_storage.add_event(make_event(100, 6, "Second"))
event_storage.add_event(make_event(101, 1, "Third"))
events = event_storage.get_recent_events()
# Should be chronological (oldest first)
assert events[0].content == "First"
assert events[1].content == "Second"
assert events[2].content == "Third"
class TestEventStorageCleanup:
"""Tests for event cleanup functionality."""
def test_cleanup_keeps_major_by_default(self, event_storage):
"""Test that cleanup keeps major events by default."""
event_storage.add_event(make_event(100, 1, "Minor", is_major=False))
event_storage.add_event(make_event(100, 2, "Major", is_major=True))
deleted = event_storage.cleanup()
assert deleted == 1
assert event_storage.count() == 1
events = event_storage.get_recent_events()
assert events[0].content == "Major"
def test_cleanup_deletes_all_when_keep_major_false(self, event_storage):
"""Test cleanup with keep_major=False."""
event_storage.add_event(make_event(100, 1, "Minor", is_major=False))
event_storage.add_event(make_event(100, 2, "Major", is_major=True))
deleted = event_storage.cleanup(keep_major=False)
assert deleted == 2
assert event_storage.count() == 0
def test_cleanup_before_month_stamp(self, event_storage):
"""Test cleanup with before_month_stamp filter."""
event_storage.add_event(make_event(100, 1, "Old", is_major=False))
event_storage.add_event(make_event(200, 1, "New", is_major=False))
# Delete events before year 150
before_stamp = int(create_month_stamp(Year(150), Month.JANUARY))
deleted = event_storage.cleanup(keep_major=False, before_month_stamp=before_stamp)
assert deleted == 1
assert event_storage.count() == 1
events = event_storage.get_recent_events()
assert events[0].content == "New"
class TestEventStorageCursorParsing:
"""Tests for cursor parsing edge cases."""
def test_parse_cursor_valid(self, event_storage):
"""Test parsing a valid cursor."""
month_stamp, rowid = event_storage._parse_cursor("1200_42")
assert month_stamp == 1200
assert rowid == 42
def test_parse_cursor_invalid_format(self, event_storage):
"""Test parsing an invalid cursor raises ValueError."""
with pytest.raises(ValueError):
event_storage._parse_cursor("invalid")
def test_make_cursor(self, event_storage):
"""Test cursor generation."""
cursor = event_storage._make_cursor(1200, 42)
assert cursor == "1200_42"
# --- EventManager Tests ---
class TestEventManagerWithStorage:
"""EventManager tests with SQLite storage."""
def test_add_event(self, event_manager):
"""Test adding events through EventManager."""
event = make_event(100, 5, "Test event", ["a1"])
event_manager.add_event(event)
assert event_manager.count() == 1
def test_add_null_event_ignored(self, event_manager):
"""Test that NULL_EVENT is ignored."""
event_manager.add_event(NULL_EVENT)
assert event_manager.count() == 0
def test_get_recent_events(self, event_manager):
"""Test getting recent events."""
event_manager.add_event(make_event(100, 1, "First", ["a1"]))
event_manager.add_event(make_event(100, 6, "Second", ["a1"]))
events = event_manager.get_recent_events()
assert len(events) == 2
# Chronological order
assert events[0].content == "First"
assert events[1].content == "Second"
def test_get_events_by_avatar(self, event_manager):
"""Test getting events by avatar."""
event_manager.add_event(make_event(100, 1, "A1 event", ["a1"]))
event_manager.add_event(make_event(100, 2, "A2 event", ["a2"]))
events = event_manager.get_events_by_avatar("a1")
assert len(events) == 1
assert events[0].content == "A1 event"
def test_get_events_between(self, event_manager):
"""Test getting events between two avatars."""
event_manager.add_event(make_event(100, 1, "A1 only", ["a1"]))
event_manager.add_event(make_event(100, 2, "A1+A2", ["a1", "a2"]))
events = event_manager.get_events_between("a1", "a2")
assert len(events) == 1
assert events[0].content == "A1+A2"
def test_get_major_events_by_avatar(self, event_manager):
"""Test getting major events for an avatar."""
event_manager.add_event(make_event(100, 1, "Minor", ["a1"], is_major=False))
event_manager.add_event(make_event(100, 2, "Major", ["a1"], is_major=True))
events = event_manager.get_major_events_by_avatar("a1")
assert len(events) == 1
assert events[0].content == "Major"
def test_get_minor_events_by_avatar(self, event_manager):
"""Test getting minor events for an avatar."""
event_manager.add_event(make_event(100, 1, "Minor", ["a1"], is_major=False))
event_manager.add_event(make_event(100, 2, "Major", ["a1"], is_major=True))
events = event_manager.get_minor_events_by_avatar("a1")
assert len(events) == 1
assert events[0].content == "Minor"
def test_get_major_events_between(self, event_manager):
"""Test getting major events between two avatars."""
event_manager.add_event(make_event(100, 1, "Minor pair", ["a1", "a2"], is_major=False))
event_manager.add_event(make_event(100, 2, "Major pair", ["a1", "a2"], is_major=True))
events = event_manager.get_major_events_between("a1", "a2")
assert len(events) == 1
assert events[0].content == "Major pair"
def test_get_minor_events_between(self, event_manager):
"""Test getting minor events between two avatars."""
event_manager.add_event(make_event(100, 1, "Minor pair", ["a1", "a2"], is_major=False))
event_manager.add_event(make_event(100, 2, "Major pair", ["a1", "a2"], is_major=True))
events = event_manager.get_minor_events_between("a1", "a2")
assert len(events) == 1
assert events[0].content == "Minor pair"
class TestEventManagerPagination:
"""EventManager pagination tests."""
def test_get_events_paginated_basic(self, event_manager):
"""Test basic pagination through EventManager."""
for i in range(10):
event_manager.add_event(make_event(100, i + 1, f"Event {i}"))
events, cursor, has_more = event_manager.get_events_paginated(limit=5)
assert len(events) == 5
assert cursor is not None
assert has_more is True
def test_get_events_paginated_with_filter(self, event_manager):
"""Test paginated query with avatar filter."""
for i in range(10):
avatar = "a1" if i % 2 == 0 else "a2"
event_manager.add_event(make_event(100, i + 1, f"Event {i}", [avatar]))
events, cursor, has_more = event_manager.get_events_paginated(avatar_id="a1", limit=3)
assert len(events) == 3
assert has_more is True
for e in events:
assert "a1" in e.related_avatars
def test_get_events_paginated_with_pair_filter(self, event_manager):
"""Test paginated query with avatar pair filter."""
event_manager.add_event(make_event(100, 1, "A1 only", ["a1"]))
event_manager.add_event(make_event(100, 2, "A1+A2", ["a1", "a2"]))
event_manager.add_event(make_event(100, 3, "A2 only", ["a2"]))
events, _, _ = event_manager.get_events_paginated(avatar_id_pair=("a1", "a2"))
assert len(events) == 1
assert events[0].content == "A1+A2"
def test_get_events_paginated_no_more(self, event_manager):
"""Test pagination when there are no more events."""
event_manager.add_event(make_event(100, 1, "Event 1"))
event_manager.add_event(make_event(100, 2, "Event 2"))
events, cursor, has_more = event_manager.get_events_paginated(limit=10)
assert len(events) == 2
assert cursor is None
assert has_more is False
class TestEventManagerMemoryMode:
"""EventManager tests in memory fallback mode."""
def test_add_and_get_events(self, memory_event_manager):
"""Test basic operations in memory mode."""
memory_event_manager.add_event(make_event(100, 1, "Event 1", ["a1"]))
memory_event_manager.add_event(make_event(100, 2, "Event 2", ["a2"]))
events = memory_event_manager.get_recent_events()
assert len(events) == 2
def test_get_events_by_avatar_memory(self, memory_event_manager):
"""Test avatar filtering in memory mode."""
memory_event_manager.add_event(make_event(100, 1, "A1 event", ["a1"]))
memory_event_manager.add_event(make_event(100, 2, "A2 event", ["a2"]))
events = memory_event_manager.get_events_by_avatar("a1")
assert len(events) == 1
assert events[0].content == "A1 event"
def test_get_events_between_memory(self, memory_event_manager):
"""Test pair filtering in memory mode."""
memory_event_manager.add_event(make_event(100, 1, "A1 only", ["a1"]))
memory_event_manager.add_event(make_event(100, 2, "A1+A2", ["a1", "a2"]))
events = memory_event_manager.get_events_between("a1", "a2")
assert len(events) == 1
assert events[0].content == "A1+A2"
def test_get_major_events_memory(self, memory_event_manager):
"""Test major event filtering in memory mode."""
memory_event_manager.add_event(make_event(100, 1, "Minor", ["a1"], is_major=False))
memory_event_manager.add_event(make_event(100, 2, "Major", ["a1"], is_major=True))
events = memory_event_manager.get_major_events_by_avatar("a1")
assert len(events) == 1
assert events[0].content == "Major"
def test_get_minor_events_memory(self, memory_event_manager):
"""Test minor event filtering in memory mode."""
memory_event_manager.add_event(make_event(100, 1, "Minor", ["a1"], is_major=False))
memory_event_manager.add_event(make_event(100, 2, "Story", ["a1"], is_major=True, is_story=True))
memory_event_manager.add_event(make_event(100, 3, "Major", ["a1"], is_major=True))
events = memory_event_manager.get_minor_events_by_avatar("a1")
assert len(events) == 2
contents = [e.content for e in events]
assert "Minor" in contents
assert "Story" in contents
def test_pagination_memory_mode(self, memory_event_manager):
"""Test that pagination in memory mode returns all events without real pagination."""
for i in range(10):
memory_event_manager.add_event(make_event(100, i + 1, f"Event {i}"))
events, cursor, has_more = memory_event_manager.get_events_paginated(limit=5)
# Memory mode doesn't support real pagination
assert len(events) == 5 # Still respects limit
assert cursor is None
assert has_more is False
def test_cleanup_memory_mode(self, memory_event_manager):
"""Test cleanup in memory mode clears all events."""
memory_event_manager.add_event(make_event(100, 1, "Event 1"))
memory_event_manager.add_event(make_event(100, 2, "Event 2"))
deleted = memory_event_manager.cleanup()
assert deleted == 2
assert memory_event_manager.count() == 0
class TestEventManagerCleanup:
"""EventManager cleanup tests with SQLite storage."""
def test_cleanup_delegates_to_storage(self, event_manager):
"""Test that cleanup delegates to storage."""
event_manager.add_event(make_event(100, 1, "Minor", is_major=False))
event_manager.add_event(make_event(100, 2, "Major", is_major=True))
deleted = event_manager.cleanup()
assert deleted == 1
assert event_manager.count() == 1
# --- Edge Cases ---
class TestEdgeCases:
"""Tests for edge cases and error handling."""
def test_storage_closed_operations_fail_gracefully(self, temp_db_path):
"""Test that operations on closed storage fail gracefully."""
storage = EventStorage(temp_db_path)
storage.close()
# Should return False/empty rather than throwing
assert storage.add_event(make_event(100, 1, "Test")) is False
events, cursor = storage.get_events()
assert events == []
assert storage.count() == 0
def test_event_with_many_avatars(self, event_storage):
"""Test event with many related avatars."""
avatar_ids = [f"avatar_{i}" for i in range(20)]
event = make_event(100, 1, "Large group event", avatar_ids)
event_storage.add_event(event)
events, _ = event_storage.get_events()
assert len(events) == 1
assert set(events[0].related_avatars) == set(avatar_ids)
def test_empty_content(self, event_storage):
"""Test event with empty content."""
event = make_event(100, 1, "", ["a1"])
result = event_storage.add_event(event)
assert result is True
events, _ = event_storage.get_events()
assert events[0].content == ""
def test_special_characters_in_content(self, event_storage):
"""Test event with special characters in content."""
content = "测试中文 & 'quotes' \"double\" <tag> END"
event = make_event(100, 1, content, ["a1"])
event_storage.add_event(event)
events, _ = event_storage.get_events()
assert events[0].content == content
def test_same_month_stamp_ordering(self, event_storage):
"""Test that events with same month_stamp maintain insertion order."""
# Add multiple events in the same month
for i in range(5):
event_storage.add_event(make_event(100, 6, f"Event {i}"))
events, _ = event_storage.get_events()
# Should be in reverse insertion order (newest first)
assert events[0].content == "Event 4"
assert events[4].content == "Event 0"

View File

@@ -0,0 +1,489 @@
"""
Tests for save/load functionality with SQLite event storage.
Covers:
- Events persistence across save/load cycles
- Database file switching when loading different saves
- Event retrieval after loading
"""
import pytest
import tempfile
from pathlib import Path
from unittest.mock import patch, MagicMock
from src.classes.world import World
from src.classes.map import Map
from src.classes.tile import TileType
from src.classes.calendar import Month, Year, create_month_stamp, MonthStamp
from src.classes.avatar import Avatar, Gender
from src.classes.age import Age
from src.classes.cultivation import Realm
from src.classes.event import Event
from src.classes.event_storage import EventStorage
from src.classes.event_manager import EventManager
from src.sim.simulator import Simulator
from src.sim.save.save_game import save_game
from src.sim.load.load_game import load_game
from src.utils.id_generator import get_avatar_id
def create_test_map():
"""Create a simple 10x10 plain map for testing."""
m = Map(width=10, height=10)
for x in range(10):
for y in range(10):
m.create_tile(x, y, TileType.PLAIN)
return m
def make_event(
year: int,
month: int,
content: str,
avatar_ids: list[str] | None = None,
is_major: bool = False,
) -> Event:
"""Helper to create an Event."""
month_stamp = create_month_stamp(Year(year), Month(month))
return Event(
month_stamp=month_stamp,
content=content,
related_avatars=avatar_ids,
is_major=is_major,
)
def make_event_by_index(
index: int,
content: str,
avatar_ids: list[str] | None = None,
) -> Event:
"""Helper to create an Event from an index (handles year/month calculation)."""
year = 100 + (index // 12)
month = (index % 12) + 1
return make_event(year, month, content, avatar_ids)
@pytest.fixture
def temp_save_dir(tmp_path):
"""Create a temporary directory for saves."""
d = tmp_path / "saves"
d.mkdir()
return d
class TestEventManagerWithWorld:
"""Tests for EventManager integration with World."""
def test_world_creates_event_manager_with_db(self, tmp_path):
"""Test that World.create_with_db creates proper EventManager."""
db_path = tmp_path / "events.db"
game_map = create_test_map()
month_stamp = create_month_stamp(Year(100), Month.JANUARY)
world = World.create_with_db(
map=game_map,
month_stamp=month_stamp,
events_db_path=db_path,
)
# EventManager should be connected to SQLite
assert world.event_manager is not None
assert world.event_manager._storage is not None
assert db_path.exists()
# Clean up
world.event_manager.close()
def test_events_written_to_sqlite(self, tmp_path):
"""Test that events added to World are written to SQLite."""
db_path = tmp_path / "events.db"
game_map = create_test_map()
month_stamp = create_month_stamp(Year(100), Month.JANUARY)
world = World.create_with_db(
map=game_map,
month_stamp=month_stamp,
events_db_path=db_path,
)
# Add events
event1 = make_event(100, 1, "First event", ["a1"])
event2 = make_event(100, 2, "Second event", ["a2"])
world.event_manager.add_event(event1)
world.event_manager.add_event(event2)
# Verify in SQLite
assert world.event_manager.count() == 2
# Clean up and verify persistence
world.event_manager.close()
# Reopen and verify
storage = EventStorage(db_path)
assert storage.count() == 2
storage.close()
class TestSaveLoadWithEvents:
"""Tests for save/load cycle with SQLite events."""
def test_save_load_preserves_events(self, temp_save_dir, tmp_path):
"""Test that events are preserved across save/load cycle."""
# Setup world with SQLite events
db_path = tmp_path / "events.db"
game_map = create_test_map()
month_stamp = create_month_stamp(Year(100), Month.JANUARY)
world = World.create_with_db(
map=game_map,
month_stamp=month_stamp,
events_db_path=db_path,
)
# Create avatar
avatar_id = get_avatar_id()
avatar = Avatar(
world=world,
name="TestAvatar",
id=avatar_id,
birth_month_stamp=create_month_stamp(Year(80), Month.JANUARY),
age=Age(20, Realm.Qi_Refinement),
gender=Gender.MALE,
)
world.avatar_manager.avatars[avatar.id] = avatar
# Add events
for i in range(10):
event = make_event(
100, i + 1,
f"Event {i} for avatar",
[avatar_id],
is_major=(i % 3 == 0),
)
world.event_manager.add_event(event)
original_count = world.event_manager.count()
assert original_count == 10
# Save
sim = Simulator(world)
save_path = temp_save_dir / "test_events.json"
success, _ = save_game(world, sim, [], save_path)
assert success
# Close current event manager
world.event_manager.close()
# Load
with patch('src.run.load_map.load_cultivation_world_map', return_value=create_test_map()):
loaded_world, loaded_sim, _ = load_game(save_path)
# Verify events are accessible
# Note: After loading, the world should use a new EventManager
# connected to the loaded save's database
loaded_events = loaded_world.event_manager.get_recent_events()
# The exact behavior depends on implementation -
# if events DB path is derived from save path, they should be preserved
# This test may need adjustment based on actual load_game implementation
def test_events_filtered_by_avatar_after_load(self, temp_save_dir, tmp_path):
"""Test that avatar-specific event queries work after loading."""
db_path = tmp_path / "events.db"
game_map = create_test_map()
month_stamp = create_month_stamp(Year(100), Month.JANUARY)
world = World.create_with_db(
map=game_map,
month_stamp=month_stamp,
events_db_path=db_path,
)
# Create two avatars
avatar1_id = get_avatar_id()
avatar2_id = get_avatar_id()
avatar1 = Avatar(
world=world,
name="Avatar1",
id=avatar1_id,
birth_month_stamp=create_month_stamp(Year(80), Month.JANUARY),
age=Age(20, Realm.Qi_Refinement),
gender=Gender.MALE,
)
avatar2 = Avatar(
world=world,
name="Avatar2",
id=avatar2_id,
birth_month_stamp=create_month_stamp(Year(80), Month.JANUARY),
age=Age(20, Realm.Qi_Refinement),
gender=Gender.FEMALE,
)
world.avatar_manager.avatars[avatar1.id] = avatar1
world.avatar_manager.avatars[avatar2.id] = avatar2
# Add events for different avatars
world.event_manager.add_event(make_event(100, 1, "Avatar1 event", [avatar1_id]))
world.event_manager.add_event(make_event(100, 2, "Avatar2 event", [avatar2_id]))
world.event_manager.add_event(make_event(100, 3, "Both avatars", [avatar1_id, avatar2_id]))
# Query before save
avatar1_events = world.event_manager.get_events_by_avatar(avatar1_id)
assert len(avatar1_events) == 2 # "Avatar1 event" and "Both avatars"
between_events = world.event_manager.get_events_between(avatar1_id, avatar2_id)
assert len(between_events) == 1 # "Both avatars"
# Clean up
world.event_manager.close()
class TestEventPagination:
"""Tests for event pagination functionality."""
def test_pagination_returns_correct_pages(self, tmp_path):
"""Test that pagination returns events in correct order."""
db_path = tmp_path / "events.db"
storage = EventStorage(db_path)
manager = EventManager(storage)
# Add 25 events
for i in range(25):
year = 100 + (i // 12)
month = (i % 12) + 1
manager.add_event(make_event(year, month, f"Event {i}"))
# Get first page (10 items)
page1, cursor1, has_more1 = manager.get_events_paginated(limit=10)
assert len(page1) == 10
assert has_more1 is True
assert cursor1 is not None
# Events should be in descending order (newest first)
assert page1[0].content == "Event 24" # Newest
assert page1[9].content == "Event 15"
# Get second page
page2, cursor2, has_more2 = manager.get_events_paginated(limit=10, cursor=cursor1)
assert len(page2) == 10
assert has_more2 is True
# Get third page (only 5 remaining)
page3, cursor3, has_more3 = manager.get_events_paginated(limit=10, cursor=cursor2)
assert len(page3) == 5
assert has_more3 is False
assert cursor3 is None
# Verify no duplicates across pages
all_ids = {e.id for e in page1} | {e.id for e in page2} | {e.id for e in page3}
assert len(all_ids) == 25
manager.close()
def test_pagination_with_avatar_filter(self, tmp_path):
"""Test pagination with avatar filter."""
db_path = tmp_path / "events.db"
storage = EventStorage(db_path)
manager = EventManager(storage)
avatar1_id = "avatar_1"
avatar2_id = "avatar_2"
# Add events alternating between avatars
for i in range(20):
avatar_id = avatar1_id if i % 2 == 0 else avatar2_id
manager.add_event(make_event(100, (i % 12) + 1, f"Event {i}", [avatar_id]))
# Get avatar1's events (should be 10)
page1, cursor, has_more = manager.get_events_paginated(
avatar_id=avatar1_id,
limit=5
)
assert len(page1) == 5
assert has_more is True
# All events should be for avatar1
for e in page1:
assert avatar1_id in e.related_avatars
# Get remaining
page2, _, _ = manager.get_events_paginated(
avatar_id=avatar1_id,
limit=10,
cursor=cursor
)
assert len(page2) == 5
manager.close()
def test_pagination_cursor_format_stability(self, tmp_path):
"""Test that cursor format is stable and parseable."""
db_path = tmp_path / "events.db"
storage = EventStorage(db_path)
# Add some events
for i in range(5):
storage.add_event(make_event(100, i + 1, f"Event {i}"))
_, cursor = storage.get_events(limit=3)
# Cursor should be in format: month_stamp_rowid
assert cursor is not None
parts = cursor.split("_")
assert len(parts) == 2
assert parts[0].isdigit()
assert parts[1].isdigit()
# Cursor should be parseable
month_stamp, rowid = storage._parse_cursor(cursor)
assert isinstance(month_stamp, int)
assert isinstance(rowid, int)
storage.close()
class TestEventStorageEdgeCases:
"""Edge case tests for event storage."""
def test_concurrent_writes(self, tmp_path):
"""Test that concurrent writes don't corrupt data."""
db_path = tmp_path / "events.db"
storage = EventStorage(db_path)
# Simulate rapid writes (use make_event_by_index to handle month > 12)
events = [make_event_by_index(i, f"Event {i}") for i in range(100)]
for event in events:
result = storage.add_event(event)
assert result is True
assert storage.count() == 100
storage.close()
def test_large_event_content(self, tmp_path):
"""Test handling of large event content."""
db_path = tmp_path / "events.db"
storage = EventStorage(db_path)
# Create event with large content (10KB)
large_content = "测试内容" * 2500 # ~10KB of Chinese characters
event = make_event(100, 1, large_content, ["a1"])
result = storage.add_event(event)
assert result is True
events, _ = storage.get_events()
assert len(events) == 1
assert events[0].content == large_content
storage.close()
def test_special_characters_in_avatar_id(self, tmp_path):
"""Test handling of special characters in avatar IDs."""
db_path = tmp_path / "events.db"
storage = EventStorage(db_path)
# UUID-style IDs with hyphens
avatar_id = "550e8400-e29b-41d4-a716-446655440000"
event = make_event(100, 1, "Test event", [avatar_id])
storage.add_event(event)
events = storage.get_events_by_avatar(avatar_id)
assert len(events) == 1
assert avatar_id in events[0].related_avatars
storage.close()
def test_empty_database_queries(self, tmp_path):
"""Test queries on empty database return sensible results."""
db_path = tmp_path / "events.db"
storage = EventStorage(db_path)
# All queries should return empty lists, not errors
assert storage.get_events() == ([], None)
assert storage.get_events_by_avatar("nonexistent") == []
assert storage.get_events_between("a1", "a2") == []
assert storage.get_major_events_by_avatar("a1") == []
assert storage.get_minor_events_by_avatar("a1") == []
assert storage.get_recent_events() == []
assert storage.count() == 0
storage.close()
class TestEventManagerMemoryFallback:
"""Tests for EventManager memory fallback mode."""
def test_memory_mode_basic_operations(self):
"""Test that memory mode works for basic operations."""
manager = EventManager.create_in_memory()
manager.add_event(make_event(100, 1, "Event 1", ["a1"]))
manager.add_event(make_event(100, 2, "Event 2", ["a2"]))
assert manager.count() == 2
events = manager.get_recent_events()
assert len(events) == 2
a1_events = manager.get_events_by_avatar("a1")
assert len(a1_events) == 1
def test_memory_mode_cleanup(self):
"""Test that cleanup works in memory mode."""
manager = EventManager.create_in_memory()
manager.add_event(make_event(100, 1, "Event 1"))
manager.add_event(make_event(100, 2, "Event 2"))
deleted = manager.cleanup()
assert deleted == 2
assert manager.count() == 0
class TestEventStorageCleanup:
"""Tests for event cleanup functionality."""
def test_cleanup_with_time_filter(self, tmp_path):
"""Test cleanup with before_month_stamp filter."""
db_path = tmp_path / "events.db"
storage = EventStorage(db_path)
# Add events at different times
storage.add_event(make_event(50, 1, "Very old", is_major=False))
storage.add_event(make_event(100, 1, "Old", is_major=False))
storage.add_event(make_event(150, 1, "Recent", is_major=False))
# Delete events before year 100
cutoff = int(create_month_stamp(Year(100), Month.JANUARY))
deleted = storage.cleanup(keep_major=False, before_month_stamp=cutoff)
assert deleted == 1 # Only "Very old" deleted
assert storage.count() == 2
storage.close()
def test_cleanup_preserves_major_events(self, tmp_path):
"""Test that cleanup preserves major events by default."""
db_path = tmp_path / "events.db"
storage = EventStorage(db_path)
storage.add_event(make_event(100, 1, "Minor 1", is_major=False))
storage.add_event(make_event(100, 2, "Major 1", is_major=True))
storage.add_event(make_event(100, 3, "Minor 2", is_major=False))
deleted = storage.cleanup(keep_major=True)
assert deleted == 2
assert storage.count() == 1
events = storage.get_recent_events()
assert events[0].content == "Major 1"
storage.close()