"""Kook 适配器消息。"""
import json
from io import StringIO
from dataclasses import dataclass
from typing_extensions import override, deprecated
from typing import ( # type: ignore
Any,
Dict,
Type,
Tuple,
Union,
Mapping,
Iterable,
Optional,
cast,
)
from iamai.log import logger
from iamai.message import Message, MessageSegment
from .exceptions import UnsupportedMessageType, UnsupportedMessageOperation
__all__ = [
"T_KookMSG",
"KookMessage",
"KookMessageSegment",
"escape_kmarkdown",
"unescape_kmarkdown",
]
T_KookMSG = Union[str, Mapping, Iterable[Mapping], "KookMessageSegment", "KookMessage"]
ESCAPE_CHAR = "!()*-.:>[\]`~"
msg_type_map = {
"text": 1,
"image": 2,
"video": 3,
"file": 4,
"audio": 8,
"kmarkdown": 9,
"card": 10,
}
rev_msg_type_map = {code: msg_type for msg_type, code in msg_type_map.items()}
# 根据协议消息段类型显示消息段内容
segment_text = {
"text": "[文字]",
"image": "[图片]",
"video": "[视频]",
"file": "[文件]",
"audio": "[音频]",
"kmarkdown": "[KMarkdown消息]",
"card": "[卡片消息]",
}
[docs]
class KookMessage(Message["KookMessageSegment"]):
"""
Kook v3 协议 Message 适配。
"""
@property
def _message_segment_class(self) -> Type["KookMessageSegment"]:
return KookMessageSegment
def _str_to_message_segment(self, msg) -> "KookMessageSegment":
return KookMessageSegment(type="text", data={"content": msg})
def _mapping_to_message_segment(self, msg: Mapping) -> "KookMessageSegment":
return KookMessageSegment(type=msg["type"], data=msg.get("content") or {})
[docs]
class KookMessageSegment(MessageSegment["KookMessage"]):
"""Kook 消息字段。"""
"""
Kook 协议 MessageSegment 适配。具体方法参考协议消息段类型或源码。
https://developer.kookapp.cn/doc/event/message
"""
@property
def _message_class(self) -> Type["KookMessage"]:
return KookMessage
def __str__(self) -> str:
if self.type in ["text", "kmarkdown"]:
return str(self.data["content"])
elif self.type == "at":
return str(f"@{self.data['user_name']}")
else:
return segment_text.get(self.type, "[未知类型消息]")
[docs]
@classmethod
@deprecated("用 KMarkdown 语法 (met)用户id/here/all(met) 代替")
def at(cls, user_id: str) -> "KookMessageSegment":
return KookMessageSegment.KMarkdown(f"(met){user_id}(met)", user_id)
[docs]
@classmethod
def text(cls, text: str) -> "KookMessageSegment":
return cls(type="text", data={"content": text})
[docs]
@classmethod
def image(cls, file_key: str) -> "KookMessageSegment":
return cls(type="image", data={"file_key": file_key})
[docs]
@classmethod
def video(cls, file_key: str, title: Optional[str] = None) -> "KookMessageSegment":
return cls(
type="video",
data={
"file_key": file_key,
"title": title,
},
)
[docs]
@classmethod
def file(cls, file_key: str, title: Optional[str] = None) -> "KookMessageSegment":
return cls(
"file",
{
"file_key": file_key,
"title": title,
},
)
[docs]
@classmethod
def audio(
cls,
file_key: str,
title: Optional[str] = None,
cover_file_key: Optional[str] = None,
) -> "KookMessageSegment":
return cls(
type="audio",
data={
"file_key": file_key,
"title": title,
"cover_file_key": cover_file_key,
},
)
[docs]
@classmethod
def KMarkdown(
cls, content: str, raw_content: Optional[str] = None
) -> "KookMessageSegment":
"""
构造KMarkdown消息段
@param content: KMarkdown消息内容(语法参考:https://developer.kookapp.cn/doc/kmarkdown)
@param raw_content: (可选)消息段的纯文本内容
"""
if raw_content is None:
raw_content = ""
return cls(
type="kmarkdown", data={"content": content, "raw_content": raw_content}
)
[docs]
@classmethod
def Card(cls, content: Any) -> "KookMessageSegment":
"""
构造卡片消息
@param content: KMarkdown消息内容(语法参考:https://developer.kookapp.cn/doc/cardmessage)
"""
if not isinstance(content, str):
content = json.dumps(content)
return cls(type="card", data={"content": content})
[docs]
@classmethod
def quote(cls, msg_id: str) -> "KookMessageSegment":
return cls(type="quote", data={"msg_id": msg_id})
def _convert_to_card_message(msg: KookMessage) -> KookMessageSegment:
cards = []
modules = []
for seg in msg:
if seg.type == "card":
if len(modules) != 0:
cards.append(
{"type": "card", "theme": "none", "size": "lg", "modules": modules}
)
modules = []
cards.extend(json.loads(seg.data["content"]))
elif seg.type == "text":
modules.append(
{
"type": "section",
"text": {"type": "plain-text", "content": seg.data["content"]},
}
)
elif seg.type == "kmarkdown":
modules.append(
{
"type": "section",
"text": {"type": "kmarkdown", "content": seg.data["content"]},
}
)
elif seg.type == "image":
modules.append(
{
"type": "container",
"elements": [{"type": "image", "src": seg.data["file_key"]}],
}
)
elif seg.type in ("audio", "video", "file"):
mod = {
"type": seg.type,
"src": seg.data["file_key"],
}
if seg.data.get("title") is not None:
mod["title"] = seg.data["title"]
if seg.data.get("cover_file_key") is not None:
mod["cover"] = seg.data["cover_file_key"]
modules.append(mod)
else:
raise UnsupportedMessageType(seg.type)
if len(modules) != 0:
cards.append(
{"type": "card", "theme": "none", "size": "lg", "modules": modules}
)
return KookMessageSegment.Card(cards)
@dataclass
class MessageSerializer:
"""
Kook 协议 Message 序列化器。
"""
message: KookMessage
def serialize(self, for_send: bool = True) -> Tuple[int, str]:
if len(self.message) != 1:
self.message = self.message.copy()
self.message.reduce() # type: ignore
if len(self.message) != 1:
# 转化为卡片消息发送
return MessageSerializer(KookMessage(_convert_to_card_message(self.message))).serialize() # type: ignore
msg_type = self.message[0].type
msg_type_code = msg_type_map[msg_type]
# bot 发送消息只支持"text", "kmarkdown", "card"
# 经测试还支持"image", "video", "file"
if msg_type in ("text", "kmarkdown", "card"):
return msg_type_code, self.message[0].data["content"]
elif msg_type in ("image", "video", "file"):
return msg_type_code, self.message[0].data["file_key"]
elif msg_type == "audio":
if not for_send:
return msg_type_code, self.message[0].data["file_key"]
else:
# 转化为卡片消息发送
return MessageSerializer(
KookMessage(_convert_to_card_message(self.message))
).serialize()
else:
raise UnsupportedMessageType(msg_type)
@dataclass
class MessageDeserializer:
"""
Kook 协议 Message 反序列化器。
"""
type_code: int
data: Dict
def __post_init__(self):
self.type = rev_msg_type_map.get(self.type_code, "")
def deserialize(self) -> KookMessage:
if self.type == "text":
return KookMessage(KookMessageSegment.text(self.data["content"]))
elif self.type == "image":
return KookMessage(KookMessageSegment.image(self.data["content"]))
elif self.type == "video":
return KookMessage(
KookMessageSegment.video(self.data["attachments"]["url"])
)
elif self.type == "file":
return KookMessage(KookMessageSegment.file(self.data["attachments"]["url"]))
elif self.type == "kmarkdown":
content = self.data["content"]
raw_content = self.data["extra"]["kmarkdown"]["raw_content"]
unescaped = unescape_kmarkdown(content)
is_plain_text = unescaped.strip() == raw_content
if not is_plain_text:
return KookMessage(KookMessageSegment.KMarkdown(content, raw_content))
raw_content = unescaped
return KookMessage(KookMessageSegment.text(raw_content))
elif self.type == "card":
return KookMessage(KookMessageSegment.Card(self.data["content"]))
else:
return KookMessage(KookMessageSegment(self.type, self.data))
[docs]
def escape_kmarkdown(content: str):
"""
将文本中的kmarkdown标识符进行转义
"""
with StringIO() as f:
for c in content:
if c in ESCAPE_CHAR:
f.write("\\")
f.write(c)
return f.getvalue()
[docs]
def unescape_kmarkdown(content: str):
"""
去除kmarkdown中的转义字符
"""
with StringIO() as f:
i = 0
while i < len(content):
if content[i] == "\\":
if i + 1 < len(content) and content[i + 1] in ESCAPE_CHAR:
f.write(content[i + 1])
i += 2
continue
f.write(content[i])
i += 1
return f.getvalue()