多线程

  1. import threading
  2. import time
  3. from threading import current_thread
  4. # 定义线程方法
  5. def my_thread(arg1, arg2):
  6. print(threading.currentThread().getName(), 'start')
  7. print('%s %s' % (arg1, arg2))
  8. time.sleep(1)
  9. print(threading.currentThread().getName(), 'end')
  10. # 测试
  11. def test_func():
  12. for i in range(1, 6):
  13. t1 = threading.Thread(target=my_thread, args=(i, i + 1))
  14. t1.start()
  15. print(current_thread().getName(), 'end')
  16. class MyThread(threading.Thread):
  17. def run(self):
  18. print(current_thread().getName(), 'start')
  19. print('run')
  20. print(current_thread().getName(), 'stop')
  21. def test_func2():
  22. t1 = MyThread()
  23. t1.start()
  24. t1.join()
  25. print(current_thread().getName(), 'end')
  26. # test_func()
  27. test_func2()

Thread 方法

  • run(): 用以表示线程活动的方法。
  • start():启动线程活动。
  • join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
  • isAlive(): 返回线程是否活动的。
  • getName(): 返回线程名。
  • setName(): 设置线程名。

经典的消费者和生产者

  1. from threading import Thread, current_thread
  2. import time
  3. import random
  4. from queue import Queue
  5. queue = Queue(5)
  6. # 经典的消费者和生产者问题
  7. class ProducerThread(Thread):
  8. def run(self):
  9. name = current_thread().getName()
  10. nums = range(100)
  11. global queue
  12. while True:
  13. num = random.choice(nums)
  14. queue.put(num)
  15. print('生产者 %s 生产了数据 %s' % (name, num))
  16. t = random.randint(1, 3)
  17. time.sleep(t)
  18. print('生产者 %s 睡眠了 %s 秒' % (name, t))
  19. class ConsumerTheard(Thread):
  20. def run(self):
  21. name = current_thread().getName()
  22. global queue
  23. while True:
  24. num = queue.get()
  25. queue.task_done()
  26. print('消费者 %s 消耗了数据 %s' % (name, num))
  27. t = random.randint(1, 5)
  28. time.sleep(t)
  29. print('消费者 %s 睡眠了 %s 秒' % (name, t))
  30. def test_func():
  31. p1 = ProducerThread(name='p1')
  32. p1.start()
  33. p2 = ProducerThread(name='p2')
  34. p2.start()
  35. p3 = ProducerThread(name='p3')
  36. p3.start()
  37. c1 = ConsumerTheard(name='c1')
  38. c1.start()
  39. c2 = ConsumerTheard(name='c2')
  40. c2.start()
  41. test_func()

线程同步(锁)

  • 创建锁:lock = threading.Lock()
  • 加锁:lock.acquire()
  • 解锁:lock.release() ```python from queue import Queue import threading import time

exitFlag = 0

class myThread(threading.Thread): def init(self, threadID, name, q): threading.Thread.init(self) self.threadID = threadID self.name = name self.q = q

  1. def run(self):
  2. print("Starting " + self.name)
  3. process_data(self.name, self.q)
  4. print
  5. "Exiting " + self.name

def process_data(threadName, q): while not exitFlag: queueLock.acquire() if not workQueue.empty(): data = q.get() queueLock.release() print(“%s processing %s” % (threadName, data)) else: queueLock.release() time.sleep(1)

threadList = [“Thread-1”, “Thread-2”, “Thread-3”] nameList = [“One”, “Two”, “Three”, “Four”, “Five”] queueLock = threading.Lock() workQueue = Queue(10) threads = [] threadID = 1

创建新线程

for tName in threadList: thread = myThread(threadID, tName, workQueue) thread.start() threads.append(thread) threadID += 1

填充队列

queueLock.acquire() for word in nameList: workQueue.put(word) queueLock.release()

等待队列清空

while not workQueue.empty(): pass

通知线程是时候退出

exitFlag = 1

使用 json 方法等待所有线程完成

for t in threads: t.join() print(“Exiting Main Thread”)

  1. <a name="DEw6E"></a>
  2. ### 线程合并(join方法)
  3. 需要主线程要等待子线程运行完后,再退出可以使用 join 方法
  4. ```python
  5. # 创建的 thread 调用 join 确保子线程结束
  6. def test_func2():
  7. t1 = MyThread()
  8. t1.start()
  9. t1.join()
  10. print(current_thread().getName(), 'end')

线程间通信

从一个线程向另一个线程发送数据最安全的方式可能就是使用 queue 库中的队列了。创建一个被多个线程共享的 Queue 对象,这些线程通过使用 put() 和 get() 操作来向队列中添加或者删除元素。

Queue 方法

  • Queue.qsize() 返回队列的大小
  • Queue.empty() 如果队列为空,返回True,反之False
  • Queue.full() 如果队列满了,返回True,反之False
  • Queue.full 与 maxsize 大小对应
  • Queue.get([block[, timeout]])获取队列,timeout等待时间
  • Queue.get_nowait() 相当Queue.get(False)
  • Queue.put(item, block=True, timeout=None) 写入队列,timeout等待时间
  • Queue.put_nowait(item) 相当 Queue.put(item, False)
  • Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
  • Queue.join() 实际上意味着等到队列为空,再执行别的操作

参考