风过空庭,字句正徐来。
本站隐私政策/Privacy状态监控
@Do1e关于邮箱GitHubBangumi照片墙Scholar
友链Blogroll影汛创新
© 2024/11/22 ~ 2026/3/17 Do1e. | RSS 订阅 | 站点地图 | |
只有特别的你正在浏览本站~
Powered by Mix Space&
白い
.
| 苏ICP备2024146330号-1 | 苏公网安备32011302322471号
纸白微明,未成篇章。

哔哩哔哩漫画签到与福利券兑换

/ ,
139
AI·GEN

AI 生成的摘要

哔哩哔哩漫画签到与福利券兑换

  • Loading...
  • Loading...
  • Loading...
  • Loading...
  • Loading...
  • 前置说明

    既然都搭建博客了,感觉可以在这里把写过的一些杂七杂八的简单代码放上来,都比较简单就不打算在Github上给每一个都单独建一个仓库了。
    这里先更新一个“哔哩哔哩漫画签到与福利券兑换”,未来有空还可以再写一些。

    动机

    哔哩哔哩漫画每天签到能领取一定的积分,积分可以兑换成可购买漫画的福利券。
    不过最优惠的100积分只在每天0点发放一定数量。

    为此我基于NoneBot2和bilibili-api,并参考SocialSisterYi/bilibili-API-collect写了两个插件,分别用于每天签到领积分和每天0点抢购福利券,目前用下来稳定性极高。

    NoneBot2用起来还是比较简单的,我也在上面开发了一些插件,有空也可以分享出来。

    实现的效果如下:

    似乎成功率太高了完全不需要推送
    以防万一还是推送一下吧

    代码

    get_sessdata.py

    用于获取并刷新登录信息

    CodeBlock Loading...

    其中cookie_path是一个json文件,包含以下5个key:

    1. sessdata
    2. buvid3
    3. bili_jct
    4. actimevalue
    5. dedeuserid

    可以使用浏览器InPrivate模式登录后在cookie中获取,详见bilibili-api文档

    checkin.py

    用于简单的NoneBot2插件,可以在给QQ机器人发送/漫画签到手动执行,也会在每天指定的时间定时运行并将结果反馈。
    我这里没有开发成通用的形式,让所有的QQ机器人好友使用,因此仅支持发送给QQ机器人管理员。
    当然你要用的话也可以考虑改为直接使用cron定时运行,再想别的办法进行消息推送或者不推送。

    CodeBlock Loading...

    exchange.py

    用于在每天0点兑换的NoneBot2插件。实际上在23点59分开始运行并进行预处理,之后检测系统时间,于0点准时兑换,因此请确保系统时间准确,定时同步时间:

    CodeBlock Loading...

    NoneBot2逻辑和上面的签到插件类似,查看exchange_func函数了解兑换逻辑即可,代码如下:

    CodeBlock Loading...
    PYTHON
    from bilibili_api import Credential, sync
    import json
    import logging
    
    def get_sessdata(cookie_path: str) -> str:
        with open(cookie_path, 'r') as f:
            cookie = json.load(f)
        credential = Credential(**cookie)
        if sync(credential.check_refresh()):
            logging.info('Cookie needs to be refreshed.')
            sync(credential.refresh())
            cookie = credential.get_cookies()
            cookie = {k.lower(): v for k, v in cookie.items()}
            with open(cookie_path, 'w') as f:
                json.dump(cookie, f, indent=4)
            logging.info(f'Cookie refreshed.\n\t{cookie}')
        return cookie['sessdata']
    
    
    PYTHON
    from nonebot import get_driver, on_command, get_bot, require
    from nonebot.log import logger
    from nonebot.adapters.onebot.v11 import Bot, Event
    from nonebot.typing import T_State
    from nonebot.permission import SUPERUSER
    require("nonebot_plugin_apscheduler")
    from nonebot_plugin_apscheduler import scheduler
    import requests
    import time
    import traceback
    
    from .get_sessdata import get_sessdata
    
    checkin = on_command("漫画签到", priority=5, permission=SUPERUSER) # 指定仅允许管理员使用
    
    async def checkin_func():
        url = "https://manga.bilibili.com/twirp/activity.v1.Activity/ClockIn?platform=android"
        headers = {
            'user-agent': "Dalvik/2.1.0 (Linux; U; Android 14; 23046RP50C Build/UKQ1.230804.001) 6.8.5 os/android model/23046RP50C mobi_app/android_comic build/36608060 channel/xiaomi innerVer/36608060 osVer/14 network/2",
            'accept': 'application/json, text/plain, */*',
            'accept-encoding': 'gzip, deflate, br',
        }
        # 此处隐去我的cookie_path
        headers.update({'cookie': f'SESSDATA={get_sessdata(config.cookie_path)}'})
        logger.info(f'headers: {headers}')
    
        try:
            resp = requests.post(url, headers=headers)
            resp.raise_for_status()
            retdata = resp.json()
            if retdata['code'] == 0:
                msg = 'Bilibili漫画签到成功'
                logger.success(msg)
            else:
                msg = 'Bilibili漫画签到失败,' + retdata['msg']
                logger.error(msg)
        except Exception as e:
            msg = 'Bilibili漫画签到失败,' + str(e)
            logger.error(traceback.format_exc())
        return msg
    
    @checkin.handle()
    async def handle_checkin(bot: Bot, event: Event, state: T_State):
        msg = await checkin_func()
        await checkin.finish(msg)
    
    # 此处设置每天签到的时间
    @scheduler.scheduled_job('cron', hour=14, minute=57, id='manga_checkin')
    async def handle_checkin_scheduler():
        msg = await checkin_func()
        bot = get_bot()
        # 定时消息仅发送给管理员
        await bot.send_private_msg(user_id=next(iter(bot.config.superusers)), message=msg)
    
    
    timedatectl set-ntp true
    
    PYTHON
    from nonebot import get_driver, on_command, get_bot, require
    from nonebot.log import logger
    from nonebot.adapters.onebot.v11 import Bot, Event
    from nonebot.typing import T_State
    from nonebot.permission import SUPERUSER
    require("nonebot_plugin_apscheduler")
    from nonebot_plugin_apscheduler import scheduler
    import requests
    import time
    import traceback
    
    from .get_sessdata import get_sessdata
    
    exchange = on_command("漫画兑换", priority=5, permission=SUPERUSER) # 指定仅允许管理员使用
    
    
    async def exchange_func():
        point_of_target = 100 # 仅兑换100积分的福利券
        type_of_target = 7    # 仅兑换漫画福利券,避免其他类型的商品
        get_point_url = 'https://manga.bilibili.com/twirp/pointshop.v1.Pointshop/GetUserPoint'
        list_url = 'https://manga.bilibili.com/twirp/pointshop.v1.Pointshop/ListProduct'
        exchange_url = 'https://manga.bilibili.com/twirp/pointshop.v1.Pointshop/Exchange'
        headers = {
            'user-agent': "Dalvik/2.1.0 (Linux; U; Android 14; 23046RP50C Build/UKQ1.230804.001) 6.8.5 os/android model/23046RP50C mobi_app/android_comic build/36608060 channel/xiaomi innerVer/36608060 osVer/14 network/2",
            'accept': 'application/json, text/plain, */*',
            'accept-encoding': 'gzip, deflate, br',
        }
        # 此处隐去我的cookie_path
        headers.update({'cookie': f'SESSDATA={get_sessdata(config.cookie_path)}'})
        logger.info(f'headers: {headers}')
    
        def get_point(session):
            try:
                resp = session.post(get_point_url)
                resp.raise_for_status()
                return resp.json()['data']['point']
            except Exception as e:
                logger.error(traceback.format_exc())
                return None
    
        def get_target_id(session):
            try:
                resp = session.post(list_url)
                resp.raise_for_status()
                if resp.json()['code'] != 0:
                    raise RuntimeError('Failed to get product list.')
                products = resp.json()['data']
                for product in products:
                    if product['real_cost'] == point_of_target and product['type'] == type_of_target:
                        return product['id']
                return -1
            except Exception as e:
                logger.error(traceback.format_exc())
            return None
    
        def exchange(session, product_id):
            try:
                resp = session.post(exchange_url, json={'product_id': product_id, 'product_num': 1, 'point': point_of_target})
                resp.raise_for_status()
                return resp.json()
            except Exception as e:
                logger.error(traceback.format_exc())
                return None
    
        session = requests.Session()
        session.headers.update(headers)
    
        # 获取剩余点数,小于目标点数则退出
        point, retry = None, 10
        while point is None and retry > 0:
            point = get_point(session)
            retry -= 1
        if point is None:
            msg = '获取剩余点数失败'
            logger.error(msg)
            return msg
        point = int(point)
        logger.info(f'当前剩余点数: {point}')
        if point < point_of_target:
            msg = f'剩余{point},点数不足'
            logger.error(msg)
            return msg
    
        # 获取目标商品id
        product_id, retry = None, 10
        while product_id is None and retry > 0:
            product_id = get_target_id(session)
            retry -= 1
        if product_id is None:
            msg = '获取目标商品id失败'
            logger.error(msg)
            return msg
        logger.info(f'目标商品id: {product_id}')
    
        # 兑换
        retry, success_times = 100, 0
        while retry > 0:
            if time.strftime("%H", time.localtime()) == '23':
                while time.strftime("%H", time.localtime()) != '00':
                    pass # 等待到0点
            resp = exchange(session, product_id)
            if resp is None:
                retry -= 1
                continue
            if resp['code'] == 2:
                msg = '兑换失败,' + resp['msg']
                break
            if resp['code'] == 1 and resp['msg'] == '积分不足':
                msg = '兑换失败,' + resp['msg']
                break
            if resp['code'] != 0:
                msg = '兑换失败,' + resp['msg']
                retry -= 1
                continue
            else:
                success_times += 1
                msg = f'兑换成功{success_times}次'
            time.sleep(0.05)
        if success_times > 0:
            msg = f'兑换成功{success_times}次'
        logger.info(msg)
        return msg
    
    @exchange.handle()
    async def handle_exchange(bot: Bot, event: Event, state: T_State):
        msg = await exchange_func()
        await exchange.finish(msg)
    
    @scheduler.scheduled_job('cron', hour='23', minute='59', id='manga_exchange')
    async def handle_exchange_scheduler():
        msg = await exchange_func()
        bot = get_bot()
        await bot.send_private_msg(user_id=next(iter(bot.config.superusers)), message=msg)