跳转至

utils

pylibob 辅助函数。

LifespanManager

生命周期管理器。

Source code in src/pylibob/utils.py
class LifespanManager:
    def __init__(self) -> None:
        self._startup_funcs: list[L_FUNC] = []
        self._shutdown_funcs: list[L_FUNC] = []

    def on_startup(self, func: L_FUNC) -> L_FUNC:
        lifespan_logger.debug(f"添加 startup 生命周期函数: {func}")
        self._startup_funcs.append(func)
        return func

    def on_shutdown(self, func: L_FUNC) -> L_FUNC:
        lifespan_logger.debug(f"添加 shutdown 生命周期函数: {func}")
        self._shutdown_funcs.append(func)
        return func

    async def startup(self) -> None:
        if self._startup_funcs:
            for func in self._startup_funcs:
                lifespan_logger.debug(f"执行 startup 生命周期函数: {func}")
                await func()

    async def shutdown(self) -> None:
        if self._shutdown_funcs:
            for func in self._shutdown_funcs:
                lifespan_logger.debug(f"执行 shutdown 生命周期函数: {func}")
                await func()

on_shutdown(func)

注册 shutdown 生命周期函数

Parameters:

Name Type Description Default
func L_FUNC

shutdown 生命周期函数

required
Source code in src/pylibob/utils.py
def on_shutdown(self, func: L_FUNC) -> L_FUNC:
    lifespan_logger.debug(f"添加 shutdown 生命周期函数: {func}")
    self._shutdown_funcs.append(func)
    return func

on_startup(func)

注册 startup 生命周期函数

Parameters:

Name Type Description Default
func L_FUNC

startup 生命周期函数

required
Source code in src/pylibob/utils.py
def on_startup(self, func: L_FUNC) -> L_FUNC:
    lifespan_logger.debug(f"添加 startup 生命周期函数: {func}")
    self._startup_funcs.append(func)
    return func

shutdown() async

执行 shutdown 生命周期。

Source code in src/pylibob/utils.py
async def shutdown(self) -> None:
    if self._shutdown_funcs:
        for func in self._shutdown_funcs:
            lifespan_logger.debug(f"执行 shutdown 生命周期函数: {func}")
            await func()

startup() async

执行 startup 生命周期。

Source code in src/pylibob/utils.py
async def startup(self) -> None:
    if self._startup_funcs:
        for func in self._startup_funcs:
            lifespan_logger.debug(f"执行 startup 生命周期函数: {func}")
            await func()

TaskManager

异步任务管理器。

Source code in src/pylibob/utils.py
class TaskManager:
    def __init__(self):
        self.tasks = set()

    async def task(self, func, result=None, *args, **kwargs) -> Any | None:
        task_logger.debug(f"添加任务: {func}({args}, {kwargs})")
        task = asyncio.create_task(func(*args, **kwargs))
        self.tasks.add(task)
        try:
            return await task
        except asyncio.CancelledError:
            return result
        finally:
            self.tasks.remove(task)

    def task_nowait(self, func, *args, **kwargs):
        task_logger.debug(f"添加任务(nowait): {func}({args}, {kwargs})")
        task = asyncio.create_task(func(*args, **kwargs))
        self.tasks.add(task)
        task.add_done_callback(self.tasks.remove)

    def cancel_all(self):
        for _task in self.tasks:
            if not _task.done():
                _task.cancel()

cancel_all()

取消所有任务。

Source code in src/pylibob/utils.py
def cancel_all(self):
    for _task in self.tasks:
        if not _task.done():
            _task.cancel()

task(func, result=None, *args, **kwargs) async

运行任务。

Parameters:

Name Type Description Default
func Callable[..., Any]

任务函数

required
result Any | None

任务被取消的默认返回值

None
*args Any

任务函数的参数

()
**kwargs Any

任务函数的关键字参数

{}
Source code in src/pylibob/utils.py
async def task(self, func, result=None, *args, **kwargs) -> Any | None:
    task_logger.debug(f"添加任务: {func}({args}, {kwargs})")
    task = asyncio.create_task(func(*args, **kwargs))
    self.tasks.add(task)
    try:
        return await task
    except asyncio.CancelledError:
        return result
    finally:
        self.tasks.remove(task)

task_nowait(func, *args, **kwargs)

无等待添加任务。

Parameters:

Name Type Description Default
func Callable[..., Any]

任务函数

required
*args Any

任务函数的参数

()
**kwargs Any

任务函数的关键字参数

{}
Source code in src/pylibob/utils.py
def task_nowait(self, func, *args, **kwargs):
    task_logger.debug(f"添加任务(nowait): {func}({args}, {kwargs})")
    task = asyncio.create_task(func(*args, **kwargs))
    self.tasks.add(task)
    task.add_done_callback(self.tasks.remove)

TypingType

Bases: Enum

类型标注类型。

Source code in src/pylibob/utils.py
class TypingType(Enum):
    NORMAL = auto()
    BOT = auto()
    ANNOTATED = auto()

analytic_typing(func)

分析动作响应器类型。

类型信息
  • 0: 参数名称
  • 1: 参数类型
  • 2: 参数默认值(若为空则为 inspect.Parameter.empty
  • 3: 参数的类型标注类型

Parameters:

Name Type Description Default
func ActionHandler

动作响应器

required

Returns:

Type Description
list[tuple[str, type, Any, TypingType]]

一个含有类型信息的列表

Source code in src/pylibob/utils.py
def analytic_typing(
    func: ActionHandler,
) -> list[tuple[str, type, Any, TypingType]]:
    signature = get_signature(func)
    types: list[tuple[str, type, Any, TypingType]] = []
    for name, parameter in signature.parameters.items():
        if (annotation := parameter.annotation) is inspect.Parameter.empty:
            raise TypeError(f"Parameter `{name}` has no annotation")
        if inspect.ismethod(func) and parameter.name == "self":
            continue
        default = parameter.default
        if annotation is Bot:
            type_ = (name, annotation, inspect.Parameter.empty, TypingType.BOT)
        elif get_origin(annotation) is Annotated:
            annotation: Annotated
            if not isinstance(annotation.__metadata__[0], str):
                raise TypeError(
                    f"The first metadata of Annotated of param `{parameter!s}`"
                    " is not str",
                )
            type_ = (name, annotation, default, TypingType.ANNOTATED)
        else:
            type_ = (name, annotation, default, TypingType.NORMAL)
        types.append(type_)
    return types

authorize(access_token, request)

对请求进行鉴权。

access_token 为 None,则视为无访问密钥。

Parameters:

Name Type Description Default
access_token str | None

访问密钥

required
request HTTPConnection

请求

required

Returns:

Type Description
bool

鉴权是否通过

Source code in src/pylibob/utils.py
def authorize(access_token: str | None, request: HTTPConnection) -> bool:
    if access_token is None:
        return True
    # 首先检查请求头中是否存在 Authorization 头
    if (
        request_token := request.headers.get("Authorization")
    ) and request_token == f"Bearer {access_token}":
        return True
    # 继续检查是否存在 access_token URL query 参数
    return bool(
        (request_token := request.query_params.get("access_token"))
        and request_token == access_token,
    )

detect_content_type(type_)

根据 MIME Type 选中 Content-Type。

若无此类型则返回 None

Parameters:

Name Type Description Default
type_ str

MIME Type

required

Returns:

Type Description
ContentType | None

Content-Type

Source code in src/pylibob/utils.py
def detect_content_type(type_: str) -> ContentType | None:
    try:
        return ContentType(type_)
    except ValueError:
        return None

evaluate_forwardref(type_, globalns, localns)

解析 ForwardRef。

Parameters:

Name Type Description Default
type_ ForwardRef

ForwardRef

required
globalns Any

当前全局作用域

required
localns Any

当前局部作用域

required

Returns:

Type Description
Any

解析后的类型

Source code in src/pylibob/utils.py
def evaluate_forwardref(
    type_: ForwardRef,
    globalns: Any,
    localns: Any,
) -> Any:
    # Even though it is the right signature for python 3.9,
    # mypy complains with
    # `error: Too many arguments for "_evaluate" of
    # "ForwardRef"` hence the cast...
    return cast(Any, type_)._evaluate(  # noqa: SLF001
        globalns,
        localns,
        set(),
    )

get_signature(call)

获取函数签名。

Parameters:

Name Type Description Default
call Callable[..., Any]

函数

required

Returns:

Type Description
Signature

函数签名

Source code in src/pylibob/utils.py
def get_signature(call: Callable[..., Any]) -> inspect.Signature:
    signature = inspect.signature(call)
    globalns = getattr(call, "__globals__", {})
    typed_params = [
        inspect.Parameter(
            name=param.name,
            kind=param.kind,
            default=param.default,
            annotation=get_annotation(param, globalns),
        )
        for param in signature.parameters.values()
    ]
    return inspect.Signature(typed_params)