multiprocessing.Pool.apply_async
用法
apply_async
(func[, args[, kwds[, callback]]])A variant of the
apply()
method which returns a result object.If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it (unless the call failed). callback should complete immediately since otherwise the thread which handles the results will get blocked.
说明:
- callback函数应该接受一个参数(func返回的结果),如需多个参数可通过
functools.partial
构造偏函数 - callback函数应该可以立即完成,否则会阻塞其他线程
#!/usr/bin/env python
#-*- encoding: utf8 -*-
import os
import sys
import json
import time
import socket
import logging
import commands
from functools import partial
from multiprocessing import Pool
reload(sys)
sys.setdefaultencoding('utf-8')
logging.basicConfig(
format=
'[%(asctime)s %(funcName)s %(levelname)s %(processName)s] %(message)s',
datefmt='%Y%m%d %H:%M:%S',
level=logging.DEBUG)
logger = logging.getLogger(__name__)
def get_sample_context(infile):
logger.info('infile: {}'.format(infile))
if infile.endswith('.jl'):
with open(infile) as f:
for line in f:
context = json.loads(line.strip())
yield context
elif infile.endswith('.json'):
with open(infile) as f:
data = json.load(f)
for context in data:
yield context
def check_path(path, context, key=None):
cmd = 'ls -Ld {}'.format(path)
logger.debug('checking path: {}'.format(path))
res = commands.getoutput(cmd).strip()
if 'ls:' in res or not res:
logger.debug('path not exists: {}'.format(path))
# 原始数据本地不存在时检查云上路径
if key == 'raw_path':
logger.debug('check rawdata on oss ...')
logger.info('{projectid} {novoid} {lane}'.format(**context))
return '.'
logger.info('\033[32mfind path: {}\033[0m'.format(res))
return ','.join(res.split('\n'))
def check_context(context, title, num):
logger.debug('\033[36mdealing with line: {}\033[0m'.format(num))
# 先检查项目路径
if check_path(context['projpath'], context) == '.':
return None
data_path = {}
data_path[
'raw_path'] = '{projpath}/RawData/{samplename}/{samplename}*fq.gz'.format(
**context)
data_path[
'clean_path'] = '{projpath}/QC/{samplename}/{samplename}*.clean.fq.gz'.format(
**context)
data_path[
'bam_path'] = '{projpath}/Mapping/{samplename}.{samplename}/{samplename}*final.bam'.format(
**context)
data_path[
'gvcf_path'] = '{projpath}/Mutation/{samplename}.*/{samplename}*flt.vcf.gz'.format(
**context)
for key in ('raw_path', 'clean_path', 'bam_path', 'gvcf_path'):
path = data_path[key]
data_path[key] = check_path(path, context, key=key)
if set(data_path.values()) != set('.'):
line = '\t'.join('{%s}' % t for t in title)
line = line.format(**dict(context, **data_path))
return line
return None
def write_output(out, data):
if data:
logger.info('write 1 line')
out.write(data + '\n')
def main():
start_time = time.time()
proc = Pool(args['jobs'])
if args['jobs'] > 1:
logger.info('run {jobs} jobs in parallel'.format(**args))
hostname = socket.gethostname()
title = '''
familyid samplename sex diseasename disnorm seqsty
raw_path clean_path bam_path gvcf_path
'''.split()
sample_context = get_sample_context(args['infile'])
with open(args['outfile'], 'w') as out:
out.write('\t'.join(title) + '\n')
for num, context in enumerate(sample_context):
projpath = context.get('projpath')
diseasename = context.get('diseasename', '').strip()
familyid = context.get('familyid')
seqsty = context.get('seqsty')
context['diseasename'] = diseasename
if not all([projpath, diseasename, familyid, seqsty]):
continue
if ('NJPROJ' in projpath) and ('nj' not in hostname):
continue
elif ('NJPROJ' not in projpath) and ('nj' in hostname):
continue
if args['jobs'] > 1:
proc.apply_async(check_context,
args=(context, title, num),
callback=partial(write_output, out))
else:
data = check_context(context, title, num)
write_output(out, data)
if args['jobs'] > 1:
proc.close()
proc.join()
logger.info('time used: {:.1f}s'.format(time.time() - start_time))
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(
prog='sample_stat',
description=__doc__,
formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('infile',
help='the samplelist.json or samplelist.jl file',
nargs='?')
parser.add_argument('-j',
'--jobs',
help='run n jobs in parallel[%(default)s]',
type=int,
default=4)
parser.add_argument('-o',
'--outfile',
help='the output filename[%(default)s]',
default='out.xls')
args = vars(parser.parse_args())
main()