Rでの並列計算について少し

出版物は非常に短いです。多くの人は、Rでの並列計算は非常に難しく、現在のタスクには適用できないと考えています。





はいといいえ。理論、ハードウェア、およびあらゆる種類の詳細を意図的に調べない場合は、ほぼ普遍的なレシピの「3と1/2」を描くことができます。これらの例は、生産的なタスクに意図的に類似しており、合成の線の去勢されたカップルではありません。





これは、以前の一連の出版物の続きです





中古パッケージ

パッケージの読み込み
library(tidyverse)
library(magrittr)
library(stringi)
library(glue)

library(dqrng)

library(iterators)
library(future)
library(foreach)
library(doFuture)

library(tictoc)
library(futile.logger)
library(lgr) #      `lgr`

library(hrbrthemes)
      
      



並列化パターン

パターン1。tidyverse計算の並列化

状況。の多くのパイプラインを含むスクリプトがありtidyverse



ます。





タスクの例。数の二乗和の平均を計算してみましょう。並列計算の効率を向上させるには、スレッド間のデータ転送量を減らすことが重要です。furrrパッケージを使用してます





`tidyverse`パイプライン
registerDoFuture()
# future::plan(multiprocess)
workers <- parallel::detectCores() - 1
future::plan(multisession, workers = workers)

num_row <- 1:10^6

ff_seq <- function(x) x^2

ff_par <- function(x) mean(x^2)

tic(" ")
lst1 <- num_row %>%
  purrr::map_dbl(ff_seq) %>%
  mean()
toc()

tic(" ,  1")
lst2 <- num_row %>%
  furrr::future_map_dbl(ff_seq) %>%
  mean()
toc() 

tic(" ,  2")
lst2 <- num_row %>%
  split(cut(seq_along(.), workers, labels = FALSE)) %>%
  furrr::future_map_dbl(ff_par) %>%
  mean()
toc()
      
      



当然、結果はすべてが実行されるハードウェアプラットフォームとOSによって異なります。テスト実行では、次のレイアウトがあります。





 : 7.23 sec elapsed
 ,  1: 3.43 sec elapsed
 ,  2: 0.64 sec elapsed
      
      



WindowsとLinuxは、並列化の方法がまったく異なります。実稼働環境のLinuxは、Windowsよりも強く推奨されます。





パターン2。ローカル手動並列化

. . , . , %<-%



.





#  ,  20   10^5 
nn <- 10^5
tic("Generating sample data.frame")
df <- 100 %>%
  # stri_rand_strings(length = 10, pattern = "[a-z]") %>%
  sample(10^4:10^5, .) %>%
  sample(20 * nn, replace = TRUE) %>%
  matrix(byrow = TRUE, ncol = 20) %>%
  as_tibble(.name_repair = "universal") %>%
  mutate(user_id = as.character(sample(1:as.integer(nn/10), n(), replace = TRUE))) %>%
  #   
  mutate(ver = sample(1:20, n(), replace = TRUE)) %>%
  select(user_id, ver, everything())
toc()

#       
demo_fpath <- here::here("temp", "demo_data.xlsx")
openxlsx::write.xlsx(df, demo_fpath, asTable = TRUE)
      
      



plan(multisession, workers = parallel::detectCores() - 2)
# plan(sequential)
# https://github.com/HenrikBengtsson/future

# ,     2
tic("   ")
tic("  ")
res_lst <- list()
for (j in 1:6) {
  res_lst[[j]] <- { readxl::read_excel(demo_fpath) %>% head(5)}
}
toc()
seq_df <- bind_rows(res_lst)
toc()


tic("   ")
tic("  ")
df1 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df2 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df3 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df4 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df5 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df6 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
toc()
par_df <- bind_rows(df1, df2, df3, df4, df5, df6)
toc()

all_equal(seq_df, par_df)
      
      



. . :





   : 46.23 sec elapsed
   : 37.82 sec elapsed
      
      



3.

. , , .





.

. $C_n^k$



. .





.




#  
flog_logname <- here::here("log", "job_futile.log")
lgr_logname <- here::here("log", "job_lgr.log")

initLogging <- function(log_file){

  lgr <- get_logger_glue("logger")

  lgr$set_propagate(FALSE)
  lgr$set_threshold("all")
  lgr$set_appenders(list(
    console = AppenderConsole$new(
      threshold = "info"
    ),
    file = AppenderFile$new(
      file = log_file,
      threshold = "all"
    )
  ))

  lgr  
}

invisible(flog.appender(appender.tee(flog_logname)))
invisible(flog.threshold(INFO))
lgr <- initLogging(lgr_logname)
      
      



"Start batch processing" %T>%
  flog.info() %T>%
  lgr$info()

#   
# https://github.com/HenrikBengtsson/doFuture
# https://cran.r-project.org/web/packages/future/vignettes/future-1-overview.html
registerDoFuture()
# future::plan(multiprocess)
future::plan(multisession, workers = parallel::detectCores())
# future::plan(sequential)
# plan(future.callr::callr)

tic("Batch processing")
start_time <- Sys.time()

foreach(it = iter(jobs_tbl, by = "row"), .export = c("start_time"), 
        # .packages = 'futile.logger',
        .verbose = FALSE, .inorder = FALSE, .errorhandling = "remove") %dopar% {

          start <- Sys.time() - start_time

          #    
          flog.appender(appender.tee(flog_logname))
          lgr <- initLogging(lgr_logname)

          res <- arrangements::npermutations(k = it$k, n = it$n, bigz = TRUE)

          # https://www.jottr.org/2020/11/06/future-1.20.1-the-future-just-got-a-bit-brighter/
          message("     message from thread")

          glue("Step {it$idx_str} finished. RAM used {capture.output(pryr::mem_used())}.",
               "PID: {Sys.getpid()}",
               "Elapsed {round(difftime(Sys.time(), start_time, units = 'mins'), digits = 2)} min(s) ----------->",
               .sep = " ") %T>%
            flog.info() %T>%
            lgr$info()

          #   
          return(list(pid = Sys.getpid(), start = start, finish = Sys.time() - start_time))
        } -> output_lst
flog.info("Foreach finished")

checkmate::assertList(output_lst, any.missing = FALSE, null.ok = FALSE, min.len = 1)
output_tbl <- dplyr::bind_rows(output_lst)
# rm(output_lst)

#    --------------
future::plan(sequential)
gc(reset = TRUE, full = TRUE)
flog.info(capture.output(toc()))
      
      



() () . windows.





.
#      
output_tbl %>%
  mutate_at("pid", as.factor) %>%
  mutate_at(vars(start, finish), as.numeric) %>%
  ggplot(aes(start, pid, colour = pid)) +
  geom_point(size = 3, alpha = .7) +
  geom_point(aes(x=finish), shape = 4, size = 3, colour = "black") +
  geom_vline(aes(xintercept = start, colour = pid), lty = "dashed", alpha = 0.7) +
  ggthemes::scale_fill_tableau("Tableau 10") +
  theme_ipsum_rc() +
  xlim(c(0, 5))
      
      



, , , . , « »:





  1. (worker) . (, , …), . () .





  2. , core - 1, . , reduce , . .





  3. .





  4. , . , , ( , , API ..). .





  5. , . .





  6. 外部システムからの長い同期リクエストに関連する多くのタスク(一般的な代表はREST API / Webスクレイピング)の場合、使用可能なコアよりもはるかに多くの計算機を作成できます。スタンバイモードでは、ほとんどの場合、ハングします。適切なバックエンドを構成することにより、個別のOSプロセスとして実行できます。registerDoFuture();



    plan(future.callr::callr).



    これはレシピの残りの1/2です。





以前の出版物- 「エンタープライズ環境でRソリューションを運用する際のニュアンス?」..。








All Articles