什么是进程(process)

进程是操作系统分配资源的最小单元,进程好比工厂的车间,代表cpu所能处理的单个任务,
任意时刻CPU总是运行一个进程,其他进程处于非运行状态;
进程的内存空间是共享的;

什么是线程(thread)

线程是操作系统调度的最小单元,线程好比车间的工人,一个进程可包括一个或多个线程,多线程协同完成一个任务;
多线程都可使用进程的共享空间;
线程安全相当于车间的厕所,一次只能进入一个人,防止他人同时进入的方法就是加上”🔒”,称为”互斥锁”;
容纳固定数目的线程,可通过给线程加上”信号量”(semaphore),保证多个线程不会相互冲突。

什么是协程(coroutine)

协程又称微线程,是一种用户态的轻量级线程,不是被操作系统内核所管理,而完全是由程序所控制(也就是在用户态执行)。
gevent的基本原理来自libev和libuv,本质上是一种时间驱动模型,如果代码中引入了带io阻塞的代码时,lib本身会自动完成上下文的切换。

  • 优点:
    1. 无需线程上下文切换的开销,线程数量越多,协程的性能优势越明显。
    2. 无需原子操作锁定及同步的开销,协程不需要多线程的锁机制,因为只有一个线程,不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态。
  • 缺点:
    1. 无法利用多核资源,协程本质是单线程,不能同时将单CPU的多个核用上,协程需要进程配合使用才能运行在多CPU上。
    2. 进行阻塞操作会阻塞掉整个程序。

并行和并发

并行是指计算机系统中能同时执行两个或多个任务的计算方法,并行处理可同时工作于同一程序的不同方面。
并发是同一个时间段内有几个程序都在一个CPU中处于运行状态,但任一时刻只有一个程序在CPU上运行。
并发在于有处理多个任务的能力,不一定要同时;并行在于就是同时处理多个任务的能力,并行是并发的子集。

阻塞/非阻塞、同步/异步

在进程通信中,阻塞/非阻塞,同步/异步是同义词,但是需要区分对象是发送方还是接收方。
发送方阻塞/非阻塞(同步/异步)和接收方的阻塞/非阻塞(同步/异步)是互不影响的。
在IO系统调用层面,非阻塞IO系统调用异步IO系统调用存在一定差别,他们都不阻塞进程,但返回结果方式和内容是有所差别的,可能是完整的结果、也可以是不完整的,也可以是空值,但都属于非阻塞系统调用。

同步与异步关注的是消息通信机制:
1. 你打电话问书店老板有没有xxxx这本书,如果是同步通信机制,书店老板会说,你稍等,”我查一下”,然后开始查,等查好了(可能是5秒,也可能是一天)告诉你结果(返回结果)。
2. 异步通信机制,书店老板直接告诉你我查一下啊,查好了打电话给你,然后直接挂电话了(不返回结果)。然后查好了,他会主动打电话给你。在这里老板通过“回电”这种方式来回调 阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态: 阻塞调用是指调用结果返回之前,当前线程会被挂起,调用线程只有在得到结果之后才会返回。
非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。

  1. 你打电话问书店老板有没有xxx这本书,你如果是阻塞式调用,你会一直把自己“挂起”,直到得到这本书有没有的结果,如果是非阻塞式调用,你不管老板有没有告诉你,你自己先一边去玩了, 当然你也要偶尔过几分钟check一下老板有没有返回结果。
    在这里阻塞与非阻塞与是否同步异步无关。跟老板通过什么方式回答你结果无关

mulprocessing实现多进程

常用函数说明

  1. Process():通过Process方法创建进程,接收两个参数,target:一般指向函数名,args:需要向函数传递的参数

    1. start():开始进程
    2. join():阻塞主进程,等待子进程完成
    3. os.getpid():打印当前进程名称
  2. Pool类创建多进程

    1. apply_async(func[, args=()[, kwds={}[[, callback=None]]]):向进程池提交需要执行的函数及参数,非阻塞调用。
    2. map(func,iterable):使进程阻塞直到返回结果。
    3. map_async():同map函数,非阻塞调用。
    4. close():关闭进程池(pool),使其不再接受新的任务。
    5. terminate():结束工作进程,不再处理未处理的任务。
    6. join():主进程阻塞等待子进程的退出,在close()和terminate()之后使用 ```python import time import os from multiprocessing import Process, cpu_count, Pool

def long_time_task2(i): print(“子进程:{} - 任务: {}”.format(os.getpid(), i)) with open(‘process_text.txt’, ‘w’) as f: for j in range(100000): f.write(str(j) + ‘\n’)

if name == “main“: print(“当前cpu核数:{}”.format(cpu_count())) print(“当前母进程:{}”.format(os.getpid())) start = time.time()

  1. # 使用Pool
  2. p = Pool(cpu_count())
  3. for i in range(cpu_count()):
  4. p.apply_async(long_time_task2, args=(i, ))
  5. print("等待所有pool子进程完成")
  6. p.close()
  7. p.join()
  8. end = time.time()
  9. print("耗时:{}秒".format(end-start))
  1. <a name="V3X1f"></a>
  2. ####
  3. <a name="n0Uhk"></a>
  4. #### 多进程之间的数据共享
  5. 通过使用队列queue来实现不同进程间的通信或数据共享
  6. ```python
  7. from multiprocessing import Process, Queue
  8. import os, time, random
  9. def write(q):
  10. print("Process to write: {}".format(os.getpid()))
  11. for value in ["A", "B", "C"]:
  12. print("Put {} to queue...".format(value))
  13. q.put(value)
  14. time.sleep(random.random())
  15. def read(q):
  16. print("Process to read: {}".format(os.getpid()))
  17. while True:
  18. value = q.get(True)
  19. print("Get {} from queue.".format(value))
  20. if __name__ == "__main__":
  21. q = Queue()
  22. pw = Process(target=write, args=(q,))
  23. pr = Process(target=read, args=(q,))
  24. pw.start()
  25. pr.start()
  26. pw.join()
  27. pr.terminate()

threading实现多线程

  1. threading.currentThread():返回当前的线程变量
  2. threading.enumetate():返回正在运行的线程的list
  3. threading.activeCount():返回正在运行的线程数量
  4. Thread类处理线程

    run(): 用以表示线程活动的方法 start(): 启动线程活动 join(): 等待至线程终止 sAlive(): 返回线程是否活动 getName(): 返回线程名 setName(): 设置线程名 setDaemon: 将该线程声明为守护线程setDaemon(True),子线程会随着父线程的终止而终止;否则,子线程仍在运行中,主线程不会退出(但会向后执行),直到所有子线程运行结束。setDaemon()需要在start()前声明。

  1. import threading
  2. import time
  3. def long_time_task(i):
  4. print("当前子进程:{}- 任务 {}".format(threading.current_thread().name, i))
  5. # time.sleep(2)
  6. # print("结果: {}".format(8 ** 20))
  7. with open('thread_text.txt', 'w') as f:
  8. for j in range(100000):
  9. f.write(str(j) + '\n')
  10. if __name__ == "__main__":
  11. start = time.time()
  12. print("这是主线程:{}".format(threading.current_thread().name))
  13. thread_list = []
  14. for k in range(1, 10):
  15. t = threading.Thread(target=long_time_task, args=(k, ))
  16. thread_list.append(t)
  17. for i in thread_list:
  18. i.setDaemon(True)
  19. i.start()
  20. for i in thread_list:
  21. i.join()
  22. end = time.time()
  23. print("总共用时{}秒".format(end-start))

多线程之间的数据共享

  1. threading.lock()实现对线程中共享变量的锁定,确保每次只有一个线程能修改
  2. threading.lock().acquire():获得锁
  3. threading.lock().release():释放锁 ```python import threading from logzero import logger

class Account:

  1. def __init__(self):
  2. self.num = 1
  3. def add(self, lock):
  4. with lock:
  5. for i in range(100000):
  6. self.num += 1
  7. logger.info(f"add self.num: {self.num}")
  8. def delete(self, lock):
  9. with lock:
  10. for i in range(100000):
  11. self.num -= 1
  12. logger.info(f"delete self.num: {self.num}")

if name == “main“: account = Account() lock = threading.Lock()

  1. # 创建线程
  2. thread_num = 10
  3. thread_add_list = []
  4. thread_delete_list = []
  5. for i in range(thread_num):
  6. thread_add = threading.Thread(target=account.add, args=(lock, ), name="add")
  7. thread_add_list.append(thread_add)
  8. thread_delete = threading.Thread(target=account.delete, args=(lock,), name="Delete")
  9. thread_delete_list.append(thread_delete)
  10. # 启动线程
  11. for i in thread_add_list:
  12. i.start()
  13. i.join()
  14. for i in thread_delete_list:
  15. i.start()
  16. i.join()
  17. logger.info(f"num is {account.num}")

“””

[I 220112 13:15:08 threading_test_2:24] add self.num: 100001 [I 220112 13:15:08 threading_test_2:24] add self.num: 200001 [I 220112 13:15:08 threading_test_2:24] add self.num: 300001 [I 220112 13:15:08 threading_test_2:24] add self.num: 400001 [I 220112 13:15:08 threading_test_2:24] add self.num: 500001 [I 220112 13:15:08 threading_test_2:24] add self.num: 600001 [I 220112 13:15:08 threading_test_2:24] add self.num: 700001 [I 220112 13:15:08 threading_test_2:24] add self.num: 800001 [I 220112 13:15:08 threading_test_2:24] add self.num: 900001 [I 220112 13:15:08 threading_test_2:24] add self.num: 1000001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 900001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 800001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 700001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 600001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 500001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 400001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 300001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 200001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 100001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 1 [I 220112 13:15:08 threading_test_2:55] num is 1 “””

  1. <a name="IFvCH"></a>
  2. ####
  3. <a name="nTwld"></a>
  4. #### 队列实现线程安全的数据共享
  5. ```python
  6. from queue import Queue
  7. import time
  8. import threading
  9. import random
  10. # 生产者
  11. class Productor(threading.Thread):
  12. def __init__(self, name, queue):
  13. # threading.Thread.__init__(self, name=name)
  14. super(Productor, self).__init__(name=name)
  15. self.queue = queue
  16. def run(self):
  17. for i in range(1, 5):
  18. print("{} is producting {} to the queue".format(self.getName(), i))
  19. self.queue.put(i)
  20. time.sleep(random.randrange(10)/5)
  21. print("{} finished!".format(self.getName()))
  22. # 消费者
  23. class Consumer(threading.Thread):
  24. def __init__(self, name, queue):
  25. # threading.Thread.__init__(self, name=name)
  26. super(Consumer, self).__init__(name=name)
  27. self.queue = queue
  28. def run(self):
  29. for i in range(1, 5):
  30. val = self.queue.get()
  31. print("{} is consuming {} in the queue".format(self.getName(), val))
  32. time.sleep(random.randrange(10))
  33. print("{} finished!".format(self.getName()))
  34. def main():
  35. start = time.time()
  36. queue = Queue()
  37. producer = Productor('Producer', queue)
  38. consumer = Consumer('Consumer', queue)
  39. producer.start()
  40. consumer.start()
  41. producer.join()
  42. consumer.join()
  43. end = time.time()
  44. print("all threads finished!, 耗时 {} 秒".format(end-start))
  45. if __name__ == "__main__":
  46. main()
  47. -----------------------------------------------------------------------------
  48. """
  49. # @File : threading_test-1.py
  50. # @Time : 2021/01/11 23:19:59
  51. # @Author : wangshunzhe
  52. @desc: 多线程去消费一个队列的例子
  53. """
  54. import threading
  55. import time
  56. import queue
  57. # 下面来通过多线程来处理Queue里面的任务:
  58. def work(q):
  59. while True:
  60. if q.empty():
  61. return
  62. else:
  63. t = q.get()
  64. print("当前线程sleep {} 秒".format(t))
  65. time.sleep(t)
  66. def main():
  67. q = queue.Queue()
  68. for i in range(10):
  69. q.put(i) # 往队列里生成消息
  70. # 单线程
  71. # work(q)
  72. # 多线程
  73. thread_num = 10
  74. threads = []
  75. for i in range(thread_num):
  76. t = threading.Thread(target=work, args=(q,))
  77. # args需要输出的是一个元组,如果只有一个参数,后面加,表示元组,否则会报错
  78. threads.append(t)
  79. for i in range(thread_num):
  80. threads[i].setDaemon(True)
  81. threads[i].start()
  82. for i in range(thread_num):
  83. threads[i].join()
  84. if __name__ == "__main__":
  85. start = time.time()
  86. main()
  87. print('耗时:', time.time() - start)
  88. """
  89. 当前线程sleep 0 秒
  90. 当前线程sleep 1 秒
  91. 当前线程sleep 3 秒
  92. 当前线程sleep 4 秒
  93. 当前线程sleep 5 秒
  94. 当前线程sleep 6 秒
  95. 当前线程sleep 8 秒
  96. 当前线程sleep 2 秒
  97. 当前线程sleep 7 秒
  98. 当前线程sleep 9 秒
  99. 耗时: 0.0029397010803222656
  100. """

为啥队列是线程安全的?

因为队列Queue的put方法和get方法都是原子操作

tips:
对CPU密集型代码(循环操作)—多进程效率高
对IO密集型代码(文件操作、爬虫)—多线程效率高

原子操作

指不会被线程调度机制打断的操作,这种操作一旦开始,就一直运行到结束,中间不会切换到其他线程。
常见的原子操作:

  1. L.append(i)
  2. L1.extend(L2)
  3. x = L[i]
  4. x = L.pop()
  5. L[i:j] = L2
  6. L.sort()
  7. x = y
  8. x.field = y
  9. D[x] = y
  10. D1.update(D2)
  11. D.keys()

gevent实现协程

  1. from __future__ import print_function
  2. import gevent
  3. from gevent import monkey, sleep
  4. monkey.patch_all()
  5. import time
  6. import requests
  7. urls = [
  8. 'https://www.baidu.com/',
  9. 'https://kitten4.codemao.cn/'
  10. ]
  11. def print_head(url):
  12. start_time = time.time()
  13. print('Starting %s' % url)
  14. data = requests.get(url).text
  15. print('%s: %s bytes: %r' % (url, len(data), data[:50]))
  16. total_time = time.time() - start_time
  17. print('total time is %s' % total_time)
  18. # 生成Greenlet对象
  19. jobs = [gevent.spawn(print_head, _url) for _url in urls]
  20. # 使用joinall对Greenlet对象进行管控,等待所有greenlet对象以达到协程自动切换的目的
  21. # 返回所有已经join的greenlet对象列表
  22. gevent.joinall(jobs)
  23. """
  24. Starting https://www.baidu.com/
  25. Starting https://kitten4.codemao.cn/
  26. https://www.baidu.com/: 2443 bytes: '<!DOCTYPE html>\r\n<!--STATUS OK--><html> <head><met'
  27. total time is 0.12381315231323242
  28. https://kitten4.codemao.cn/: 15049 bytes: '<!doctype html><html><head><script>"use strict";\n\n'
  29. total time is 0.508491039276123
  30. """

gevent下的monkey机制

自动替换原来的thread、socket、time、multiprocessing等代码,全部变成gevent框架

  1. from gevent import monkey, sleep
  2. monkey.patch_all()
  3. import socket
  4. def print_monkey():
  5. print('obj', socket.socket)
  6. jobs = [gevent.spawn(print_monkey)]
  7. # 使用joinall对Greenlet对象进行管控,等待所有greenlet对象以达到协程自动切换的目的
  8. # 返回所有已经join的greenlet对象列表
  9. gevent.joinall(jobs)
  10. """
  11. obj <class 'gevent._socket3.socket'>
  12. """
  13. # 没有打补丁的情况下
  14. import socket
  15. def print_monkey():
  16. print('obj', socket.socket)
  17. jobs = [gevent.spawn(print_monkey)]
  18. # 使用joinall对Greenlet对象进行管控,等待所有greenlet对象以达到协程自动切换的目的
  19. # 返回所有已经join的greenlet对象列表
  20. gevent.joinall(jobs)
  21. """
  22. obj <class 'socket.socket'>
  23. """

gevent延时操作

  1. from gevent import monkey, sleep
  2. monkey.patch_all()
  3. import gevent
  4. def f1():
  5. for i in range(6, 11):
  6. print('this is ' + str(i))
  7. # 会主动调用切换函数
  8. gevent.sleep(2)
  9. def f2():
  10. for i in range(5):
  11. print('that is ' + str(i))
  12. t1 = gevent.spawn(f1)
  13. t2 = gevent.spawn(f2)
  14. gevent.wait([t1, t2])
  15. """
  16. this is 6
  17. that is 0
  18. that is 1
  19. that is 2
  20. that is 3
  21. that is 4
  22. this is 7
  23. this is 8
  24. this is 9
  25. this is 10
  26. """

gevent常用方法

gevent.spawn() 创建一个普通的Greenlet对象并切换
gevent.spawn_later(seconds=1) 延时创建一个普通的Greenlet对象并切换
gevent.getcurrent() 返回当前正在执行的greenlet
gevent.joinall(jobs) 将协程任务添加到事件循环,接收一个任务列表
gevent.wait() 替代join函数等待循环结束,,也可以传入协程对象列表
gevent.kill() 杀死一个协程
gevent.killall() 杀死一个协程列表里的所有协程
monkey.patch_all() 自动将python的一些标准模块替换成gevent框架

gevent中的组和池

  1. 组(group)是一个运行中greenlet集合,集合中的greenlet像一个组一样会被共同管理和调度
    1. Group().add():将greenlet添加到group中
    2. Group().map(func, iterable):由第二个参数控制迭代次数,返回可迭代对象执行结果列表
    3. Group().imap():返回一个可迭代对象 ```python def talk(msg): for i in range(2): print(msg)

g1 = gevent.spawn(talk, ‘bar’) g2 = gevent.spawn(talk, ‘foo’) print(g1) group = Group()

add方法,将greenlet添加到group中

group.add(g1) group.add(g2)

等待spawn完成,完成即从group里面去掉

group.join()

“””

bar bar foo foo “””

def talk_map(data): print(‘Size of group %s’ % len(group1)) print(‘Hello from Greenlet %s’ % id(getcurrent())) return data

group1 = Group()

返回可迭代对象执行结果列表

res = group1.map(talk_map, [1, 2, 3]) print(type(res)) print(res)

“”” Size of group 3 Hello from Greenlet 140581403301936 Size of group 3 Hello from Greenlet 140581403302224 Size of group 3 Hello from Greenlet 140581403302512

[1, 2, 3] “””

def talk_imap(data): print(‘Size of group %s’ % len(group2)) print(‘Hello from Greenlet %s’ % id(getcurrent())) return data

group2 = Group()

返回可迭代对象执行结果列表

res2 = group2.imap(talk_imap, range(5), maxsize=1) print(type(res2)) print(res2) for i in res2: print(i)

“””

Size of group 2 Hello from Greenlet 140290958927632 Size of group 2 Hello from Greenlet 140290958928496 0 1 Size of group 2 Hello from Greenlet 140290958928496 Size of group 2 Hello from Greenlet 140290958927632 2 3 Size of group 1 Hello from Greenlet 140290958927632 4 “””

  1. 2. 池(Pool)是一种用于处理需要限制并发性的动态数量
  2. 1. Pool().map(func, iterable):由第二个参数控制迭代次数,返回可迭代对象执行结果列表
  3. 2. Pool().imap():返回一个可迭代对象
  4. ```python
  5. pool = Pool(2)
  6. def hello_from(n):
  7. print('Size of pool %s' % len(pool))
  8. return 44
  9. # pool.map(hello_from, range(4))
  10. for i in pool.imap(hello_from, range(3)):
  11. print(i)
  12. """
  13. Size of pool 2
  14. Size of pool 2
  15. 44
  16. 44
  17. Size of pool 1
  18. 44
  19. """

gevent结合数据结构

队列queue是一个排序的数据集合,常用的方法如下:

put_nowait(item) 非阻塞的往队列放入数据,队列满则抛出full exception
get_nowait() 非阻塞的从队列读取数据,队列为空则抛出empty exception
put(block=True, timeout=None) 从队列放入数据,可选是否阻塞和超时时间
get(block=True, timeout=None) 从队列读取数据,可选是否阻塞和超时时间
peek(block=True, timeout=None) 与get()类似,但获取的数据不会从队列移除
peek_nowait() 类似get_nowait()
empty() 队列为空返回True
full() 队列已满返回True
qsize() 返回队列长度
  1. from gevent import monkey
  2. from gevent.queue import Queue
  3. monkey.patch_all()
  4. import gevent
  5. tasks = Queue()
  6. def worker(n):
  7. while not tasks.empty():
  8. task = tasks.get()
  9. print("Worker {} got task {}".format(n, task))
  10. gevent.sleep(0)
  11. print("没有数据了")
  12. def producer():
  13. for i in range(3):
  14. tasks.put(i)
  15. print("produce: {}".format(i))
  16. gevent.spawn(producer).join()
  17. gevent.joinall([
  18. gevent.spawn(worker('worker A')),
  19. gevent.spawn(worker('worker B')),
  20. gevent.spawn(worker('worker C')),
  21. ])
  22. """
  23. produce: 0
  24. produce: 1
  25. produce: 2
  26. Worker worker A got task 0
  27. Worker worker A got task 1
  28. Worker worker A got task 2
  29. 没有数据了
  30. 没有数据了
  31. 没有数据了
  32. """

python中的锁

GIL锁

全局解释器锁,每个线程在执行的时候都需要先获取GIL,保证同一个时刻只有一个线程可以执行代码,即说同一进程内的多个线程同一时间只能有一个运行

  • 释放GIL锁的时机:
    • 遇到IO操作,会造成CPU闲置,会释放GIL
    • 会有专门的triks计数,到达一定值会释放GIL锁,这时候线程之间会开始竞争GIL锁
  • GIL锁和互斥锁的关系:
    • 假设一个进程中有多线程运行,线程A获得GIL—>获得互斥锁lock,线程A在开始修改数据之前,遇到IO操作(数据读入内存或者内存输出的过程)
    • 线程A释放GIL,线程A、B开始竞争GIL锁
    • 假设线程B获得GIL锁—>因为有互斥锁lock,B无法修改数据,释放GIL锁
    • 假设线程A再次竞争到GIL锁,因为其占有互斥锁,开始修改数据—>释放互斥锁
    • 这时候线程B竞争到GIL和互斥锁后才能修改数据
  • 综上,cpython中,多线程比单线程性能快的原因是:遇到IO阻塞时,正在执行的线程会释放GIL锁,其他线程会利用这个空闲时间执行自己的代码
  • 关于计算密集型,使用多进程效率高,关于IO密集型,使用多线程效率高
  1. # 进程应用场景
  2. # 计算密集型:多进程效率高
  3. from multiprocessing import Process
  4. from threading import Thread
  5. import os,time
  6. def work():
  7. res=0
  8. for i in range(10000000):
  9. res*=i
  10. if __name__ == '__main__':
  11. l=[]
  12. print(os.cpu_count()) #本机为4核
  13. start=time.time()
  14. for i in range(4):
  15. p=Process(target=work) #耗时run time is 0.8030457496643066
  16. # p=Thread(target=work) #耗时run time is 2.134121894836426
  17. l.append(p)
  18. p.start()
  19. for p in l:
  20. p.join()
  21. stop=time.time() #
  22. print('run time is %s' %(stop-start))

使用进程:
image.png
使用线程:
image.png

同步锁

  1. import threading
  2. import time
  3. """同步锁"""
  4. lock = threading.Lock()
  5. """没有加锁的情况"""
  6. def func():
  7. global num
  8. num1 = num
  9. time.sleep(0.1) # 此处线程被释放,num值还没有修改
  10. num = num1 - 1
  11. num = 100
  12. l = []
  13. for i in range(100):
  14. t = threading.Thread(target=func, args=())
  15. t.start()
  16. l.append(t)
  17. for i in l:
  18. i.join()
  19. print(num) # 99
  20. """加锁的情况"""
  21. def func_1():
  22. global num3
  23. with lock:
  24. num2 = num3
  25. time.sleep(0.1)
  26. num3 = num2 - 1
  27. num3 = 100
  28. l2 = []
  29. for i in range(100):
  30. t = threading.Thread(target=func_1, args=())
  31. t.start()
  32. l2.append(t)
  33. for i in l2:
  34. i.join()
  35. print(num3) # 0

递归锁(重入锁)

为了支持同一个线程中多次请求同一资源,Python 提供了可重入锁(RLock)。这个RLock内部维护着一个锁(Lock)和一个计数器(counter)变量,counter 记录了acquire 的次数,从而使得资源可以被多次acquire。直到一个线程所有 acquire都被release(计数器counter变为0),其他的线程才能获得资源。

  1. """递归锁"""
  2. import time
  3. import threading
  4. r_lock = threading.RLock()
  5. class MyThread(threading.Thread):
  6. def __init__(self):
  7. threading.Thread.__init__(self)
  8. def run(self):
  9. self.fun_A()
  10. self.fun_B()
  11. def fun_A(self):
  12. r_lock.acquire()
  13. print("A加锁1", end='\t')
  14. r_lock.acquire()
  15. print('A加锁2', end='\t')
  16. time.sleep(0.2)
  17. r_lock.release()
  18. print('A释放1', end='\t')
  19. r_lock.release()
  20. print('A释放2')
  21. def fun_B(self):
  22. r_lock.acquire()
  23. print("B加锁1", end='\t')
  24. r_lock.acquire()
  25. print('B加锁2', end='\t')
  26. time.sleep(3)
  27. r_lock.release()
  28. print('B释放1', end='\t')
  29. r_lock.release()
  30. print('B释放2')
  31. if __name__ == '__main__':
  32. t1 = MyThread()
  33. t2 = MyThread()
  34. t1.start()
  35. t2.start()
  36. """
  37. A加锁1 A加锁2 A释放1 A释放2
  38. A加锁1 A加锁2 A释放1 A释放2
  39. B加锁1 B加锁2 B释放1 B释放2
  40. B加锁1 B加锁2 B释放1 B释放2
  41. """
  42. """注意观察程序的运行,当运行到程序B时,即使B休眠了3秒也不会切换线程"""

信号量

信号量是一个内部数据,它有一个内置的计数器,它标明当前的共享资源可以有多少线程同时读取。
信号量控制规则:当计数器大于0时,那么可以为线程分配资源权限;当计数器小于0时,未获得权限的线程会被挂起,直到其他线程释放资源。

  1. """信号量"""
  2. import random
  3. import threading
  4. import time
  5. # 创建信号量对象,信号量设置为3,需要有三个线程才启动
  6. semaphore = threading.Semaphore(3)
  7. def func():
  8. # 获取信号 -1
  9. if semaphore.acquire():
  10. print(threading.currentThread().getName() + '获得信号量')
  11. time.sleep(random.randint(1, 5))
  12. # 释放信号 +1
  13. semaphore.release()
  14. for i in range(10):
  15. t1 = threading.Thread(target=func)
  16. t1.start()
  17. """注意观察程序运行,开始只有3个线程获得了资源的权限,
  18. 后面当释放几个资源时就有几个获得资源权限"""

信号量被初始化为0,目的是同步两个或多个线程。线程必须并行运行,所以需要信号量同步。

  1. """信号量"""
  2. import random
  3. import threading
  4. import time
  5. # 同步两个不同线程,信号量被初始化为0
  6. semaphore = threading.Semaphore(0)
  7. def consumer():
  8. print("----等待producer运行----")
  9. semaphore.acquire() # 获取资源,信号量为0被挂起,等待信号量释放
  10. print("----consumer 结束--- 编号:%s" % item)
  11. def producer():
  12. global item
  13. time.sleep(3)
  14. item = random.randint(0, 100)
  15. print("producer运行编号:%s" % item)
  16. semaphore.release()
  17. if __name__ == "__main__":
  18. for i in range(0, 4):
  19. t1 = threading.Thread(target=producer)
  20. t2 = threading.Thread(target=consumer)
  21. t1.start()
  22. t2.start()
  23. t1.join()
  24. t2.join()
  25. print('程序终止')
  26. """
  27. 信号量被初始化为0,目的是同步两个或多个线程。
  28. 线程必须并行运行,所以需要信号量同步。
  29. 这种运用场景有时会用到,比较难理解,多运行示例仔细观察打印结果
  30. """

参考:https://zhuanlan.zhihu.com/p/37620890
参考:https://zhuanlan.zhihu.com/p/46368084