将R用于实用程序任务

通过实践可以获得一个好的工具以及与之合作的技能,这使您可以轻松,优雅地解决许多不同的“类似”非典型任务。 以下是几个类似的示例。 我敢肯定,许多人都可以扩展此列表。


它是以前出版物的延续。


应用程序日志分析


非常流行的任务是基于应用程序日志进行分析计算。 例如,对用户操作进行分析并估计预测指标,或检验假设。 您可以遵循经典版本并提高ELK堆栈等(最近,Splunk退出了俄罗斯可用的系统)。 但是您可以稍作思考,快速完成R上的所有操作。无论是在实现上还是在处理时间上,都可以在各种意义上快速完成。


但是,解决类似问题时,有许多功能:


  1. 通常,日志文件以经典的log4j格式编写:时间戳,重要性,子系统类型,消息正文。
  2. 时间戳记可能包含毫秒级的事件,必须保留这些事件才能确保后续分析的准确性。 不符合ISO 8601的情况下可以写入毫秒。
  3. 消息的主体实际上是非结构化的实体。 开发人员在此处编写他们认为必要的所有内容,而不会局限于任何演示格式。
  4. 有时消息主体是多行的,例如java调用堆栈或xml系统间交换包的输出。 必须将多行记录重构为一个(时间戳标记是记录开始的标志)。
  5. 内容之外可能有许多atriutes,它们需要以不同的方式获得,例如,对象的id可以编码为日志文件的名称。
  6. 文件形式的日志可以是几MB或几百GB。
  7. 该任务非常并行。

实际上,该任务可以分为2个步骤:


  • 原始数据预处理;
  • 后续分析。

最后一步的内容由主题领域和业务任务确定,R非常适合此步骤。 许多人不知道,但是使用R也可以很轻松地解决第一步。此外,根据日志文件的大小,可以将适合进一步分析的部分结构化预处理结果添加到文件以及数据库中。 TB可以磨一两个。


只是一个示例代码:
 library(readr) library(tidyverse) library(magrittr) library(stringi) library(fs) library(glue) library(RClickhouse) library(DBI) library(anytime) library(tictoc) library(iterators) library(foreach) library(doParallel) library(futile.logger) library(re2r) library(data.table) library(future) library(doFuture) common_logname <- "DEV_log_parser.log" table_name <- "DEV_LOGS" flog.appender(appender.file(common_logname)) flog.threshold(INFO) flog.info("Start batch processing") oneTimeProcessing <- function(f_iter, log_type = c("app", "system")) { log_type <- match.arg(log_type) checkmate::assertNames(names(f_iter), permutation.of = c("fname", "short_fname", "location", "wk", "size", "id")) cfg <- list(app = list(db_table = "DEV_APP_LOGS"), system = list(db_table = "DEV_LOGS")) #   data <- readr::read_lines(file = f_iter$fname, progress = FALSE) log_df <- setDT(tibble::enframe(data, name = NULL)) %>% .[, log_line_start := re2r::re2_detect(value, pattern = "^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}", parallel = F)] %>% .[, log_line_number := cumsum(log_line_start)] %>% .[, body := stri_c(value, collapse = "\n"), by = log_line_number] %>% .[, `:=`(value = NULL, log_line_start = NULL, log_line_number = NULL)] %>% tibble::as_tibble() %>% #  body = character(0)      0  #      POSIXct tidyr::extract(col = "body", into = c("timestamp", "tz", "level", "module", "class", "message"), # tz   (  DEV),      ( DEV) regex = "^(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}:\\d+([+-]\\d+)?) (.*?) <(.*?)> \\[(.*?)\\] (?s:(.*))$", case_insensitive = TRUE, ignore.case = TRUE) %>% #     ISO         (   ?) #  ISO 8601 (https://en.wikipedia.org/wiki/ISO_8601) mutate_at("timestamp", re2r::re2_replace, # tz   (  DEV),      ( DEV) pattern = "(.*) (\\d{2}:\\d{2}:\\d{2}):(\\d+([+-]\\d+)?)", replacement = "\\1T\\2.\\3") %>% mutate_at("timestamp", lubridate::as_datetime, tz = "Europe/Moscow") %>% #    mutate(location = f_iter$location, wk = f_iter$wk) # TRUNCATE  CH    ,           #    CH, ms    (timestamp %% 1) conn <- DBI::dbConnect(RClickhouse::clickhouse(), host = "10.0.0.1", db = "DEV_LOGS") # m <- DBI::dbExecute(conn, glue("ALTER TABLE {table_name}")) write_res <- log_df %>% mutate(ms = (as.numeric(timestamp) %% 1) * 1000) %>% select(location, wk, timestamp, ms, level, module, class, message) %>% #            DBI::dbWriteTable(conn, cfg[[log_type]][["db_table"]], ., append = TRUE) DBI::dbDisconnect(conn) #       res <- tibble::tibble(id = f_iter$id, lines = nrow(log_df), min_t = min(log_df$timestamp), max_t = max(log_df$timestamp), write_res) rm(data, log_df) return(res) } #    tic("Batch processing") #    gc(full = TRUE) nworkers <- parallel::detectCores() - 1 registerDoFuture() # future::plan(multiprocess) # future::plan(multisession) future::plan(multisession, workers = nworkers) # future::plan(sequential) #  ~  #      CH #   ------------------ fnames_tbl <- here::here("raw_data") %>% fs::dir_ls(recurse = TRUE, glob = "*dev_app*.gz") %>% enframe(name = "fname") %>% #         mutate(short_fname = as.character(fs::path_rel(fname, start = "./raw_data"))) %>% select(-value) %>% mutate(size = fs::file_size(fname)) %>% tidyr::extract(col = "short_fname", into = c("location", "wk"), regex = "^([^/]+)/wk(\\d+)", remove = FALSE) %>% arrange(size) %>% mutate(id = paste(format(row_number(), justify = "r", width = 4), "/", n())) %>% #   ~ N  mutate(chunk = (row_number() %% nworkers + 1)) %>% #    ,  dopar    arrange(chunk) start_time <- Sys.time() stat_list <- foreach(it = iter(fnames_tbl, by = "row"), .export = c("start_time"), .verbose = TRUE, .inorder = FALSE, .errorhandling = "remove") %dopar% { #   flog.appender(appender.file(common_logname)) # flog.info(capture.output(gc(verbose = TRUE))) res <- oneTimeProcessing(it, log_type = "app") flog.info(glue("Step {it$id} finished.", "Elapsed {round(difftime(Sys.time(), start_time, units = 'mins'), digits = 2)} min(s) ----------->", .sep = " ")) return(res) } flog.info("Load finished") #    -------------- #    ,    future::plan(sequential) gc(reset = TRUE, full = TRUE) flog.info(capture.output(toc())) #     ------------- logstat_tbl <- stat_list %>% dplyr::bind_rows() %>% #    left_join(fnames_tbl, by = "id") %>% #          mutate(delta_t = as.numeric(difftime(max_t, min_t, units = "mins"))) %>% arrange(min_t) write_delim(logstat_tbl, here::here("output", "DEV_parse_stat.csv.gz"), delim = ";") # ,     ? if(nrow(logstat_tbl) < nrow(fnames_tbl)){ flog.error("!!!!!!! Not all workers were executed successfully !!!!!!!!!") } 

此代码示例包含基本概念,例如并行化,考虑毫秒的处理时间,保存到数据库,说明多行记录,总结工作结果,使用外部属性,初步基准测试以及选择最佳功能和程序包(例如re2r ;此示例与常规库配合使用的google库是最快的,并且在很多地方都使用,采用代码{ bencmark ,某些运营商可能已关闭ILV}中提到的同一ClickHouse。 但是该代码并不是理想的代码,因为它只是对数据预处理的一次性操作。 它可以快速正确地运行,好吧。 对于另一个类似的任务,我们考虑了响应,进行了更正。 输入数据。


在其他语言上获得最终结果的时间会大大加快吗? 问题是开放的。 与pythonperlawk并行的版本没有显着差异。 python中的专家可能会获得更好的结果,但是请不要忘记这只是一个周期性的“一次性”任务。


恢复照片中的顺序


旅途中手持多种设备后,您必须将所有照片收集在一起,并以某种方式整理它们,然后再进行进一步处理。 最好的选择之一是按拍摄日期( YYYY-MM-DD hh_mm_ss )命名文件,从而确保照片在时间箭头上的顺序。 Exif属性有助于一步解决此问题。


而且,也可以在“一对线”中使用R来完成此操作。 exifrexifr可以提供帮助。


  • 列出文件清单;
  • 拉出属性;
  • 使用重命名acc复制文件。 具有正确的属性。

实际上,该任务已简化为上一个任务,它不是通过文件名而是通过其exif属性来收集属性,并且在处理过程中只需重命名即可复制文件。 脚本的框架和工作逻辑保持不变。


快速手代码示例:
 library(tidyverse) library(magrittr) library(stringi) library(lubridate) library(stringi) library(fs) library(glue) library(futile.logger) library(anytime) library(tictoc) library(bench) library(exifr) library(tictoc) input_path <- "S:/ " %>% fs::path_real() #       output_path <- "S:/ " %>% fs::path_real() i_fnames <- input_path %>% fs::dir_ls(recurse = TRUE, regexp = "(JPG|jpg)$") raw_df <- read_exif(i_fnames, tags = c("SourceFile", "Model", "DateTimeOriginal")) %>% #      base64, ,    mutate(tmp = sub("^base64:(.*)", "\\1", SourceFile)) %>% mutate(i_fname = purrr::map_chr(tmp, ~rawToChar(jsonlite::base64_dec(.)))) %>% mutate(tm = anytime::anytime(DateTimeOriginal)) %>% select(i_fname, DateTimeOriginal, model = Model, tm) #         clean_df <- raw_df %>% mutate(timestamp = case_when( model == 'iPhone ...' ~ tm, model == 'Nikon ...' ~ tm - lubridate::minutes(56), model == 'Samsung ...' ~ tm - lubridate::minutes(62), TRUE ~ tm) ) %>% mutate_at("i_fname", fs::path_real) %>% mutate(fname = format(timestamp, format = '%Y-%m-%d %H_%M_%S')) %>% #  ,     (""),      mutate(fname = dplyr::coalesce(fname, fs::path_ext_remove(fs::path_file(i_fname)))) %>% #  ,         group_by(fname) %>% mutate(n = n(), idx = row_number()) %>% ungroup() %>% #        mutate(fname = case_when( n > 1 ~ stri_c(fname, '_', idx), TRUE ~ fname ) ) %>% mutate(o_fname = fs::path(!!output_path, paste0(fname, ".jpg"))) #     janitor::get_dupes(clean_df, o_fname) #   tic(" ") clean_df %$% # purrr::walk2(i_fname, o_fname, ~print(glue("{.x} -> {.y}"))) purrr::walk2(i_fname, o_fname, ~fs::file_copy(.x, .y, overwrite = TRUE)) toc() #              

为什么要exifr ? 因为它是功能强大的跨平台实用程序ExifTool的包装。


也许任务看起来像是合成的,这很难争论,因为有许多不同的实用程序和GUI用于Exif和重命名,但是有细微差别。 并非所有设备都能获取更改后的时区并调整时间(例如,摄像头的用户多久在其上设置准确的时间?),因此在重命名过程中,您还需要根据来源更改时间戳。


完成时间


有许多类似的问题,其中许多问题也可以借助R来解决。


先前的出版物- “儿童,数学与R”

Source: https://habr.com/ru/post/zh-CN464849/


All Articles