import osimport refrom queue import Queuefrom threading import Threadimport requestsclass Demo(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue self.headers = { "user-agent": "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Mobile Safari/537.36" } def run(self): while True: video_info = self.queue.get() try: self.parse(video_info) finally: self.queue.task_done() def parse(self, video_info): file_name = re.sub(r"[/\\:*?\"<>|]", "_", video_info['desc']) if not file_name: file_name = video_info["aweme_id"] file_name = video_info["sort_name"] + " " + file_name if os.path.exists(file_name + ".mp4"): print("文件存在,{}.mp4".format(file_name)) return video_url = video_info['video']['play_addr']['url_list'][0] with open(file_name + ".mp4", 'wb') as f: f.write(requests.get(url=video_url, headers=self.headers).content) print('下载成功: {}.mp4'.format(file_name))class DouYin: def __init__(self): self.share_url = input('粘贴分享链接:') # self.share_url = '在抖音,记录美好生活! https://v.douyin.com/ekkTsYw/' self.headers = { "user-agent": "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Mobile Safari/537.36" } self.sort_num = 1 self.video_list = [] self.queue = Queue() # 解析粘贴内容里的url def parse_share_url(self): return re.findall('[a-z]+://[\S]+', self.share_url, re.I | re.M)[0] def get_sec_uid(self): url = self.parse_share_url() location = requests.get(url, headers=self.headers, allow_redirects=False).headers['location'] return re.findall('sec_uid=(.*?)&', location)[0] # 创建用户名目录 def init_save_dir(self, sec_uid): userinfo = requests.get(url='https://www.iesdouyin.com/web/api/v2/user/info/?sec_uid={}'.format(sec_uid), headers=self.headers).json() save_path = userinfo['user_info']['nickname'] if not os.path.exists(save_path): os.mkdir(save_path) os.chdir(save_path) # 变更工作目录 def get_video_list(self, sec_uid, max_cursor=0): url = "https://www.iesdouyin.com/web/api/v2/aweme/post/?sec_uid={}&count=21&max_cursor={}".format(sec_uid, max_cursor) # yield resp = requests.get(url, headers=self.headers) data = resp.json() if data["aweme_list"]: for video_info in data["aweme_list"]: self.video_list.append(video_info) print("下一页", url) self.get_video_list(sec_uid, data["max_cursor"]) def run(self): sec_uid = self.get_sec_uid() self.init_save_dir(sec_uid) self.get_video_list(sec_uid) for x in range(10): worker = Demo(self.queue) worker.daemon = True worker.start() self.video_list.reverse() for video_info in self.video_list: video_info["sort_name"] = "[{}]".format(self.sort_num) self.sort_num += 1 self.queue.put(video_info) self.queue.join()if __name__ == '__main__': DouYin().run()