L'utilisation de R pour les tâches utilitaires

Un bon outil + la disponibilité des compétences pour travailler avec lui, qui est obtenue grâce à la pratique, vous permet de résoudre facilement et avec élégance de nombreuses tâches atypiques "similaires" différentes. Voici quelques exemples similaires. Je suis sûr que beaucoup peuvent étendre cette liste.


Il s'agit d'une continuation des publications précédentes .


Analyse du journal des applications


La tâche consistant à effectuer des calculs analytiques sur la base des journaux d'application est assez populaire. Par exemple, effectuez une analyse des actions des utilisateurs et estimez des indicateurs de prévision, ou testez des hypothèses. Vous pouvez suivre la version classique et augmenter la pile ELK ou similaire (récemment, Splunk a abandonné les systèmes disponibles en Russie). Mais vous pouvez réfléchir un peu et tout faire rapidement sur R. Rapidement dans tous les sens, à la fois dans la mise en œuvre et dans le temps de traitement.


Mais il existe un certain nombre de fonctionnalités pour résoudre un problème similaire:


  1. En règle générale, les fichiers journaux sont écrits au format log4j classique: horodatage, importance, type de sous-système, corps du message.
  2. L'horodatage peut contenir des événements avec une résolution en millisecondes, qui doivent être conservés pour la précision des analyses ultérieures. Les millisecondes peuvent écrire sans se conformer à ISO 8601.
  3. Le corps du message est une entité pratiquement non structurée. Les développeurs y écrivent tout ce qu'ils jugent nécessaire, sans se limiter à aucun format de présentation.
  4. Parfois, le corps du message est multiligne, par exemple, la sortie de la pile d'appels Java ou le package d'échange intersystèmes xml. Il est nécessaire de reconstruire des enregistrements multi-lignes en un seul (un marqueur d'horodatage est un signe du début de l'enregistrement).
  5. Plusieurs atriutes peuvent être externes au contenu et doivent être obtenus d'une manière différente, par exemple, l'id de l'objet peut être encodé dans le nom du fichier journal.
  6. Les journaux sous forme de fichiers peuvent être de plusieurs mégaoctets ou centaines de gigaoctets.
  7. La tâche est très bien parallèle.

En fait, la tâche peut être divisée en 2 étapes:


  • prétraitement des données brutes;
  • analyses ultérieures.

Le contenu de la dernière étape est déterminé par le domaine et les tâches commerciales, R est idéalement adapté à cette étape. Beaucoup de gens ne le savent pas, mais la première étape peut également être résolue assez facilement avec R. De plus, en fonction de la taille des fichiers journaux, un résultat de prétraitement partiellement structuré adapté à une analyse plus approfondie peut être ajouté aux fichiers ainsi qu'à la base de données. Les téraoctets en broient un ou deux.


Juste un exemple de code:
 library(readr) library(tidyverse) library(magrittr) library(stringi) library(fs) library(glue) library(RClickhouse) library(DBI) library(anytime) library(tictoc) library(iterators) library(foreach) library(doParallel) library(futile.logger) library(re2r) library(data.table) library(future) library(doFuture) common_logname <- "DEV_log_parser.log" table_name <- "DEV_LOGS" flog.appender(appender.file(common_logname)) flog.threshold(INFO) flog.info("Start batch processing") oneTimeProcessing <- function(f_iter, log_type = c("app", "system")) { log_type <- match.arg(log_type) checkmate::assertNames(names(f_iter), permutation.of = c("fname", "short_fname", "location", "wk", "size", "id")) cfg <- list(app = list(db_table = "DEV_APP_LOGS"), system = list(db_table = "DEV_LOGS")) #   data <- readr::read_lines(file = f_iter$fname, progress = FALSE) log_df <- setDT(tibble::enframe(data, name = NULL)) %>% .[, log_line_start := re2r::re2_detect(value, pattern = "^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}", parallel = F)] %>% .[, log_line_number := cumsum(log_line_start)] %>% .[, body := stri_c(value, collapse = "\n"), by = log_line_number] %>% .[, `:=`(value = NULL, log_line_start = NULL, log_line_number = NULL)] %>% tibble::as_tibble() %>% #  body = character(0)      0  #      POSIXct tidyr::extract(col = "body", into = c("timestamp", "tz", "level", "module", "class", "message"), # tz   (  DEV),      ( DEV) regex = "^(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}:\\d+([+-]\\d+)?) (.*?) <(.*?)> \\[(.*?)\\] (?s:(.*))$", case_insensitive = TRUE, ignore.case = TRUE) %>% #     ISO         (   ?) #  ISO 8601 (https://en.wikipedia.org/wiki/ISO_8601) mutate_at("timestamp", re2r::re2_replace, # tz   (  DEV),      ( DEV) pattern = "(.*) (\\d{2}:\\d{2}:\\d{2}):(\\d+([+-]\\d+)?)", replacement = "\\1T\\2.\\3") %>% mutate_at("timestamp", lubridate::as_datetime, tz = "Europe/Moscow") %>% #    mutate(location = f_iter$location, wk = f_iter$wk) # TRUNCATE  CH    ,           #    CH, ms    (timestamp %% 1) conn <- DBI::dbConnect(RClickhouse::clickhouse(), host = "10.0.0.1", db = "DEV_LOGS") # m <- DBI::dbExecute(conn, glue("ALTER TABLE {table_name}")) write_res <- log_df %>% mutate(ms = (as.numeric(timestamp) %% 1) * 1000) %>% select(location, wk, timestamp, ms, level, module, class, message) %>% #            DBI::dbWriteTable(conn, cfg[[log_type]][["db_table"]], ., append = TRUE) DBI::dbDisconnect(conn) #       res <- tibble::tibble(id = f_iter$id, lines = nrow(log_df), min_t = min(log_df$timestamp), max_t = max(log_df$timestamp), write_res) rm(data, log_df) return(res) } #    tic("Batch processing") #    gc(full = TRUE) nworkers <- parallel::detectCores() - 1 registerDoFuture() # future::plan(multiprocess) # future::plan(multisession) future::plan(multisession, workers = nworkers) # future::plan(sequential) #  ~  #      CH #   ------------------ fnames_tbl <- here::here("raw_data") %>% fs::dir_ls(recurse = TRUE, glob = "*dev_app*.gz") %>% enframe(name = "fname") %>% #         mutate(short_fname = as.character(fs::path_rel(fname, start = "./raw_data"))) %>% select(-value) %>% mutate(size = fs::file_size(fname)) %>% tidyr::extract(col = "short_fname", into = c("location", "wk"), regex = "^([^/]+)/wk(\\d+)", remove = FALSE) %>% arrange(size) %>% mutate(id = paste(format(row_number(), justify = "r", width = 4), "/", n())) %>% #   ~ N  mutate(chunk = (row_number() %% nworkers + 1)) %>% #    ,  dopar    arrange(chunk) start_time <- Sys.time() stat_list <- foreach(it = iter(fnames_tbl, by = "row"), .export = c("start_time"), .verbose = TRUE, .inorder = FALSE, .errorhandling = "remove") %dopar% { #   flog.appender(appender.file(common_logname)) # flog.info(capture.output(gc(verbose = TRUE))) res <- oneTimeProcessing(it, log_type = "app") flog.info(glue("Step {it$id} finished.", "Elapsed {round(difftime(Sys.time(), start_time, units = 'mins'), digits = 2)} min(s) ----------->", .sep = " ")) return(res) } flog.info("Load finished") #    -------------- #    ,    future::plan(sequential) gc(reset = TRUE, full = TRUE) flog.info(capture.output(toc())) #     ------------- logstat_tbl <- stat_list %>% dplyr::bind_rows() %>% #    left_join(fnames_tbl, by = "id") %>% #          mutate(delta_t = as.numeric(difftime(max_t, min_t, units = "mins"))) %>% arrange(min_t) write_delim(logstat_tbl, here::here("output", "DEV_parse_stat.csv.gz"), delim = ";") # ,     ? if(nrow(logstat_tbl) < nrow(fnames_tbl)){ flog.error("!!!!!!! Not all workers were executed successfully !!!!!!!!!") } 

Cet exemple de code contient des concepts de base tels que la parallélisation, le traitement du temps en tenant compte des millisecondes, l'enregistrement dans la base de données, la comptabilisation des enregistrements sur plusieurs lignes, la synthèse des résultats du travail, l'utilisation des attributs externes, l'analyse comparative préliminaire et le choix des fonctions et des packages re2r ( re2r , par exemple; ce la bibliothèque google pour travailler avec les bibliothèques normales est la plus rapide et est beaucoup utilisée où, prenez le même ClickHouse mentionné dans le code { bencmark , certains opérateurs peuvent avoir ILV fermé}). Mais le code ne prétend pas être idéal, car il ne s'agit que d'une action ponctuelle sur le prétraitement des données. Il le fait rapidement et correctement, eh bien, ok. Pour une autre tâche similaire, nous corrigeons, en tenant compte de resp. données d'entrée.


Sera-t-il incroyablement plus rapide en termes de temps pour obtenir le résultat final dans d'autres langues? La question est ouverte. Les versions parallèles avec python , perl , awk n'ont montré aucune différence frappante. Il est possible que le gourou en python obtienne de meilleurs résultats, mais n'oubliez pas que ce n'est qu'une tâche périodique "ponctuelle" qui passe.


Restauration de l'ordre dans les photos


Après un voyage avec plusieurs appareils à portée de main, vous devez collecter toutes les photos ensemble et les organiser avant de poursuivre le traitement. L'une des meilleures options est de nommer les fichiers par la date de prise de vue ( YYYY-MM-DD hh_mm_ss ), assurant ainsi l'ordre de la photo sur la flèche de temps. Les attributs Exif aident à résoudre ce problème en une seule étape.


Et cela peut également être fait en utilisant R dans un "couple de lignes". exifr et exifr pour vous aider.


  • fait une liste de fichiers;
  • retiré les attributs;
  • fichiers copiés avec renommer acc. avec les bons attributs.

En fait, la tâche a été réduite à la précédente, seuls les attributs sont collectés non pas par le nom du fichier, mais par ses attributs exif, et lors du traitement, il suffit de copier le fichier avec renommer. Le squelette du script et la logique du travail restent inchangés.


Exemple de code de main rapide:
 library(tidyverse) library(magrittr) library(stringi) library(lubridate) library(stringi) library(fs) library(glue) library(futile.logger) library(anytime) library(tictoc) library(bench) library(exifr) library(tictoc) input_path <- "S:/ " %>% fs::path_real() #       output_path <- "S:/ " %>% fs::path_real() i_fnames <- input_path %>% fs::dir_ls(recurse = TRUE, regexp = "(JPG|jpg)$") raw_df <- read_exif(i_fnames, tags = c("SourceFile", "Model", "DateTimeOriginal")) %>% #      base64, ,    mutate(tmp = sub("^base64:(.*)", "\\1", SourceFile)) %>% mutate(i_fname = purrr::map_chr(tmp, ~rawToChar(jsonlite::base64_dec(.)))) %>% mutate(tm = anytime::anytime(DateTimeOriginal)) %>% select(i_fname, DateTimeOriginal, model = Model, tm) #         clean_df <- raw_df %>% mutate(timestamp = case_when( model == 'iPhone ...' ~ tm, model == 'Nikon ...' ~ tm - lubridate::minutes(56), model == 'Samsung ...' ~ tm - lubridate::minutes(62), TRUE ~ tm) ) %>% mutate_at("i_fname", fs::path_real) %>% mutate(fname = format(timestamp, format = '%Y-%m-%d %H_%M_%S')) %>% #  ,     (""),      mutate(fname = dplyr::coalesce(fname, fs::path_ext_remove(fs::path_file(i_fname)))) %>% #  ,         group_by(fname) %>% mutate(n = n(), idx = row_number()) %>% ungroup() %>% #        mutate(fname = case_when( n > 1 ~ stri_c(fname, '_', idx), TRUE ~ fname ) ) %>% mutate(o_fname = fs::path(!!output_path, paste0(fname, ".jpg"))) #     janitor::get_dupes(clean_df, o_fname) #   tic(" ") clean_df %$% # purrr::walk2(i_fname, o_fname, ~print(glue("{.x} -> {.y}"))) purrr::walk2(i_fname, o_fname, ~fs::file_copy(.x, .y, overwrite = TRUE)) toc() #              

Pourquoi exifr ? Parce que c'est un wrapper pour le puissant utilitaire multiplateforme ExifTool .


Peut-être que la tâche semble synthétique, ce qui est difficile à discuter, car il existe de nombreux utilitaires et interfaces graphiques différents pour travailler avec Exif et renommer, mais il y a une nuance. Tous les appareils ne peuvent pas détecter le fuseau horaire modifié et ajuster l'heure (caméras, par exemple, à quelle fréquence l'utilisateur de la caméra définit-il l'heure exacte?), Donc lors du changement de nom, vous devez également modifier les horodatages en fonction de la source.


Achèvement


Il existe de nombreux problèmes similaires, beaucoup d'entre eux peuvent également être résolus à l'aide de R.


Publication précédente - «Enfants, mathématiques et R» .

Source: https://habr.com/ru/post/fr464849/


All Articles