mirror of
https://github.com/zhayujie/chatgpt-on-wechat.git
synced 2026-04-30 16:02:42 +08:00
396 lines
14 KiB
Python
396 lines
14 KiB
Python
"""
|
|
Weixin HTTP JSON API client.
|
|
|
|
Implements the ilink bot protocol:
|
|
- getUpdates (long-poll)
|
|
- sendMessage
|
|
- getUploadUrl
|
|
- getConfig
|
|
- sendTyping
|
|
- QR login (get_bot_qrcode / get_qrcode_status)
|
|
|
|
CDN media upload with AES-128-ECB encryption.
|
|
"""
|
|
|
|
import base64
|
|
import hashlib
|
|
import os
|
|
import random
|
|
import struct
|
|
import time
|
|
import uuid
|
|
|
|
import requests
|
|
|
|
from common.log import logger
|
|
|
|
DEFAULT_BASE_URL = "https://ilinkai.weixin.qq.com"
|
|
CDN_BASE_URL = "https://novac2c.cdn.weixin.qq.com/c2c"
|
|
DEFAULT_LONG_POLL_TIMEOUT = 35
|
|
DEFAULT_API_TIMEOUT = 15
|
|
QR_POLL_TIMEOUT = 35
|
|
BOT_TYPE = "3"
|
|
|
|
|
|
def _random_wechat_uin() -> str:
|
|
val = random.randint(0, 0xFFFFFFFF)
|
|
return base64.b64encode(str(val).encode("utf-8")).decode("utf-8")
|
|
|
|
|
|
def _build_headers(token: str = "") -> dict:
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"AuthorizationType": "ilink_bot_token",
|
|
"X-WECHAT-UIN": _random_wechat_uin(),
|
|
}
|
|
if token:
|
|
headers["Authorization"] = f"Bearer {token}"
|
|
return headers
|
|
|
|
|
|
def _ensure_trailing_slash(url: str) -> str:
|
|
return url if url.endswith("/") else url + "/"
|
|
|
|
|
|
class WeixinApi:
|
|
"""Stateless HTTP client for the Weixin ilink bot API."""
|
|
|
|
def __init__(self, base_url: str = DEFAULT_BASE_URL, token: str = "",
|
|
cdn_base_url: str = CDN_BASE_URL):
|
|
self.base_url = base_url
|
|
self.token = token
|
|
self.cdn_base_url = cdn_base_url
|
|
|
|
def _post(self, endpoint: str, body: dict, timeout: int = DEFAULT_API_TIMEOUT) -> dict:
|
|
url = _ensure_trailing_slash(self.base_url) + endpoint
|
|
headers = _build_headers(self.token)
|
|
try:
|
|
resp = requests.post(url, json=body, headers=headers, timeout=timeout)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
except requests.exceptions.Timeout:
|
|
logger.debug(f"[Weixin] API timeout: {endpoint}")
|
|
return {"ret": 0, "msgs": []}
|
|
except Exception as e:
|
|
logger.error(f"[Weixin] API error {endpoint}: {e}")
|
|
raise
|
|
|
|
# ── getUpdates (long-poll) ─────────────────────────────────────────
|
|
|
|
def get_updates(self, get_updates_buf: str = "", timeout: int = DEFAULT_LONG_POLL_TIMEOUT) -> dict:
|
|
return self._post("ilink/bot/getupdates", {
|
|
"get_updates_buf": get_updates_buf,
|
|
}, timeout=timeout + 5)
|
|
|
|
# ── sendMessage ────────────────────────────────────────────────────
|
|
|
|
def send_text(self, to: str, text: str, context_token: str) -> dict:
|
|
return self._post("ilink/bot/sendmessage", {
|
|
"msg": {
|
|
"from_user_id": "",
|
|
"to_user_id": to,
|
|
"client_id": uuid.uuid4().hex[:16],
|
|
"message_type": 2, # BOT
|
|
"message_state": 2, # FINISH
|
|
"item_list": [{"type": 1, "text_item": {"text": text}}],
|
|
"context_token": context_token,
|
|
}
|
|
})
|
|
|
|
def send_image_item(self, to: str, context_token: str,
|
|
encrypt_query_param: str, aes_key_b64: str,
|
|
ciphertext_size: int, text: str = "") -> dict:
|
|
items = []
|
|
if text:
|
|
items.append({"type": 1, "text_item": {"text": text}})
|
|
items.append({
|
|
"type": 2,
|
|
"image_item": {
|
|
"media": {
|
|
"encrypt_query_param": encrypt_query_param,
|
|
"aes_key": aes_key_b64,
|
|
"encrypt_type": 1,
|
|
},
|
|
"mid_size": ciphertext_size,
|
|
}
|
|
})
|
|
return self._send_items(to, context_token, items)
|
|
|
|
def send_file_item(self, to: str, context_token: str,
|
|
encrypt_query_param: str, aes_key_b64: str,
|
|
file_name: str, file_size: int, text: str = "") -> dict:
|
|
items = []
|
|
if text:
|
|
items.append({"type": 1, "text_item": {"text": text}})
|
|
items.append({
|
|
"type": 4,
|
|
"file_item": {
|
|
"media": {
|
|
"encrypt_query_param": encrypt_query_param,
|
|
"aes_key": aes_key_b64,
|
|
"encrypt_type": 1,
|
|
},
|
|
"file_name": file_name,
|
|
"len": str(file_size),
|
|
}
|
|
})
|
|
return self._send_items(to, context_token, items)
|
|
|
|
def send_video_item(self, to: str, context_token: str,
|
|
encrypt_query_param: str, aes_key_b64: str,
|
|
ciphertext_size: int, text: str = "") -> dict:
|
|
items = []
|
|
if text:
|
|
items.append({"type": 1, "text_item": {"text": text}})
|
|
items.append({
|
|
"type": 5,
|
|
"video_item": {
|
|
"media": {
|
|
"encrypt_query_param": encrypt_query_param,
|
|
"aes_key": aes_key_b64,
|
|
"encrypt_type": 1,
|
|
},
|
|
"video_size": ciphertext_size,
|
|
}
|
|
})
|
|
return self._send_items(to, context_token, items)
|
|
|
|
def _send_items(self, to: str, context_token: str, items: list) -> dict:
|
|
return self._post("ilink/bot/sendmessage", {
|
|
"msg": {
|
|
"from_user_id": "",
|
|
"to_user_id": to,
|
|
"client_id": uuid.uuid4().hex[:16],
|
|
"message_type": 2,
|
|
"message_state": 2,
|
|
"item_list": items,
|
|
"context_token": context_token,
|
|
}
|
|
})
|
|
|
|
# ── getUploadUrl ───────────────────────────────────────────────────
|
|
|
|
def get_upload_url(self, filekey: str, media_type: int, to_user_id: str,
|
|
rawsize: int, rawfilemd5: str, filesize: int,
|
|
aeskey: str) -> dict:
|
|
return self._post("ilink/bot/getuploadurl", {
|
|
"filekey": filekey,
|
|
"media_type": media_type,
|
|
"to_user_id": to_user_id,
|
|
"rawsize": rawsize,
|
|
"rawfilemd5": rawfilemd5,
|
|
"filesize": filesize,
|
|
"aeskey": aeskey,
|
|
"no_need_thumb": True,
|
|
})
|
|
|
|
# ── getConfig / sendTyping ─────────────────────────────────────────
|
|
|
|
def get_config(self, user_id: str, context_token: str = "") -> dict:
|
|
return self._post("ilink/bot/getconfig", {
|
|
"ilink_user_id": user_id,
|
|
"context_token": context_token,
|
|
}, timeout=10)
|
|
|
|
def send_typing(self, user_id: str, typing_ticket: str, status: int = 1) -> dict:
|
|
return self._post("ilink/bot/sendtyping", {
|
|
"ilink_user_id": user_id,
|
|
"typing_ticket": typing_ticket,
|
|
"status": status,
|
|
}, timeout=10)
|
|
|
|
# ── QR Login ───────────────────────────────────────────────────────
|
|
|
|
def fetch_qr_code(self) -> dict:
|
|
url = _ensure_trailing_slash(self.base_url) + f"ilink/bot/get_bot_qrcode?bot_type={BOT_TYPE}"
|
|
resp = requests.get(url, timeout=15)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
def poll_qr_status(self, qrcode: str, timeout: int = QR_POLL_TIMEOUT) -> dict:
|
|
url = (_ensure_trailing_slash(self.base_url) +
|
|
f"ilink/bot/get_qrcode_status?qrcode={requests.utils.quote(qrcode)}")
|
|
headers = {"iLink-App-ClientVersion": "1"}
|
|
try:
|
|
resp = requests.get(url, headers=headers, timeout=timeout)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
except requests.exceptions.Timeout:
|
|
return {"status": "wait"}
|
|
|
|
|
|
# ── AES-128-ECB helpers ─────────────────────────────────────────────
|
|
|
|
def _aes_ecb_encrypt(data: bytes, key: bytes) -> bytes:
|
|
from Crypto.Cipher import AES
|
|
pad_len = 16 - (len(data) % 16)
|
|
padded = data + bytes([pad_len] * pad_len)
|
|
cipher = AES.new(key, AES.MODE_ECB)
|
|
return cipher.encrypt(padded)
|
|
|
|
|
|
def _aes_ecb_decrypt(data: bytes, key: bytes) -> bytes:
|
|
from Crypto.Cipher import AES
|
|
cipher = AES.new(key, AES.MODE_ECB)
|
|
decrypted = cipher.decrypt(data)
|
|
pad_len = decrypted[-1]
|
|
if pad_len > 16:
|
|
return decrypted
|
|
return decrypted[:-pad_len]
|
|
|
|
|
|
def _file_md5(file_path: str) -> str:
|
|
h = hashlib.md5()
|
|
with open(file_path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(8192), b""):
|
|
h.update(chunk)
|
|
return h.hexdigest()
|
|
|
|
|
|
def _md5_bytes(data: bytes) -> str:
|
|
return hashlib.md5(data).hexdigest()
|
|
|
|
|
|
def _aes_ecb_padded_size(plaintext_size: int) -> int:
|
|
"""PKCS7 padded size for AES-128-ECB."""
|
|
return ((plaintext_size + 1 + 15) // 16) * 16
|
|
|
|
|
|
UPLOAD_MAX_RETRIES = 3
|
|
|
|
|
|
def upload_media_to_cdn(api: WeixinApi, file_path: str, to_user_id: str,
|
|
media_type: int) -> dict:
|
|
"""
|
|
Upload a local file to the Weixin CDN (matching official plugin protocol).
|
|
|
|
Args:
|
|
api: WeixinApi instance
|
|
file_path: local file path
|
|
to_user_id: target user id
|
|
media_type: 1=IMAGE, 2=VIDEO, 3=FILE
|
|
|
|
Returns:
|
|
dict with keys: encrypt_query_param, aes_key_b64, ciphertext_size, raw_size
|
|
"""
|
|
aes_key = os.urandom(16)
|
|
aes_key_hex = aes_key.hex()
|
|
filekey = uuid.uuid4().hex
|
|
|
|
with open(file_path, "rb") as f:
|
|
raw_data = f.read()
|
|
|
|
raw_size = len(raw_data)
|
|
raw_md5 = _md5_bytes(raw_data)
|
|
cipher_size = _aes_ecb_padded_size(raw_size)
|
|
|
|
encrypted = _aes_ecb_encrypt(raw_data, aes_key)
|
|
|
|
from urllib.parse import quote
|
|
|
|
download_param = None
|
|
last_error = None
|
|
for attempt in range(1, UPLOAD_MAX_RETRIES + 1):
|
|
try:
|
|
if attempt > 1:
|
|
filekey = uuid.uuid4().hex
|
|
resp = api.get_upload_url(
|
|
filekey=filekey,
|
|
media_type=media_type,
|
|
to_user_id=to_user_id,
|
|
rawsize=raw_size,
|
|
rawfilemd5=raw_md5,
|
|
filesize=cipher_size,
|
|
aeskey=aes_key_hex,
|
|
)
|
|
upload_param = resp.get("upload_param", "")
|
|
if not upload_param:
|
|
raise RuntimeError(f"[Weixin] getUploadUrl returned no upload_param: {resp}")
|
|
|
|
cdn_url = (f"{api.cdn_base_url}/upload"
|
|
f"?encrypted_query_param={quote(upload_param)}"
|
|
f"&filekey={quote(filekey)}")
|
|
|
|
cdn_resp = requests.post(cdn_url, data=encrypted, headers={
|
|
"Content-Type": "application/octet-stream",
|
|
"Content-Length": str(len(encrypted)),
|
|
}, timeout=120)
|
|
if 400 <= cdn_resp.status_code < 500:
|
|
err_msg = cdn_resp.headers.get("x-error-message", cdn_resp.text[:200])
|
|
raise RuntimeError(f"CDN client error {cdn_resp.status_code}: {err_msg}")
|
|
cdn_resp.raise_for_status()
|
|
download_param = cdn_resp.headers.get("x-encrypted-param", "")
|
|
if not download_param:
|
|
raise RuntimeError("CDN response missing x-encrypted-param header")
|
|
logger.debug(f"[Weixin] CDN upload success attempt={attempt} filekey={filekey}")
|
|
break
|
|
except Exception as e:
|
|
last_error = e
|
|
if "client error" in str(e):
|
|
raise
|
|
if attempt < UPLOAD_MAX_RETRIES:
|
|
backoff = 2 ** attempt
|
|
logger.warning(f"[Weixin] CDN upload attempt {attempt} failed, retrying in {backoff}s: {e}")
|
|
time.sleep(backoff)
|
|
else:
|
|
logger.error(f"[Weixin] CDN upload failed after {UPLOAD_MAX_RETRIES} attempts: {e}")
|
|
|
|
if not download_param:
|
|
raise last_error or RuntimeError("CDN upload failed")
|
|
|
|
aes_key_b64 = base64.b64encode(aes_key_hex.encode("utf-8")).decode("utf-8")
|
|
|
|
return {
|
|
"encrypt_query_param": download_param,
|
|
"aes_key_b64": aes_key_b64,
|
|
"ciphertext_size": cipher_size,
|
|
"raw_size": raw_size,
|
|
}
|
|
|
|
|
|
def download_media_from_cdn(cdn_base_url: str, encrypt_query_param: str,
|
|
aes_key: str, save_path: str) -> str:
|
|
"""
|
|
Download and decrypt a media file from Weixin CDN.
|
|
|
|
Args:
|
|
cdn_base_url: CDN base URL
|
|
encrypt_query_param: encrypted query parameter from message
|
|
aes_key: hex or base64 encoded AES key
|
|
save_path: path to save decrypted file
|
|
|
|
Returns:
|
|
save_path on success
|
|
"""
|
|
from urllib.parse import quote
|
|
url = f"{cdn_base_url}/download?encrypted_query_param={quote(encrypt_query_param)}"
|
|
resp = requests.get(url, timeout=60)
|
|
resp.raise_for_status()
|
|
|
|
# Determine key format:
|
|
# 1) 32-char hex string → 16 raw bytes
|
|
# 2) base64 string → decode → if 32 bytes, treat as hex-encoded → 16 raw bytes
|
|
# 3) base64 string → decode → 16 raw bytes directly
|
|
try:
|
|
key_bytes = bytes.fromhex(aes_key)
|
|
if len(key_bytes) != 16:
|
|
raise ValueError()
|
|
except (ValueError, TypeError):
|
|
decoded = base64.b64decode(aes_key)
|
|
if len(decoded) == 32:
|
|
try:
|
|
key_bytes = bytes.fromhex(decoded.decode("ascii"))
|
|
except (ValueError, UnicodeDecodeError):
|
|
raise ValueError(f"Invalid AES key: 32 bytes but not valid hex")
|
|
elif len(decoded) == 16:
|
|
key_bytes = decoded
|
|
else:
|
|
raise ValueError(f"Invalid AES key length after base64 decode: {len(decoded)}")
|
|
|
|
decrypted = _aes_ecb_decrypt(resp.content, key_bytes)
|
|
|
|
os.makedirs(os.path.dirname(save_path), exist_ok=True)
|
|
with open(save_path, "wb") as f:
|
|
f.write(decrypted)
|
|
return save_path
|