"""iamai 协议适配器。
所有协议适配器都必须继承自 `Adapter` 基类。
"""
import os
from abc import ABC, abstractmethod
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Generic,
Optional,
Type,
TypeVar,
Union,
final,
overload,
)
from iamai.event import Event
from iamai.typing import ConfigT, EventT
from iamai.utils import is_config_class
if TYPE_CHECKING:
from ..bot import Bot
__all__ = ["Adapter"]
if os.getenv("IAMAI_DEV") == "1": # pragma: no cover
# 当处于开发环境时,使用 pkg_resources 风格的命名空间包
__import__("pkg_resources").declare_namespace(__name__)
_EventT = TypeVar("_EventT", bound="Event[Any]")
[docs]
class Adapter(Generic[EventT, ConfigT], ABC):
"""协议适配器基类。
Attributes:
name: 适配器的名称。
bot: 当前的机器人对象。
"""
name: str
bot: "Bot"
Config: Type[ConfigT]
def __init__(self, bot: "Bot") -> None:
"""初始化。
Args:
bot: 当前机器人对象。
"""
if not hasattr(self, "name"):
self.name = self.__class__.__name__
self.bot: Bot = bot
self.handle_event = self.bot.handle_event
@property
def config(self) -> ConfigT:
"""适配器配置。"""
default: Any = None
config_class = getattr(self, "Config", None)
if is_config_class(config_class):
return getattr(
self.bot.config.adapter,
config_class.__config_name__,
default,
)
return default
[docs]
@final
async def safe_run(self) -> None:
"""附带有异常处理地安全运行适配器。"""
try:
await self.run()
except Exception as e:
self.bot.error_or_exception(
f"Run adapter {self.__class__.__name__} failed:", e
)
[docs]
@abstractmethod
async def run(self) -> None:
"""适配器运行方法,适配器开发者必须实现该方法。
适配器运行过程中保持保持运行,当此方法结束后, AliceBot 不会自动重新启动适配器。
"""
raise NotImplementedError
[docs]
async def startup(self) -> None:
"""在适配器开始运行前运行的方法,用于初始化适配器。
AliceBot 依次运行并等待所有适配器的 `startup()` 方法,待运行完毕后再创建 `run()` 任务。
"""
[docs]
async def shutdown(self) -> None:
"""在适配器结束运行时运行的方法,用于安全地关闭适配器。
AliceBot 在接收到系统的结束信号后依次运行并等待所有适配器的 `shutdown()` 方法。
当强制退出时此方法可能未被执行。
"""
@overload
async def get(
self,
func: Optional[Callable[[EventT], Union[bool, Awaitable[bool]]]] = None,
*,
event_type: None = None,
max_try_times: Optional[int] = None,
timeout: Optional[Union[int, float]] = None,
) -> EventT: ...
@overload
async def get(
self,
func: Optional[Callable[[_EventT], Union[bool, Awaitable[bool]]]] = None,
*,
event_type: Type[_EventT],
max_try_times: Optional[int] = None,
timeout: Optional[Union[int, float]] = None,
) -> _EventT: ...
[docs]
@final
async def get(
self,
func: Optional[Callable[[Any], Union[bool, Awaitable[bool]]]] = None,
*,
event_type: Any = None,
max_try_times: Optional[int] = None,
timeout: Optional[Union[int, float]] = None,
) -> Event[Any]:
"""获取满足指定条件的的事件,协程会等待直到适配器接收到满足条件的事件、超过最大事件数或超时。
类似 `Bot` 类的 `get()` 方法,但是隐含了判断产生事件的适配器是本适配器。
等效于 `Bot` 类的 `get()` 方法传入 adapter_type 为本适配器类型。
Args:
func: 协程或者函数,函数会被自动包装为协程执行。
要求接受一个事件作为参数,返回布尔值。
当协程返回 `True` 时返回当前事件。
当为 `None` 时相当于输入对于任何事件均返回真的协程,即返回适配器接收到的下一个事件。
event_type: 当指定时,只接受指定类型的事件,先于 func 条件生效。默认为 `None`。
max_try_times: 最大事件数。
timeout: 超时时间。
Returns:
返回满足 func 条件的事件。
Raises:
GetEventTimeout: 超过最大事件数或超时。
"""
return await self.bot.get(
func,
event_type=event_type,
adapter_type=type(self),
max_try_times=max_try_times,
timeout=timeout,
)