import reimport timeimport base64import rsaimport jsonimport requestsfrom binascii import b2a_hexclass WeiboLogin(object): def __init__(self, username, pwd): self.username = username self.pwd = pwd self.url = "https://login.sina.com.cn/sso/login.php?client=ssologin.js(v1.4.19)" self.headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Content-Length': '651', 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'login.sina.com.cn', 'Origin': 'https://weibo.com', 'Pragma': 'no-cache', 'Referer': 'https://weibo.com/', 'Sec-Fetch-Dest': 'iframe', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'cross-site', 'Sec-Fetch-User': '?1', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.135 Safari/537.36', } self.session = requests.Session() def get_prelogin(self, su): """获取登入前请求所得的响应,该响应中包含登入时所需要的参数""" url = 'https://login.sina.com.cn/sso/prelogin.php' # 获取当前毫秒时间戳 ts = str(int(time.time()*1000)) params = { 'entry': 'weibo', 'callback': 'sinaSSOController.preloginCallBack', 'su': su, 'rsakt': 'mod', 'client': 'ssologin.js(v1.4.19)', '_': ts, } res = requests.get(url, params=params) # 返回的数据是普通文本类型,而非json类型,通过正则取出,并将其由str类型转为dict类型 pattern = r'{.*?}' data = re.search(pattern, res.text).group() data_dict = json.loads(data) servertime = data_dict.get("servertime") nonce = data_dict.get("nonce") rsakv = data_dict.get("rsakv") pubkey = data_dict.get("pubkey") exectime = data_dict.get("exectime") # prelt是指预登入时所用时间,单位是毫秒 prelt = int(time.time() * 1000) - int(ts) - exectime pcid = data_dict.get('pcid') return servertime, nonce, rsakv, pubkey, prelt, pcid def get_su(self, username): """生成通过base64编码后的用户名""" su = base64.b64encode(username.encode()).decode() return su def get_sp(self, servertime, nonce, pubkey): """生成通过rsa加密后的密码""" # 按照规则生成公钥 rsa_pubkey = rsa.PublicKey(int(pubkey, 16), int('10001', 16)) # rsa加密的是在pwd前拼接了servertime和nonce两个参数的字符,而不仅是pwd message = str(servertime) + '\t' + nonce + '\n' + self.pwd # 加密,得到的是字节类型数据 res = rsa.encrypt(message.encode(), rsa_pubkey) # 返回其16进制的表现形式 sp = b2a_hex(res).decode() return sp def get_post_data(self): """构造登入时需要提交的表单参数""" su = self.get_su(self.username) servertime, nonce, rsakv, pubkey, prelt, pcid = self.get_prelogin(su) sp = self.get_sp(servertime, nonce, pubkey) form_data = { 'entry': 'weibo', 'gateway': '1', 'from': '', 'savestate': '0', 'qrcode_flag': 'false', 'useticket': '1', 'pagerefer': '', 'vsnf': '1', 'su': su, 'service': 'miniblog', 'servertime': servertime, 'nonce': nonce, 'pwencode': 'rsa2', 'rsakv': rsakv, 'sp': sp, 'sr': '1536*864', 'encoding': 'UTF-8', 'prelt': prelt, 'url': 'https://weibo.com/ajaxlogin.php?framelogin=1&callback=parent.sinaSSOController.feedBackUrlCallBack', 'returntype': 'META', } return form_data def get_redirect_url(self, res): """使用re获取去响应中要跳转的url, url为空表示登入出现问题""" pattern = r"location.replace\([\"\'](.*?)[\"\']\)" url = re.search(pattern, res.text) if url: return url.group(1) return url def get_errno(self, url1, res): """获取去登入失败的状态码,以判断登入失败的原因""" errno_dict = { 4038: "登录次数过于频繁", 4049: "请填写验证码", 4010: "帐号尚未激活", 4090: "此帐号未激活,请登录原帐号", 5024: "请填写正确的微盾动态码", 5025: "动态码有误,请重新输入", 5: "尚未注册微博", 101: "用户名或密码错误", 4098: "您的帐号还没有设置密码,为方便登录请", 9999: "当前网络超时,请稍后再试", 2071: "您已开启登录保护,请扫码登录" } pattern = r"{.*?}" data = re.search(pattern, res.text) if data: data = data.group() errno = json.loads(data).get('errno') errno_msg = errno_dict.get(int(errno)) print(errno, errno_msg) return errno else: print('重定向时请求头headers1中的cookie已过期, 请在浏览器中访问下面的url,复制请求头中的cookie添加到headers1') print(url1) def get_login_res(self, form_data): """一共跳转3次到最后的登入后的微博首页""" res = self.session.post(self.url, headers=self.headers, data=form_data) url1 = self.get_redirect_url(res) res1 = self.session.get(url1) url2 = self.get_redirect_url(res1) if not url2: # url2为空,则表示登入时出现问题 print('跳转失败') headers1 = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Host': 'weibo.com', 'Pragma': 'no-cache', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', 'Upgrade-Insecure-Requests': '1', 'Cookie': '', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.135 Safari/537.36', } res1 = self.session.get(url1, headers=headers1) errno = self.get_errno(url1, res1) else: # url2不为空则继续跳转 print('跳转成功') res2 = self.session.get(url2) url3 = 'https://weibo.com' res3 = self.session.get(url3) if '我的首页 微博-随时随地发现新鲜事' in res3.text: print('登入成功') else: print('登入失败') with open('weibo.html', 'w', encoding='utf-8') as f: f.write(res3.text) def run(self): # 构造postdata表单数据 form_data = self.get_post_data() # 发送登入请求,登入成功后请求登入才能访问的页面并返回 self.get_login_res(form_data)if __name__ == '__main__': username = '' pwd = '' weiboLogin = WeiboLogin(username, pwd) weiboLogin.run()