参考:https://blog.csdn.net/weixin_41929524/article/details/81707053
https://www.jianshu.com/p/3882ea7b9cc9
R的并行化计算其实没有改变其整个并行环境,而是先启动N个附属进程,然后将数据分割成N块在N个核上进行运行,等待附属进程结束返回结果。
有的时候,我们使用R 总是感觉速度不够快,而实际上有很大一部分的程序是可以通过多线程进行并行运算的。
可以参见:
使用包 parallel
。它其实就是将本来的apply 家族替换成了perApply,但采用了多线程。
1)常用函数
detectCores() 检查当前的可用核数
clusterExport() 配置当前环境
makeCluster() 分配核数
stopCluster() 关闭集群
parLapply() lapply()函数的并行版本
我们首先可以通过detectCores()
获得当前电脑可用核心数:
detectCores()
4
比如我可怜的小mac 只有4个心脏。
接着我们配置一下,初始化分配给R 的核心数:
no_cores <- detectCores() - 2
cl <- makeCluster(no_cores)
接着我们就可以使用lapply()函数的并行版本parLapply。
我们可以先比较一下lapply 和parLapply 的运行速度:
> system.time(lapply(1:1000000, function(x) c(x, x**2, x**3)))
用户 系统 流逝
1.672 0.025 1.777
> system.time(parLapply(cl, 1:1000000, function(x) c(x, x**2, x**3)))
用户 系统 流逝
0.447 0.086 1.785
需要注意,使用parLapply 函数时,需要指定参数cl。
2)一般操作
参考:https://www.bioinfo-scrounger.com/archives/577/
其一般操作就是先在开头添加分配核数的语句makeCluster()
,接下来调用parApply
方法,最后结束以后,需要使用stopCluster(cl)
结束并行。
3)变量作用域
局部调用
在调用时,分配的核心相当于新的环境。我们必须要在parAapply
函数内部重新调用值或者加载包。
比如在函数中加载:
library(parallel)
cl <- makeCluster(3)
x = c()
for (i in 1:600){
x[i] <- paste(letters[sample(1:26, 3)], collapse='')
}
test_function <- function(x) {
library(stringr)
return(str_to_upper(x))
}
result <- parLapply(cl, x, test_function)
final <- do.call('c',result)
stopCluster(cl)
如果是在外部加载,则会报错:
library(stringr)
test_function <- function(x) {
return(str_to_upper(x))
}
result <- parLapply(cl, x, test_function)
# error
Error in checkForRemoteErrors(val) :
3 nodes produced errors; first error: 没有"str_to_upper"这个函数
同样,如果是在函数中调用了外部的变量也是:
a <- 2
test_function <- function(x) {
return(x[a])
}
result <- parLapply(cl, x, test_function)
# output
Error in checkForRemoteErrors(val) :
3 nodes produced errors; first error: 找不到对象'a'
对于环境中的变量,可以使用clusterExport 加载,而包可以使用clusterEvalQ 加载:
clusterExport(cl, "a")
clusterEvalQ(cl, library(stringr))
全局配置
我们还可以直接在开启核心时就进行配置:
cl <- makeCluster(3, type="FORK") # mac & linux
Parallel Socket Cluster (PSOCK) # WIN
这一选项从而当你并行运行的时候可以包含所有环境变量。
但对于包中的函数,还是需要专门的使用clusterEvalQ 加载。
4)小建议
- 运行完毕后释放内存
stopCluster(cl)