使用条件
- 如果是 I/O 密集型的, 并且I/O操作很慢,需要很多任务/线程协同实现, 使用asyncio
- 如果是 I/O 密集型的, 但是I/O很快,只需要有限数量的任务/线程,那么使用多线程即可
- 如果是 CPU 密集型的, 使用多进程
asyncio
https://docs.python.org/zh-cn/3/library/asyncio.html https://docs.python.org/zh-cn/3/library/asyncio-task.html
协程
通过 async / await 语法进行声明,是编写 asyncio 应用的推荐方式
import asyncioasync def main():print('hello')await asyncio.sleep(1)print('world')asyncio.run(main())
注意:简单地调用一个协程并不会使其被调度执行
>>> main()<coroutine object main at 0x1053bb7c8>
要真正运行一个协程,asyncio 提供了三种主要机制:
- asyncio.run()
- 等待一个协程 ```python import asyncio import time
async def say_after(delay, what): await asyncio.sleep(delay) print(what)
async def main(): print(f”started at {time.strftime(‘%X’)}”)
await say_after(1, 'hello')await say_after(2, 'world')print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
- [asyncio.create_task()](https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.create_task)```pythonasync def main():task1 = asyncio.create_task(say_after(1, 'hello'))task2 = asyncio.create_task(say_after(2, 'world'))print(f"started at {time.strftime('%X')}")# Wait until both tasks are completed (should take# around 2 seconds.)await task1await task2print(f"finished at {time.strftime('%X')}")
并发运行任务
asyncio.create_task
async def parse_url(url):print(url)return urlasync def main(url_list):tasks = [parse_url(url) for url in url_list]for task in tasks:await taskurl_list = ['a', 'b', 'c']asyncio.run(main(url_list))
asyncio.gather
async def main(url_list):tasks = [parse_url(url) for url in url_list]L = await asyncio.gather(*tasks)print(L)return Lurl_list = ['a', 'b', 'c']asyncio.run(main(url_list))
限制并发数
async with sem
import timeimport randomimport asyncioimport datetimeasync def square(n, semaphore):async with semaphore:# sec = random.randint(1, 3)sec = 2print(datetime.datetime.now(), n, f'start, sleep {sec}s')await asyncio.sleep(sec)print(datetime.datetime.now(), n, 'done')return f'{n} * {n} = {n * n}'async def main(numbers, njobs):semaphore = asyncio.Semaphore(njobs)tasks = [square(path, semaphore) for path in numbers]print(f'run with njobs: {njobs}')results = await asyncio.gather(*tasks)print('collect results ...')for res in results:print(res)if __name__ == "__main__":numbers = range(1, 11)njobs = 3start = time.time()asyncio.run(main(numbers, njobs))print(f'time used: {time.time() - start :.1f} seconds')
在类中使用
import timeimport asyncioimport subprocessimport clickfrom simple_loggers import SimpleLoggerclass Du_Path_Size(object):def __init__(self, path_list, njobs=4):self.njobs = njobsself.path_list = path_listself.logger = SimpleLogger('GetPathSize')async def run_du_cmd(self, path):self.logger.debug(f'check path: {path}')cmd = f'du -cs -B1 {path}'res = subprocess.run(cmd, shell=True, capture_output=True, encoding='utf8')if res.returncode != 0:error = res.stderr.strip()else:error = Nonesize = res.stdout.strip().split('\n')[-1].split()[0]return path, size, errorasync def get_path_size(self, path, semaphore):async with semaphore:return await asyncio.create_task(self.run_du_cmd(path))async def get_results(self):semaphore = asyncio.Semaphore(self.njobs)tasks = [self.get_path_size(path, semaphore) for path in self.path_list]return await asyncio.gather(*tasks)@click.command()@click.argument('path_list', nargs=-1, required=True)@click.option('-j', '--jobs', help='the number of Concurrency', type=int, default=1)def main(**kwargs):print(kwargs)start = time.time()dps = Du_Path_Size(kwargs['path_list'], njobs=kwargs['jobs'])results = asyncio.run(dps.get_results())for path, size, error in results:print(path, size, error)dps.logger.info(f'time used: {time.time() - start:.1f}s')if __name__ == "__main__":main()
