This commit is contained in:
bridge
2025-09-24 22:05:08 +08:00
parent e5d8bf9bf4
commit 30f91d264c
8 changed files with 332 additions and 284 deletions

View File

@@ -14,7 +14,8 @@ from src.classes.cultivation import CultivationProgress
from src.classes.root import Root
from src.classes.age import Age
from src.classes.event import NULL_EVENT, Event
from src.classes.typings import ACTION_NAME, ACTION_PARAMS, ACTION_PAIR, ACTION_NAME_PARAMS_PAIRS, ACTION_NAME_PARAMS_PAIR
from src.classes.typings import ACTION_NAME, ACTION_PARAMS, ACTION_NAME_PARAMS_PAIRS, ACTION_NAME_PARAMS_PAIR
from src.classes.action_runtime import ActionPlan, ActionInstance
from src.classes.persona import Persona, personas_by_id, get_random_compatible_personas
from src.classes.item import Item
@@ -60,10 +61,10 @@ class Avatar:
root: Root = field(default_factory=lambda: random.choice(list(Root)))
personas: List[Persona] = field(default_factory=list)
cur_action_pair: Optional[ACTION_PAIR] = None
history_events: List[Event] = field(default_factory=list)
_pending_events: List[Event] = field(default_factory=list)
next_actions: ACTION_NAME_PARAMS_PAIRS = field(default_factory=list)
current_action: Optional[ActionInstance] = None
planned_actions: List[ActionPlan] = field(default_factory=list)
thinking: str = ""
objective: str = ""
magic_stone: MagicStone = field(default_factory=lambda: MagicStone(0)) # 灵石,即货币
@@ -130,88 +131,101 @@ class Avatar:
"""
if not action_name_params_pairs:
return
first_action_name, first_action_params = action_name_params_pairs[0]
action = self.create_action(first_action_name)
self.thinking = avatar_thinking
self.objective = objective
self.cur_action_pair = (action, first_action_params)
# 余下的动作进入队列
if len(action_name_params_pairs) > 1:
self.next_actions.extend(action_name_params_pairs[1:])
# 转为计划并入队(不立即提交,交由提交阶段统一触发开始事件)
plans: List[ActionPlan] = [ActionPlan(name, params) for name, params in action_name_params_pairs]
self.planned_actions.extend(plans)
def clear_next_actions(self) -> None:
"""
清空后续动作队列(不影响当前动作)。
"""
self.next_actions.clear()
def clear_plans(self) -> None:
self.planned_actions.clear()
def has_next_actions(self) -> bool:
return len(self.next_actions) > 0
def has_plans(self) -> bool:
return len(self.planned_actions) > 0
def pop_next_action_and_set_current(self) -> Optional[Event]:
def commit_next_plan(self) -> Optional[Event]:
"""
从队列中取出下一个动作并设置为当前动作,同时返回开始事件。
若队列为空则返回None。
提交下一个可启动的计划为当前动作返回开始事件(若有)
"""
if not self.next_actions:
if self.current_action is not None:
return None
action_name, action_params = self.next_actions.pop(0)
action = self.create_action(action_name)
while not action.is_doable and self.next_actions:
action_name, action_params = self.next_actions.pop(0)
action = self.create_action(action_name)
while self.planned_actions:
plan = self.planned_actions.pop(0)
action = self.create_action(plan.action_name)
# 再验证
can_start = True
try:
can_start = bool(action.can_start(**plan.params))
except TypeError:
can_start = bool(action.can_start())
if not can_start:
continue
# 启动
try:
start_event = action.start(**plan.params)
except TypeError:
start_event = action.start()
self.current_action = ActionInstance(action=action, params=plan.params, status="running")
return start_event
return None
if not action.is_doable:
def peek_next_plan(self) -> Optional[ActionPlan]:
if not self.planned_actions:
return None
return self.planned_actions[0]
self.cur_action_pair = (action, action_params)
async def tick_action(self) -> List[Event]:
"""
推进当前动作一步;返回过程中由动作内部产生的事件(通过 add_event 收集)。
"""
if self.current_action is None:
return []
action = self.current_action.action
params = self.current_action.params
try:
event = action.get_event(**action_params)
status, mid_events = action.step(**params)
except TypeError:
# 兼容无参数的 get_event 定义
event = action.get_event()
return event
def peek_next_action(self) -> Optional[ACTION_NAME_PARAMS_PAIR]:
"""
查看下一个动作但不弹出。
"""
if not self.next_actions:
return None
return self.next_actions[0]
def is_next_action_doable(self) -> bool:
"""
判断队列中的下一个动作当前是否可执行。
若没有下一个动作返回False。
"""
pair = self.peek_next_action()
if pair is None:
return False
action_name, _ = pair
action = self.create_action(action_name)
doable = action.is_doable
assert isinstance(doable, bool)
del action
return doable
async def act(self) -> List[Event]:
"""
角色执行动作。
注意这里只负责执行,不负责决定做什么动作。
事件只在决定动作时产生,执行过程不产生事件
"""
# 纯粹执行动作。具体事件由决定阶段或动作内部通过 add_event 添加
action, action_params = self.cur_action_pair
action.execute(**action_params)
if action.is_finished(**action_params):
# 完成后清空当前动作
self.cur_action_pair = None
# 返回并清空待派发事件
status, mid_events = action.step()
if status == "completed":
try:
finish_events = action.finish(**params)
except TypeError:
finish_events = action.finish()
self.current_action = None
if finish_events:
# 允许 finish 直接返回事件(极少用),统一并入 pending
for e in finish_events:
self._pending_events.append(e)
# 合并动作返回的事件(通常为空)
if mid_events:
for e in mid_events:
self._pending_events.append(e)
events, self._pending_events = self._pending_events, []
return events
def _commit_specific_plan(self, plan: ActionPlan) -> bool:
"""
尝试提交指定计划为当前动作,不返回事件(用于内部/立即触发)。
"""
if self.current_action is not None:
# 已有当前动作则直接入队
self.planned_actions.insert(0, plan)
return False
action = self.create_action(plan.action_name)
try:
can_start = bool(action.can_start(**plan.params))
except TypeError:
can_start = bool(action.can_start())
if not can_start:
# 无法启动则丢入计划队列等待
self.planned_actions.insert(0, plan)
return False
try:
_ = action.start(**plan.params)
except TypeError:
_ = action.start()
self.current_action = ActionInstance(action=action, params=plan.params, status="running")
return True
def update_cultivation(self, new_level: int):
"""
@@ -353,7 +367,15 @@ class Avatar:
获取动作空间
"""
actual_actions = [self.create_action(action_cls_name) for action_cls_name in ALL_ACTUAL_ACTION_NAMES]
doable_actions = [action for action in actual_actions if action.is_doable]
doable_actions: list[Action] = []
for action in actual_actions:
# 用 can_start 的无参形式,用于“是否在动作空间中显示”
try:
if action.can_start():
doable_actions.append(action)
except TypeError:
# 无参展示判定失败时,简单跳过(避免在空间中展示需要参数才能判定的动作)
continue
action_space = [action.name for action in doable_actions]
return action_space