多线程百度url爬取工具:
import requests
import re
import threading
from bs4 import BeautifulSoup as bs
from queue import Queue
import datetime
class urlcollector(threading.Thread):
def __init__(self,queue):#构造函数
threading.Thread.__init__(self)
self._queue=queue
def run(self):#启动线程
while not self._queue.empty():
url=self._queue.get()
self.spider(url)
def spider(self,url):
headers={"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0"}
r=requests.get(url=url,headers=headers)
html=r.content
soup=bs(html,'lxml')
links=soup.find_all('a',attrs={"data-click":re.compile('.'),"class":None})
for i in links:
url_real=i['href']
try:
r_link_real=requests.get(url_real,headers=headers,timeout=8)
if r_link_real.status_code==200:
print(url_real)
except Exception as e:
print (e)
pass
def main(keyword):
start_time=datetime.datetime.now()
queue_=Queue()
threads=[]
thread_num=8
for i in range(0,760,10):#生成搜索栏url
url_="https://www.baidu.com/s?wd=%s & pn=%s" %(keyword,str(i))
queue_.put(url_)
for t in range(thread_num):
t=urlcollector(queue_)#实例化线程对象
threads.append(t)#将线程对象加入到线程列表中
for i in threads:#启动线程
i.start()
for i in threads:#等待线程结束
i.join()
end_time=datetime.datetime.now()
print("%d进程耗时:{}秒".format(end_time-start_time)%(thread_num))
if __name__=='__main__':
main("吴亦凡")
多线程网站目录扫描
import requests
import sys
from queue import Queue
import threading
requests.packages.urllib3.disable_warnings()
class DirScan(threading.Thread):
#构造函数
def __init__(self,queue):
threading.Thread.__init__(self)
self._queue=queue
#启动线程
def run(self):
while not self._queue.empty():#查看资源是否可用
url=self._queue.get()#如果可用,则获取使用权
try:
headers={"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36"}
proxies={"https":"http://127.0.0.1:8080"}
#print(url)
r=requests.get(url=url,headers=headers,timeout=4,proxies=proxies,verify=False)
#print(r.status_code)
if r.status_code==200:#验证路径是否有效
print("[*]"+url)
# else:
# print(r.status_code)
except Exception as e:
#print(e)
exit()
def start(url,ext,count):
queue_=Queue()
f=open('./%s.txt'%ext,'r', encoding='UTF-8')#从字典文件拼接路径
for i in f:
#result=(url+i.rstrip('\n'))
queue_.put(url+i.rstrip('\n')[0:])#将路径加入队列
#print(url+i.rstrip('\n'))
threads=[]
thread_count=int(count)
for i in range(thread_count):#创建线程对象,并添加到线程列表
threads.append(DirScan(queue_))
for i in threads:#启动线程
i.start()
for i in threads:#等待线程结束
i.join()
if __name__=='__main__':
if len(sys.argv)!=4:
print("瓜皮,参数没传够")
sys.exit(-1)
else:
start(sys.argv[1],sys.argv[2],sys.argv[3])
#python web_scanner.py https://www.baidu.com php_dic 20
要注意路径的拼接是否正确,
c段web地址扫描:
import requests
import time
import threading
import sys
from queue import Queue
from IPy import IP
from bs4 import BeautifulSoup
requests.packages.urllib3.disable_warnings()
headers={'User-Agent':"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0"}
proxies={"https":"http://127.0.0.1:8080"}
class DirScan(threading.Thread):
def __init__(self,queue):#线程类构造函数
threading.Thread.__init__(self)
self.queue_=queue
def run(self):#线程类主函数
while not self.queue_.empty():#如果存在资源可用
url=self.queue_.get()#请求url资源
try:
#print(url)
requests.adapters.DEFAULT_RETRIES = 5 #增加重连次数
s = requests.session()
s.keep_alive = False #关闭多余连接
r=requests.get(url=url,timeout=6,headers=headers,proxies=proxies,verify=False)
#print(r.status_code)
r.encoding=r.apparent_encoding
if r.status_code==200:#验证url是否可以成功访问
#print("[*] Web Service Found!%s"%url)
soup = BeautifulSoup(r.text, 'lxml')
titles = soup.find_all('head')
for i in titles:
if i.find("title").string is not None:
print("[*] Web Service Found!%s"%url+"\t"+i.find("title").string)
except Exception as e:
#print(e)
pass
def create(ips):#生成相应的c段url地址
queue_=Queue()
ip=IP(ips,make_net=True)
ports=["80"]
for i in ip:
for j in ports:
queue_.put('http://'+str(i)+":"+j)
return queue_
def main(ips):
queue_=create(ips)
threads=[]
thread_count=100
for i in range(thread_count):
threads.append(DirScan(queue_))
for i in threads:
i.start()
for i in threads:
i.join()
if __name__=="__main__":
if len(sys.argv)==2:
start=time.time()
main(sys.argv[1])
print(time.time()-start)
sys.exit()
else:
main("35.229.181.0/24")
#print("Usage:%s 192.168.1.1/24"%(sys.argv[0]))
sys.exit(-1)