threading模块使用
#对于io操作来说,多线程和多进程性能差别不大
#1.通过Thread类实例化
import time
import threading
def get_detail_html(url):
print("get detail html started")
time.sleep(2)
print("get detail html end")
def get_detail_url(url):
print("get detail url started")
time.sleep(4)
print("get detail url end")
#2. 通过集成Thread来实现多线程
class GetDetailHtml(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
print("get detail html started")
time.sleep(2)
print("get detail html end")
class GetDetailUrl(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
print("get detail url started")
time.sleep(4)
print("get detail url end")
if __name__ == "__main__":
thread1 = GetDetailHtml("get_detail_html")
thread2 = GetDetailUrl("get_detail_url")
start_time = time.time()
thread1.start()
thread2.start()
thread1.join()
thread2.join()
#当主线程退出的时候, 子线程kill掉
print ("last time: {}".format(time.time()-start_time))
theading 控制线程数量
#Semaphore 是用于控制进入数量的锁
#文件, 读、写, 写一般只是用于一个线程写,读可以允许有多个
#做爬虫
import threading
import time
class HtmlSpider(threading.Thread):
def __init__(self, url, sem):
super().__init__()
self.url = url
self.sem = sem
def run(self):
time.sleep(2)
print("got html text success")
# 释放锁
self.sem.release()
class UrlProducer(threading.Thread):
def __init__(self, sem):
super().__init__()
self.sem = sem
def run(self):
for i in range(20):
# 获取锁
self.sem.acquire()
html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem) # HtmlSpider调用
html_thread.start()
if __name__ == "__main__":
sem = threading.Semaphore(3) # 在20个任务中,保证一次只有3个线程同时工作
url_producer = UrlProducer(sem)
url_producer.start()
theading 线程同步
同步= 同步执行
from threading import Lock, RLock, Condition #可重入的锁
#在同一个线程里面,可以连续调用多次acquire, 一定要注意acquire的次数要和release的次数相等
total = 0
lock = RLock() #RLock允许acquire重复不报错
def add():
#1. dosomething1
#2. io操作
# 1. dosomething3
global lock
global total
for i in range(1000000):
lock.acquire()
lock.acquire()
total += 1
lock.release()
lock.release()
def desc():
global total
global lock
for i in range(1000000):
lock.acquire()
total -= 1
lock.release()
import threading
thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()
#
thread1.join()
thread2.join()
print(total)
#1. 用锁会影响性能
#2. 锁会引起死锁
#死锁的情况 A(a,b)
"""
A(a、b)
acquire (a)
acquire (b)
B(a、b)
acquire (a)
acquire (b)
"""
threading 线程间通信 -共享变量加锁
线程间通信用于:一个任务依赖另一个任务,生产者消费者关系
使用共享变量不加锁的话会出问题:
比如修改一个共享变量的值a,当2个线程拿到变量和修改变量是2步操作,第一步同时拿到a,a的值是一样的,导致修改后的a值不正确,a=1
import time import threading from chapter11 import variables
from threading import Condition
1. 生产者当生产10个url以后就就等待,保证detail_url_list中最多只有十个url
2. 当url_list为空的时候,消费者就暂停
def get_detail_html(lock):
#爬取文章详情页
detail_url_list = variables.detail_url_list
while True:
if len(variables.detail_url_list):
lock.acquire()
if len(detail_url_list):
url = detail_url_list.pop()
lock.release()
# for url in detail_url_list:
print("get detail html started")
time.sleep(2)
print("get detail html end")
else:
lock.release()
time.sleep(1)
def get_detail_url(lock):
# 爬取文章列表页
detail_url_list = variables.detail_url_list
while True:
print("get detail url started")
time.sleep(4)
for i in range(20):
lock.acquire() # 获取锁
if len(detail_url_list) >= 10:
lock.release() #释放锁
time.sleep(1)
else:
detail_url_list.append("http://projectsedu.com/{id}".format(id=i))
lock.release()
print("get detail url end")
1. 线程通信方式- 共享变量
if name == “main“: lock = RLock() thread_detail_url = threading.Thread(target=get_detail_url, args=(lock,)) for i in range(10): html_thread = threading.Thread(target=get_detail_html, args=(lock,)) html_thread.start()
# # thread2 = GetDetailUrl("get_detail_url")
start_time = time.time()
# thread_detail_url.start()
# thread_detail_url1.start()
#
# thread1.join()
# thread2.join()
#当主线程退出的时候, 子线程kill掉
print ("last time: {}".format(time.time()-start_time))
<a name="eKpwg"></a>
# threading 线程间通信 -queue
queue是线程安全的不需要加锁
```python
#通过queue的方式进行线程间同步
from queue import Queue
import time
import threading
def get_detail_html(queue):
#爬取文章详情页
while True:
url = queue.get()
# for url in detail_url_list:
print("get detail html started")
time.sleep(2)
print("get detail html end")
def get_detail_url(queue):
# 爬取文章列表页
while True:
print("get detail url started")
time.sleep(4)
for i in range(20):
queue.put("http://projectsedu.com/{id}".format(id=i))
print("get detail url end")
#1. 线程通信方式- 共享变量
if __name__ == "__main__":
detail_url_queue = Queue(maxsize=1000) #超过数量会阻塞
thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_queue,))
for i in range(10):
html_thread = threading.Thread(target=get_detail_html, args=(detail_url_queue,))
html_thread.start()
# # thread2 = GetDetailUrl("get_detail_url")
start_time = time.time()
# thread_detail_url.start()
# thread_detail_url1.start()
#
# thread1.join()
# thread2.join()
detail_url_queue.task_done()
detail_url_queue.join()
#当主线程退出的时候, 子线程kill掉
print ("last time: {}".format(time.time()-start_time))
复杂的线程同步 condition条件变量
import threading
#条件变量, 用于复杂的线程间同步
下面代码会一个线程执行完才能另一个线程执行
# class XiaoAi(threading.Thread):
# def __init__(self, lock):
# super().__init__(name="小爱")
# self.lock = lock
#
# def run(self):
# self.lock.acquire()
# print("{} : 在 ".format(self.name))
# self.lock.release()
#
# self.lock.acquire()
# print("{} : 好啊 ".format(self.name))
# self.lock.release()
#
# class TianMao(threading.Thread):
# def __init__(self, lock):
# super().__init__(name="天猫精灵")
# self.lock = lock
#
# def run(self):
#
# self.lock.acquire()
# print("{} : 小爱同学 ".format(self.name))
# self.lock.release()
#
# self.lock.acquire()
# print("{} : 我们来对古诗吧 ".format(self.name))
# self.lock.release()
#通过condition完成协同读诗
class XiaoAi(threading.Thread):
def __init__(self, cond):
super().__init__(name="小爱")
self.cond = cond
def run(self):
with self.cond:
self.cond.wait()
print("{} : 在 ".format(self.name))
self.cond.notify() #通知另一个线程可以执行了
self.cond.wait() #等待通知
print("{} : 好啊 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : 君住长江尾 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : 共饮长江水 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : 此恨何时已 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : 定不负相思意 ".format(self.name))
self.cond.notify()
class TianMao(threading.Thread):
def __init__(self, cond):
super().__init__(name="天猫精灵")
self.cond = cond
def run(self):
with self.cond:
print("{} : 小爱同学 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : 我们来对古诗吧 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : 我住长江头 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : 日日思君不见君 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : 此水几时休 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : 只愿君心似我心 ".format(self.name))
self.cond.notify()
self.cond.wait()
if __name__ == "__main__":
from concurrent import futures
cond = threading.Condition()
xiaoai = XiaoAi(cond)
tianmao = TianMao(cond)
#启动顺序很重要
#在调用with cond之后才能调用wait或者notify方法
#condition有两层锁, 一把底层锁会在线程调用了wait方法的时候释放, 上面的锁会在每次调用wait的时候分配一把并放入到cond的等待队列中,等到notify方法的唤醒
xiaoai.start()
tianmao.start()