整个框架主要分为两个部分
    一个是分配任务模块
    另一个是处理算法模块

    分配任务模块读取文件,然后按照行数和CPU的数量进行切分
    然后自动的生成param_dict,然后把param_dict里的行数转化为对应的文件块
    最后传给进程池,调用处理算法解决。

    速度会快很多

    1. def get_taged_tokenized(name,json_data):
    2. possessed_data = []
    3. for idx in range(len(json_data)):
    4. pass
    5. def multi_process_tag(name):
    6. num_cores = int(mp.cpu_count())
    7. print("本地计算机有: " + str(num_cores) + " 核心")
    8. pool = mp.Pool(num_cores)
    9. param_dict = {}
    10. with open('./data/'+name+'.json','r',encoding='utf8')as fp:
    11. json_data = json.load(fp)
    12. start = 0
    13. end = len(json_data)
    14. step = int((end - start)/num_cores)
    15. print("per Task Step: ",step)
    16. for i in range(num_cores):
    17. param_dict['task{}'.format(i)]= json_data[start:start+step]
    18. start = start+step
    19. param_dict['task{}'.format(num_cores)]= json_data[start:]
    20. start_t = datetime.datetime.now()
    21. results = [pool.apply_async(get_taged_tokenized, args=(name, param)) for name, param in param_dict.items()]
    22. results = [p.get() for p in results]
    23. total = 0
    24. my_result = []
    25. for i in results:
    26. for j in i:
    27. #print(j,len(i[j]))
    28. total += len(i[j])
    29. my_result.extend(i[j])
    30. print(total,len(my_result))
    31. end_t = datetime.datetime.now()
    32. elapsed_sec = (end_t - start_t).total_seconds()
    33. print("多进程计算 共消耗: " + "{:.2f}".format(elapsed_sec) + " 秒")
    34. f = open('./data/'+name+'_tokenized_taged.json','w',encoding='utf-8')
    35. json.dump(my_result,f,ensure_ascii=False)
    36. f.close()