示例:获取直播间弹幕和礼物

  1. from bilibili_api import live, sync
  2. room = live.LiveDanmaku(22544798)
  3. @room.on('DANMU_MSG')
  4. async def on_danmaku(event):
  5. # 收到弹幕
  6. print(event)
  7. @room.on('SEND_GIFT')
  8. async def on_gift(event):
  9. # 收到礼物
  10. print(event)
  11. sync(room.connect())

示例:简易录播

  1. from bilibili_api import live, sync
  2. import aiohttp
  3. async def main():
  4. # 初始化
  5. room = live.LiveRoom(3)
  6. # 获取直播流链接
  7. stream_info = await room.get_room_play_url()
  8. url = stream_info['durl'][0]['url']
  9. async with aiohttp.ClientSession() as sess:
  10. # 设置 UA 和 Referer 头以绕过验证
  11. async with sess.get(url, headers={"User-Agent": "Mozilla/5.0", "Referer": "https://www.bilibili.com/"}) as resp:
  12. # 以二进制追加方式打开文件以存储直播流
  13. with open('live.flv', 'ab') as f:
  14. while True:
  15. # 循环读取最高不超过 1024B 的数据
  16. chunk = await resp.content.read(1024)
  17. if not chunk:
  18. # 无更多数据,退出循环
  19. print('无更多数据')
  20. break
  21. print(f'接收到数据 {len(chunk)}B')
  22. # 写入数据
  23. f.write(chunk)
  24. sync(main())

示例:直播间赠送金瓜子礼物

  1. from bilibili_api import live, sync, Credential
  2. SESSDATA = ""
  3. BILI_JCT = ""
  4. BUVID3 = ""
  5. # 自己的uid
  6. self_uid = 0
  7. # 实例化 Credential 类
  8. credential = Credential(sessdata=SESSDATA, bili_jct=BILI_JCT, buvid3=BUVID3)
  9. room = live.LiveRoom(22544798, credential)
  10. # 获取礼物列表
  11. gift_config = sync(live.get_gift_config())
  12. for gift in gift_config['list']:
  13. if gift['name'] == "牛哇牛哇":
  14. # 赠送礼物 "牛哇牛哇" 1个
  15. res = sync(room.send_gift_gold(uid=self_uid, gift_id=gift['id'], gift_num=1, price=gift['price']))
  16. print(res)
  17. break
  18. else:
  19. print('礼物 牛哇牛哇 不存在')