multiprocessing.Pool.apply_async用法
apply_async(func[, args[, kwds[, callback]]])A variant of the
apply()method which returns a result object.If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it (unless the call failed). callback should complete immediately since otherwise the thread which handles the results will get blocked.
说明:
- callback函数应该接受一个参数(func返回的结果),如需多个参数可通过
functools.partial构造偏函数 - callback函数应该可以立即完成,否则会阻塞其他线程
#!/usr/bin/env python#-*- encoding: utf8 -*-import osimport sysimport jsonimport timeimport socketimport loggingimport commandsfrom functools import partialfrom multiprocessing import Poolreload(sys)sys.setdefaultencoding('utf-8')logging.basicConfig(format='[%(asctime)s %(funcName)s %(levelname)s %(processName)s] %(message)s',datefmt='%Y%m%d %H:%M:%S',level=logging.DEBUG)logger = logging.getLogger(__name__)def get_sample_context(infile):logger.info('infile: {}'.format(infile))if infile.endswith('.jl'):with open(infile) as f:for line in f:context = json.loads(line.strip())yield contextelif infile.endswith('.json'):with open(infile) as f:data = json.load(f)for context in data:yield contextdef check_path(path, context, key=None):cmd = 'ls -Ld {}'.format(path)logger.debug('checking path: {}'.format(path))res = commands.getoutput(cmd).strip()if 'ls:' in res or not res:logger.debug('path not exists: {}'.format(path))# 原始数据本地不存在时检查云上路径if key == 'raw_path':logger.debug('check rawdata on oss ...')logger.info('{projectid} {novoid} {lane}'.format(**context))return '.'logger.info('\033[32mfind path: {}\033[0m'.format(res))return ','.join(res.split('\n'))def check_context(context, title, num):logger.debug('\033[36mdealing with line: {}\033[0m'.format(num))# 先检查项目路径if check_path(context['projpath'], context) == '.':return Nonedata_path = {}data_path['raw_path'] = '{projpath}/RawData/{samplename}/{samplename}*fq.gz'.format(**context)data_path['clean_path'] = '{projpath}/QC/{samplename}/{samplename}*.clean.fq.gz'.format(**context)data_path['bam_path'] = '{projpath}/Mapping/{samplename}.{samplename}/{samplename}*final.bam'.format(**context)data_path['gvcf_path'] = '{projpath}/Mutation/{samplename}.*/{samplename}*flt.vcf.gz'.format(**context)for key in ('raw_path', 'clean_path', 'bam_path', 'gvcf_path'):path = data_path[key]data_path[key] = check_path(path, context, key=key)if set(data_path.values()) != set('.'):line = '\t'.join('{%s}' % t for t in title)line = line.format(**dict(context, **data_path))return linereturn Nonedef write_output(out, data):if data:logger.info('write 1 line')out.write(data + '\n')def main():start_time = time.time()proc = Pool(args['jobs'])if args['jobs'] > 1:logger.info('run {jobs} jobs in parallel'.format(**args))hostname = socket.gethostname()title = '''familyid samplename sex diseasename disnorm seqstyraw_path clean_path bam_path gvcf_path'''.split()sample_context = get_sample_context(args['infile'])with open(args['outfile'], 'w') as out:out.write('\t'.join(title) + '\n')for num, context in enumerate(sample_context):projpath = context.get('projpath')diseasename = context.get('diseasename', '').strip()familyid = context.get('familyid')seqsty = context.get('seqsty')context['diseasename'] = diseasenameif not all([projpath, diseasename, familyid, seqsty]):continueif ('NJPROJ' in projpath) and ('nj' not in hostname):continueelif ('NJPROJ' not in projpath) and ('nj' in hostname):continueif args['jobs'] > 1:proc.apply_async(check_context,args=(context, title, num),callback=partial(write_output, out))else:data = check_context(context, title, num)write_output(out, data)if args['jobs'] > 1:proc.close()proc.join()logger.info('time used: {:.1f}s'.format(time.time() - start_time))if __name__ == '__main__':import argparseparser = argparse.ArgumentParser(prog='sample_stat',description=__doc__,formatter_class=argparse.RawTextHelpFormatter)parser.add_argument('infile',help='the samplelist.json or samplelist.jl file',nargs='?')parser.add_argument('-j','--jobs',help='run n jobs in parallel[%(default)s]',type=int,default=4)parser.add_argument('-o','--outfile',help='the output filename[%(default)s]',default='out.xls')args = vars(parser.parse_args())main()
