import os
import re
from queue import Queue
from threading import Thread
import requests
class Demo(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
self.headers = {
"user-agent": "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Mobile Safari/537.36"
}
def run(self):
while True:
video_info = self.queue.get()
try:
self.parse(video_info)
finally:
self.queue.task_done()
def parse(self, video_info):
file_name = re.sub(r"[/\\:*?\"<>|]", "_", video_info['desc'])
if not file_name:
file_name = video_info["aweme_id"]
file_name = video_info["sort_name"] + " " + file_name
if os.path.exists(file_name + ".mp4"):
print("文件存在,{}.mp4".format(file_name))
return
video_url = video_info['video']['play_addr']['url_list'][0]
with open(file_name + ".mp4", 'wb') as f:
f.write(requests.get(url=video_url, headers=self.headers).content)
print('下载成功: {}.mp4'.format(file_name))
class DouYin:
def __init__(self):
self.share_url = input('粘贴分享链接:')
# self.share_url = '在抖音,记录美好生活! https://v.douyin.com/ekkTsYw/'
self.headers = {
"user-agent": "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Mobile Safari/537.36"
}
self.sort_num = 1
self.video_list = []
self.queue = Queue()
# 解析粘贴内容里的url
def parse_share_url(self):
return re.findall('[a-z]+://[\S]+', self.share_url, re.I | re.M)[0]
def get_sec_uid(self):
url = self.parse_share_url()
location = requests.get(url, headers=self.headers, allow_redirects=False).headers['location']
return re.findall('sec_uid=(.*?)&', location)[0]
# 创建用户名目录
def init_save_dir(self, sec_uid):
userinfo = requests.get(url='https://www.iesdouyin.com/web/api/v2/user/info/?sec_uid={}'.format(sec_uid), headers=self.headers).json()
save_path = userinfo['user_info']['nickname']
if not os.path.exists(save_path):
os.mkdir(save_path)
os.chdir(save_path) # 变更工作目录
def get_video_list(self, sec_uid, max_cursor=0):
url = "https://www.iesdouyin.com/web/api/v2/aweme/post/?sec_uid={}&count=21&max_cursor={}".format(sec_uid, max_cursor)
# yield
resp = requests.get(url, headers=self.headers)
data = resp.json()
if data["aweme_list"]:
for video_info in data["aweme_list"]:
self.video_list.append(video_info)
print("下一页", url)
self.get_video_list(sec_uid, data["max_cursor"])
def run(self):
sec_uid = self.get_sec_uid()
self.init_save_dir(sec_uid)
self.get_video_list(sec_uid)
for x in range(10):
worker = Demo(self.queue)
worker.daemon = True
worker.start()
self.video_list.reverse()
for video_info in self.video_list:
video_info["sort_name"] = "[{}]".format(self.sort_num)
self.sort_num += 1
self.queue.put(video_info)
self.queue.join()
if __name__ == '__main__':
DouYin().run()