threading模块使用

  1. #对于io操作来说,多线程和多进程性能差别不大
  2. #1.通过Thread类实例化
  3. import time
  4. import threading
  5. def get_detail_html(url):
  6. print("get detail html started")
  7. time.sleep(2)
  8. print("get detail html end")
  9. def get_detail_url(url):
  10. print("get detail url started")
  11. time.sleep(4)
  12. print("get detail url end")
  13. #2. 通过集成Thread来实现多线程
  14. class GetDetailHtml(threading.Thread):
  15. def __init__(self, name):
  16. super().__init__(name=name)
  17. def run(self):
  18. print("get detail html started")
  19. time.sleep(2)
  20. print("get detail html end")
  21. class GetDetailUrl(threading.Thread):
  22. def __init__(self, name):
  23. super().__init__(name=name)
  24. def run(self):
  25. print("get detail url started")
  26. time.sleep(4)
  27. print("get detail url end")
  28. if __name__ == "__main__":
  29. thread1 = GetDetailHtml("get_detail_html")
  30. thread2 = GetDetailUrl("get_detail_url")
  31. start_time = time.time()
  32. thread1.start()
  33. thread2.start()
  34. thread1.join()
  35. thread2.join()
  36. #当主线程退出的时候, 子线程kill掉
  37. print ("last time: {}".format(time.time()-start_time))

theading 控制线程数量

  1. #Semaphore 是用于控制进入数量的锁
  2. #文件, 读、写, 写一般只是用于一个线程写,读可以允许有多个
  3. #做爬虫
  4. import threading
  5. import time
  6. class HtmlSpider(threading.Thread):
  7. def __init__(self, url, sem):
  8. super().__init__()
  9. self.url = url
  10. self.sem = sem
  11. def run(self):
  12. time.sleep(2)
  13. print("got html text success")
  14. # 释放锁
  15. self.sem.release()
  16. class UrlProducer(threading.Thread):
  17. def __init__(self, sem):
  18. super().__init__()
  19. self.sem = sem
  20. def run(self):
  21. for i in range(20):
  22. # 获取锁
  23. self.sem.acquire()
  24. html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem) # HtmlSpider调用
  25. html_thread.start()
  26. if __name__ == "__main__":
  27. sem = threading.Semaphore(3) # 在20个任务中,保证一次只有3个线程同时工作
  28. url_producer = UrlProducer(sem)
  29. url_producer.start()

theading 线程同步

同步= 同步执行

  1. from threading import Lock, RLock, Condition #可重入的锁
  2. #在同一个线程里面,可以连续调用多次acquire, 一定要注意acquire的次数要和release的次数相等
  3. total = 0
  4. lock = RLock() #RLock允许acquire重复不报错
  5. def add():
  6. #1. dosomething1
  7. #2. io操作
  8. # 1. dosomething3
  9. global lock
  10. global total
  11. for i in range(1000000):
  12. lock.acquire()
  13. lock.acquire()
  14. total += 1
  15. lock.release()
  16. lock.release()
  17. def desc():
  18. global total
  19. global lock
  20. for i in range(1000000):
  21. lock.acquire()
  22. total -= 1
  23. lock.release()
  24. import threading
  25. thread1 = threading.Thread(target=add)
  26. thread2 = threading.Thread(target=desc)
  27. thread1.start()
  28. thread2.start()
  29. #
  30. thread1.join()
  31. thread2.join()
  32. print(total)
  33. #1. 用锁会影响性能
  34. #2. 锁会引起死锁
  35. #死锁的情况 A(a,b)
  36. """
  37. A(a、b)
  38. acquire (a)
  39. acquire (b)
  40. B(a、b)
  41. acquire (a)
  42. acquire (b)
  43. """

threading 线程间通信 -共享变量加锁

线程间通信用于:一个任务依赖另一个任务,生产者消费者关系
使用共享变量不加锁的话会出问题:
比如修改一个共享变量的值a,当2个线程拿到变量和修改变量是2步操作,第一步同时拿到a,a的值是一样的,导致修改后的a值不正确,a=1

  1. 1+1=2
  2. 1-1 =0
  3. 结果可能是2或者0
  4. 实际应该是1+1-1=1 ```python

    线程间通信

import time import threading from chapter11 import variables

from threading import Condition

1. 生产者当生产10个url以后就就等待,保证detail_url_list中最多只有十个url

2. 当url_list为空的时候,消费者就暂停

def get_detail_html(lock):

  1. #爬取文章详情页
  2. detail_url_list = variables.detail_url_list
  3. while True:
  4. if len(variables.detail_url_list):
  5. lock.acquire()
  6. if len(detail_url_list):
  7. url = detail_url_list.pop()
  8. lock.release()
  9. # for url in detail_url_list:
  10. print("get detail html started")
  11. time.sleep(2)
  12. print("get detail html end")
  13. else:
  14. lock.release()
  15. time.sleep(1)

def get_detail_url(lock):

  1. # 爬取文章列表页
  2. detail_url_list = variables.detail_url_list
  3. while True:
  4. print("get detail url started")
  5. time.sleep(4)
  6. for i in range(20):
  7. lock.acquire() # 获取锁
  8. if len(detail_url_list) >= 10:
  9. lock.release() #释放锁
  10. time.sleep(1)
  11. else:
  12. detail_url_list.append("http://projectsedu.com/{id}".format(id=i))
  13. lock.release()
  14. print("get detail url end")

1. 线程通信方式- 共享变量

if name == “main“: lock = RLock() thread_detail_url = threading.Thread(target=get_detail_url, args=(lock,)) for i in range(10): html_thread = threading.Thread(target=get_detail_html, args=(lock,)) html_thread.start()

  1. # # thread2 = GetDetailUrl("get_detail_url")
  2. start_time = time.time()
  3. # thread_detail_url.start()
  4. # thread_detail_url1.start()
  5. #
  6. # thread1.join()
  7. # thread2.join()
  8. #当主线程退出的时候, 子线程kill掉
  9. print ("last time: {}".format(time.time()-start_time))
  1. <a name="eKpwg"></a>
  2. # threading 线程间通信 -queue
  3. queue是线程安全的不需要加锁
  4. ```python
  5. #通过queue的方式进行线程间同步
  6. from queue import Queue
  7. import time
  8. import threading
  9. def get_detail_html(queue):
  10. #爬取文章详情页
  11. while True:
  12. url = queue.get()
  13. # for url in detail_url_list:
  14. print("get detail html started")
  15. time.sleep(2)
  16. print("get detail html end")
  17. def get_detail_url(queue):
  18. # 爬取文章列表页
  19. while True:
  20. print("get detail url started")
  21. time.sleep(4)
  22. for i in range(20):
  23. queue.put("http://projectsedu.com/{id}".format(id=i))
  24. print("get detail url end")
  25. #1. 线程通信方式- 共享变量
  26. if __name__ == "__main__":
  27. detail_url_queue = Queue(maxsize=1000) #超过数量会阻塞
  28. thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_queue,))
  29. for i in range(10):
  30. html_thread = threading.Thread(target=get_detail_html, args=(detail_url_queue,))
  31. html_thread.start()
  32. # # thread2 = GetDetailUrl("get_detail_url")
  33. start_time = time.time()
  34. # thread_detail_url.start()
  35. # thread_detail_url1.start()
  36. #
  37. # thread1.join()
  38. # thread2.join()
  39. detail_url_queue.task_done()
  40. detail_url_queue.join()
  41. #当主线程退出的时候, 子线程kill掉
  42. print ("last time: {}".format(time.time()-start_time))

复杂的线程同步 condition条件变量

  1. import threading
  2. #条件变量, 用于复杂的线程间同步
  3. 下面代码会一个线程执行完才能另一个线程执行
  4. # class XiaoAi(threading.Thread):
  5. # def __init__(self, lock):
  6. # super().__init__(name="小爱")
  7. # self.lock = lock
  8. #
  9. # def run(self):
  10. # self.lock.acquire()
  11. # print("{} : 在 ".format(self.name))
  12. # self.lock.release()
  13. #
  14. # self.lock.acquire()
  15. # print("{} : 好啊 ".format(self.name))
  16. # self.lock.release()
  17. #
  18. # class TianMao(threading.Thread):
  19. # def __init__(self, lock):
  20. # super().__init__(name="天猫精灵")
  21. # self.lock = lock
  22. #
  23. # def run(self):
  24. #
  25. # self.lock.acquire()
  26. # print("{} : 小爱同学 ".format(self.name))
  27. # self.lock.release()
  28. #
  29. # self.lock.acquire()
  30. # print("{} : 我们来对古诗吧 ".format(self.name))
  31. # self.lock.release()
  32. #通过condition完成协同读诗
  33. class XiaoAi(threading.Thread):
  34. def __init__(self, cond):
  35. super().__init__(name="小爱")
  36. self.cond = cond
  37. def run(self):
  38. with self.cond:
  39. self.cond.wait()
  40. print("{} : 在 ".format(self.name))
  41. self.cond.notify() #通知另一个线程可以执行了
  42. self.cond.wait() #等待通知
  43. print("{} : 好啊 ".format(self.name))
  44. self.cond.notify()
  45. self.cond.wait()
  46. print("{} : 君住长江尾 ".format(self.name))
  47. self.cond.notify()
  48. self.cond.wait()
  49. print("{} : 共饮长江水 ".format(self.name))
  50. self.cond.notify()
  51. self.cond.wait()
  52. print("{} : 此恨何时已 ".format(self.name))
  53. self.cond.notify()
  54. self.cond.wait()
  55. print("{} : 定不负相思意 ".format(self.name))
  56. self.cond.notify()
  57. class TianMao(threading.Thread):
  58. def __init__(self, cond):
  59. super().__init__(name="天猫精灵")
  60. self.cond = cond
  61. def run(self):
  62. with self.cond:
  63. print("{} : 小爱同学 ".format(self.name))
  64. self.cond.notify()
  65. self.cond.wait()
  66. print("{} : 我们来对古诗吧 ".format(self.name))
  67. self.cond.notify()
  68. self.cond.wait()
  69. print("{} : 我住长江头 ".format(self.name))
  70. self.cond.notify()
  71. self.cond.wait()
  72. print("{} : 日日思君不见君 ".format(self.name))
  73. self.cond.notify()
  74. self.cond.wait()
  75. print("{} : 此水几时休 ".format(self.name))
  76. self.cond.notify()
  77. self.cond.wait()
  78. print("{} : 只愿君心似我心 ".format(self.name))
  79. self.cond.notify()
  80. self.cond.wait()
  81. if __name__ == "__main__":
  82. from concurrent import futures
  83. cond = threading.Condition()
  84. xiaoai = XiaoAi(cond)
  85. tianmao = TianMao(cond)
  86. #启动顺序很重要
  87. #在调用with cond之后才能调用wait或者notify方法
  88. #condition有两层锁, 一把底层锁会在线程调用了wait方法的时候释放, 上面的锁会在每次调用wait的时候分配一把并放入到cond的等待队列中,等到notify方法的唤醒
  89. xiaoai.start()
  90. tianmao.start()