在上一篇文章中,我讨论了什么是多线程,并给出了使用doSNOW
, doParallel
和foreach
构造与Yandex.Direct API一起使用R语言时的实现示例。
本文是续篇,但可以看作是R中多线程的脱机指南。第一部分收到的评论提示我写它(在此特别感谢Alexey_mosc , SatCat , Ananiev_Genrih ),在其中给了我一些软件包,这些软件包代表了一种更现代的方法R中实现多线程的实现,我们将在后面讨论。

目录内容
挑战赛
例如,我们以先前出版物中考虑的问题为例。 在多线程模式下,从4个Yandex.Direct广告帐户中收集关键字列表。
要使用Yandex.Direct API,我们将使用ryandexdirect
包。 链接的官方文档在上,但是对于所描述任务的实现,我们仅需要2个功能:
- yadirAuth-Yandex.Direct API中的授权;
yadirGetKeyWords
从广告帐户下载关键字列表。
不仅仅是我选择了下载关键字的过程,事实是,这是Yandex.Direct API中最冗长的操作之一。 其次,在所有帐户中,关键字的数量是不同的,因此,对于每个帐户,完成此操作的时间将非常不同,在我们的情况下为1到20秒。
准备工作
最初,您需要安装本文讨论的所有软件包,为此,您可以使用下面的代码。
代码1:安装软件包 # install.packages("ryandexdirect") install.packages("tictoc") install.packages("rbenchmark") install.packages("dplyr") install.packages("purrr") install.packages("future") install.packages("promises") install.packages("furrr") install.packages("future.apply")
为了使包功能可用,必须使用library
命令将其连接。 为了方便起见,我将在每个给定的代码示例中分别连接所有必需的软件包。
我们创建一个由Yandex.Direct登录名组成的向量,稍后我们将从中请求关键字:
代码2:创建登录向量 logins <- c("login1", "login2", "login3", "login4")
要使用Yandex.Direct API,首先需要在每个帐户下进行授权,为此您可以使用以下设计:
代码3:Yandex.Direct API中的授权 lapply(logins, function(l) { yadirAuth(Login = l)})
运行上述代码后,将打开一个浏览器以使用每个帐户进行授权。 您确认ryandexdirect
允许访问您的广告材料。 您将被重定向到需要复制验证码的页面。 通过将其输入R控制台,完成授权过程。 在创建引导程序登录名时为指定的每个登录名重复此操作。
在授权过程中,某些用户可能会因重定向到第三方资源而感到困惑,但是这样做对您的帐户没有任何危险,我在文章“使用R包与广告系统API一起使用有多安全”中对此主题进行了详细描述。
接下来,我们将考虑实现所描述任务的几个示例。 每种方法都将从示例代码及其进一步说明开始。 我认为此选项最易于感知。
串行处理解决方案示例,sapply函数和purrr包

在上一篇文章中 ,我引用了以for
循环为例的解决方案。 因为我们考虑使用语法类似于循环的foreach
包进行多线程处理,所以此示例适用于此处,尽管R的用户不欢迎使用循环。
我们将在本文中考虑的程序包更加让人联想到apply系列的语法功能;因此,我将举一个使用它们的串行模式解决方案的示例。
sapply
功能
为了估计命令的执行时间,在每种考虑的方法中,我们将使用tictoc
软件包。
代码4:使用sapply函数的顺序模式下的示例解决方案 library(tictoc) library(dplyr) tic() # kw.sapply <- sapply( logins, # , function(x) # # { yadirGetKeyWords(Login = x) %>% mutate(login = x) }, simplify = FALSE # ) toc() # # result.sapply <- do.call("rbind", kw.sapply)
39.36 sec elapsed
时间: 39.36 sec elapsed
首先, apply
系列功能的语法不像循环的语法那样易读,但实际上一切都很简单。
sapply(X, FUN)
其中:
- X-一个对象,我们将对其元素进行迭代并在每次迭代中依次使用,在
for
循环中,它看起来像这样: for(i in X)
; - FUN-一个函数,在该函数中,我们将依次替换对象X的每个元素,如果我们使用
for
进行类比,则这是循环的主体。
在代码示例4中 ,先前创建的登录向量被传递给X参数。 登录向量的每个元素依次作为传递给FUN参数的匿名函数function(x) { yadirGetKeyWords(Login = x) %>% mutate(login = x) }
唯一参数传递。
即 sapply
将执行FUN中指定的功能4次,将登录名一个接一个地替换,并以包含4个元素的列表(类list的对象)的形式返回结果。 每个元素都是一个表,其中包含在每次迭代中从帐户收到的关键字列表。
yadirGetKeyWords(Login = "login1") %>% mutate(login = "login1")
yadirGetKeyWords(Login = "login2") %>% mutate(login = "login2")
yadirGetKeyWords(Login = "login3") %>% mutate(login = "login3")
yadirGetKeyWords(Login = "login4") %>% mutate(login = "login4")
使用sapply
获得的对象具有以下结构:
summary(kw.sapply)
Length Class Mode login1 19 data.frame list login2 19 data.frame list login3 19 data.frame list login4 19 data.frame list
在本示例的最后,命令result.sapply <- do.call("rbind", kw.sapply)
将kw.sapply列表的所有4个元素组合到一个result.sapply帧中。
# A tibble: 6,804 x 1 result.sapply$Id $Keyword $AdGroupId $CampaignId $ServingStatus $State <dbl> <fct> <dbl> <int> <fct> <fct> 1 15164230566 ~ 3597453985 39351725 ELIGIBLE ON 2 15164230567 ~ 3597453985 39351725 ELIGIBLE ON 3 15164230568 ~ 3597453985 39351725 ELIGIBLE ON 4 15164230569 ~ 3597453985 39351725 ELIGIBLE ON 5 15164230570 ~ 3597453985 39351725 ELIGIBLE ON 6 15164230571 ~ 3597453985 39351725 ELIGIBLE ON 7 15164230572 ~ 3597453985 39351725 ELIGIBLE ON 8 15164230573 ~ 3597453985 39351725 ELIGIBLE ON 9 15164230574 ~ 3597453985 39351725 ELIGIBLE ON 10 15164230575 ~ 3597453985 39351725 ELIGIBLE ON # ... with 6,794 more rows, and 13 more variables: $Status <fct>, # $StrategyPriority <fct>, $StatisticsSearchImpressions <int>, # $StatisticsSearchClicks <int>, $StatisticsNetworkImpressions <int>, # $StatisticsNetworkClicks <lgl>, $UserParam1 <chr>, $UserParam2 <chr>, # $ProductivityValue <lgl>, $ProductivityReferences <lgl>, $Bid <dbl>, # $ContextBid <dbl>, $login <chr>
除了sapply
, *apply
系列功能还包括: apply
, lapply
, vapply
, mapply
等。
purrr
包装
代码5:使用purrr包函数的示例解决方案 library(purrr) library(dplyr) library(tictoc) tic() # result.purrr <- map_df( logins, # , ~ # function(.x) { yadirGetKeyWords(Login = .x) %>% mutate(login = .x) } ) toc() #
35.46 sec elapsed
时间: 35.46 sec elapsed
purrr
程序包是Headley Wickham编写的tidyverse
库核心的一部分。
在含义和语法上,该软件包的主要功能与sapply
非常相似,其主要优点如下:
- 函数分为
map
, map2
, pmap
, walk
等族,同一族中包含的单独函数以不同格式返回结果: chr , dbl , int , df等; map2
系列的功能map2
您同时迭代两个对象的元素(迭代)。pmap
系列的功能pmap
您同时pmap
任意数量的对象的元素。 您可以将表传递给.l参数(sapply中X参数的类似物)的输入,该表的每一列都将包含用于迭代的值,并将其依次替换为.f中传递的同一函数的参数( sapply) 。
在什么情况下,我们需要迭代几个对象的元素。 例如,您使用多个代理帐户,想要从中获取关键字列表的广告帐户分散在它们之间。 在这种情况下,您可以根据代理商帐户的名称创建一个向量,并对其进行迭代,这与您对广告帐户的登录名进行排序的方式并行。
代码6:使用多个代理帐户的示例 library(purrr) # agencies <- c("agency1", NA, "agency2", "agency1") # # result.pmap2 <- map2_df(.x = logins, .y = agencies, ~ { yadirGetKeyWords(Login = .x, AgencyAccount = .y) %>% mutate(login = .x) })
现在想象一下这样一种情况,当您使用不同的帐户登录时,您将带有凭据的文件保存在不同的文件夹中,然后需要立即遍历三个对象:广告帐户的登录,代理帐户的登录,带有凭据的文件的存储路径。 可以在帮助下完成。 pmap
系列pmap
。
代码7:pmap函数示例 library(purrr) # , # TokenPath <- c("C:\\proj1\\tokens", "C:\\yandex\\token", "C:\\yandex\\token", "C:\\my_yandex_acoount") # pmap.result <- pmap_df(list(Login = logins, AgencyAccount = agencies, TokenPath = TokenPath), yadirGetKeyWords)
因此,执行功能map_df
, map2_df
和pmap_df
是日期框架,并且在使用它们时, sapply
使用sapply
( do.call("rbind", kw.sapply)
)的示例的最后一步。
代码变得更加紧凑,执行速度更快,但是,上述方法sapply
和purrr
从每个帐户顺序收集了关键字。 因此,此操作的总执行时间是来自所有四个帐户的数据收集持续时间的总和。
时间[总计] = 时间[登录1] +时间[登录2] +时间[登录3] +时间[登录4]
解决从Yandex.Direct收集关键字的任务的多线程选项

因此,如果您已经阅读了第一篇文章 ,那么您将知道多线程操作模式具有以下功能:
- 每个线程均在具有干净工作环境的单独R会话中启动。
- 出于相同的原因,在单独的运行过程中,默认情况下不会传输先前连接的数据包。
导出在工作环境中创建的对象以及在每种方法中连接程序包的实现方式有所不同,然后我们将对其进行更详细的考虑。
parallel
包装
该软件包最初包含在2.14.0版的R软件包中,并且直到今天,R本身也都附带了该软件包。
代码8:通过并行包解决问题的示例 library(parallel) library(tictoc) # cl <- makeCluster(4) # clusterExport(cl = cl, varlist = "logins") # , # , ryandexdirect clusterEvalQ(cl = cl, { library(ryandexdirect) library(dplyr) } ) tic() # parallel.kw <- parSapplyLB(cl = cl, # X = logins, # , FUN = function(x) { # # X yadirGetKeyWords(Login = x) %>% mutate(login = x) }, simplify = F) # toc() # # stopCluster(cl) # result.parallel <- dplyr::bind_rows(parallel.kw)
16.75 sec elapsed
时间: 16.75 sec elapsed
让我们尝试解析代码8 。 makeCluster
函数创建一个由4个进程组成的集群。 我们可以使用clusterExport
函数将对象从主要工作环境导出到创建的集群,为此,我们需要使用其参数:
- cl-要将对象导出到的群集
- varlist-一个文本向量,其中包含要导出到每个集群进程的对象的名称。
在每个群集节点上连接正确的程序包的一种方法是使用clusterEvalQ
函数。 在我们的示例中,我们使用它来连接程序包,但是您可以在clusterEvalQ
内编写任何R代码,它将在每个群集节点的开始处启动。 该函数的参数非常明显,您需要指定集群以及将在其中执行的命令。
parSapplyLB
是sapply
函数的并行版本,在群集节点之间具有负载平衡,它们也使用它,但是您需要使用cl参数指定群集。
同时,还有*apply
系列函数的其他并行版本: parLapply
, parSapply
, parApply
等。
parSapply
与parSapply
不同之处仅在于,它在群集节点上没有负载平衡。
stopCluster
函数用于停止创建的集群。
最后一个命令dplyr::bind_rows(parallel.kw)
我们将使用parSapplyLB
获得的parallel.kw对象parSapplyLB
到一个表中。
对于Linux, parallel
具有单独的功能: mclapply
, mcmapply
, mcMap
。 通常在此操作系统中,命令执行速度更快,并且代码变得更紧凑。
代码9:针对Linux使用mclapply的解决方案 library(parallel) library(tictic) library(dplyr) library(ryandexdirect) tic() mclapply.kw <- mclapply(logins, FUN = function(x) { # # X yadirGetKeyWords(Login = x) %>% mutate(login = x) }, mc.cores = 4) toc()
使用这些功能时,无需使用makeCluster
启动集群。 您使用mc.cores参数指定的节点数。 也不需要连接包和导出对象;这些操作是自动执行的。
future
包装
R中异步编程的最现代方法之一。
可以在future
的帮助下并行解决我们的问题的代码非常复杂,难以理解。 因此,让我们通过一个简单的示例来分析其工作,我们将从一个帐户中请求关键字列表。
代码10:使用Future包的最简单示例 library(future) # plan(multiprocess) # # future.kw <- future({yadirGetKeyWords(Login = logins[4])}, packages = "ryandexdirect", globals = "logins") # resolved(future.kw) # future.result.1 <- value(future.kw)
让我们尝试找出Code 10示例。 plan
功能允许您设置和更改给定表达式的执行模式,以下是主要的表达式:
- 顺序 -这是通常的R操作模式;命令在当前会话中顺序执行;
- 多会话 -并行模式,命令将在当前计算机的后台运行会话中执行,而不会阻塞您的工作会话;
- cluster-并行模式,命令将在当前或远程计算机上执行,类似于在
parallel
程序包中实现的方式。
整个future
程序包都基于在后台进程中执行命令而不会阻塞当前会话。 运行命令的执行将遵循具有相同名称的功能future
,因此当我们运行命令时:
future({yadirGetKeyWords(Login = logins[4])}, packages = "ryandexdirect", globals = "logins")
我们当前在R中的会话没有被阻塞,并且该命令在后台执行,运行另一个R会话。
您可以使用resolved
函数来检查给定表达式执行过程的当前状态。 最后, value
函数用于获取future
执行的结果。 如果您在并行运行的会话中早于future
运行value
函数,则当前工作会话将被阻塞,直到并行会话表达式完成。
最先进的工作示例是将Future与promises
结合使用。
代码11:共享`future`和`promise`包的示例 library(future) library(promises) # plan(multiprocess) # # future.kw <- future({suppressMessages( yadirGetKeyWords(Login = logins[4]))}, packages = "ryandexdirect", globals = "logins") %...>% # future, # nrow() %...>% paste("words loaded") %...>% print()
promises
软件包提供了一组管道运算符,可以完美地补充future
功能。
在后台的Code 11示例中,我们开始从一个广告帐户下载关键字的过程。 此外,没有阻塞工作会话的管道运算符%...>%
等待future
,并执行其余操作。 由于执行了代码, future
工作完成后,指定帐户中的关键字数量将显示在控制台中:
[1] "1855 words loaded"
在文章的结尾,将展示一堆关于future
和promises
的更具说明性的示例。
默认情况下, future
包本身会将整个工作空间导出到每个并行运行的会话,但是您自己可以使用globals参数指定要导出的对象列表。
要将软件包连接到future
应该将包含其名称的向量传递给packages参数。
现在回到我们的任务,以下并行模式的代码示例将从4个帐户中加载关键字列表:
代码12:使用future包解决问题的示例 library(future) library(tictoc) # plan("multisession", workers = 4) tic() # futs <- lapply(logins, # function(i) # # future({ yadirGetKeyWords(Login = i) %>% mutate(login = i) }, packages = c("ryandexdirect", "dplyr"))) completed <- sapply(futs, resolved) # kw <- lapply(futs, value) # toc() # # result.future <- dplyr::bind_rows(kw)
前置时间: 14.83 sec elapsed
要从引导登录中列出的所有广告帐户中以多线程模式下载关键字列表,您需要在后台运行单独的Future。 在代码示例12中,我们使用lapply
函数实现了这一点。
努力工作的结果是future
推出的清单。 您可以使用sapply(futs, resolved)
命令检查每个命令的状态,该命令将返回逻辑向量,其中TRUE表示future
实现,而FALSE表示future
正在进行中。
为了从每个future
获得结果,在他们的工作完成之后,我们使用lapply(futs, value)
命令。
: result.future <- dplyr::bind_rows(kw)
.
future
, (
future
), .
future.apply
future.apply
future
, .
13: future.apply library(future.apply) library(tictoc) # plan("multisession", workers = 4) tic() # kw.future.apply <- future_sapply(logins, # , function(x) { # yadirGetKeyWords(Login = x) %>% mutate(login = x) }, simplify = FALSE, # # future.packages = c("ryandexdirect", "dplyr"), future.globals = TRUE ) toc() #
: 17.28 sec elapsed
13 , future.apply
future
, .
4 : plan("multisession", workers = 4)
.
future_sapply
logins . 即 , , sapply
, .
future_sapply
future.packages . future.globals . , .
furrr
future
. purrr
, furrr
.
14: furrr library(furrr) library(tictoc) # cl <- parallel::makeCluster(4) plan(cluster, workers = cl) tic() # furrr.kw <- future_map(logins, ~ # function(.x) yadirGetKeyWords(Login = .x) %>% mutate(login = .x), .options = future_options(packages = c("ryandexdirect", "dplyr"), globals = c())) toc() # # result.furrr <-dplyr::bind_rows(furrr.kw)
: 15.45 sec elapsed
furrr
purrr
. purrr
, .
.options . .options future_options
, .
14 packages globals :
.options = future_options(packages = c("ryandexdirect", "dplyr"), globals = c())
rbenchmark
.
, future
promises
. .
, 20 4 () .
= (T[1] * 20) + (T[2] * 20) + (T[N] * 20)
15: future promises library(furrr) library(parallel) library(dplyr) library(future) library(ryandexdirect) library(tictoc) library(rbenchmark) # logins <- c("login1", "login2", "login3", "login4") # # par par.furrr <- function(logins) { cl <- parallel::makeCluster(4) plan(cluster, workers = cl) furrr.kw <- future_map(logins, ~ yadirGetKeyWords(Login = .x) %>% mutate(login = .x), .options = future_options(packages = c("ryandexdirect", "dplyr"), globals = c())) result.furrr <-dplyr::bind_rows(furrr.kw) } par.future <- function(logins) { plan("multisession", workers = 4) futs <- lapply(logins, function(i) future({ yadirGetKeyWords(Login = i) %>% mutate(login = i) }, packages = c("ryandexdirect", "dplyr"))) completed <- sapply(futs, resolved) kw <- lapply(futs, value) result.future <- dplyr::bind_rows(kw) } par.future.apply <- function(logins) { plan("multisession", workers = 4) kw.future.apply <- future_sapply(logins, function(x) { yadirGetKeyWords(Login = x) %>% mutate(login = x) }, simplify = FALSE, future.packages = c("ryandexdirect", "dplyr"), future.globals = TRUE ) result.future.apply <- dplyr::bind_rows(kw.future.apply) } par.parallel <- function(logins) { cl <- parallel::makeCluster(4) clusterExport(cl = cl, varlist = "logins") clusterEvalQ(cl = cl, { library(ryandexdirect) library(dplyr) } ) parallel.kw <- parSapplyLB(cl = cl, X = logins, FUN = function(x) { yadirGetKeyWords(Login = x) %>% mutate(login = x) }, simplify = F) stopCluster(cl) result.parallel <- dplyr::bind_rows(parallel.kw) } # seq seq.apply <- function(logins) { kw.sapply <- sapply( logins, function(x) { yadirGetKeyWords(Login = x) %>% mutate(login = x) }, simplify = FALSE ) result.sapply <- do.call("rbind", kw.sapply) } seq.purrr <- function(logins) { kw.purrr <- map_df( logins, ~ { yadirGetKeyWords(Login = .x) %>% mutate(login = .x) } ) result.purrr <- do.call("rbind", kw.purrr) } # rbenchmark # future + promises # , # plan(list(tweak(multisession, workers = 2), tweak(multisession, workers = 4))) tic() speed.test <- future({ # within(benchmark(furrr = par.furrr(logins), future = par.future(logins), future.apply = par.future.apply(logins), parallel = par.parallel(logins), apply = seq.apply(logins), purrr = seq.purrr(logins), replications = c(20), columns = c('test', 'replications', 'elapsed'), order = c('elapsed', 'test')), { average = round(elapsed/replications, 2) }) }, packages = c("dplyr", "ryandexdirect", "rbenchmark", "parallel", "purrr", "future", "promises", "furrr", "future.apply"), globals = c("logins", "par.furrr", "par.future", "par.future.apply", "par.parallel", "seq.apply", "seq.purrr")) %...>% print() %...T>% toc() message("My Session is not blocked")
3370 , .. .
. , future
, promises
, .
, . "My Session is not blocked", , , .. .
promises
:
%...>%
— %>%
, . 即 , resolved
, future
, value
. , print
.%...T>%
— %T>%
, , . , , , .. .. print
, , .- %...T!% — .
15 plan
tweak
( plan(list(tweak(multisession, workers = 2), tweak(multisession, workers = 4)))
), , 2 , future
4 .
:
My Session is not blocked test replications elapsed average 4 parallel 20 393.02 19.65 1 furrr 20 402.09 20.10 2 future 20 431.19 21.56 3 future.apply 20 432.29 21.61 5 apply 20 847.77 42.39 6 purrr 20 864.19 43.21 3370.55 sec elapsed

, parallel
, . furrr
, future
future.apply
.
1 , , . , API . .
, 4 , .
结论
R, API.
, API . " R , 1" .
:
- doSNOW / doParallel + foreach
- future + promises
- future.apply / furrr
- parallel
, , .
, R .