O uso de R para tarefas utilitárias

Uma boa ferramenta + a disponibilidade de habilidades para trabalhar com ela, alcançada através da prática, permite que você resolva de maneira fácil e elegante muitas tarefas atípicas "semelhantes". Abaixo estão alguns exemplos semelhantes. Estou certo de que muitos podem expandir esta lista.


É uma continuação de publicações anteriores .


Análise de Log de Aplicativo


Bastante popular é a tarefa de realizar cálculos analíticos com base nos logs do aplicativo. Por exemplo, realize uma análise das ações do usuário e estime indicadores de previsão ou teste hipóteses. Você pode seguir a versão clássica e aumentar a pilha ELK ou algo semelhante (recentemente, o Splunk abandonou os sistemas disponíveis na Rússia). Mas você pode pensar um pouco e rapidamente fazer tudo no R. Rapidamente em todos os sentidos, tanto na implementação quanto no tempo de processamento.


Mas existem vários recursos ao resolver um problema semelhante:


  1. Normalmente, os arquivos de log são gravados no formato log4j clássico: registro de data e hora, importância, tipo de subsistema, corpo da mensagem.
  2. O registro de data e hora pode conter eventos com uma resolução de milissegundos, que deve ser preservada para a precisão das análises subsequentes. Milissegundos podem gravar sem cumprir a ISO 8601.
  3. O corpo da mensagem é uma entidade praticamente não estruturada. Os desenvolvedores escrevem tudo o que consideram necessário lá, sem se limitar a nenhum formato de apresentação.
  4. Às vezes, o corpo da mensagem é multilinhas, por exemplo, a saída da pilha de chamadas java ou o pacote de troca entre sistemas xml. É necessário reconstruir gravações com várias linhas em uma (um marcador de data e hora é um sinal do início da gravação).
  5. Vários atriutes podem ser externos ao conteúdo e precisam ser obtidos de uma maneira diferente; por exemplo, o ID do objeto pode ser codificado no nome do arquivo de log.
  6. Os logs na forma de arquivos podem ter vários megabytes ou centenas de gigabytes.
  7. A tarefa é muito bem paralela.

De fato, a tarefa pode ser dividida em 2 etapas:


  • pré-processamento de dados brutos;
  • análises subsequentes.

O conteúdo da última etapa é determinado pela área de assunto e pelas tarefas de negócios, R é ideal para esta etapa. Muitas pessoas não sabem, mas o primeiro passo também pode ser resolvido facilmente com R. Além disso, dependendo do tamanho dos arquivos de log, um resultado de pré-processamento parcialmente estruturado adequado para análises adicionais pode ser adicionado aos arquivos e ao banco de dados. Terabytes moem um ou dois.


Apenas um exemplo de código:
 library(readr) library(tidyverse) library(magrittr) library(stringi) library(fs) library(glue) library(RClickhouse) library(DBI) library(anytime) library(tictoc) library(iterators) library(foreach) library(doParallel) library(futile.logger) library(re2r) library(data.table) library(future) library(doFuture) common_logname <- "DEV_log_parser.log" table_name <- "DEV_LOGS" flog.appender(appender.file(common_logname)) flog.threshold(INFO) flog.info("Start batch processing") oneTimeProcessing <- function(f_iter, log_type = c("app", "system")) { log_type <- match.arg(log_type) checkmate::assertNames(names(f_iter), permutation.of = c("fname", "short_fname", "location", "wk", "size", "id")) cfg <- list(app = list(db_table = "DEV_APP_LOGS"), system = list(db_table = "DEV_LOGS")) #   data <- readr::read_lines(file = f_iter$fname, progress = FALSE) log_df <- setDT(tibble::enframe(data, name = NULL)) %>% .[, log_line_start := re2r::re2_detect(value, pattern = "^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}", parallel = F)] %>% .[, log_line_number := cumsum(log_line_start)] %>% .[, body := stri_c(value, collapse = "\n"), by = log_line_number] %>% .[, `:=`(value = NULL, log_line_start = NULL, log_line_number = NULL)] %>% tibble::as_tibble() %>% #  body = character(0)      0  #      POSIXct tidyr::extract(col = "body", into = c("timestamp", "tz", "level", "module", "class", "message"), # tz   (  DEV),      ( DEV) regex = "^(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}:\\d+([+-]\\d+)?) (.*?) <(.*?)> \\[(.*?)\\] (?s:(.*))$", case_insensitive = TRUE, ignore.case = TRUE) %>% #     ISO         (   ?) #  ISO 8601 (https://en.wikipedia.org/wiki/ISO_8601) mutate_at("timestamp", re2r::re2_replace, # tz   (  DEV),      ( DEV) pattern = "(.*) (\\d{2}:\\d{2}:\\d{2}):(\\d+([+-]\\d+)?)", replacement = "\\1T\\2.\\3") %>% mutate_at("timestamp", lubridate::as_datetime, tz = "Europe/Moscow") %>% #    mutate(location = f_iter$location, wk = f_iter$wk) # TRUNCATE  CH    ,           #    CH, ms    (timestamp %% 1) conn <- DBI::dbConnect(RClickhouse::clickhouse(), host = "10.0.0.1", db = "DEV_LOGS") # m <- DBI::dbExecute(conn, glue("ALTER TABLE {table_name}")) write_res <- log_df %>% mutate(ms = (as.numeric(timestamp) %% 1) * 1000) %>% select(location, wk, timestamp, ms, level, module, class, message) %>% #            DBI::dbWriteTable(conn, cfg[[log_type]][["db_table"]], ., append = TRUE) DBI::dbDisconnect(conn) #       res <- tibble::tibble(id = f_iter$id, lines = nrow(log_df), min_t = min(log_df$timestamp), max_t = max(log_df$timestamp), write_res) rm(data, log_df) return(res) } #    tic("Batch processing") #    gc(full = TRUE) nworkers <- parallel::detectCores() - 1 registerDoFuture() # future::plan(multiprocess) # future::plan(multisession) future::plan(multisession, workers = nworkers) # future::plan(sequential) #  ~  #      CH #   ------------------ fnames_tbl <- here::here("raw_data") %>% fs::dir_ls(recurse = TRUE, glob = "*dev_app*.gz") %>% enframe(name = "fname") %>% #         mutate(short_fname = as.character(fs::path_rel(fname, start = "./raw_data"))) %>% select(-value) %>% mutate(size = fs::file_size(fname)) %>% tidyr::extract(col = "short_fname", into = c("location", "wk"), regex = "^([^/]+)/wk(\\d+)", remove = FALSE) %>% arrange(size) %>% mutate(id = paste(format(row_number(), justify = "r", width = 4), "/", n())) %>% #   ~ N  mutate(chunk = (row_number() %% nworkers + 1)) %>% #    ,  dopar    arrange(chunk) start_time <- Sys.time() stat_list <- foreach(it = iter(fnames_tbl, by = "row"), .export = c("start_time"), .verbose = TRUE, .inorder = FALSE, .errorhandling = "remove") %dopar% { #   flog.appender(appender.file(common_logname)) # flog.info(capture.output(gc(verbose = TRUE))) res <- oneTimeProcessing(it, log_type = "app") flog.info(glue("Step {it$id} finished.", "Elapsed {round(difftime(Sys.time(), start_time, units = 'mins'), digits = 2)} min(s) ----------->", .sep = " ")) return(res) } flog.info("Load finished") #    -------------- #    ,    future::plan(sequential) gc(reset = TRUE, full = TRUE) flog.info(capture.output(toc())) #     ------------- logstat_tbl <- stat_list %>% dplyr::bind_rows() %>% #    left_join(fnames_tbl, by = "id") %>% #          mutate(delta_t = as.numeric(difftime(max_t, min_t, units = "mins"))) %>% arrange(min_t) write_delim(logstat_tbl, here::here("output", "DEV_parse_stat.csv.gz"), delim = ";") # ,     ? if(nrow(logstat_tbl) < nrow(fnames_tbl)){ flog.error("!!!!!!! Not all workers were executed successfully !!!!!!!!!") } 

Este exemplo de código contém conceitos básicos como paralelização, processamento de tempo considerando milissegundos, salvando no banco de dados, contabilizando registros de várias linhas, resumindo os resultados do trabalho, usando atributos externos, comparações preliminares e escolhendo as funções e pacotes re2r ( re2r , por exemplo; isso A biblioteca do Google para trabalhar com os regulares é a mais rápida e é muito usada, onde, use o mesmo ClickHouse mencionado no código { bencmark , alguns operadores podem ter o ILV fechado}). Mas o código não afirma ser o ideal, pois é apenas uma ação única no pré-processamento de dados. Faz rápido e corretamente, bem, ok. Para outra tarefa semelhante, corrigimos, levando em consideração o resp. dados de entrada.


Será incrivelmente mais rápido em termos de tempo para obter o resultado final em outros idiomas? A questão está aberta. Versões paralelas com python , perl , awk não mostraram diferenças marcantes. É possível que o guru em python obtenha melhores resultados, mas não esqueça que esta é apenas uma tarefa periódica "única" que passa.


Restaurando a ordem nas fotos


Após uma viagem com vários dispositivos em mãos, é necessário coletar todas as fotos juntas e, de alguma forma, organizá-las antes do processamento. Uma das melhores opções é nomear os arquivos pela data da gravação ( YYYY-MM-DD hh_mm_ss ), garantindo assim a ordem da foto na seta do tempo. Os atributos Exif ajudam a resolver esse problema em uma etapa.


E isso também pode ser feito usando R em algumas "linhas". exifr e exifr para ajudar.


  • fez uma lista de arquivos;
  • atributos retirados;
  • arquivos copiados com renomeação acc. com os atributos certos.

Por uma questão de fato, a tarefa foi reduzida para a anterior, apenas os atributos são coletados não pelo nome do arquivo, mas por seus atributos exif e, no processamento, há simplesmente a cópia do arquivo com a renomeação. O esqueleto do roteiro e a lógica do trabalho permanecem inalterados.


Exemplo de código manual rápido:
 library(tidyverse) library(magrittr) library(stringi) library(lubridate) library(stringi) library(fs) library(glue) library(futile.logger) library(anytime) library(tictoc) library(bench) library(exifr) library(tictoc) input_path <- "S:/ " %>% fs::path_real() #       output_path <- "S:/ " %>% fs::path_real() i_fnames <- input_path %>% fs::dir_ls(recurse = TRUE, regexp = "(JPG|jpg)$") raw_df <- read_exif(i_fnames, tags = c("SourceFile", "Model", "DateTimeOriginal")) %>% #      base64, ,    mutate(tmp = sub("^base64:(.*)", "\\1", SourceFile)) %>% mutate(i_fname = purrr::map_chr(tmp, ~rawToChar(jsonlite::base64_dec(.)))) %>% mutate(tm = anytime::anytime(DateTimeOriginal)) %>% select(i_fname, DateTimeOriginal, model = Model, tm) #         clean_df <- raw_df %>% mutate(timestamp = case_when( model == 'iPhone ...' ~ tm, model == 'Nikon ...' ~ tm - lubridate::minutes(56), model == 'Samsung ...' ~ tm - lubridate::minutes(62), TRUE ~ tm) ) %>% mutate_at("i_fname", fs::path_real) %>% mutate(fname = format(timestamp, format = '%Y-%m-%d %H_%M_%S')) %>% #  ,     (""),      mutate(fname = dplyr::coalesce(fname, fs::path_ext_remove(fs::path_file(i_fname)))) %>% #  ,         group_by(fname) %>% mutate(n = n(), idx = row_number()) %>% ungroup() %>% #        mutate(fname = case_when( n > 1 ~ stri_c(fname, '_', idx), TRUE ~ fname ) ) %>% mutate(o_fname = fs::path(!!output_path, paste0(fname, ".jpg"))) #     janitor::get_dupes(clean_df, o_fname) #   tic(" ") clean_df %$% # purrr::walk2(i_fname, o_fname, ~print(glue("{.x} -> {.y}"))) purrr::walk2(i_fname, o_fname, ~fs::file_copy(.x, .y, overwrite = TRUE)) toc() #              

Por que exifr ? Porque é um invólucro para o poderoso utilitário de plataforma cruzada ExifTool .


Talvez a tarefa pareça sintética, o que é difícil de argumentar, uma vez que existem muitos utilitários e GUIs diferentes para trabalhar com Exif e renomear, mas há uma nuance. Nem todos os dispositivos podem pegar o fuso horário alterado e ajustar a hora (câmeras, por exemplo, com que frequência o usuário da câmera define a hora exata?). Portanto, durante a renomeação, você também precisa alterar os carimbos de hora com base na fonte.


Conclusão


Existem muitos problemas semelhantes, muitos deles podem ser resolvidos com a ajuda de R também.


Publicação anterior - “Crianças, Matemática e R” .

Source: https://habr.com/ru/post/pt464849/


All Articles