Die Verwendung von R für Dienstprogrammaufgaben

Ein gutes Werkzeug + die Verfügbarkeit von Fähigkeiten, um damit zu arbeiten, was durch Übung erreicht wird, ermöglicht es Ihnen, viele verschiedene "ähnliche" atypische Aufgaben einfach und elegant zu lösen. Im Folgenden finden Sie einige ähnliche Beispiele. Ich bin sicher, dass viele diese Liste erweitern können.


Es ist eine Fortsetzung früherer Veröffentlichungen .


Anwendungsprotokollanalyse


Sehr beliebt ist die Aufgabe, analytische Berechnungen basierend auf Anwendungsprotokollen durchzuführen. Führen Sie beispielsweise eine Analyse der Benutzeraktionen durch und schätzen Sie Prognoseindikatoren oder testen Sie Hypothesen. Sie können der klassischen Version folgen und den ELK-Stack oder ähnliches erhöhen (kürzlich hat Splunk die in Russland verfügbaren Systeme eingestellt). Aber Sie können ein wenig nachdenken und schnell alles auf R tun. Schnell in jeder Hinsicht, sowohl in der Implementierung als auch in der Verarbeitungszeit.


Bei der Lösung eines ähnlichen Problems gibt es jedoch eine Reihe von Funktionen:


  1. In der Regel werden Protokolldateien im klassischen log4j Format geschrieben: Zeitstempel, Wichtigkeit, Typ des Subsystems, Nachrichtentext.
  2. Der Zeitstempel kann Ereignisse mit einer Millisekundenauflösung enthalten, die für die Genauigkeit nachfolgender Analysen erhalten bleiben müssen. Millisekunden können ohne Einhaltung von ISO 8601 schreiben.
  3. Der Nachrichtentext ist eine praktisch unstrukturierte Einheit. Entwickler schreiben dort alles, was sie für notwendig halten, ohne sich auf Präsentationsformate zu beschränken.
  4. Manchmal ist der Nachrichtentext mehrzeilig, z. B. die Ausgabe des Java-Aufrufstapels oder des XML-Intersystem-Austauschpakets. Es ist notwendig, mehrzeilige Datensätze zu einem zu rekonstruieren (eine Zeitstempelmarkierung ist ein Zeichen für den Beginn der Aufzeichnung).
  5. Eine Reihe von Attributen kann außerhalb des Inhalts liegen und muss auf andere Weise abgerufen werden. Beispielsweise kann die ID des Objekts im Namen der Protokolldatei codiert werden.
  6. Protokolle in Form von Dateien können mehrere Megabyte oder Hunderte von Gigabyte betragen.
  7. Die Aufgabe ist sehr gut parallel.

Tatsächlich kann die Aufgabe in zwei Schritte unterteilt werden:


  • Vorverarbeitung von Rohdaten;
  • nachfolgende Analyse.

Der Inhalt des letzten Schritts wird durch den Themenbereich und die Geschäftsaufgaben bestimmt, R ist für diesen Schritt ideal geeignet. Viele Menschen wissen es nicht, aber der erste Schritt kann auch ganz einfach mit R gelöst werden. Abhängig von der Größe der Protokolldateien kann ein teilweise strukturiertes Vorverarbeitungsergebnis, das für die weitere Analyse geeignet ist, sowohl zu Dateien als auch zur Datenbank hinzugefügt werden. Terabyte mahlen ein oder zwei.


Nur ein Beispielcode:
 library(readr) library(tidyverse) library(magrittr) library(stringi) library(fs) library(glue) library(RClickhouse) library(DBI) library(anytime) library(tictoc) library(iterators) library(foreach) library(doParallel) library(futile.logger) library(re2r) library(data.table) library(future) library(doFuture) common_logname <- "DEV_log_parser.log" table_name <- "DEV_LOGS" flog.appender(appender.file(common_logname)) flog.threshold(INFO) flog.info("Start batch processing") oneTimeProcessing <- function(f_iter, log_type = c("app", "system")) { log_type <- match.arg(log_type) checkmate::assertNames(names(f_iter), permutation.of = c("fname", "short_fname", "location", "wk", "size", "id")) cfg <- list(app = list(db_table = "DEV_APP_LOGS"), system = list(db_table = "DEV_LOGS")) #   data <- readr::read_lines(file = f_iter$fname, progress = FALSE) log_df <- setDT(tibble::enframe(data, name = NULL)) %>% .[, log_line_start := re2r::re2_detect(value, pattern = "^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}", parallel = F)] %>% .[, log_line_number := cumsum(log_line_start)] %>% .[, body := stri_c(value, collapse = "\n"), by = log_line_number] %>% .[, `:=`(value = NULL, log_line_start = NULL, log_line_number = NULL)] %>% tibble::as_tibble() %>% #  body = character(0)      0  #      POSIXct tidyr::extract(col = "body", into = c("timestamp", "tz", "level", "module", "class", "message"), # tz   (  DEV),      ( DEV) regex = "^(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}:\\d+([+-]\\d+)?) (.*?) <(.*?)> \\[(.*?)\\] (?s:(.*))$", case_insensitive = TRUE, ignore.case = TRUE) %>% #     ISO         (   ?) #  ISO 8601 (https://en.wikipedia.org/wiki/ISO_8601) mutate_at("timestamp", re2r::re2_replace, # tz   (  DEV),      ( DEV) pattern = "(.*) (\\d{2}:\\d{2}:\\d{2}):(\\d+([+-]\\d+)?)", replacement = "\\1T\\2.\\3") %>% mutate_at("timestamp", lubridate::as_datetime, tz = "Europe/Moscow") %>% #    mutate(location = f_iter$location, wk = f_iter$wk) # TRUNCATE  CH    ,           #    CH, ms    (timestamp %% 1) conn <- DBI::dbConnect(RClickhouse::clickhouse(), host = "10.0.0.1", db = "DEV_LOGS") # m <- DBI::dbExecute(conn, glue("ALTER TABLE {table_name}")) write_res <- log_df %>% mutate(ms = (as.numeric(timestamp) %% 1) * 1000) %>% select(location, wk, timestamp, ms, level, module, class, message) %>% #            DBI::dbWriteTable(conn, cfg[[log_type]][["db_table"]], ., append = TRUE) DBI::dbDisconnect(conn) #       res <- tibble::tibble(id = f_iter$id, lines = nrow(log_df), min_t = min(log_df$timestamp), max_t = max(log_df$timestamp), write_res) rm(data, log_df) return(res) } #    tic("Batch processing") #    gc(full = TRUE) nworkers <- parallel::detectCores() - 1 registerDoFuture() # future::plan(multiprocess) # future::plan(multisession) future::plan(multisession, workers = nworkers) # future::plan(sequential) #  ~  #      CH #   ------------------ fnames_tbl <- here::here("raw_data") %>% fs::dir_ls(recurse = TRUE, glob = "*dev_app*.gz") %>% enframe(name = "fname") %>% #         mutate(short_fname = as.character(fs::path_rel(fname, start = "./raw_data"))) %>% select(-value) %>% mutate(size = fs::file_size(fname)) %>% tidyr::extract(col = "short_fname", into = c("location", "wk"), regex = "^([^/]+)/wk(\\d+)", remove = FALSE) %>% arrange(size) %>% mutate(id = paste(format(row_number(), justify = "r", width = 4), "/", n())) %>% #   ~ N  mutate(chunk = (row_number() %% nworkers + 1)) %>% #    ,  dopar    arrange(chunk) start_time <- Sys.time() stat_list <- foreach(it = iter(fnames_tbl, by = "row"), .export = c("start_time"), .verbose = TRUE, .inorder = FALSE, .errorhandling = "remove") %dopar% { #   flog.appender(appender.file(common_logname)) # flog.info(capture.output(gc(verbose = TRUE))) res <- oneTimeProcessing(it, log_type = "app") flog.info(glue("Step {it$id} finished.", "Elapsed {round(difftime(Sys.time(), start_time, units = 'mins'), digits = 2)} min(s) ----------->", .sep = " ")) return(res) } flog.info("Load finished") #    -------------- #    ,    future::plan(sequential) gc(reset = TRUE, full = TRUE) flog.info(capture.output(toc())) #     ------------- logstat_tbl <- stat_list %>% dplyr::bind_rows() %>% #    left_join(fnames_tbl, by = "id") %>% #          mutate(delta_t = as.numeric(difftime(max_t, min_t, units = "mins"))) %>% arrange(min_t) write_delim(logstat_tbl, here::here("output", "DEV_parse_stat.csv.gz"), delim = ";") # ,     ? if(nrow(logstat_tbl) < nrow(fnames_tbl)){ flog.error("!!!!!!! Not all workers were executed successfully !!!!!!!!!") } 

Dieses Codebeispiel enthält grundlegende Konzepte wie Parallelisierung, Zeitverarbeitung unter Berücksichtigung von Millisekunden, Speichern in der Datenbank, Abrechnung mehrzeiliger Datensätze, Zusammenfassung der Arbeitsergebnisse, Verwendung externer Attribute, vorläufiges Benchmarking und Auswahl der optimalen Funktionen und Pakete (z. B. re2r ; dies; Die Google-Bibliothek für die Arbeit mit normalen Bibliotheken ist die schnellste und wird häufig verwendet, wenn Sie dasselbe ClickHouse verwenden, das im Code erwähnt ist ( bencmark , einige Betreiber haben möglicherweise ILV geschlossen). Der Code erhebt jedoch keinen Anspruch auf Idealheit, da es sich nur um eine einmalige Aktion zur Datenvorverarbeitung handelt. Es geht schnell und richtig, na gut. Für eine andere ähnliche Aufgabe korrigieren wir unter Berücksichtigung bzw. Eingabedaten.


Wird es zeitlich auffallend schneller sein, das Endergebnis in anderen Sprachen zu erhalten? Die Frage ist offen. Parallele Versionen mit python , perl , awk zeigten keine auffälligen Unterschiede. Es ist möglich, dass der Guru in python bessere Ergebnisse erzielt, aber vergessen Sie nicht, dass dies nur eine vorübergehende periodische "einmalige" Aufgabe ist.


Ordnung in Fotos wiederherstellen


Nach einer Reise mit mehreren Geräten müssen Sie alle Fotos zusammen sammeln und vor der weiteren Verarbeitung irgendwie anordnen. Eine der besten Optionen besteht darin, die Dateien nach dem Aufnahmedatum ( YYYY-MM-DD hh_mm_ss ) zu YYYY-MM-DD hh_mm_ss , um die Reihenfolge des Fotos auf dem Zeitpfeil sicherzustellen. Exif-Attribute helfen, dieses Problem in einem Schritt zu lösen.


Und dies kann auch mit R in „ein paar Zeilen“ erfolgen. exifr und exifr helfen.


  • machte eine Liste von Dateien;
  • Attribute herausgezogen;
  • kopierte Dateien mit Umbenennung gem. mit den richtigen Attributen.

Tatsächlich wurde die Aufgabe auf die vorherige reduziert, nur Attribute werden nicht nach dem Dateinamen, sondern nach den exif-Attributen erfasst, und bei der Verarbeitung wird die Datei einfach mit Umbenennen kopiert. Das Grundgerüst des Drehbuchs und die Logik der Arbeit bleiben unverändert.


Beispiel für einen schnellen Handcode:
 library(tidyverse) library(magrittr) library(stringi) library(lubridate) library(stringi) library(fs) library(glue) library(futile.logger) library(anytime) library(tictoc) library(bench) library(exifr) library(tictoc) input_path <- "S:/ " %>% fs::path_real() #       output_path <- "S:/ " %>% fs::path_real() i_fnames <- input_path %>% fs::dir_ls(recurse = TRUE, regexp = "(JPG|jpg)$") raw_df <- read_exif(i_fnames, tags = c("SourceFile", "Model", "DateTimeOriginal")) %>% #      base64, ,    mutate(tmp = sub("^base64:(.*)", "\\1", SourceFile)) %>% mutate(i_fname = purrr::map_chr(tmp, ~rawToChar(jsonlite::base64_dec(.)))) %>% mutate(tm = anytime::anytime(DateTimeOriginal)) %>% select(i_fname, DateTimeOriginal, model = Model, tm) #         clean_df <- raw_df %>% mutate(timestamp = case_when( model == 'iPhone ...' ~ tm, model == 'Nikon ...' ~ tm - lubridate::minutes(56), model == 'Samsung ...' ~ tm - lubridate::minutes(62), TRUE ~ tm) ) %>% mutate_at("i_fname", fs::path_real) %>% mutate(fname = format(timestamp, format = '%Y-%m-%d %H_%M_%S')) %>% #  ,     (""),      mutate(fname = dplyr::coalesce(fname, fs::path_ext_remove(fs::path_file(i_fname)))) %>% #  ,         group_by(fname) %>% mutate(n = n(), idx = row_number()) %>% ungroup() %>% #        mutate(fname = case_when( n > 1 ~ stri_c(fname, '_', idx), TRUE ~ fname ) ) %>% mutate(o_fname = fs::path(!!output_path, paste0(fname, ".jpg"))) #     janitor::get_dupes(clean_df, o_fname) #   tic(" ") clean_df %$% # purrr::walk2(i_fname, o_fname, ~print(glue("{.x} -> {.y}"))) purrr::walk2(i_fname, o_fname, ~fs::file_copy(.x, .y, overwrite = TRUE)) toc() #              

Warum exifr ? Weil es ein Wrapper für das leistungsstarke plattformübergreifende Dienstprogramm ExifTool .


Vielleicht sieht die Aufgabe synthetisch aus, was schwer zu diskutieren ist, da es viele verschiedene Dienstprogramme und GUIs für die Arbeit mit Exif und das Umbenennen gibt, aber es gibt eine Nuance. Nicht alle Geräte können die geänderte Zeitzone erfassen und die Zeit anpassen (Kameras, z. B. wie oft stellt der Benutzer der Kamera die genaue Zeit darauf ein?). Daher müssen Sie beim Umbenennen auch die Zeitstempel basierend auf der Quelle verschieben.


Fertigstellung


Es gibt viele ähnliche Probleme, von denen viele auch mit Hilfe von R gelöst werden können.


Vorherige Veröffentlichung - "Kinder, Mathematik und R" .

Source: https://habr.com/ru/post/de464849/


All Articles