出版物は非常に短いです。多くの人は、Rでの並列計算は非常に難しく、現在のタスクには適用できないと考えています。
はいといいえ。理論、ハードウェア、およびあらゆる種類の詳細を意図的に調べない場合は、ほぼ普遍的なレシピの「3と1/2」を描くことができます。これらの例は、生産的なタスクに意図的に類似しており、合成の線の去勢されたカップルではありません。
これは、以前の一連の出版物の続きです。
中古パッケージ
パッケージの読み込み
library(tidyverse)
library(magrittr)
library(stringi)
library(glue)
library(dqrng)
library(iterators)
library(future)
library(foreach)
library(doFuture)
library(tictoc)
library(futile.logger)
library(lgr) # `lgr`
library(hrbrthemes)
並列化パターン
パターン1。tidyverse計算の並列化
状況。の多くのパイプラインを含むスクリプトがありtidyverse
ます。
タスクの例。数の二乗和の平均を計算してみましょう。並列計算の効率を向上させるには、スレッド間のデータ転送量を減らすことが重要です。furrrパッケージを使用しています。
`tidyverse`パイプライン
registerDoFuture()
# future::plan(multiprocess)
workers <- parallel::detectCores() - 1
future::plan(multisession, workers = workers)
num_row <- 1:10^6
ff_seq <- function(x) x^2
ff_par <- function(x) mean(x^2)
tic(" ")
lst1 <- num_row %>%
purrr::map_dbl(ff_seq) %>%
mean()
toc()
tic(" , 1")
lst2 <- num_row %>%
furrr::future_map_dbl(ff_seq) %>%
mean()
toc()
tic(" , 2")
lst2 <- num_row %>%
split(cut(seq_along(.), workers, labels = FALSE)) %>%
furrr::future_map_dbl(ff_par) %>%
mean()
toc()
当然、結果はすべてが実行されるハードウェアプラットフォームとOSによって異なります。テスト実行では、次のレイアウトがあります。
: 7.23 sec elapsed
, 1: 3.43 sec elapsed
, 2: 0.64 sec elapsed
WindowsとLinuxは、並列化の方法がまったく異なります。実稼働環境のLinuxは、Windowsよりも強く推奨されます。
パターン2。ローカル手動並列化
. . , . , %<-%
.
# , 20 10^5
nn <- 10^5
tic("Generating sample data.frame")
df <- 100 %>%
# stri_rand_strings(length = 10, pattern = "[a-z]") %>%
sample(10^4:10^5, .) %>%
sample(20 * nn, replace = TRUE) %>%
matrix(byrow = TRUE, ncol = 20) %>%
as_tibble(.name_repair = "universal") %>%
mutate(user_id = as.character(sample(1:as.integer(nn/10), n(), replace = TRUE))) %>%
#
mutate(ver = sample(1:20, n(), replace = TRUE)) %>%
select(user_id, ver, everything())
toc()
#
demo_fpath <- here::here("temp", "demo_data.xlsx")
openxlsx::write.xlsx(df, demo_fpath, asTable = TRUE)
plan(multisession, workers = parallel::detectCores() - 2)
# plan(sequential)
# https://github.com/HenrikBengtsson/future
# , 2
tic(" ")
tic(" ")
res_lst <- list()
for (j in 1:6) {
res_lst[[j]] <- { readxl::read_excel(demo_fpath) %>% head(5)}
}
toc()
seq_df <- bind_rows(res_lst)
toc()
tic(" ")
tic(" ")
df1 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df2 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df3 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df4 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df5 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df6 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
toc()
par_df <- bind_rows(df1, df2, df3, df4, df5, df6)
toc()
all_equal(seq_df, par_df)
. . :
: 46.23 sec elapsed
: 37.82 sec elapsed
3.
. , , .
.
. $C_n^k$
. .
.
#
flog_logname <- here::here("log", "job_futile.log")
lgr_logname <- here::here("log", "job_lgr.log")
initLogging <- function(log_file){
lgr <- get_logger_glue("logger")
lgr$set_propagate(FALSE)
lgr$set_threshold("all")
lgr$set_appenders(list(
console = AppenderConsole$new(
threshold = "info"
),
file = AppenderFile$new(
file = log_file,
threshold = "all"
)
))
lgr
}
invisible(flog.appender(appender.tee(flog_logname)))
invisible(flog.threshold(INFO))
lgr <- initLogging(lgr_logname)
"Start batch processing" %T>%
flog.info() %T>%
lgr$info()
#
# https://github.com/HenrikBengtsson/doFuture
# https://cran.r-project.org/web/packages/future/vignettes/future-1-overview.html
registerDoFuture()
# future::plan(multiprocess)
future::plan(multisession, workers = parallel::detectCores())
# future::plan(sequential)
# plan(future.callr::callr)
tic("Batch processing")
start_time <- Sys.time()
foreach(it = iter(jobs_tbl, by = "row"), .export = c("start_time"),
# .packages = 'futile.logger',
.verbose = FALSE, .inorder = FALSE, .errorhandling = "remove") %dopar% {
start <- Sys.time() - start_time
#
flog.appender(appender.tee(flog_logname))
lgr <- initLogging(lgr_logname)
res <- arrangements::npermutations(k = it$k, n = it$n, bigz = TRUE)
# https://www.jottr.org/2020/11/06/future-1.20.1-the-future-just-got-a-bit-brighter/
message(" message from thread")
glue("Step {it$idx_str} finished. RAM used {capture.output(pryr::mem_used())}.",
"PID: {Sys.getpid()}",
"Elapsed {round(difftime(Sys.time(), start_time, units = 'mins'), digits = 2)} min(s) ----------->",
.sep = " ") %T>%
flog.info() %T>%
lgr$info()
#
return(list(pid = Sys.getpid(), start = start, finish = Sys.time() - start_time))
} -> output_lst
flog.info("Foreach finished")
checkmate::assertList(output_lst, any.missing = FALSE, null.ok = FALSE, min.len = 1)
output_tbl <- dplyr::bind_rows(output_lst)
# rm(output_lst)
# --------------
future::plan(sequential)
gc(reset = TRUE, full = TRUE)
flog.info(capture.output(toc()))
() () . windows.
.
#
output_tbl %>%
mutate_at("pid", as.factor) %>%
mutate_at(vars(start, finish), as.numeric) %>%
ggplot(aes(start, pid, colour = pid)) +
geom_point(size = 3, alpha = .7) +
geom_point(aes(x=finish), shape = 4, size = 3, colour = "black") +
geom_vline(aes(xintercept = start, colour = pid), lty = "dashed", alpha = 0.7) +
ggthemes::scale_fill_tableau("Tableau 10") +
theme_ipsum_rc() +
xlim(c(0, 5))
, , , . , « »:
(worker) . (, , …), . () .
, core - 1, . , reduce , . .
.
, . , , ( , , API ..). .
, . .
外部システムからの長い同期リクエストに関連する多くのタスク(一般的な代表はREST API / Webスクレイピング)の場合、使用可能なコアよりもはるかに多くの計算機を作成できます。スタンバイモードでは、ほとんどの場合、ハングします。適切なバックエンドを構成することにより、個別のOSプロセスとして実行できます。
registerDoFuture();
plan(future.callr::callr).
これはレシピの残りの1/2です。
以前の出版物- 「エンタープライズ環境でRソリューションを運用する際のニュアンス?」..。