Skip to content

订阅消息简单

字数统计:743 字
阅读时长:2 分钟

💡 提醒

想象一下以下场景,为了制作一个公告推送(或者是签到)功能,你需要制作一个命令,触发时用于先持久化保存该群/用户的信息(group_id/user_id)等,定时轮询信息源,有消息的时候就按照该群/用户的信息进行推送。

在这个场景下,因为GsCore的自带多平台/多账号适配,为了解决消息发送的问题,需要持久化的内容很多

例如:BOT,Bot平台ID(多平台),Bot自身ID(同平台多账号),user_id/group_id

而在发送的时候,需要传递的参数/代码量也过于繁琐,详见定时任务

此时则需要一个封装好的方法,可以高效的处理该类场景,下面为你介绍。

Interface

代码位置测试示例

流程概览

python
from gsuid_core.utils.database.models import Subscribe

# 订阅公告
GsCoreSubscribe.add_task("鸣朝公告", event)
# 订阅推送
GsCoreSubscribe.add_task("鸣朝推送", event)

# 发送主动消息
all_push_task: Optional[List[Subscribe]] = GsCoreSubscribe.get_task("鸣朝推送")
for subscribe in all_push_task:
    user_id = subscribe.user_id
    group_id = subscribe.group_id
    msg = process(user_id, group_id)  # 处理业务逻辑,并拿到消息
    subscribe.send(msg)  # 发送消息

一些重要的传参信息

python
async def add_subscribe(
        self,
        subscribe_type: Literal['session', 'single'],
        task_name: str,
        event: Event,
        extra_message: Optional[str] = None,
    ):
        '''📝简单介绍:

            该方法允许向数据库添加一个订阅信息的持久化保存
            注意`subscribe_type`参数必须为`session`或`single`
            `session`模式下, 订阅都将在每个有效的session(group或direct)内独立存在 (公告推送)
            `single`模式下, 同个session(group)可能同时存在多个订阅 (签到任务)

        🌱参数:

            🔹subscribe_type (`Literal['session', 'single']`):
                    'session'模式: 同个group/user下只存在一条订阅
                    'single'模式: 同个group下存在多条订阅, 同个user只存在一条订阅

            🔹task_name (`str`):
                    订阅名称

            🔹event (`Event`):
                    事件Event

            🔹extra_message (`Optional[str]`, 默认是 `None`):
                    额外想要保存的信息, 例如推送信息或者数值阈值

        🚀使用范例:

            `await GsCoreSubscribe.add_subscribe('single', '签到', event)`
        '''
        pass

订阅消息

python
from gsuid_core.subscribe import gs_subscribe
from gsuid_core.models import Event
from gsuid_core.bot import Bot
from gsuid_core.logger import logger

async def handle_subscribe(bot: Bot, ev: Event):
    await gs_subscribe.add_subscribe(
        'single',
        '订阅测试',
        ev,
        extra_message='测试',
    )
    data = await gs_subscribe.get_subscribe('订阅测试')
    logger.info(data)
    await bot.send('订阅成功!')

获取订阅角色/群聊&发送消息

python
from gsuid_core.subscribe import gs_subscribe
from gsuid_core.models import Event
from gsuid_core.bot import Bot
from gsuid_core.logger import logger

async def handle_get_subscribe(bot: Bot, ev: Event):
    # 可以拿到所有该类命名的订阅信息
    datas = await gs_subscribe.get_subscribe('订阅测试')
    # 进行循环
    if datas:
        # 发送
        for subscribe in datas:
            await subscribe.send(f'[订阅] {subscribe.extra_message}')
    await bot.send('查看订阅成功!')

删除订阅

python
from gsuid_core.subscribe import gs_subscribe
from gsuid_core.models import Event
from gsuid_core.bot import Bot
from gsuid_core.logger import logger

async def handle_unsubscribe(bot: Bot, ev: Event):
    # 执行删除
    await gs_subscribe.delete_subscribe('single', '订阅测试', ev)
    # 检查是否存在
    data = await gs_subscribe.get_subscribe('订阅测试')
    logger.info(data)
    await bot.send('取消订阅成功!')