使用条件

协程

通过 async / await 语法进行声明,是编写 asyncio 应用的推荐方式

  1. import asyncio
  2. async def main():
  3. print('hello')
  4. await asyncio.sleep(1)
  5. print('world')
  6. asyncio.run(main())

注意:简单地调用一个协程并不会使其被调度执行

  1. >>> main()
  2. <coroutine object main at 0x1053bb7c8>

要真正运行一个协程,asyncio 提供了三种主要机制:

  • asyncio.run()
  • 等待一个协程 ```python import asyncio import time

async def say_after(delay, what): await asyncio.sleep(delay) print(what)

async def main(): print(f”started at {time.strftime(‘%X’)}”)

  1. await say_after(1, 'hello')
  2. await say_after(2, 'world')
  3. print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

  1. - [asyncio.create_task()](https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.create_task)
  2. ```python
  3. async def main():
  4. task1 = asyncio.create_task(
  5. say_after(1, 'hello'))
  6. task2 = asyncio.create_task(
  7. say_after(2, 'world'))
  8. print(f"started at {time.strftime('%X')}")
  9. # Wait until both tasks are completed (should take
  10. # around 2 seconds.)
  11. await task1
  12. await task2
  13. print(f"finished at {time.strftime('%X')}")

并发运行任务

asyncio.create_task

  1. async def parse_url(url):
  2. print(url)
  3. return url
  4. async def main(url_list):
  5. tasks = [parse_url(url) for url in url_list]
  6. for task in tasks:
  7. await task
  8. url_list = ['a', 'b', 'c']
  9. asyncio.run(main(url_list))

asyncio.gather

  1. async def main(url_list):
  2. tasks = [parse_url(url) for url in url_list]
  3. L = await asyncio.gather(*tasks)
  4. print(L)
  5. return L
  6. url_list = ['a', 'b', 'c']
  7. asyncio.run(main(url_list))

限制并发数

async with sem

  1. import time
  2. import random
  3. import asyncio
  4. import datetime
  5. async def square(n, semaphore):
  6. async with semaphore:
  7. # sec = random.randint(1, 3)
  8. sec = 2
  9. print(datetime.datetime.now(), n, f'start, sleep {sec}s')
  10. await asyncio.sleep(sec)
  11. print(datetime.datetime.now(), n, 'done')
  12. return f'{n} * {n} = {n * n}'
  13. async def main(numbers, njobs):
  14. semaphore = asyncio.Semaphore(njobs)
  15. tasks = [square(path, semaphore) for path in numbers]
  16. print(f'run with njobs: {njobs}')
  17. results = await asyncio.gather(*tasks)
  18. print('collect results ...')
  19. for res in results:
  20. print(res)
  21. if __name__ == "__main__":
  22. numbers = range(1, 11)
  23. njobs = 3
  24. start = time.time()
  25. asyncio.run(main(numbers, njobs))
  26. print(f'time used: {time.time() - start :.1f} seconds')

在类中使用

  1. import time
  2. import asyncio
  3. import subprocess
  4. import click
  5. from simple_loggers import SimpleLogger
  6. class Du_Path_Size(object):
  7. def __init__(self, path_list, njobs=4):
  8. self.njobs = njobs
  9. self.path_list = path_list
  10. self.logger = SimpleLogger('GetPathSize')
  11. async def run_du_cmd(self, path):
  12. self.logger.debug(f'check path: {path}')
  13. cmd = f'du -cs -B1 {path}'
  14. res = subprocess.run(cmd, shell=True, capture_output=True, encoding='utf8')
  15. if res.returncode != 0:
  16. error = res.stderr.strip()
  17. else:
  18. error = None
  19. size = res.stdout.strip().split('\n')[-1].split()[0]
  20. return path, size, error
  21. async def get_path_size(self, path, semaphore):
  22. async with semaphore:
  23. return await asyncio.create_task(self.run_du_cmd(path))
  24. async def get_results(self):
  25. semaphore = asyncio.Semaphore(self.njobs)
  26. tasks = [self.get_path_size(path, semaphore) for path in self.path_list]
  27. return await asyncio.gather(*tasks)
  28. @click.command()
  29. @click.argument('path_list', nargs=-1, required=True)
  30. @click.option('-j', '--jobs', help='the number of Concurrency', type=int, default=1)
  31. def main(**kwargs):
  32. print(kwargs)
  33. start = time.time()
  34. dps = Du_Path_Size(kwargs['path_list'], njobs=kwargs['jobs'])
  35. results = asyncio.run(dps.get_results())
  36. for path, size, error in results:
  37. print(path, size, error)
  38. dps.logger.info(f'time used: {time.time() - start:.1f}s')
  39. if __name__ == "__main__":
  40. main()