feat: reset will cancel unprocessed messages

This commit is contained in:
lanvent
2023-04-04 14:57:38 +08:00
parent 6c7e4aaf37
commit 28eb67bc24
2 changed files with 21 additions and 4 deletions

View File

@@ -243,9 +243,9 @@ class ChatChannel(Channel):
session_id = context['session_id']
with self.lock:
if session_id not in self.sessions:
self.sessions[session_id] = (Dequeue(), threading.BoundedSemaphore(conf().get("concurrency_in_session", 1)))
self.sessions[session_id] = [Dequeue(), threading.BoundedSemaphore(conf().get("concurrency_in_session", 1))]
if context.type == ContextType.TEXT and context.content.startswith("#"):
self.sessions[session_id][0].putleft(context) # 优先处理命令
self.sessions[session_id][0].putleft(context) # 优先处理管理命令
else:
self.sessions[session_id][0].put(context)
@@ -273,12 +273,26 @@ class ChatChannel(Channel):
semaphore.release()
time.sleep(0.1)
def cancel(self, session_id):
# 取消session_id对应的所有任务只能取消排队的消息和已提交线程池但未执行的任务
def cancel_session(self, session_id):
with self.lock:
if session_id in self.sessions:
for future in self.futures[session_id]:
future.cancel()
self.sessions[session_id][0]=Dequeue()
cnt = self.sessions[session_id][0].qsize()
if cnt>0:
logger.info("Cancel {} messages in session {}".format(cnt, session_id))
self.sessions[session_id][0] = Dequeue()
def cancel_all_session(self):
with self.lock:
for session_id in self.sessions:
for future in self.futures[session_id]:
future.cancel()
cnt = self.sessions[session_id][0].qsize()
if cnt>0:
logger.info("Cancel {} messages in session {}".format(cnt, session_id))
self.sessions[session_id][0] = Dequeue()
def check_prefix(content, prefix_list):