整个框架主要分为两个部分
一个是分配任务模块
另一个是处理算法模块
分配任务模块读取文件,然后按照行数和CPU的数量进行切分
然后自动的生成param_dict,然后把param_dict里的行数转化为对应的文件块
最后传给进程池,调用处理算法解决。
速度会快很多
def get_taged_tokenized(name,json_data):possessed_data = []for idx in range(len(json_data)):passdef multi_process_tag(name):num_cores = int(mp.cpu_count())print("本地计算机有: " + str(num_cores) + " 核心")pool = mp.Pool(num_cores)param_dict = {}with open('./data/'+name+'.json','r',encoding='utf8')as fp:json_data = json.load(fp)start = 0end = len(json_data)step = int((end - start)/num_cores)print("per Task Step: ",step)for i in range(num_cores):param_dict['task{}'.format(i)]= json_data[start:start+step]start = start+stepparam_dict['task{}'.format(num_cores)]= json_data[start:]start_t = datetime.datetime.now()results = [pool.apply_async(get_taged_tokenized, args=(name, param)) for name, param in param_dict.items()]results = [p.get() for p in results]total = 0my_result = []for i in results:for j in i:#print(j,len(i[j]))total += len(i[j])my_result.extend(i[j])print(total,len(my_result))end_t = datetime.datetime.now()elapsed_sec = (end_t - start_t).total_seconds()print("多进程计算 共消耗: " + "{:.2f}".format(elapsed_sec) + " 秒")f = open('./data/'+name+'_tokenized_taged.json','w',encoding='utf-8')json.dump(my_result,f,ensure_ascii=False)f.close()
