No último artigo, falei sobre o que é multithreading e dei exemplos de sua implementação na linguagem R ao trabalhar com a API Yandex.Direct usando os doParallel
, doParallel
e foreach
.
Este artigo é uma continuação, mas pode ser considerado um guia offline para multithreading em R. Fui solicitado a escrevê-lo pelos comentários recebidos na primeira parte (aqui, um agradecimento especial a Alexey_mosc , SatCat , Ananiev_Genrih ), no qual recebi vários pacotes que representam uma abordagem mais moderna implementações de multithreading em R, falaremos sobre elas mais tarde.

Conteúdo
Desafio
Como exemplo, consideramos o problema considerado em uma publicação anterior , ou seja, no modo multithread, colete uma lista de palavras-chave de 4 contas de publicidade Yandex.Direct.
Para trabalhar com a API Yandex.Direct, usaremos o pacote ryandexdirect
. A documentação oficial está no link , mas para a implementação da tarefa descrita, precisamos de apenas 2 funções:
yadirAuth
- autorização na API Yandex.Direct;yadirGetKeyWords
- faça o download de uma lista de palavras-chave de contas de anúncios.
Não escolhi apenas o processo de download de palavras-chave, o fato é que essa é uma das operações mais demoradas na API Yandex.Direct. Em segundo lugar, em todas as contas, o número de palavras-chave é diferente; portanto, o tempo para concluir esta operação para cada conta será muito diferente, no nosso caso, de 1 a 20 segundos.
Preparação
Inicialmente, você precisa instalar todos os pacotes discutidos neste artigo, para isso você pode usar o código abaixo.
Código 1: Instalando Pacotes # install.packages("ryandexdirect") install.packages("tictoc") install.packages("rbenchmark") install.packages("dplyr") install.packages("purrr") install.packages("future") install.packages("promises") install.packages("furrr") install.packages("future.apply")
Para que as funções do pacote estejam disponíveis, você deve conectá-lo usando o comando library
. Por conveniência, conectarei separadamente todos os pacotes necessários em cada exemplo de código.
Criamos um vetor que consiste em logins Yandex.Direct, dos quais posteriormente solicitaremos palavras-chave:
Código 2: Criando um vetor de logon logins <- c("login1", "login2", "login3", "login4")
Para trabalhar com a API Yandex.Direct, primeiro você precisa passar pela autorização em cada conta. Para isso, você pode usar o seguinte design:
Código 3: autorização na API Yandex.Direct lapply(logins, function(l) { yadirAuth(Login = l)})
Depois de executar o código acima, um navegador será aberto para autorização em cada conta. Você confirma a permissão do ryandexdirect
para acessar seus materiais de publicidade. Você será redirecionado para a página em que precisa copiar o código de verificação. Ao inseri-lo no console do R, conclua o processo de autorização. Esta operação é repetida para cada logon especificado ao criar os logons de vetor.
Alguns usuários, durante o processo de autorização, podem ficar confusos com o redirecionamento para um recurso de terceiros, mas não há perigo para sua conta. Descrevi esse tópico com mais detalhes no artigo "Quão seguro é usar pacotes R para trabalhar com APIs de sistemas de publicidade" .
Além disso, consideraremos vários exemplos da implementação da tarefa descrita. Cada um dos quais começará com um código de exemplo e sua explicação adicional. Eu acho que essa opção será mais conveniente para a percepção.
Exemplo de solução de processamento serial, função sapply e pacote purrr

No último artigo , citei uma solução usando o loop for
como exemplo. Como consideramos o multithreading usando o pacote foreach
, cuja sintaxe se assemelha a loops, este exemplo foi apropriado lá, embora o uso de loops não seja bem-vindo pelos usuários de R.
Os pacotes que consideraremos neste artigo são mais remanescentes das funções da família apply na sintaxe; portanto, darei um exemplo de solução em modo serial usando-os.
Função sapply
Para estimar o tempo de execução dos comandos, em cada uma das abordagens consideradas, usaremos o pacote tictoc
.
Código 4: Exemplo de solução no modo seqüencial usando a função sapply library(tictoc) library(dplyr) tic() # kw.sapply <- sapply( logins, # , function(x) # # { yadirGetKeyWords(Login = x) %>% mutate(login = x) }, simplify = FALSE # ) toc() # # result.sapply <- do.call("rbind", kw.sapply)
Tempo de 39.36 sec elapsed
: 39.36 sec elapsed
A princípio, a sintaxe das funções da família apply
não é tão fácil de ler quanto a sintaxe dos loops, mas, na verdade, tudo é bastante simples.
sapply(X, FUN)
Onde:
- X - Um objeto cujos elementos iremos iterar e usar a cada iteração, em um loop for, era assim:
for(i in X)
; - FUN - Uma função na qual substituiremos cada elemento do objeto X por sua vez, se fizermos uma analogia com
for
, então esse é o corpo do loop.
No exemplo de código 4 , o vetor de logons criado anteriormente é passado para o argumento X. Cada elemento do vetor logins é passado por sua vez, como o único argumento para a função anônima function(x) { yadirGetKeyWords(Login = x) %>% mutate(login = x) }
que foi passada para o argumento FUN .
I.e. sapply
executará a função especificada no FUN 4 vezes, substituindo os logins nela uma a uma e retornando o resultado na forma de uma lista (objeto da lista de classes) composta por 4 elementos. Cada elemento é uma tabela com uma lista de palavras-chave recebidas da conta a cada iteração.
yadirGetKeyWords(Login = "login1") %>% mutate(login = "login1")
yadirGetKeyWords(Login = "login2") %>% mutate(login = "login2")
yadirGetKeyWords(Login = "login3") %>% mutate(login = "login3")
yadirGetKeyWords(Login = "login4") %>% mutate(login = "login4")
O objeto obtido usando sapply
possui a seguinte estrutura:
summary(kw.sapply)
Length Class Mode login1 19 data.frame list login2 19 data.frame list login3 19 data.frame list login4 19 data.frame list
No final deste exemplo, o comando result.sapply <- do.call("rbind", kw.sapply)
combina todos os 4 elementos da lista kw.sapply em um quadro result.sapply .
# A tibble: 6,804 x 1 result.sapply$Id $Keyword $AdGroupId $CampaignId $ServingStatus $State <dbl> <fct> <dbl> <int> <fct> <fct> 1 15164230566 ~ 3597453985 39351725 ELIGIBLE ON 2 15164230567 ~ 3597453985 39351725 ELIGIBLE ON 3 15164230568 ~ 3597453985 39351725 ELIGIBLE ON 4 15164230569 ~ 3597453985 39351725 ELIGIBLE ON 5 15164230570 ~ 3597453985 39351725 ELIGIBLE ON 6 15164230571 ~ 3597453985 39351725 ELIGIBLE ON 7 15164230572 ~ 3597453985 39351725 ELIGIBLE ON 8 15164230573 ~ 3597453985 39351725 ELIGIBLE ON 9 15164230574 ~ 3597453985 39351725 ELIGIBLE ON 10 15164230575 ~ 3597453985 39351725 ELIGIBLE ON # ... with 6,794 more rows, and 13 more variables: $Status <fct>, # $StrategyPriority <fct>, $StatisticsSearchImpressions <int>, # $StatisticsSearchClicks <int>, $StatisticsNetworkImpressions <int>, # $StatisticsNetworkClicks <lgl>, $UserParam1 <chr>, $UserParam2 <chr>, # $ProductivityValue <lgl>, $ProductivityReferences <lgl>, $Bid <dbl>, # $ContextBid <dbl>, $login <chr>
Além do sapply
, a família de funções *apply
inclui: apply
, lapply
, vapply
, mapply
e outros.
Pacote purrr
Código 5: Exemplo de solução usando as funções do pacote purrr library(purrr) library(dplyr) library(tictoc) tic() # result.purrr <- map_df( logins, # , ~ # function(.x) { yadirGetKeyWords(Login = .x) %>% mutate(login = .x) } ) toc() #
Tempo de 35.46 sec elapsed
: 35.46 sec elapsed
O pacote purrr
faz parte do núcleo da biblioteca tidyverse
, de autoria de Headley Wickham.
Em significado e sintaxe, as principais funções do pacote são muito semelhantes ao sapply
, sua principal vantagem é a seguinte:
- As funções são divididas em famílias
map
, pmap
, pmap
, walk
etc., funções separadas incluídas na mesma família retornam o resultado em diferentes formatos: chr , dbl , int , df , etc; - As funções da família
map2
sobre elementos (iterar) simultaneamente de dois objetos; - As funções da família
pmap
simultaneamente os elementos de qualquer número de objetos. Você pode passar uma tabela para a entrada do argumento .l (um análogo do argumento X em sapply) , cada coluna contendo os valores pelos quais você iterará e que serão substituídos por sua vez nos argumentos da mesma função passada em .f (o analógico FUN de sapply) .
Em que situação precisamos iterar sobre elementos de vários objetos. Por exemplo, você trabalha com várias contas de agente e as contas de publicidade das quais deseja obter uma lista de palavras-chave estão espalhadas entre elas. Nesse caso, você pode criar um vetor a partir dos nomes das contas dos agentes e iterá-lo, paralelamente à forma como classifica os logins das contas de publicidade.
Código 6: Exemplo de trabalho com várias contas de agente library(purrr) # agencies <- c("agency1", NA, "agency2", "agency1") # # result.pmap2 <- map2_df(.x = logins, .y = agencies, ~ { yadirGetKeyWords(Login = .x, AgencyAccount = .y) %>% mutate(login = .x) })
Agora imagine a situação em que, ao fazer login em contas diferentes, você salvou o arquivo com as credenciais em pastas diferentes e precisa iterar imediatamente em três objetos: logins de contas de publicidade, logon de contas de agente, o caminho no qual o arquivo com credenciais é armazenado. Isso pode ser feito com ajuda. pmap
família pmap
.
Código 7: exemplo de função pmap library(purrr) # , # TokenPath <- c("C:\\proj1\\tokens", "C:\\yandex\\token", "C:\\yandex\\token", "C:\\my_yandex_acoount") # pmap.result <- pmap_df(list(Login = logins, AgencyAccount = agencies, TokenPath = TokenPath), yadirGetKeyWords)
Assim, o resultado da execução das funções map_df
, map2_df
e pmap_df
é o período e, ao usá-las, a última etapa do exemplo com sapply
( do.call("rbind", kw.sapply)
) não é necessária.
O código tornou-se mais compacto e executado um pouco mais rápido, mas, no entanto, as duas abordagens descritas, sapply
e purrr
, coletam palavras-chave de cada conta sequencialmente. Portanto, o tempo total de execução dessa operação é igual à soma das durações da coleta de dados das quatro contas.
Tempo [total] = Tempo [login1] + Tempo [login2] + Tempo [login3] + Tempo [login4]
Opções multithread para resolver a tarefa de coletar palavras-chave do Yandex.Direct

Portanto, se você já leu o primeiro artigo , sabe que o modo de operação multithread possui vários recursos:
- Cada encadeamento inicia em uma sessão R separada com um ambiente de trabalho limpo.
- Pelo mesmo motivo, em um processo em execução separado, os pacotes conectados anteriormente não são transmitidos por padrão.
A exportação de objetos criados em um ambiente de trabalho e a conexão de pacotes em cada abordagem são implementadas de maneira diferente; então, nós os consideraremos com mais detalhes.
Pacote parallel
Este pacote foi incluído pela primeira vez no pacote R na versão 2.14.0 e até hoje vem com o próprio R.
Código 8: Exemplo de solução para o problema através do pacote paralelo library(parallel) library(tictoc) # cl <- makeCluster(4) # clusterExport(cl = cl, varlist = "logins") # , # , ryandexdirect clusterEvalQ(cl = cl, { library(ryandexdirect) library(dplyr) } ) tic() # parallel.kw <- parSapplyLB(cl = cl, # X = logins, # , FUN = function(x) { # # X yadirGetKeyWords(Login = x) %>% mutate(login = x) }, simplify = F) # toc() # # stopCluster(cl) # result.parallel <- dplyr::bind_rows(parallel.kw)
Tempo de 16.75 sec elapsed
: 16.75 sec elapsed
Vamos tentar analisar o código 8 . A função makeCluster
cria um cluster de 4 processos. Podemos exportar objetos do nosso ambiente de trabalho principal para o cluster criado usando a função clusterExport
; para isso, precisamos usar seus argumentos:
- cl - Cluster para o qual exportaremos objetos
- varlist - um vetor de texto que contém os nomes dos objetos a serem exportados para cada processo de cluster.
Uma maneira de conectar os pacotes corretos em cada nó do cluster é usar a função clusterEvalQ
. Em nosso exemplo, nós o usamos para conectar pacotes, mas você pode escrever qualquer código R dentro de clusterEvalQ
, e ele será iniciado no início de cada nó do cluster. Os argumentos para esta função são bastante óbvios, você precisa especificar o cluster e os comandos que serão executados nele.
parSapplyLB
é uma versão paralela da função sapply
com balanceamento de carga entre os nós do cluster, eles também a usam, mas é necessário especificar o cluster com o argumento cl .
Também em parallel
existem outras versões paralelas das funções da família *apply
: parLapply
, parSapply
, parApply
, etc.
parSapply
difere do parSapplyLB
apenas no fato de não possuir balanceamento de carga nos nós do cluster.
A função stopCluster
é usada para parar o cluster criado.
O último comando, dplyr::bind_rows(parallel.kw)
combinamos o objeto parallel.kw obtido usando parSapplyLB
em uma tabela.
Para Linux, o parallel
possui funções separadas: mclapply
, mcmapply
, mcMap
. Muitas vezes, neste sistema operacional, os comandos são executados mais rapidamente e o código se torna mais compacto.
Código 9: Solução usando mclapply para Linux library(parallel) library(tictic) library(dplyr) library(ryandexdirect) tic() mclapply.kw <- mclapply(logins, FUN = function(x) { # # X yadirGetKeyWords(Login = x) %>% mutate(login = x) }, mc.cores = 4) toc()
Ao usar essas funções, não há necessidade de iniciar o cluster usando o makeCluster
. o número de nós que você especifica usando o argumento mc.cores . Também não é necessário conectar pacotes e exportar objetos; essas operações são executadas automaticamente.
Pacote future
Uma das abordagens mais modernas para programação assíncrona em R.
Um código que, paralelamente, resolverá nosso problema com a ajuda do future
é bastante complicado de entender. Portanto, vamos analisar seu trabalho em um exemplo mais simples, solicitaremos uma lista de palavras-chave de uma conta.
Código 10: O exemplo mais simples de uso do pacote futuro library(future) # plan(multiprocess) # # future.kw <- future({yadirGetKeyWords(Login = logins[4])}, packages = "ryandexdirect", globals = "logins") # resolved(future.kw) # future.result.1 <- value(future.kw)
Vamos tentar descobrir o exemplo do código 10 . A função de plan
permite definir e alterar o modo de execução das expressões fornecidas, eis as principais:
- sequencial - Este é o modo usual de operação R. Os comandos são executados sequencialmente na sessão atual;
- multisessão - modo paralelo, os comandos serão executados nas sessões em execução em segundo plano na máquina atual, enquanto sua sessão de trabalho não será bloqueada;
- cluster - modo paralelo, os comandos serão executados na máquina atual ou remota, semelhante à maneira como é implementada no pacote
parallel
.
Todo o pacote future
é baseado na execução de comandos em processos em segundo plano sem bloquear a sessão atual. Executar a execução dos comandos segue a função com o mesmo nome future
, portanto, quando executamos o comando:
future({yadirGetKeyWords(Login = logins[4])}, packages = "ryandexdirect", globals = "logins")
Nossa sessão atual em R não é bloqueada e o comando é executado em segundo plano, executando outra sessão R.
Você pode verificar o estado atual do processo de execução de uma determinada expressão usando a função resolved
. Finalmente, a função value
é usada para obter o resultado da execução future
. Se você executar a função value
mais cedo do que sua execução future
em uma sessão paralela, a sessão de trabalho atual será bloqueada até que a expressão da sessão paralela seja concluída.
O exemplo de trabalho mais avançado é o uso do future
conjunto com promises
.
Código 11: Exemplo de compartilhamento de pacotes de `futuro` e` promessas` library(future) library(promises) # plan(multiprocess) # # future.kw <- future({suppressMessages( yadirGetKeyWords(Login = logins[4]))}, packages = "ryandexdirect", globals = "logins") %...>% # future, # nrow() %...>% paste("words loaded") %...>% print()
O pacote promises
fornece um conjunto de operadores de pipeline que complementam perfeitamente a funcionalidade future
.
No exemplo do Código 11 , em segundo plano, iniciamos o processo de download de palavras-chave de uma conta de publicidade. Além disso, o operador do pipeline %...>%
sem bloquear a sessão de trabalho aguarda future
do future
e executa as operações restantes. Como resultado da execução do código, após a conclusão de trabalhos future
, o número de palavras-chave da conta especificada será exibido no console:
[1] "1855 words loaded"
No final do artigo, um exemplo mais ilustrativo de um monte de future
e promises
será demonstrado.
Por padrão, o próprio pacote future
exporta todo o espaço de trabalho para cada sessão em execução paralela, mas você mesmo pode especificar uma lista de objetos para exportação usando o argumento global .
Para conectar pacotes ao future
deve passar um vetor contendo seus nomes para o argumento packages .
Agora, voltando à nossa tarefa, o seguinte exemplo de código no modo paralelo carregará uma lista de palavras-chave de 4 contas:
Código 12: Um exemplo de solução de um problema usando o pacote futuro library(future) library(tictoc) # plan("multisession", workers = 4) tic() # futs <- lapply(logins, # function(i) # # future({ yadirGetKeyWords(Login = i) %>% mutate(login = i) }, packages = c("ryandexdirect", "dplyr"))) completed <- sapply(futs, resolved) # kw <- lapply(futs, value) # toc() # # result.future <- dplyr::bind_rows(kw)
Tempo de 14.83 sec elapsed
: 14.83 sec elapsed
Para baixar uma lista de palavras-chave no modo multithread de todas as contas de publicidade listadas nos logins de vetores , é necessário executar um future
separado em segundo plano. No exemplo de código 12, implementamos isso usando a função lapply
.
O resultado do trabalho lapply
é uma lista de future
lançados. Você pode verificar o status de cada um usando o sapply(futs, resolved)
, que retornará um vetor lógico em que TRUE significa que o future
cumprido e FALSE que o future
está em andamento no momento.
Para obter resultados de cada future
, após a conclusão do trabalho, usamos o lapply(futs, value)
.
: result.future <- dplyr::bind_rows(kw)
.
future
, (
future
), .
future.apply
future.apply
future
, .
13: future.apply library(future.apply) library(tictoc) # plan("multisession", workers = 4) tic() # kw.future.apply <- future_sapply(logins, # , function(x) { # yadirGetKeyWords(Login = x) %>% mutate(login = x) }, simplify = FALSE, # # future.packages = c("ryandexdirect", "dplyr"), future.globals = TRUE ) toc() #
: 17.28 sec elapsed
13 , future.apply
future
, .
4 : plan("multisession", workers = 4)
.
future_sapply
logins . I.e. , , sapply
, .
future_sapply
future.packages . future.globals . , .
furrr
future
. purrr
, furrr
.
14: furrr library(furrr) library(tictoc) # cl <- parallel::makeCluster(4) plan(cluster, workers = cl) tic() # furrr.kw <- future_map(logins, ~ # function(.x) yadirGetKeyWords(Login = .x) %>% mutate(login = .x), .options = future_options(packages = c("ryandexdirect", "dplyr"), globals = c())) toc() # # result.furrr <-dplyr::bind_rows(furrr.kw)
: 15.45 sec elapsed
furrr
purrr
. purrr
, .
.options . .options future_options
, .
14 packages globals :
.options = future_options(packages = c("ryandexdirect", "dplyr"), globals = c())
rbenchmark
.
, future
promises
. .
, 20 4 () .
= (T[1] * 20) + (T[2] * 20) + (T[N] * 20)
15: future promises library(furrr) library(parallel) library(dplyr) library(future) library(ryandexdirect) library(tictoc) library(rbenchmark) # logins <- c("login1", "login2", "login3", "login4") # # par par.furrr <- function(logins) { cl <- parallel::makeCluster(4) plan(cluster, workers = cl) furrr.kw <- future_map(logins, ~ yadirGetKeyWords(Login = .x) %>% mutate(login = .x), .options = future_options(packages = c("ryandexdirect", "dplyr"), globals = c())) result.furrr <-dplyr::bind_rows(furrr.kw) } par.future <- function(logins) { plan("multisession", workers = 4) futs <- lapply(logins, function(i) future({ yadirGetKeyWords(Login = i) %>% mutate(login = i) }, packages = c("ryandexdirect", "dplyr"))) completed <- sapply(futs, resolved) kw <- lapply(futs, value) result.future <- dplyr::bind_rows(kw) } par.future.apply <- function(logins) { plan("multisession", workers = 4) kw.future.apply <- future_sapply(logins, function(x) { yadirGetKeyWords(Login = x) %>% mutate(login = x) }, simplify = FALSE, future.packages = c("ryandexdirect", "dplyr"), future.globals = TRUE ) result.future.apply <- dplyr::bind_rows(kw.future.apply) } par.parallel <- function(logins) { cl <- parallel::makeCluster(4) clusterExport(cl = cl, varlist = "logins") clusterEvalQ(cl = cl, { library(ryandexdirect) library(dplyr) } ) parallel.kw <- parSapplyLB(cl = cl, X = logins, FUN = function(x) { yadirGetKeyWords(Login = x) %>% mutate(login = x) }, simplify = F) stopCluster(cl) result.parallel <- dplyr::bind_rows(parallel.kw) } # seq seq.apply <- function(logins) { kw.sapply <- sapply( logins, function(x) { yadirGetKeyWords(Login = x) %>% mutate(login = x) }, simplify = FALSE ) result.sapply <- do.call("rbind", kw.sapply) } seq.purrr <- function(logins) { kw.purrr <- map_df( logins, ~ { yadirGetKeyWords(Login = .x) %>% mutate(login = .x) } ) result.purrr <- do.call("rbind", kw.purrr) } # rbenchmark # future + promises # , # plan(list(tweak(multisession, workers = 2), tweak(multisession, workers = 4))) tic() speed.test <- future({ # within(benchmark(furrr = par.furrr(logins), future = par.future(logins), future.apply = par.future.apply(logins), parallel = par.parallel(logins), apply = seq.apply(logins), purrr = seq.purrr(logins), replications = c(20), columns = c('test', 'replications', 'elapsed'), order = c('elapsed', 'test')), { average = round(elapsed/replications, 2) }) }, packages = c("dplyr", "ryandexdirect", "rbenchmark", "parallel", "purrr", "future", "promises", "furrr", "future.apply"), globals = c("logins", "par.furrr", "par.future", "par.future.apply", "par.parallel", "seq.apply", "seq.purrr")) %...>% print() %...T>% toc() message("My Session is not blocked")
3370 , .. .
. , future
, promises
, .
, . "My Session is not blocked", , , .. .
promises
:
%...>%
— %>%
, . I.e. , resolved
, future
, value
. , print
.%...T>%
— %T>%
, , . , , , .. .. print
, , .- %...T!% — .
15 plan
tweak
( plan(list(tweak(multisession, workers = 2), tweak(multisession, workers = 4)))
), , 2 , future
4 .
:
My Session is not blocked test replications elapsed average 4 parallel 20 393.02 19.65 1 furrr 20 402.09 20.10 2 future 20 431.19 21.56 3 future.apply 20 432.29 21.61 5 apply 20 847.77 42.39 6 purrr 20 864.19 43.21 3370.55 sec elapsed

, parallel
, . furrr
, future
future.apply
.
1 , , . , API . .
, 4 , .
Conclusão
R, API.
, API . " R , 1" .
:
- doSNOW / doParallel + foreach
- future + promises
- future.apply / furrr
- parallel
, , .
, R .