Como acelerar o trabalho com a API da linguagem R usando computação paralela, usando o exemplo da API Yandex.Direct (parte 2)

No último artigo, falei sobre o que é multithreading e dei exemplos de sua implementação na linguagem R ao trabalhar com a API Yandex.Direct usando os doParallel , doParallel e foreach .


Este artigo é uma continuação, mas pode ser considerado um guia offline para multithreading em R. Fui solicitado a escrevê-lo pelos comentários recebidos na primeira parte (aqui, um agradecimento especial a Alexey_mosc , SatCat , Ananiev_Genrih ), no qual recebi vários pacotes que representam uma abordagem mais moderna implementações de multithreading em R, falaremos sobre elas mais tarde.


Multithreading


Conteúdo



Desafio


Como exemplo, consideramos o problema considerado em uma publicação anterior , ou seja, no modo multithread, colete uma lista de palavras-chave de 4 contas de publicidade Yandex.Direct.


Para trabalhar com a API Yandex.Direct, usaremos o pacote ryandexdirect . A documentação oficial está no link , mas para a implementação da tarefa descrita, precisamos de apenas 2 funções:


  • yadirAuth - autorização na API Yandex.Direct;
  • yadirGetKeyWords - faça o download de uma lista de palavras-chave de contas de anúncios.

Não escolhi apenas o processo de download de palavras-chave, o fato é que essa é uma das operações mais demoradas na API Yandex.Direct. Em segundo lugar, em todas as contas, o número de palavras-chave é diferente; portanto, o tempo para concluir esta operação para cada conta será muito diferente, no nosso caso, de 1 a 20 segundos.


Preparação


Inicialmente, você precisa instalar todos os pacotes discutidos neste artigo, para isso você pode usar o código abaixo.


Código 1: Instalando Pacotes
 #    install.packages("ryandexdirect") install.packages("tictoc") install.packages("rbenchmark") install.packages("dplyr") install.packages("purrr") install.packages("future") install.packages("promises") install.packages("furrr") install.packages("future.apply") 

Para que as funções do pacote estejam disponíveis, você deve conectá-lo usando o comando library . Por conveniência, conectarei separadamente todos os pacotes necessários em cada exemplo de código.


Criamos um vetor que consiste em logins Yandex.Direct, dos quais posteriormente solicitaremos palavras-chave:


Código 2: Criando um vetor de logon
 logins <- c("login1", "login2", "login3", "login4") 

Para trabalhar com a API Yandex.Direct, primeiro você precisa passar pela autorização em cada conta. Para isso, você pode usar o seguinte design:


Código 3: autorização na API Yandex.Direct
 lapply(logins, function(l) { yadirAuth(Login = l)}) 

Depois de executar o código acima, um navegador será aberto para autorização em cada conta. Você confirma a permissão do ryandexdirect para acessar seus materiais de publicidade. Você será redirecionado para a página em que precisa copiar o código de verificação. Ao inseri-lo no console do R, conclua o processo de autorização. Esta operação é repetida para cada logon especificado ao criar os logons de vetor.


Alguns usuários, durante o processo de autorização, podem ficar confusos com o redirecionamento para um recurso de terceiros, mas não há perigo para sua conta. Descrevi esse tópico com mais detalhes no artigo "Quão seguro é usar pacotes R para trabalhar com APIs de sistemas de publicidade" .


Além disso, consideraremos vários exemplos da implementação da tarefa descrita. Cada um dos quais começará com um código de exemplo e sua explicação adicional. Eu acho que essa opção será mais conveniente para a percepção.


Exemplo de solução de processamento serial, função sapply e pacote purrr



No último artigo , citei uma solução usando o loop for como exemplo. Como consideramos o multithreading usando o pacote foreach , cuja sintaxe se assemelha a loops, este exemplo foi apropriado lá, embora o uso de loops não seja bem-vindo pelos usuários de R.


Os pacotes que consideraremos neste artigo são mais remanescentes das funções da família apply na sintaxe; portanto, darei um exemplo de solução em modo serial usando-os.


Função sapply


Para estimar o tempo de execução dos comandos, em cada uma das abordagens consideradas, usaremos o pacote tictoc .

Código 4: Exemplo de solução no modo seqüencial usando a função sapply
 library(tictoc) library(dplyr) tic() #   kw.sapply <- sapply( logins, #  ,     function(x) #        #     { yadirGetKeyWords(Login = x) %>% mutate(login = x) }, simplify = FALSE #     ) toc() #   #       result.sapply <- do.call("rbind", kw.sapply) 

Tempo de 39.36 sec elapsed : 39.36 sec elapsed


A princípio, a sintaxe das funções da família apply não é tão fácil de ler quanto a sintaxe dos loops, mas, na verdade, tudo é bastante simples.


sapply(X, FUN)


Onde:


  • X - Um objeto cujos elementos iremos iterar e usar a cada iteração, em um loop for, era assim: for(i in X) ;
  • FUN - Uma função na qual substituiremos cada elemento do objeto X por sua vez, se fizermos uma analogia com for , então esse é o corpo do loop.

No exemplo de código 4 , o vetor de logons criado anteriormente é passado para o argumento X. Cada elemento do vetor logins é passado por sua vez, como o único argumento para a função anônima function(x) { yadirGetKeyWords(Login = x) %>% mutate(login = x) } que foi passada para o argumento FUN .


I.e. sapply executará a função especificada no FUN 4 vezes, substituindo os logins nela uma a uma e retornando o resultado na forma de uma lista (objeto da lista de classes) composta por 4 elementos. Cada elemento é uma tabela com uma lista de palavras-chave recebidas da conta a cada iteração.


  1. yadirGetKeyWords(Login = "login1") %>% mutate(login = "login1")
  2. yadirGetKeyWords(Login = "login2") %>% mutate(login = "login2")
  3. yadirGetKeyWords(Login = "login3") %>% mutate(login = "login3")
  4. yadirGetKeyWords(Login = "login4") %>% mutate(login = "login4")

O objeto obtido usando sapply possui a seguinte estrutura:


 summary(kw.sapply) 

  Length Class Mode login1 19 data.frame list login2 19 data.frame list login3 19 data.frame list login4 19 data.frame list 

No final deste exemplo, o comando result.sapply <- do.call("rbind", kw.sapply) combina todos os 4 elementos da lista kw.sapply em um quadro result.sapply .


 # A tibble: 6,804 x 1 result.sapply$Id $Keyword $AdGroupId $CampaignId $ServingStatus $State <dbl> <fct> <dbl> <int> <fct> <fct> 1 15164230566 ~ 3597453985 39351725 ELIGIBLE ON 2 15164230567  ~ 3597453985 39351725 ELIGIBLE ON 3 15164230568  ~ 3597453985 39351725 ELIGIBLE ON 4 15164230569 ~ 3597453985 39351725 ELIGIBLE ON 5 15164230570 ~ 3597453985 39351725 ELIGIBLE ON 6 15164230571  ~ 3597453985 39351725 ELIGIBLE ON 7 15164230572 ~ 3597453985 39351725 ELIGIBLE ON 8 15164230573  ~ 3597453985 39351725 ELIGIBLE ON 9 15164230574 ~ 3597453985 39351725 ELIGIBLE ON 10 15164230575 ~ 3597453985 39351725 ELIGIBLE ON # ... with 6,794 more rows, and 13 more variables: $Status <fct>, # $StrategyPriority <fct>, $StatisticsSearchImpressions <int>, # $StatisticsSearchClicks <int>, $StatisticsNetworkImpressions <int>, # $StatisticsNetworkClicks <lgl>, $UserParam1 <chr>, $UserParam2 <chr>, # $ProductivityValue <lgl>, $ProductivityReferences <lgl>, $Bid <dbl>, # $ContextBid <dbl>, $login <chr> 

Além do sapply , a família de funções *apply inclui: apply , lapply , vapply , mapply e outros.


Pacote purrr


Código 5: Exemplo de solução usando as funções do pacote purrr
 library(purrr) library(dplyr) library(tictoc) tic() #   result.purrr <- map_df( logins, #  ,     ~ #   function(.x) { yadirGetKeyWords(Login = .x) %>% mutate(login = .x) } ) toc() #   

Tempo de 35.46 sec elapsed : 35.46 sec elapsed


O pacote purrr faz parte do núcleo da biblioteca tidyverse , de autoria de Headley Wickham.


Em significado e sintaxe, as principais funções do pacote são muito semelhantes ao sapply , sua principal vantagem é a seguinte:


  • As funções são divididas em famílias map , pmap , pmap , walk etc., funções separadas incluídas na mesma família retornam o resultado em diferentes formatos: chr , dbl , int , df , etc;
  • As funções da família map2 sobre elementos (iterar) simultaneamente de dois objetos;
  • As funções da família pmap simultaneamente os elementos de qualquer número de objetos. Você pode passar uma tabela para a entrada do argumento .l (um análogo do argumento X em sapply) , cada coluna contendo os valores pelos quais você iterará e que serão substituídos por sua vez nos argumentos da mesma função passada em .f (o analógico FUN de sapply) .

Em que situação precisamos iterar sobre elementos de vários objetos. Por exemplo, você trabalha com várias contas de agente e as contas de publicidade das quais deseja obter uma lista de palavras-chave estão espalhadas entre elas. Nesse caso, você pode criar um vetor a partir dos nomes das contas dos agentes e iterá-lo, paralelamente à forma como classifica os logins das contas de publicidade.


Código 6: Exemplo de trabalho com várias contas de agente
 library(purrr) #      agencies <- c("agency1", NA, "agency2", "agency1") #      #         result.pmap2 <- map2_df(.x = logins, .y = agencies, ~ { yadirGetKeyWords(Login = .x, AgencyAccount = .y) %>% mutate(login = .x) }) 

Agora imagine a situação em que, ao fazer login em contas diferentes, você salvou o arquivo com as credenciais em pastas diferentes e precisa iterar imediatamente em três objetos: logins de contas de publicidade, logon de contas de agente, o caminho no qual o arquivo com credenciais é armazenado. Isso pode ser feito com ajuda. pmap família pmap .


Código 7: exemplo de função pmap
 library(purrr) #  ,       #      TokenPath <- c("C:\\proj1\\tokens", "C:\\yandex\\token", "C:\\yandex\\token", "C:\\my_yandex_acoount") #   pmap.result <- pmap_df(list(Login = logins, AgencyAccount = agencies, TokenPath = TokenPath), yadirGetKeyWords) 

Assim, o resultado da execução das funções map_df , map2_df e pmap_df é o período e, ao usá-las, a última etapa do exemplo com sapply ( do.call("rbind", kw.sapply) ) não é necessária.


O código tornou-se mais compacto e executado um pouco mais rápido, mas, no entanto, as duas abordagens descritas, sapply e purrr , coletam palavras-chave de cada conta sequencialmente. Portanto, o tempo total de execução dessa operação é igual à soma das durações da coleta de dados das quatro contas.


Tempo [total] = Tempo [login1] + Tempo [login2] + Tempo [login3] + Tempo [login4]


Opções multithread para resolver a tarefa de coletar palavras-chave do Yandex.Direct



Portanto, se você já leu o primeiro artigo , sabe que o modo de operação multithread possui vários recursos:


  • Cada encadeamento inicia em uma sessão R separada com um ambiente de trabalho limpo.
  • Pelo mesmo motivo, em um processo em execução separado, os pacotes conectados anteriormente não são transmitidos por padrão.

A exportação de objetos criados em um ambiente de trabalho e a conexão de pacotes em cada abordagem são implementadas de maneira diferente; então, nós os consideraremos com mais detalhes.


Pacote parallel


Este pacote foi incluído pela primeira vez no pacote R na versão 2.14.0 e até hoje vem com o próprio R.


Código 8: Exemplo de solução para o problema através do pacote paralelo
 library(parallel) library(tictoc) #   cl <- makeCluster(4) #      clusterExport(cl = cl, varlist = "logins") #  ,      #  ,       ryandexdirect clusterEvalQ(cl = cl, { library(ryandexdirect) library(dplyr) } ) tic() #   parallel.kw <- parSapplyLB(cl = cl, #   X = logins, # ,     FUN = function(x) { #      #      X yadirGetKeyWords(Login = x) %>% mutate(login = x) }, simplify = F) #    toc() #     #   stopCluster(cl) #      result.parallel <- dplyr::bind_rows(parallel.kw) 

Tempo de 16.75 sec elapsed : 16.75 sec elapsed


Vamos tentar analisar o código 8 . A função makeCluster cria um cluster de 4 processos. Podemos exportar objetos do nosso ambiente de trabalho principal para o cluster criado usando a função clusterExport ; para isso, precisamos usar seus argumentos:


  • cl - Cluster para o qual exportaremos objetos
  • varlist - um vetor de texto que contém os nomes dos objetos a serem exportados para cada processo de cluster.

Uma maneira de conectar os pacotes corretos em cada nó do cluster é usar a função clusterEvalQ . Em nosso exemplo, nós o usamos para conectar pacotes, mas você pode escrever qualquer código R dentro de clusterEvalQ , e ele será iniciado no início de cada nó do cluster. Os argumentos para esta função são bastante óbvios, você precisa especificar o cluster e os comandos que serão executados nele.


parSapplyLB é uma versão paralela da função sapply com balanceamento de carga entre os nós do cluster, eles também a usam, mas é necessário especificar o cluster com o argumento cl .


Também em parallel existem outras versões paralelas das funções da família *apply : parLapply , parSapply , parApply , etc.


parSapply difere do parSapplyLB apenas no fato de não possuir balanceamento de carga nos nós do cluster.


A função stopCluster é usada para parar o cluster criado.


O último comando, dplyr::bind_rows(parallel.kw) combinamos o objeto parallel.kw obtido usando parSapplyLB em uma tabela.


Para Linux, o parallel possui funções separadas: mclapply , mcmapply , mcMap . Muitas vezes, neste sistema operacional, os comandos são executados mais rapidamente e o código se torna mais compacto.


Código 9: Solução usando mclapply para Linux
 library(parallel) library(tictic) library(dplyr) library(ryandexdirect) tic() mclapply.kw <- mclapply(logins, FUN = function(x) { #      #      X yadirGetKeyWords(Login = x) %>% mutate(login = x) }, mc.cores = 4) toc() 

Ao usar essas funções, não há necessidade de iniciar o cluster usando o makeCluster . o número de nós que você especifica usando o argumento mc.cores . Também não é necessário conectar pacotes e exportar objetos; essas operações são executadas automaticamente.


Pacote future


Uma das abordagens mais modernas para programação assíncrona em R.


Um código que, paralelamente, resolverá nosso problema com a ajuda do future é bastante complicado de entender. Portanto, vamos analisar seu trabalho em um exemplo mais simples, solicitaremos uma lista de palavras-chave de uma conta.


Código 10: O exemplo mais simples de uso do pacote futuro
 library(future) #    plan(multiprocess) #      #    future.kw <- future({yadirGetKeyWords(Login = logins[4])}, packages = "ryandexdirect", globals = "logins") #     resolved(future.kw) #     future.result.1 <- value(future.kw) 

Vamos tentar descobrir o exemplo do código 10 . A função de plan permite definir e alterar o modo de execução das expressões fornecidas, eis as principais:


  • sequencial - Este é o modo usual de operação R. Os comandos são executados sequencialmente na sessão atual;
  • multisessão - modo paralelo, os comandos serão executados nas sessões em execução em segundo plano na máquina atual, enquanto sua sessão de trabalho não será bloqueada;
  • cluster - modo paralelo, os comandos serão executados na máquina atual ou remota, semelhante à maneira como é implementada no pacote parallel .

Todo o pacote future é baseado na execução de comandos em processos em segundo plano sem bloquear a sessão atual. Executar a execução dos comandos segue a função com o mesmo nome future , portanto, quando executamos o comando:


 future({yadirGetKeyWords(Login = logins[4])}, packages = "ryandexdirect", globals = "logins") 

Nossa sessão atual em R não é bloqueada e o comando é executado em segundo plano, executando outra sessão R.


Você pode verificar o estado atual do processo de execução de uma determinada expressão usando a função resolved . Finalmente, a função value é usada para obter o resultado da execução future . Se você executar a função value mais cedo do que sua execução future em uma sessão paralela, a sessão de trabalho atual será bloqueada até que a expressão da sessão paralela seja concluída.


O exemplo de trabalho mais avançado é o uso do future conjunto com promises .


Código 11: Exemplo de compartilhamento de pacotes de `futuro` e` promessas`
 library(future) library(promises) #    plan(multiprocess) #      #    future.kw <- future({suppressMessages( yadirGetKeyWords(Login = logins[4]))}, packages = "ryandexdirect", globals = "logins") %...>% #     future, #      nrow() %...>% paste("words loaded") %...>% print() 

O pacote promises fornece um conjunto de operadores de pipeline que complementam perfeitamente a funcionalidade future .


No exemplo do Código 11 , em segundo plano, iniciamos o processo de download de palavras-chave de uma conta de publicidade. Além disso, o operador do pipeline %...>% sem bloquear a sessão de trabalho aguarda future do future e executa as operações restantes. Como resultado da execução do código, após a conclusão de trabalhos future , o número de palavras-chave da conta especificada será exibido no console:


 [1] "1855 words loaded" 

No final do artigo, um exemplo mais ilustrativo de um monte de future e promises será demonstrado.

Por padrão, o próprio pacote future exporta todo o espaço de trabalho para cada sessão em execução paralela, mas você mesmo pode especificar uma lista de objetos para exportação usando o argumento global .


Para conectar pacotes ao future deve passar um vetor contendo seus nomes para o argumento packages .


Agora, voltando à nossa tarefa, o seguinte exemplo de código no modo paralelo carregará uma lista de palavras-chave de 4 contas:


Código 12: Um exemplo de solução de um problema usando o pacote futuro
 library(future) library(tictoc) #   plan("multisession", workers = 4) tic() #   futs <- lapply(logins, #      function(i) #        #   future({ yadirGetKeyWords(Login = i) %>% mutate(login = i) }, packages = c("ryandexdirect", "dplyr"))) completed <- sapply(futs, resolved) #    kw <- lapply(futs, value) #    toc() #    #     result.future <- dplyr::bind_rows(kw) 

Tempo de 14.83 sec elapsed : 14.83 sec elapsed


Para baixar uma lista de palavras-chave no modo multithread de todas as contas de publicidade listadas nos logins de vetores , é necessário executar um future separado em segundo plano. No exemplo de código 12, implementamos isso usando a função lapply .


O resultado do trabalho lapply é uma lista de future lançados. Você pode verificar o status de cada um usando o sapply(futs, resolved) , que retornará um vetor lógico em que TRUE significa que o future cumprido e FALSE que o future está em andamento no momento.


Para obter resultados de cada future , após a conclusão do trabalho, usamos o lapply(futs, value) .


: result.future <- dplyr::bind_rows(kw) .


future


, (
future ), .


future.apply

future.apply future , .


13: future.apply
 library(future.apply) library(tictoc) #    plan("multisession", workers = 4) tic() #   kw.future.apply <- future_sapply(logins, #    ,   function(x) { #     yadirGetKeyWords(Login = x) %>% mutate(login = x) }, simplify = FALSE, #    #   future.packages = c("ryandexdirect", "dplyr"), future.globals = TRUE ) toc() #    

: 17.28 sec elapsed


13 , future.apply future , .


4 : plan("multisession", workers = 4) .


future_sapply logins . I.e. , , sapply , .


future_sapply future.packages . future.globals . , .


furrr


future . purrr , furrr .


14: furrr
 library(furrr) library(tictoc) #   cl <- parallel::makeCluster(4) plan(cluster, workers = cl) tic() #   furrr.kw <- future_map(logins, ~ #   function(.x) yadirGetKeyWords(Login = .x) %>% mutate(login = .x), .options = future_options(packages = c("ryandexdirect", "dplyr"), globals = c())) toc() #    #      result.furrr <-dplyr::bind_rows(furrr.kw) 

: 15.45 sec elapsed


furrr purrr . purrr , .


.options . .options future_options , .


14 packages globals :


 .options = future_options(packages = c("ryandexdirect", "dplyr"), globals = c()) 


rbenchmark .


, future promises . .


, 20 4 () .


= (T[1] * 20) + (T[2] * 20) + (T[N] * 20)


15: future promises
 library(furrr) library(parallel) library(dplyr) library(future) library(ryandexdirect) library(tictoc) library(rbenchmark) #   logins <- c("login1", "login2", "login3", "login4") #        #        par par.furrr <- function(logins) { cl <- parallel::makeCluster(4) plan(cluster, workers = cl) furrr.kw <- future_map(logins, ~ yadirGetKeyWords(Login = .x) %>% mutate(login = .x), .options = future_options(packages = c("ryandexdirect", "dplyr"), globals = c())) result.furrr <-dplyr::bind_rows(furrr.kw) } par.future <- function(logins) { plan("multisession", workers = 4) futs <- lapply(logins, function(i) future({ yadirGetKeyWords(Login = i) %>% mutate(login = i) }, packages = c("ryandexdirect", "dplyr"))) completed <- sapply(futs, resolved) kw <- lapply(futs, value) result.future <- dplyr::bind_rows(kw) } par.future.apply <- function(logins) { plan("multisession", workers = 4) kw.future.apply <- future_sapply(logins, function(x) { yadirGetKeyWords(Login = x) %>% mutate(login = x) }, simplify = FALSE, future.packages = c("ryandexdirect", "dplyr"), future.globals = TRUE ) result.future.apply <- dplyr::bind_rows(kw.future.apply) } par.parallel <- function(logins) { cl <- parallel::makeCluster(4) clusterExport(cl = cl, varlist = "logins") clusterEvalQ(cl = cl, { library(ryandexdirect) library(dplyr) } ) parallel.kw <- parSapplyLB(cl = cl, X = logins, FUN = function(x) { yadirGetKeyWords(Login = x) %>% mutate(login = x) }, simplify = F) stopCluster(cl) result.parallel <- dplyr::bind_rows(parallel.kw) } #          seq seq.apply <- function(logins) { kw.sapply <- sapply( logins, function(x) { yadirGetKeyWords(Login = x) %>% mutate(login = x) }, simplify = FALSE ) result.sapply <- do.call("rbind", kw.sapply) } seq.purrr <- function(logins) { kw.purrr <- map_df( logins, ~ { yadirGetKeyWords(Login = .x) %>% mutate(login = .x) } ) result.purrr <- do.call("rbind", kw.purrr) } #       rbenchmark #   future + promises #  ,       #          plan(list(tweak(multisession, workers = 2), tweak(multisession, workers = 4))) tic() speed.test <- future({ #          within(benchmark(furrr = par.furrr(logins), future = par.future(logins), future.apply = par.future.apply(logins), parallel = par.parallel(logins), apply = seq.apply(logins), purrr = seq.purrr(logins), replications = c(20), columns = c('test', 'replications', 'elapsed'), order = c('elapsed', 'test')), { average = round(elapsed/replications, 2) }) }, packages = c("dplyr", "ryandexdirect", "rbenchmark", "parallel", "purrr", "future", "promises", "furrr", "future.apply"), globals = c("logins", "par.furrr", "par.future", "par.future.apply", "par.parallel", "seq.apply", "seq.purrr")) %...>% print() %...T>% toc() message("My Session is not blocked") 

3370 , .. .


. , future , promises , .


, . "My Session is not blocked", , , .. .


promises :


  • %...>%%>% , . I.e. , resolved , future , value . , print .
  • %...T>%%T>% , , . , , , .. .. print , , .
  • %...T!% — .

15 plan tweak ( plan(list(tweak(multisession, workers = 2), tweak(multisession, workers = 4))) ), , 2 , future 4 .


:


 My Session is not blocked test replications elapsed average 4 parallel 20 393.02 19.65 1 furrr 20 402.09 20.10 2 future 20 431.19 21.56 3 future.apply 20 432.29 21.61 5 apply 20 847.77 42.39 6 purrr 20 864.19 43.21 3370.55 sec elapsed 

imagem


, parallel , . furrr , future future.apply .


1 , , . , API . .


, 4 , .


Conclusão


R, API.


, API . " R , 1" .


:


  • doSNOW / doParallel + foreach
  • future + promises
  • future.apply / furrr
  • parallel

, , .


, R .

Source: https://habr.com/ru/post/pt448404/


All Articles