整个框架主要分为两个部分
一个是分配任务模块
另一个是处理算法模块
分配任务模块读取文件,然后按照行数和CPU的数量进行切分
然后自动的生成param_dict,然后把param_dict里的行数转化为对应的文件块
最后传给进程池,调用处理算法解决。
速度会快很多
def get_taged_tokenized(name,json_data):
possessed_data = []
for idx in range(len(json_data)):
pass
def multi_process_tag(name):
num_cores = int(mp.cpu_count())
print("本地计算机有: " + str(num_cores) + " 核心")
pool = mp.Pool(num_cores)
param_dict = {}
with open('./data/'+name+'.json','r',encoding='utf8')as fp:
json_data = json.load(fp)
start = 0
end = len(json_data)
step = int((end - start)/num_cores)
print("per Task Step: ",step)
for i in range(num_cores):
param_dict['task{}'.format(i)]= json_data[start:start+step]
start = start+step
param_dict['task{}'.format(num_cores)]= json_data[start:]
start_t = datetime.datetime.now()
results = [pool.apply_async(get_taged_tokenized, args=(name, param)) for name, param in param_dict.items()]
results = [p.get() for p in results]
total = 0
my_result = []
for i in results:
for j in i:
#print(j,len(i[j]))
total += len(i[j])
my_result.extend(i[j])
print(total,len(my_result))
end_t = datetime.datetime.now()
elapsed_sec = (end_t - start_t).total_seconds()
print("多进程计算 共消耗: " + "{:.2f}".format(elapsed_sec) + " 秒")
f = open('./data/'+name+'_tokenized_taged.json','w',encoding='utf-8')
json.dump(my_result,f,ensure_ascii=False)
f.close()