mirror of
https://github.com/zhayujie/chatgpt-on-wechat.git
synced 2026-04-26 19:43:25 +08:00
feat: intergrate itchat to lib
This commit is contained in:
318
lib/itchat/storage/templates.py
Normal file
318
lib/itchat/storage/templates.py
Normal file
@@ -0,0 +1,318 @@
|
||||
import logging, copy, pickle
|
||||
from weakref import ref
|
||||
|
||||
from ..returnvalues import ReturnValue
|
||||
from ..utils import update_info_dict
|
||||
|
||||
logger = logging.getLogger('itchat')
|
||||
|
||||
class AttributeDict(dict):
|
||||
def __getattr__(self, value):
|
||||
keyName = value[0].upper() + value[1:]
|
||||
try:
|
||||
return self[keyName]
|
||||
except KeyError:
|
||||
raise AttributeError("'%s' object has no attribute '%s'" % (
|
||||
self.__class__.__name__.split('.')[-1], keyName))
|
||||
def get(self, v, d=None):
|
||||
try:
|
||||
return self[v]
|
||||
except KeyError:
|
||||
return d
|
||||
|
||||
class UnInitializedItchat(object):
|
||||
def _raise_error(self, *args, **kwargs):
|
||||
logger.warning('An itchat instance is called before initialized')
|
||||
def __getattr__(self, value):
|
||||
return self._raise_error
|
||||
|
||||
class ContactList(list):
|
||||
''' when a dict is append, init function will be called to format that dict '''
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(ContactList, self).__init__(*args, **kwargs)
|
||||
self.__setstate__(None)
|
||||
@property
|
||||
def core(self):
|
||||
return getattr(self, '_core', lambda: fakeItchat)() or fakeItchat
|
||||
@core.setter
|
||||
def core(self, value):
|
||||
self._core = ref(value)
|
||||
def set_default_value(self, initFunction=None, contactClass=None):
|
||||
if hasattr(initFunction, '__call__'):
|
||||
self.contactInitFn = initFunction
|
||||
if hasattr(contactClass, '__call__'):
|
||||
self.contactClass = contactClass
|
||||
def append(self, value):
|
||||
contact = self.contactClass(value)
|
||||
contact.core = self.core
|
||||
if self.contactInitFn is not None:
|
||||
contact = self.contactInitFn(self, contact) or contact
|
||||
super(ContactList, self).append(contact)
|
||||
def __deepcopy__(self, memo):
|
||||
r = self.__class__([copy.deepcopy(v) for v in self])
|
||||
r.contactInitFn = self.contactInitFn
|
||||
r.contactClass = self.contactClass
|
||||
r.core = self.core
|
||||
return r
|
||||
def __getstate__(self):
|
||||
return 1
|
||||
def __setstate__(self, state):
|
||||
self.contactInitFn = None
|
||||
self.contactClass = User
|
||||
def __str__(self):
|
||||
return '[%s]' % ', '.join([repr(v) for v in self])
|
||||
def __repr__(self):
|
||||
return '<%s: %s>' % (self.__class__.__name__.split('.')[-1],
|
||||
self.__str__())
|
||||
|
||||
class AbstractUserDict(AttributeDict):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(AbstractUserDict, self).__init__(*args, **kwargs)
|
||||
@property
|
||||
def core(self):
|
||||
return getattr(self, '_core', lambda: fakeItchat)() or fakeItchat
|
||||
@core.setter
|
||||
def core(self, value):
|
||||
self._core = ref(value)
|
||||
def update(self):
|
||||
return ReturnValue({'BaseResponse': {
|
||||
'Ret': -1006,
|
||||
'ErrMsg': '%s can not be updated' % \
|
||||
self.__class__.__name__, }, })
|
||||
def set_alias(self, alias):
|
||||
return ReturnValue({'BaseResponse': {
|
||||
'Ret': -1006,
|
||||
'ErrMsg': '%s can not set alias' % \
|
||||
self.__class__.__name__, }, })
|
||||
def set_pinned(self, isPinned=True):
|
||||
return ReturnValue({'BaseResponse': {
|
||||
'Ret': -1006,
|
||||
'ErrMsg': '%s can not be pinned' % \
|
||||
self.__class__.__name__, }, })
|
||||
def verify(self):
|
||||
return ReturnValue({'BaseResponse': {
|
||||
'Ret': -1006,
|
||||
'ErrMsg': '%s do not need verify' % \
|
||||
self.__class__.__name__, }, })
|
||||
def get_head_image(self, imageDir=None):
|
||||
return self.core.get_head_img(self.userName, picDir=imageDir)
|
||||
def delete_member(self, userName):
|
||||
return ReturnValue({'BaseResponse': {
|
||||
'Ret': -1006,
|
||||
'ErrMsg': '%s can not delete member' % \
|
||||
self.__class__.__name__, }, })
|
||||
def add_member(self, userName):
|
||||
return ReturnValue({'BaseResponse': {
|
||||
'Ret': -1006,
|
||||
'ErrMsg': '%s can not add member' % \
|
||||
self.__class__.__name__, }, })
|
||||
def send_raw_msg(self, msgType, content):
|
||||
return self.core.send_raw_msg(msgType, content, self.userName)
|
||||
def send_msg(self, msg='Test Message'):
|
||||
return self.core.send_msg(msg, self.userName)
|
||||
def send_file(self, fileDir, mediaId=None):
|
||||
return self.core.send_file(fileDir, self.userName, mediaId)
|
||||
def send_image(self, fileDir, mediaId=None):
|
||||
return self.core.send_image(fileDir, self.userName, mediaId)
|
||||
def send_video(self, fileDir=None, mediaId=None):
|
||||
return self.core.send_video(fileDir, self.userName, mediaId)
|
||||
def send(self, msg, mediaId=None):
|
||||
return self.core.send(msg, self.userName, mediaId)
|
||||
def search_member(self, name=None, userName=None, remarkName=None, nickName=None,
|
||||
wechatAccount=None):
|
||||
return ReturnValue({'BaseResponse': {
|
||||
'Ret': -1006,
|
||||
'ErrMsg': '%s do not have members' % \
|
||||
self.__class__.__name__, }, })
|
||||
def __deepcopy__(self, memo):
|
||||
r = self.__class__()
|
||||
for k, v in self.items():
|
||||
r[copy.deepcopy(k)] = copy.deepcopy(v)
|
||||
r.core = self.core
|
||||
return r
|
||||
def __str__(self):
|
||||
return '{%s}' % ', '.join(
|
||||
['%s: %s' % (repr(k),repr(v)) for k,v in self.items()])
|
||||
def __repr__(self):
|
||||
return '<%s: %s>' % (self.__class__.__name__.split('.')[-1],
|
||||
self.__str__())
|
||||
def __getstate__(self):
|
||||
return 1
|
||||
def __setstate__(self, state):
|
||||
pass
|
||||
|
||||
class User(AbstractUserDict):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(User, self).__init__(*args, **kwargs)
|
||||
self.__setstate__(None)
|
||||
def update(self):
|
||||
r = self.core.update_friend(self.userName)
|
||||
if r:
|
||||
update_info_dict(self, r)
|
||||
return r
|
||||
def set_alias(self, alias):
|
||||
return self.core.set_alias(self.userName, alias)
|
||||
def set_pinned(self, isPinned=True):
|
||||
return self.core.set_pinned(self.userName, isPinned)
|
||||
def verify(self):
|
||||
return self.core.add_friend(**self.verifyDict)
|
||||
def __deepcopy__(self, memo):
|
||||
r = super(User, self).__deepcopy__(memo)
|
||||
r.verifyDict = copy.deepcopy(self.verifyDict)
|
||||
return r
|
||||
def __setstate__(self, state):
|
||||
super(User, self).__setstate__(state)
|
||||
self.verifyDict = {}
|
||||
self['MemberList'] = fakeContactList
|
||||
|
||||
class MassivePlatform(AbstractUserDict):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(MassivePlatform, self).__init__(*args, **kwargs)
|
||||
self.__setstate__(None)
|
||||
def __setstate__(self, state):
|
||||
super(MassivePlatform, self).__setstate__(state)
|
||||
self['MemberList'] = fakeContactList
|
||||
|
||||
class Chatroom(AbstractUserDict):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(Chatroom, self).__init__(*args, **kwargs)
|
||||
memberList = ContactList()
|
||||
userName = self.get('UserName', '')
|
||||
refSelf = ref(self)
|
||||
def init_fn(parentList, d):
|
||||
d.chatroom = refSelf() or \
|
||||
parentList.core.search_chatrooms(userName=userName)
|
||||
memberList.set_default_value(init_fn, ChatroomMember)
|
||||
if 'MemberList' in self:
|
||||
for member in self.memberList:
|
||||
memberList.append(member)
|
||||
self['MemberList'] = memberList
|
||||
@property
|
||||
def core(self):
|
||||
return getattr(self, '_core', lambda: fakeItchat)() or fakeItchat
|
||||
@core.setter
|
||||
def core(self, value):
|
||||
self._core = ref(value)
|
||||
self.memberList.core = value
|
||||
for member in self.memberList:
|
||||
member.core = value
|
||||
def update(self, detailedMember=False):
|
||||
r = self.core.update_chatroom(self.userName, detailedMember)
|
||||
if r:
|
||||
update_info_dict(self, r)
|
||||
self['MemberList'] = r['MemberList']
|
||||
return r
|
||||
def set_alias(self, alias):
|
||||
return self.core.set_chatroom_name(self.userName, alias)
|
||||
def set_pinned(self, isPinned=True):
|
||||
return self.core.set_pinned(self.userName, isPinned)
|
||||
def delete_member(self, userName):
|
||||
return self.core.delete_member_from_chatroom(self.userName, userName)
|
||||
def add_member(self, userName):
|
||||
return self.core.add_member_into_chatroom(self.userName, userName)
|
||||
def search_member(self, name=None, userName=None, remarkName=None, nickName=None,
|
||||
wechatAccount=None):
|
||||
with self.core.storageClass.updateLock:
|
||||
if (name or userName or remarkName or nickName or wechatAccount) is None:
|
||||
return None
|
||||
elif userName: # return the only userName match
|
||||
for m in self.memberList:
|
||||
if m.userName == userName:
|
||||
return copy.deepcopy(m)
|
||||
else:
|
||||
matchDict = {
|
||||
'RemarkName' : remarkName,
|
||||
'NickName' : nickName,
|
||||
'Alias' : wechatAccount, }
|
||||
for k in ('RemarkName', 'NickName', 'Alias'):
|
||||
if matchDict[k] is None:
|
||||
del matchDict[k]
|
||||
if name: # select based on name
|
||||
contact = []
|
||||
for m in self.memberList:
|
||||
if any([m.get(k) == name for k in ('RemarkName', 'NickName', 'Alias')]):
|
||||
contact.append(m)
|
||||
else:
|
||||
contact = self.memberList[:]
|
||||
if matchDict: # select again based on matchDict
|
||||
friendList = []
|
||||
for m in contact:
|
||||
if all([m.get(k) == v for k, v in matchDict.items()]):
|
||||
friendList.append(m)
|
||||
return copy.deepcopy(friendList)
|
||||
else:
|
||||
return copy.deepcopy(contact)
|
||||
def __setstate__(self, state):
|
||||
super(Chatroom, self).__setstate__(state)
|
||||
if not 'MemberList' in self:
|
||||
self['MemberList'] = fakeContactList
|
||||
|
||||
class ChatroomMember(AbstractUserDict):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(AbstractUserDict, self).__init__(*args, **kwargs)
|
||||
self.__setstate__(None)
|
||||
@property
|
||||
def chatroom(self):
|
||||
r = getattr(self, '_chatroom', lambda: fakeChatroom)()
|
||||
if r is None:
|
||||
userName = getattr(self, '_chatroomUserName', '')
|
||||
r = self.core.search_chatrooms(userName=userName)
|
||||
if isinstance(r, dict):
|
||||
self.chatroom = r
|
||||
return r or fakeChatroom
|
||||
@chatroom.setter
|
||||
def chatroom(self, value):
|
||||
if isinstance(value, dict) and 'UserName' in value:
|
||||
self._chatroom = ref(value)
|
||||
self._chatroomUserName = value['UserName']
|
||||
def get_head_image(self, imageDir=None):
|
||||
return self.core.get_head_img(self.userName, self.chatroom.userName, picDir=imageDir)
|
||||
def delete_member(self, userName):
|
||||
return self.core.delete_member_from_chatroom(self.chatroom.userName, self.userName)
|
||||
def send_raw_msg(self, msgType, content):
|
||||
return ReturnValue({'BaseResponse': {
|
||||
'Ret': -1006,
|
||||
'ErrMsg': '%s can not send message directly' % \
|
||||
self.__class__.__name__, }, })
|
||||
def send_msg(self, msg='Test Message'):
|
||||
return ReturnValue({'BaseResponse': {
|
||||
'Ret': -1006,
|
||||
'ErrMsg': '%s can not send message directly' % \
|
||||
self.__class__.__name__, }, })
|
||||
def send_file(self, fileDir, mediaId=None):
|
||||
return ReturnValue({'BaseResponse': {
|
||||
'Ret': -1006,
|
||||
'ErrMsg': '%s can not send message directly' % \
|
||||
self.__class__.__name__, }, })
|
||||
def send_image(self, fileDir, mediaId=None):
|
||||
return ReturnValue({'BaseResponse': {
|
||||
'Ret': -1006,
|
||||
'ErrMsg': '%s can not send message directly' % \
|
||||
self.__class__.__name__, }, })
|
||||
def send_video(self, fileDir=None, mediaId=None):
|
||||
return ReturnValue({'BaseResponse': {
|
||||
'Ret': -1006,
|
||||
'ErrMsg': '%s can not send message directly' % \
|
||||
self.__class__.__name__, }, })
|
||||
def send(self, msg, mediaId=None):
|
||||
return ReturnValue({'BaseResponse': {
|
||||
'Ret': -1006,
|
||||
'ErrMsg': '%s can not send message directly' % \
|
||||
self.__class__.__name__, }, })
|
||||
def __setstate__(self, state):
|
||||
super(ChatroomMember, self).__setstate__(state)
|
||||
self['MemberList'] = fakeContactList
|
||||
|
||||
def wrap_user_dict(d):
|
||||
userName = d.get('UserName')
|
||||
if '@@' in userName:
|
||||
r = Chatroom(d)
|
||||
elif d.get('VerifyFlag', 8) & 8 == 0:
|
||||
r = User(d)
|
||||
else:
|
||||
r = MassivePlatform(d)
|
||||
return r
|
||||
|
||||
fakeItchat = UnInitializedItchat()
|
||||
fakeContactList = ContactList()
|
||||
fakeChatroom = Chatroom()
|
||||
Reference in New Issue
Block a user