爬虫模板
# pip install requests
# pip install bs4
# pip install pycryptodome
# pip install lxml
import requests,re,time,os
from bs4 import BeautifulSoup
from Crypto.Cipher import AES
from lxml import etree
from utils import func
url = "https://passport.17k.com/ck/user/login"
data = "loginName=13271473920&password=qqqqqqq1"
session = requests.session()
head={
"User-Agent": func.random_ua()
}
proxies={
"https": "https://218.45.56.8:3125"
}
with session.post(url,data=data,headers=head,proxies="") as res: # 拿到身份cookie
url = "https://user.17k.com/ck/author/shelf?page=1&appKey=2406394919"
with session.get(url,data=data,headers=head) as res2: # 获取想要的内容
print(res.json())
print(res.cookies)
# # --------------------------BeautifulSoup解析 html拿值
# soup = BeautifulSoup(res.text, "html.parser") # html.parser指定文件类型为html,防止警告信息
# table = soup.find("div", class_="tbl-body") # find(标签,属性=值)
# table = table.find_all("th")
# for i in range(0, len(table) - 1):
# print(table[i].text)
# # --------------------------re正则拿值
# # ?P<guang> 起一个别名 re.S 让.具有换行的功能
# obj = re.compile(r'<span style="font-size:\d+px;">(?P<guang>.*?)</span>', re.S)
# res = obj.finditer(res.json())
# for i in res:
# print(i.group('guang'))
# # --------------------------xpath解析 XHR拿值
# tree = etree.parse(res.text)
# result = tree.xpath("/html/body/ul/li[1]/a/text()") # 指定li下标取a标签文本
# result = tree.xpath('/html/body/ul/ol/li/a/@href') # 取指定a标签href的内容
# result = tree.xpath('/html/body/ul/ol/li/a[@href="feiji"]/text()') # 取指定a标签href属性为feiji的文本
# # --------------------------AES解密
# def dec_ts(name, key):
# aes = AES.new(key=key, IV=b"0000000000000000", mode=AES.MODE_CBC)
# with open(f"noval/{name}", mode="rb") as f1,\
# open(f"noval/temp_{name}", mode="wb") as f2:
# bs = f1.read() # 从源文件读取内容
# f2.write(aes.decrypt(bs)) # 把解密好的内容写入文件
# print(f"{name}处理完毕")
# # --------------------------os合并文件
# def merge_ts():
# # mac: cat 1.ts 2.ts 3.ts > xxx.mp4
# # windows: copy /b 1.ts+2.ts+3.ts xxx.mp4
# lsts = []
# with open("noval/越狱第一季第一集_second_m3u8.txt", mode="r", encoding="utf-8") as f:
# for line in f:
# if line.startswith("#"):
# continue
# line = line.strip()
# lsts.append(f"noval/temp_{line}")
#
# s = "+".join(lsts) # 1.ts+2.ts+3.ts
# os.system(f"copy /b {s} movie.mp4")
# print("搞定!")
单线程异步协程模板
import aiohttp # pip install aiohttp
import aiofiles # pip install aiofiles
import time
import asyncio
urls = [
"http://kr.shanghai-jiuxin.com/file/2022/0511/f92746daee951aea129ca7b8bafbdb97.jpg",
"http://kr.shanghai-jiuxin.com/file/2022/0511/d93db47964abd557794ccd68d8a8ab16.jpg",
"http://kr.shanghai-jiuxin.com/file/2022/0511/8d647c98d9f59ea4ae38a0ba1e568bc3.jpg",
"http://kr.shanghai-jiuxin.com/file/2022/0511/0d69db8b2f5ad317c39bfd4ab1033064.jpg"
]
async def download(url):
name = url.rsplit("/",1)[1] # 按/分割符 从右边开始切 切1次 取第二个值
async with aiohttp.ClientSession() as session: # session 相当于request
async with session.get(url) as resp: # resp 相当于request.get()
async with aiofiles.open(name,mode="wb") as f: # 创建文件写入
await f.write(await resp.content.read()) # 读图片是异步操作,需要await挂起
print("开始准备下载")
await asyncio.sleep(2)
print("下载完成")
async def main():
tasks = []
for url in urls:
t = download(url)
tasks.append(t) # 在py3.8以上以列表方式加函数格式为 asyncio.create_task(函数名())
await asyncio.wait(tasks) # 异步投递任务 异步中用await可以让程序在等待的时候挂起继续执行下一个任务
if __name__ == '__main__':
t1 = time.time()
asyncio.get_event_loop().run_until_complete(main()) # 异步执行入口 py3.7用asyncio.run()
t2 = time.time()
print(t2-t1)
线程池
# 1. 如何提取单个页面的数据
# 2. 上线程池,多个页面同时抓取
import requests
from lxml import etree
import csv
from concurrent.futures import ThreadPoolExecutor
f = open("data.csv", mode="w", encoding="utf-8")
csvwriter = csv.writer(f)
def download_one_page(url):
# 拿到页面源代码
resp = requests.get(url)
html = etree.HTML(resp.text)
table = html.xpath("/html/body/div[2]/div[4]/div[1]/table")[0]
# trs = table.xpath("./tr")[1:]
trs = table.xpath("./tr[position()>1]")
# 拿到每个tr
for tr in trs:
txt = tr.xpath("./td/text()")
# 对数据做简单的处理: \\ / 去掉
txt = (item.replace("\\", "").replace("/", "") for item in txt)
# 把数据存放在文件中
csvwriter.writerow(txt)
print(url, "提取完毕!")
if __name__ == '__main__':
# 创建线程池
with ThreadPoolExecutor(50) as t:
for i in range(1, 200): # 199 * 20 = 3980
# 把下载任务提交给线程池
t.submit(download_one_page, f"http://www.xinfadi.com.cn/marketanalysis/0/list/{i}.shtml")
print("全部下载完毕!")
selenium模拟
# pip install selenium
from selenium.webdriver import Chrome # 操作chrom浏览器
from selenium.webdriver.common.action_chains import ActionChains # 事件链 基于页面中的其他框架操作
from selenium.webdriver.chrome.options import Options # 一些启动参数配置
from selenium.webdriver.common.keys import Keys # 按键
from selenium.webdriver.support.select import Select # 处理select下拉列表
from chaojiying import Chaojiying_Client
import time
# 初始化超级鹰
chaojiying = Chaojiying_Client('13271473920', 'qqqqqqq1', '933391')
# # -------------------------------------如果你的程序被识别到了,修改navigator的返回值为false
# # -------------------------------------1.chrome的版本号如果小于88 启动浏览器的时候,向页面嵌入js代码
# web = Chrome()
# web.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
# "source": """
# navigator.webdriver = undefined
# Object.defineProperty(navigator, 'webdriver', {
# get: () => undefined
# })
# """
# })
# web.get("https://www.baidu.com")
# # -------------------------------------2.chrome的版本大于等于88
option = Options()
# # option.add_experimental_option('excludeSwitches', ['enable-automation'])
option.add_argument('--disable-blink-features=AutomationControlled')
# # -------------------------------------设置后台无头模式
# option.add_argument("--headless")
# option.add_argument("--disbale-gpu")
web = Chrome(options=option)
web.get("https://kyfw.12306.cn/otn/resources/login.html")
time.sleep(2)
web.find_element_by_xpath('/html/body/div[2]/div[2]/ul/li[2]/a').click()
time.sleep(3)
# -------------------------------------拿页面中的图片
verify_img_element = web.find_element_by_xpath('//*[@id="J-loginImg"]')
verify_img = verify_img_element.screenshot_as_png
# -------------------------------------用超级鹰去识别验证码
dic = chaojiying.PostPic(verify_img, 9004)
result = dic['pic_str'] # x1,y1|x2,y2|x3,y3
rs_list = result.split("|")
for rs in rs_list: # x1,y1
p_temp = rs.split(",")
x = int(p_temp[0])
y = int(p_temp[1])
# 要让鼠标移动到某一个位置. 然后进行点击
# 醒了 -> 掀开被子 -> 坐起来 -> 穿鞋子 -> 穿衣服 -> 开始执行动作
ActionChains(web).move_to_element_with_offset(verify_img_element, x, y).click().perform()
time.sleep(1)
# -------------------------------------输入用户名和密码及按键
web.find_element_by_xpath('//*[@id="J-userName"]').send_keys("123456789")
web.find_element_by_xpath('//*[@id="J-password"]').send_keys("12346789")
# web.find_element_by_xpath('//*[@id="search_input"]').send_keys("python", Keys.ENTER)
# -------------------------------------点击登录
web.find_element_by_xpath('//*[@id="J-login"]').click()
time.sleep(5)
# -------------------------------------拖拽
btn = web.find_element_by_xpath('//*[@id="nc_1_n1z"]')
ActionChains(web).drag_and_drop_by_offset(btn, 300, 0).perform()
# # -------------------------------------切换窗口
# web.switch_to.window(web.window_handles[-1])
# # 在新窗口中提取内容
# job_detail = web.find_element_by_xpath('//*[@id="job_detail"]/dd[2]/div').text
# print(job_detail)
# # 关掉子窗口
# web.close()
# # 变更selenium的窗口视角. 回到原来的窗口中
# web.switch_to.window(web.window_handles[0])
# # -------------------------------------页面中遇到了 iframe
# web.get("https://www.91kanju.com/vod-play/541-2-1.html")
# # 处理iframe的话. 必须先拿到iframe. 然后切换视角到iframe . 再然后才可以拿数据
# iframe = web.find_element_by_xpath('//*[@id="player_iframe"]')
# web.switch_to.frame(iframe) # 切换到iframe
# # web.switch_to.default_content() # 切换回原页面
# tx = web.find_element_by_xpath('//*[@id="main"]/h3[1]').text
# print(tx)
# # -------------------------------------修改 select下拉框选中项
# sel_el = web.find_element_by_xpath('//*[@id="OptionDate"]')
# # 对元素进行包装, 包装成下拉菜单
# sel = Select(sel_el)
# sel.select_by_index(i) # 按照索引进行切换
猜年龄 循环判断
import random
age = random.randint(10,20)
print("随机年龄是:%d"%age)
i = 1
while i<4:
age_i = int(input("请输入要猜的年龄:"))
if age_i==age:
print("恭喜你猜对了")
break
else:
i += 1
if i == 4:
print("猜3次上限了")
res = input("请输入Y继续或Q退出:")
if res == "Y":
i = 1
continue
elif res == "Q":
print("我要退出了")
break
else:
print("请输入正确的命令")
res = input("请输入Y继续或Q退出:")
else:
print("猜错了,请重新猜一猜")
BIM判断 加减乘除
# BIM计算公式 体重除以身高的平方
h = float(input("请输入你的身高:"))
t = float(input("请输入你的体重:"))
s = t/(h**2)
if s < 18.5:
print("体重过轻")
elif s>18.5 and s<25:
print("体重正常")
elif s>15 and s<28:
print("体重过重")
elif s>28 and s<32:
print("体重肥胖")
elif s>32:
print("严重肥胖")
求出一个集合中不重复的值 循环
def l():
list1 = [1, 1, 2, 2, 3, 4, 4, 5, 5,5,3,3,4,4,7,8,9,9,0]
set1 = set(list1) #set1为list1去重后的唯一值
for s in set1:
a = 0
for i in list1:
if i == s:
a += 1
if a==1:
print("出现不重复的值:{}".format(s))
l()
双人对战 类和对象的魔术方法调用
import random
import time
class Person:
def __init__(self,name,blood): #定义初始化方法
'''
双人对战
:param name: 姓名
:param blood: 血量
'''
self.name=name #类中的全局实例属性
self.blood=blood
def tong(self,dr):
dr.blood-=10
print("%s捅了%s一刀,%s掉10点血,%s剩余%s点血"%(self.name,dr.name,dr.name,dr.name,dr.blood))
def kan(self,dr):
dr.blood -= 20
print("%s砍了%s一刀,%s掉20点血,%s剩余%s点血"%(self.name,dr.name,dr.name,dr.name,dr.blood))
def chi(self,dr):
dr.blood += 10
print("%s吃了一颗药,加10点血,%s剩余%s点血"%(dr.name,dr.name,dr.blood))
def __str__(self):
return("%s还剩下%s血"%(self.name,self.blood))
xm=Person('西门',100)
ygc=Person('叶孤城',100)
while True:
sj = random.randint(1,6)
if xm.blood <= 0 or ygc.blood <= 0:
break
elif sj == 1:
xm.tong(ygc)
elif sj == 2:
xm.kan(ygc)
elif sj == 3:
xm.chi(ygc)
elif sj == 4:
ygc.tong(xm)
elif sj == 5:
ygc.kan(xm)
elif sj == 6:
ygc.chi(xm)
print(ygc,xm)
print("*"*40)
time.sleep(1)
print("游戏结束",ygc,xm)