使用条件
- 如果是 I/O 密集型的, 并且I/O操作很慢,需要很多任务/线程协同实现, 使用asyncio
- 如果是 I/O 密集型的, 但是I/O很快,只需要有限数量的任务/线程,那么使用多线程即可
- 如果是 CPU 密集型的, 使用多进程
asyncio
https://docs.python.org/zh-cn/3/library/asyncio.html https://docs.python.org/zh-cn/3/library/asyncio-task.html
协程
通过 async / await 语法进行声明,是编写 asyncio 应用的推荐方式
import asyncio
async def main():
print('hello')
await asyncio.sleep(1)
print('world')
asyncio.run(main())
注意:简单地调用一个协程并不会使其被调度执行
>>> main()
<coroutine object main at 0x1053bb7c8>
要真正运行一个协程,asyncio 提供了三种主要机制:
- asyncio.run()
- 等待一个协程 ```python import asyncio import time
async def say_after(delay, what): await asyncio.sleep(delay) print(what)
async def main(): print(f”started at {time.strftime(‘%X’)}”)
await say_after(1, 'hello')
await say_after(2, 'world')
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
- [asyncio.create_task()](https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.create_task)
```python
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
# Wait until both tasks are completed (should take
# around 2 seconds.)
await task1
await task2
print(f"finished at {time.strftime('%X')}")
并发运行任务
asyncio.create_task
async def parse_url(url):
print(url)
return url
async def main(url_list):
tasks = [parse_url(url) for url in url_list]
for task in tasks:
await task
url_list = ['a', 'b', 'c']
asyncio.run(main(url_list))
asyncio.gather
async def main(url_list):
tasks = [parse_url(url) for url in url_list]
L = await asyncio.gather(*tasks)
print(L)
return L
url_list = ['a', 'b', 'c']
asyncio.run(main(url_list))
限制并发数
async with sem
import time
import random
import asyncio
import datetime
async def square(n, semaphore):
async with semaphore:
# sec = random.randint(1, 3)
sec = 2
print(datetime.datetime.now(), n, f'start, sleep {sec}s')
await asyncio.sleep(sec)
print(datetime.datetime.now(), n, 'done')
return f'{n} * {n} = {n * n}'
async def main(numbers, njobs):
semaphore = asyncio.Semaphore(njobs)
tasks = [square(path, semaphore) for path in numbers]
print(f'run with njobs: {njobs}')
results = await asyncio.gather(*tasks)
print('collect results ...')
for res in results:
print(res)
if __name__ == "__main__":
numbers = range(1, 11)
njobs = 3
start = time.time()
asyncio.run(main(numbers, njobs))
print(f'time used: {time.time() - start :.1f} seconds')
在类中使用
import time
import asyncio
import subprocess
import click
from simple_loggers import SimpleLogger
class Du_Path_Size(object):
def __init__(self, path_list, njobs=4):
self.njobs = njobs
self.path_list = path_list
self.logger = SimpleLogger('GetPathSize')
async def run_du_cmd(self, path):
self.logger.debug(f'check path: {path}')
cmd = f'du -cs -B1 {path}'
res = subprocess.run(cmd, shell=True, capture_output=True, encoding='utf8')
if res.returncode != 0:
error = res.stderr.strip()
else:
error = None
size = res.stdout.strip().split('\n')[-1].split()[0]
return path, size, error
async def get_path_size(self, path, semaphore):
async with semaphore:
return await asyncio.create_task(self.run_du_cmd(path))
async def get_results(self):
semaphore = asyncio.Semaphore(self.njobs)
tasks = [self.get_path_size(path, semaphore) for path in self.path_list]
return await asyncio.gather(*tasks)
@click.command()
@click.argument('path_list', nargs=-1, required=True)
@click.option('-j', '--jobs', help='the number of Concurrency', type=int, default=1)
def main(**kwargs):
print(kwargs)
start = time.time()
dps = Du_Path_Size(kwargs['path_list'], njobs=kwargs['jobs'])
results = asyncio.run(dps.get_results())
for path, size, error in results:
print(path, size, error)
dps.logger.info(f'time used: {time.time() - start:.1f}s')
if __name__ == "__main__":
main()