"""Bot Module
The basic module of iamai, each iamai robot is a ``Bot()`` instance.
"""
import asyncio
import json
import os
import pkgutil
import signal
import sys
import threading
import time
from collections import defaultdict
from contextlib import AsyncExitStack
from itertools import chain
from pathlib import Path
from typing import (
Any,
Awaitable,
Callable,
Dict,
List,
Optional,
Set,
Tuple,
Type,
Union,
overload,
)
from gettext import GNUTranslations
from pydantic import ValidationError, create_model
from .adapter import Adapter
from .config import AdapterConfig, ConfigModel, MainConfig, PluginConfig
from .dependencies import solve_dependencies
from .event import Event
from .exceptions import (
GetEventTimeout,
LoadModuleError,
SkipException,
StopException,
)
from .log import logger
from .plugin import Plugin, PluginLoadType
from .i18n import setup_gettext
from .typing import AdapterHook, AdapterT, BotHook, EventHook, EventT
from .utils import (
ModulePathFinder,
get_classes_from_module_name,
is_config_class,
samefile,
wrap_get_func,
)
from .const import __version__
if sys.version_info >= (3, 11): # pragma: no cover
import tomllib
else: # pragma: no cover
import tomli as tomllib
__all__ = ["Bot"]
HANDLED_SIGNALS = (
signal.SIGINT, # Unix signal 2. Sent by Ctrl+C.
signal.SIGTERM, # Unix signal 15. Sent by `kill <pid>`.
)
[docs]
class Bot:
"""iamai ``Bot`` class object, defines the basic functionality for interacting.
Read and save configuration ``Config``, load adapters ``Adapter`` and plugins ``Plugin``, then distribute the events.
Attributes:
config: Bot configuration.
should_exit: Whether the bot should enter the ready-to-exit state.
adapters: List of currently loaded adapters.
plugins_priority_dict: Plugin priority dictionary.
plugin_state: Plugin status.
global_state: Global status.
"""
config: MainConfig
should_exit: asyncio.Event # pyright: ignore[reportUninitializedInstanceVariable]
adapters: List[Adapter[Any, Any]]
plugins_priority_dict: Dict[int, List[Type[Plugin[Any, Any, Any]]]]
plugin_state: Dict[str, Any]
global_state: Dict[Any, Any]
_condition: asyncio.Condition # Condition used to handle get # pyright: ignore[reportUninitializedInstanceVariable]
_current_event: Optional[Event[Any]] # Event currently pending
_restart_flag: bool # Restart flag
_module_path_finder: ModulePathFinder # Module metapath finder for finding plugins
_raw_config_dict: Dict[str, Any] # Original configuration dictionary
_adapter_tasks: Set[
"asyncio.Task[None]"
] # Adapter task collection, used to hold references to adapter tasks
_handle_event_tasks: Set[
"asyncio.Task[None]"
] # Event handling task, used to keep a reference to the adapter task
# The following properties are not cleared on reboot
# _config_file: Optional[str] # Configuration file
_config_dict: Optional[Dict[str, Any]] # Configuration dictionary
_hot_reload: bool # Hot-Reload
_extend_plugins: List[
Union[Type[Plugin[Any, Any, Any]], str, Path]
] # A list of plugins loaded programmatically using the ``load_plugins()`` method
_extend_plugin_dirs: List[
Path
] # List of plugin paths loaded programmatically using the ``load_plugins_from_dirs()`` method
_extend_adapters: List[
Union[Type[Adapter[Any, Any]], str]
] # A list of adapters loaded programmatically using the ``load_adapter()`` method
_bot_run_hooks: List[BotHook]
_bot_exit_hooks: List[BotHook]
_adapter_startup_hooks: List[AdapterHook]
_adapter_run_hooks: List[AdapterHook]
_adapter_shutdown_hooks: List[AdapterHook]
_event_preprocessor_hooks: List[EventHook]
_event_postprocessor_hooks: List[EventHook]
def __init__(
self,
*,
config_file: Optional[str] = "config.toml",
config_dict: Optional[Dict[str, Any]] = None,
hot_reload: bool = False,
) -> None:
"""Initialize iamai, read configuration files, create configurations, load adapters and plug-ins.
Args:
config_file: Configuration file, if not specified, the default ``config.toml`` will be used.
If specified as ``None``, the configuration file will not be loaded.
config_dict: Configuration dictionary, default is ``None``.
If a dictionary is specified, the ``config_file`` configuration will be ignored and the configuration file will no longer be read.
hot_reload: hot reload.
When enabled, plugin files in ``plugin_dir`` will be automatically checked for updates and automatically reloaded when updated.
"""
self.config = MainConfig()
self.plugins_priority_dict = defaultdict(list)
self.plugin_state = defaultdict(lambda: None)
self.global_state = {}
self.adapters = []
self._current_event = None
self._restart_flag = False
self._module_path_finder = ModulePathFinder()
self._raw_config_dict = {}
self._adapter_tasks = set()
self._handle_event_tasks = set()
self._config_file = config_file
self._config_dict = config_dict
self._hot_reload = hot_reload
self._extend_plugins = []
self._extend_plugin_dirs = []
self._extend_adapters = []
self._bot_run_hooks = []
self._bot_exit_hooks = []
self._adapter_startup_hooks = []
self._adapter_run_hooks = []
self._adapter_shutdown_hooks = []
self._event_preprocessor_hooks = []
self._event_postprocessor_hooks = []
sys.meta_path.insert(0, self._module_path_finder)
@property
def locale_dirs(self) -> List[Path]:
"""Get the locale directories of the bot."""
return [Path.cwd() / "locales"]
@property
def _(self) -> GNUTranslations:
# logger.warning(f"{self.config.bot.locale}")
return setup_gettext(languages=self.config.bot.locale)
@property
def locale(self) -> List[str]:
"""Get the current language of the bot."""
return list(self.config.bot.locale)
@property
def plugins(self) -> List[Type[Plugin[Any, Any, Any]]]:
"""List of currently loaded plugins."""
return list(chain(*self.plugins_priority_dict.values()))
[docs]
def run(self) -> None:
"""Run iamai, monitor and intercept system exit signals, and update robot configuration."""
self._restart_flag = True
while self._restart_flag:
self._restart_flag = False
asyncio.run(self._run())
if self._restart_flag:
self._load_plugins_from_dirs(*self._extend_plugin_dirs)
self._load_plugins(*self._extend_plugins)
self._load_adapters(*self._extend_adapters)
[docs]
def restart(self) -> None:
"""Exit and rerun iamai."""
logger.info(self._("Restarting iamai..."))
self._restart_flag = True
self.should_exit.set()
async def _run(self) -> None:
"""Run iamai."""
self.should_exit = asyncio.Event()
self._condition = asyncio.Condition()
# Monitor and intercept system exit signals to complete some aftermath work before closing the program
if threading.current_thread() is threading.main_thread(): # pragma: no cover
# Signals can only be processed in the main thread
try:
loop = asyncio.get_running_loop()
for sig in HANDLED_SIGNALS:
loop.add_signal_handler(sig, self._handle_exit)
except NotImplementedError:
# add_signal_handler is only available under Unix, below for Windows
for sig in HANDLED_SIGNALS:
signal.signal(sig, self._handle_exit)
# Load configuration file
self._reload_config_dict()
# Load plugins and adapters
self._load_plugins_from_dirs(*self.config.bot.plugin_dirs)
self._load_plugins(*self.config.bot.plugins)
self._load_adapters(*self.config.bot.adapters)
self._update_config()
# Run iamai
logger.info(self._("Running iamai..."))
logger.info(self._("Version: {version}").format(version=__version__))
hot_reload_task = None
if self._hot_reload: # pragma: no cover
hot_reload_task = asyncio.create_task(self._run_hot_reload())
for bot_run_hook_func in self._bot_run_hooks:
await bot_run_hook_func(self)
try:
for _adapter in self.adapters:
for adapter_startup_hook_func in self._adapter_startup_hooks:
await adapter_startup_hook_func(_adapter)
try:
await _adapter.startup()
except Exception as e:
self.error_or_exception(
self._("Startup adapter {_adapter!r} failed: {error}").format(
_adapte=_adapter, error=e
)
)
for _adapter in self.adapters:
for adapter_run_hook_func in self._adapter_run_hooks:
await adapter_run_hook_func(_adapter)
_adapter_task = asyncio.create_task(_adapter.safe_run())
self._adapter_tasks.add(_adapter_task)
_adapter_task.add_done_callback(self._adapter_tasks.discard)
await self.should_exit.wait()
if hot_reload_task is not None: # pragma: no cover
await hot_reload_task
finally:
for _adapter in self.adapters:
for adapter_shutdown_hook_func in self._adapter_shutdown_hooks:
await adapter_shutdown_hook_func(_adapter)
await _adapter.shutdown()
while self._adapter_tasks:
await asyncio.sleep(0)
for bot_exit_hook_func in self._bot_exit_hooks:
await bot_exit_hook_func(self)
self.adapters.clear()
self.plugins_priority_dict.clear()
self._module_path_finder.path.clear()
def _remove_plugin_by_path(
self, file: Path
) -> List[Type[Plugin[Any, Any, Any]]]: # pragma: no cover
"""Remove loaded plugins based on path."""
removed_plugins: List[Type[Plugin[Any, Any, Any]]] = []
for plugins in self.plugins_priority_dict.values():
_removed_plugins = list(
filter(
lambda x: x.__plugin_load_type__ != PluginLoadType.CLASS
and x.__plugin_file_path__ is not None
and samefile(x.__plugin_file_path__, file),
plugins,
)
)
removed_plugins.extend(_removed_plugins)
for plugin_ in _removed_plugins:
plugins.remove(plugin_)
logger.info(
self._(
'Succeeded to remove plugin "{plugin_.__name__}" from file "{file}"'.format(
plugin_=plugin_, file=file
)
)
)
return removed_plugins
async def _run_hot_reload(self) -> None: # pragma: no cover
"""Hot reload."""
try:
from watchfiles import Change, awatch
except ImportError:
logger.warning(
self._(
'Hot reload needs to install "watchfiles", try "pip install watchfiles"'
)
)
return
logger.info(self._("Hot reload is working!"))
async for changes in awatch(
*(
x.resolve()
for x in set(self._extend_plugin_dirs)
.union(self.config.bot.plugin_dirs)
.union(
{Path(self._config_file)}
if self._config_dict is None and self._config_file is not None
else set()
)
),
stop_event=self.should_exit,
):
# Processed in the order of Change.deleted, Change.modified, Change.added
# To ensure that when renaming occurs, deletions are processed first and then additions are processed
for change_type, file_ in sorted(changes, key=lambda x: x[0], reverse=True):
file = Path(file_)
# Change configuration file
if (
self._config_file is not None
and samefile(self._config_file, file)
and change_type == change_type.modified
):
logger.info(
self._('Reload config file "{self._config_file}"').format(
self=self
)
)
old_config = self.config
self._reload_config_dict()
if (
self.config.bot != old_config.bot
or self.config.adapter != old_config.adapter
):
self.restart()
continue
# Change plugin folder
if change_type == Change.deleted:
# Special handling for deletion operations
if file.suffix != ".py":
file = file / "__init__.py"
else:
if file.is_dir() and (file / "__init__.py").is_file():
# When a new directory is added and this directory contains the ``__init__.py`` file
# It means that what happens at this time is that a Python package is added, and the ``__init__.py`` file of this package is deemed to be added
file = file / "__init__.py"
if not (file.is_file() and file.suffix == ".py"):
continue
if change_type == Change.added:
logger.info(
self._("Hot reload: Added file: {file}").format(file=file)
)
self._load_plugins(
Path(file), plugin_load_type=PluginLoadType.DIR, reload=True
)
self._update_config()
continue
if change_type == Change.deleted:
logger.info(
self._("Hot reload: Deleted file: {file}").format(file=file)
)
self._remove_plugin_by_path(file)
self._update_config()
elif change_type == Change.modified:
logger.info(
self._("Hot reload: Modified file: {file}").format(file=file)
)
self._remove_plugin_by_path(file)
self._load_plugins(
Path(file), plugin_load_type=PluginLoadType.DIR, reload=True
)
self._update_config()
def _update_config(self) -> None:
"""Updated config to incorporate Config from Plugin and Adapter."""
def update_config(
source: Union[List[Type[Plugin[Any, Any, Any]]], List[Adapter[Any, Any]]],
name: str,
base: Type[ConfigModel],
) -> Tuple[Type[ConfigModel], ConfigModel]:
config_update_dict: Dict[str, Any] = {}
for i in source:
config_class = getattr(i, "Config", None)
if is_config_class(config_class):
default_value: Any
try:
default_value = config_class()
except ValidationError:
default_value = ...
config_update_dict[config_class.__config_name__] = (
config_class,
default_value,
)
config_model = create_model(name, **config_update_dict, __base__=base)
return config_model, config_model()
self.config = create_model(
"Config",
plugin=update_config(self.plugins, "PluginConfig", PluginConfig),
adapter=update_config(self.adapters, "AdapterConfig", AdapterConfig),
__base__=MainConfig,
)(**self._raw_config_dict)
# Update the level of logging
logger.remove()
logger.add(sys.stderr, level=self.config.bot.log.level)
def _reload_config_dict(self) -> None:
"""Reload the configuration file."""
self._raw_config_dict = {}
if self._config_dict is not None:
self._raw_config_dict = self._config_dict
elif self._config_file is not None:
try:
with Path(self._config_file).open("rb") as f:
if self._config_file.endswith(".json"):
self._raw_config_dict = json.load(f)
elif self._config_file.endswith(".toml"):
self._raw_config_dict = tomllib.load(f)
else:
self.error_or_exception(
self._("Read config file failed:"),
OSError(self._("Unable to determine config file type")),
)
except OSError as e:
self.error_or_exception(
self._("Can not open config file: {e}").format(e=e)
)
except (ValueError, json.JSONDecodeError, tomllib.TOMLDecodeError) as e:
self.error_or_exception(
self._("Read config file failed: {e}").format(e=e)
)
try:
self.config = MainConfig(**self._raw_config_dict)
except ValidationError as e:
self.config = MainConfig()
self.error_or_exception(self._("Config dict parse error: {e}").format(e=e))
self._update_config()
[docs]
def reload_plugins(self) -> None:
"""Manually reload all plugins."""
self.plugins_priority_dict.clear()
self._load_plugins(*self.config.bot.plugins)
self._load_plugins_from_dirs(*self.config.bot.plugin_dirs)
self._load_plugins(*self._extend_plugins)
self._load_plugins_from_dirs(*self._extend_plugin_dirs)
self._update_config()
def _handle_exit(self, *_args: Any) -> None: # pragma: no cover
"""When the robot receives the exit signal, it will handle it according to the situation."""
logger.info(self._("Stopping iamai..."))
if self.should_exit.is_set():
logger.warning(self._("Force Exit iamai..."))
sys.exit()
else:
self.should_exit.set()
[docs]
async def handle_event(
self,
current_event: Event[Any],
*,
handle_get: bool = True,
show_log: bool = True,
) -> None:
"""Called by the adapter object, distributes events to all plug-ins according to priority, and handles plug-in signals such as ``stop`` and ``skip``.
This method should not be called manually by the user.
Args:
current_event: The currently pending ``Event``.
handle_get: Whether the current event can be captured by the get method, the default is ``True``.
show_log: Whether to display in the log, the default is ``True``.
"""
if show_log:
logger.info(
self._(
"Adapter {current_event.adapter.name} received: {current_event!r}"
).format(current_event=current_event)
)
if handle_get:
_handle_event_task = asyncio.create_task(self._handle_event())
self._handle_event_tasks.add(_handle_event_task)
_handle_event_task.add_done_callback(self._handle_event_tasks.discard)
await asyncio.sleep(0)
async with self._condition:
self._current_event = current_event
self._condition.notify_all()
else:
_handle_event_task = asyncio.create_task(self._handle_event(current_event))
self._handle_event_tasks.add(_handle_event_task)
_handle_event_task.add_done_callback(self._handle_event_tasks.discard)
async def _handle_event(self, current_event: Optional[Event[Any]] = None) -> None:
if current_event is None:
async with self._condition:
await self._condition.wait()
assert self._current_event is not None
current_event = self._current_event
if current_event.__handled__:
return
for _hook_func in self._event_preprocessor_hooks:
await _hook_func(current_event)
for plugin_priority in sorted(self.plugins_priority_dict.keys()):
logger.debug(
self._(
"Checking for matching plugins with priority {plugin_priority!r}"
).format(plugin_priority=plugin_priority)
)
stop = False
for plugin in self.plugins_priority_dict[plugin_priority]:
try:
async with AsyncExitStack() as stack:
_plugin = await solve_dependencies(
plugin,
use_cache=True,
stack=stack,
dependency_cache={
Bot: self,
Event: current_event,
},
)
if _plugin.name not in self.plugin_state:
plugin_state = _plugin.__init_state__()
if plugin_state is not None:
self.plugin_state[_plugin.name] = plugin_state
if await _plugin.rule():
logger.info(
self._("Event will be handled by {_plugin!r}").format(
_plugin=_plugin
)
)
try:
await _plugin.handle()
finally:
if _plugin.block:
stop = True
except SkipException:
# The plug-in requires that it skips itself and continues the current event propagation
continue
except StopException:
# Plugin requires stopping current event propagation
stop = True
except Exception as e:
self.error_or_exception(
self._("Exception in plugin {plugin!r}").format(plugin=plugin),
e,
)
if stop:
break
for _hook_func in self._event_postprocessor_hooks:
await _hook_func(current_event)
logger.info(self._("Event Finished"))
@overload
async def get(
self,
func: Optional[Callable[[Event[Any]], Union[bool, Awaitable[bool]]]] = None,
*,
event_type: None = None,
adapter_type: None = None,
max_try_times: Optional[int] = None,
timeout: Optional[Union[int, float]] = None,
) -> Event[Any]: ...
@overload
async def get(
self,
func: Optional[Callable[[EventT], Union[bool, Awaitable[bool]]]] = None,
*,
event_type: None = None,
adapter_type: Type[Adapter[EventT, Any]],
max_try_times: Optional[int] = None,
timeout: Optional[Union[int, float]] = None,
) -> EventT: ...
@overload
async def get(
self,
func: Optional[Callable[[EventT], Union[bool, Awaitable[bool]]]] = None,
*,
event_type: Type[EventT],
adapter_type: Optional[Type[AdapterT]] = None,
max_try_times: Optional[int] = None,
timeout: Optional[Union[int, float]] = None,
) -> EventT: ...
[docs]
async def get(
self,
func: Optional[Callable[[Any], Union[bool, Awaitable[bool]]]] = None,
*,
event_type: Optional[Type[Event[Any]]] = None,
adapter_type: Optional[Type[Adapter[Any, Any]]] = None,
max_try_times: Optional[int] = None,
timeout: Optional[Union[int, float]] = None,
) -> Event[Any]:
"""Get events that meet the specified conditions. The coroutine will wait until the adapter receives events that meet the conditions, exceeds the maximum number of events, or times out.
Args:
func: Coroutine or function, the function will be automatically packaged as a coroutine for execution.
Requires an event to be accepted as a parameter and returns a Boolean value. Returns the current event when the coroutine returns ``True``.
When ``None`` is equivalent to the input coroutine returning true for any event, that is, returning the next event received by the adapter.
event_type: When specified, only events of the specified type are accepted, taking effect before the func condition. Defaults to ``None``.
adapter_type: When specified, only events generated by the specified adapter will be accepted, taking effect before the func condition. Defaults to ``None``.
max_try_times: Maximum number of events.
timeout: timeout period.
Returns:
Returns events that satisfy the condition of ``func``.
Raises:
GetEventTimeout: Maximum number of events exceeded or timeout.
"""
_func = wrap_get_func(func)
try_times = 0
start_time = time.time()
while not self.should_exit.is_set():
if max_try_times is not None and try_times > max_try_times:
break
if timeout is not None and time.time() - start_time > timeout:
break
async with self._condition:
if timeout is None:
await self._condition.wait()
else:
try:
await asyncio.wait_for(
self._condition.wait(),
timeout=start_time + timeout - time.time(),
)
except asyncio.TimeoutError:
break
if (
self._current_event is not None
and not self._current_event.__handled__
and (
event_type is None
or isinstance(self._current_event, event_type)
)
and (
adapter_type is None
or isinstance(self._current_event.adapter, adapter_type)
)
and await _func(self._current_event)
):
self._current_event.__handled__ = True
return self._current_event
try_times += 1
raise GetEventTimeout
def _load_plugin_class(
self,
plugin_class: Type[Plugin[Any, Any, Any]],
plugin_load_type: PluginLoadType,
plugin_file_path: Optional[str],
) -> None:
"""Load a plugin class"""
priority = getattr(plugin_class, "priority", None)
if isinstance(priority, int) and priority >= 0:
for _plugin in self.plugins:
if _plugin.__name__ == plugin_class.__name__:
logger.warning(
self._(
'Already have a same name plugin "{_plugin.__name__}"'
).format(_plugin=_plugin)
)
plugin_class.__plugin_load_type__ = plugin_load_type
plugin_class.__plugin_file_path__ = plugin_file_path
self.plugins_priority_dict[priority].append(plugin_class)
logger.info(
self._(
'Succeeded to load plugin "{plugin_class.__name__}" from class "{plugin_class!r}"'
).format(plugin_class=plugin_class)
)
else:
self.error_or_exception(
self._('Load plugin from class "{plugin_class!r}" failed:').format(
plugin_class=plugin_class
),
LoadModuleError(
self._(
'Plugin priority incorrect in the class "{plugin_class!r}"'
).format(plugin_class=plugin_class)
),
)
def _load_plugins_from_module_name(
self,
module_name: str,
*,
plugin_load_type: PluginLoadType,
reload: bool = False,
) -> None:
"""Load plugins from the given module."""
try:
plugin_classes = get_classes_from_module_name(
module_name, Plugin, reload=reload
)
except ImportError as e:
self.error_or_exception(
self._('Import module "{module_name}" failed:').format(
module_name=module_name
),
e,
)
else:
for plugin_class, module in plugin_classes:
self._load_plugin_class(
plugin_class, # type: ignore
plugin_load_type,
module.__file__,
)
def _load_plugins(
self,
*plugins: Union[Type[Plugin[Any, Any, Any]], str, Path],
plugin_load_type: Optional[PluginLoadType] = None,
reload: bool = False,
) -> None:
"""Load plugins.
Args:
*plugins: plug-in class, plug-in module name or plug-in module file path. Type can be ``Type[Plugin]``, ``str`` or ``pathlib.Path``.
If it is ``Type[Plugin]``, it will be loaded as a plug-in class.
If it is of type ``str``, it will be loaded as the plug-in module name, and the format is the same as the Python ``import`` statement.
For example: ``path.of.plugin``.
If it is of type ``pathlib.Path``, it will be loaded as the plug-in module file path.
For example: ``pathlib.Path("path/of/plugin")``.
plugin_load_type: Plug-in loading type, if it is ``None``, it will be automatically determined, otherwise the specified type will be used.
reload: Whether to reload the module.
"""
for plugin_ in plugins:
try:
if isinstance(plugin_, type) and issubclass(plugin_, Plugin):
self._load_plugin_class(
plugin_, plugin_load_type or PluginLoadType.CLASS, None
)
elif isinstance(plugin_, str):
logger.info(
self._('Loading plugins from module "{plugin_}"').format(
plugin_=plugin_
)
)
self._load_plugins_from_module_name(
plugin_,
plugin_load_type=plugin_load_type or PluginLoadType.NAME,
reload=reload,
)
elif isinstance(plugin_, Path):
logger.info(
self._('Loading plugins from path "{plugin_}"').format(
plugin_=plugin_
)
)
if not plugin_.is_file():
raise LoadModuleError( # noqa: TRY301
self._('The plugin path "{plugin_}" must be a file').format(
plugin_=plugin_
)
)
if plugin_.suffix != ".py":
raise LoadModuleError( # noqa: TRY301
self._('The path "{plugin_}" must endswith ".py"').format(
plugin_=plugin_
)
)
plugin_module_name = None
for path in self._module_path_finder.path:
try:
if plugin_.stem == "__init__":
if plugin_.resolve().parent.parent.samefile(Path(path)):
plugin_module_name = plugin_.resolve().parent.name
break
elif plugin_.resolve().parent.samefile(Path(path)):
plugin_module_name = plugin_.stem
break
except OSError:
continue
if plugin_module_name is None:
rel_path = plugin_.resolve().relative_to(Path().resolve())
if rel_path.stem == "__init__":
plugin_module_name = ".".join(rel_path.parts[:-1])
else:
plugin_module_name = ".".join(
rel_path.parts[:-1] + (rel_path.stem,)
)
self._load_plugins_from_module_name(
plugin_module_name,
plugin_load_type=plugin_load_type or PluginLoadType.FILE,
reload=reload,
)
else:
raise TypeError( # noqa: TRY301
self._("{plugin_} can not be loaded as plugin").format(
plugin_=plugin_
)
)
except Exception as e:
self.error_or_exception(
self._('Load plugin "{plugin_}" failed:').format(plugin_=plugin_), e
)
[docs]
def load_plugins(
self, *plugins: Union[Type[Plugin[Any, Any, Any]], str, Path]
) -> None:
"""Load the plugin.
Args:
*plugins: ``Plugin`` class, plugin module name or plug-in module file path.
Type can be ``Type[Plugin]``, ``str`` or ``pathlib.Path``.
If it is ``Type[Plugin]``, it will be loaded as a plug-in class.
If it is of type ``str``, it will be loaded as the plug-in module name, and the format is the same as the Python ``import`` statement.
For example: ``path.of.plugin``.
If it is of type ``pathlib.Path``, it will be loaded as the plug-in module file path.
For example: ``pathlib.Path("path/of/plugin")``.
"""
self._extend_plugins.extend(plugins)
return self._load_plugins(*plugins)
def _load_plugins_from_dirs(self, *dirs: Path) -> None:
"""Load plug-ins from the directory. Plug-ins in modules starting with ``_`` will not be imported. The path can be a relative path or an absolute path.
Args:
*dirs: Module paths that store modules containing plugins.
For example: ``pathlib.Path("path/of/plugins/")`` .
"""
dir_list = [str(x.resolve()) for x in dirs]
logger.info(
self._("Loading plugins from dirs {dir_list}").format(
dir_list=", ".join(map(str, dir_list))
)
)
self._module_path_finder.path.extend(dir_list)
for module_info in pkgutil.iter_modules(dir_list):
if not module_info.name.startswith("_"):
self._load_plugins_from_module_name(
module_info.name, plugin_load_type=PluginLoadType.DIR
)
[docs]
def load_plugins_from_dirs(self, *dirs: Path) -> None:
"""Load plug-ins from the directory. Plug-ins in modules starting with ``_`` will not be imported. The path can be a relative path or an absolute path.
Args:
*dirs: Module paths that store modules containing plugins.
For example: ``pathlib.Path("path/of/plugins/")`` .
"""
self._extend_plugin_dirs.extend(dirs)
self._load_plugins_from_dirs(*dirs)
def _load_adapters(self, *adapters: Union[Type[Adapter[Any, Any]], str]) -> None:
"""Load adapter.
Args:
*adapters: Adapter class or adapter name, type can be ``Type[Adapter]`` or ``str``.
If it is of type ``Type[Adapter]``, it will be loaded as an adapter class.
If it is of type ``str``, it will be loaded as the adapter module name, and the format is the same as the Python ``import`` statement.
For example: ``path.of.adapter``.
"""
for adapter_ in adapters:
adapter_object: Adapter[Any, Any]
try:
if isinstance(adapter_, type) and issubclass(adapter_, Adapter):
adapter_object = adapter_(self)
elif isinstance(adapter_, str):
adapter_classes = get_classes_from_module_name(adapter_, Adapter)
if not adapter_classes:
raise LoadModuleError( # noqa: TRY301
self._(
"Can not find Adapter class in the {adapter_} module"
).format(adapter_=adapter_)
)
if len(adapter_classes) > 1:
raise LoadModuleError( # noqa: TRY301
self._(
"More then one Adapter class in the {adapter_} module"
).format(adapter_=adapter_)
)
adapter_object = adapter_classes[0][0](self) # type: ignore
else:
raise TypeError( # noqa: TRY301
self._("{adapter_} can not be loaded as adapter").format(
adapter_=adapter_
)
)
except Exception as e:
self.error_or_exception(
self._('Load adapter "{adapter_}" failed:').format(
adapter_=adapter_
),
e,
)
continue
else:
self.adapters.append(adapter_object)
logger.info(
self._(
'Succeeded to load adapter "{adapter_object.__class__.__name__}" '
'from "{adapter_}"'
).format(adapter_object=adapter_object, adapter_=adapter_)
)
[docs]
def load_adapters(self, *adapters: Union[Type[Adapter[Any, Any]], str]) -> None:
"""Load adapter.
Args:
*adapters: Adapter class or adapter name, type can be ``Type[Adapter]`` or ``str``.
If it is of type ``Type[Adapter]``, it will be loaded as an adapter class.
If it is of type ``str``, it will be loaded as the adapter module name, and the format is the same as the Python ``import`` statement.
For example: ``path.of.adapter``.
"""
self._extend_adapters.extend(adapters)
self._load_adapters(*adapters)
@overload
def get_adapter(self, adapter: str) -> Adapter[Any, Any]: ...
@overload
def get_adapter(self, adapter: Type[AdapterT]) -> AdapterT: ...
[docs]
def get_adapter(
self, adapter: Union[str, Type[AdapterT]]
) -> Union[Adapter[Any, Any], AdapterT]:
"""Get the loaded adapter by name or adapter class.
Args:
adapter: adapter name or adapter class.
Returns:
The obtained adapter object.
Raises:
LookupError: No adapter object with this name found.
"""
for _adapter in self.adapters:
if isinstance(adapter, str):
if _adapter.name == adapter:
return _adapter
elif isinstance(_adapter, adapter):
return _adapter
raise LookupError(
self._('Can not find adapter named "{adapter}"').format(adapter=adapter)
)
[docs]
def get_plugin(self, name: str) -> Type[Plugin[Any, Any, Any]]:
"""Get the loaded plugin class by name.
Args:
name: plugin name
Returns:
The obtained plug-in class.
Raises:
LookupError: The plugin class with this name cannot be found.
"""
for _plugin in self.plugins:
if _plugin.__name__ == name:
return _plugin
raise LookupError(
self._('Can not find plugin named "{name}"').format(name=name)
)
[docs]
def error_or_exception(
self, message: str, exception: Exception
) -> None: # pragma: no cover
"""Output error or exception logs based on the current Bot configuration.
Args:
message: message.
exception: Exception.
"""
if self.config.bot.log.verbose_exception:
logger.exception(self._(message))
else:
logger.error(
"{message} {exception!r}".format(message=message, exception=exception)
)
[docs]
def bot_run_hook(self, func: BotHook) -> BotHook:
"""Register a function when Bot starts.
Args:
func: the registered function.
Returns:
The registered function.
"""
self._bot_run_hooks.append(func)
return func
[docs]
def bot_exit_hook(self, func: BotHook) -> BotHook:
"""Register a function when the Bot exits.
Args:
func: the registered function.
Returns:
The registered function.
"""
self._bot_exit_hooks.append(func)
return func
[docs]
def adapter_startup_hook(self, func: AdapterHook) -> AdapterHook:
"""Register a function during adapter initialization.
Args:
func: the registered function.
Returns:
The registered function.
"""
self._adapter_startup_hooks.append(func)
return func
[docs]
def adapter_run_hook(self, func: AdapterHook) -> AdapterHook:
"""Register an adapter runtime function.
Args:
func: the registered function.
Returns:
The registered function.
"""
self._adapter_run_hooks.append(func)
return func
[docs]
def adapter_shutdown_hook(self, func: AdapterHook) -> AdapterHook:
"""Register a function when the adapter is closed.
Args:
func: the registered function.
Returns:
The registered function.
"""
self._adapter_shutdown_hooks.append(func)
return func
[docs]
def event_preprocessor_hook(self, func: EventHook) -> EventHook:
"""Register an event preprocessing function.
Args:
func: the registered function.
Returns:
The registered function.
"""
self._event_preprocessor_hooks.append(func)
return func
[docs]
def event_postprocessor_hook(self, func: EventHook) -> EventHook:
"""Register a post-event processing function.
Args:
func: the registered function.
Returns:
The registered function.
"""
self._event_postprocessor_hooks.append(func)
return func