誰が興味を持ちますか?
今日の原子炉はスタイリッシュでファッショナブルで若々しいです。なぜ私たちの多くがリアクティブプログラミングを実践しているのですか?この質問に明確に答えることができる人はほとんどいません。良い-あなたがあなたの利益を理解しているなら、悪い-原子炉が与えられたように組織によって課されているなら。 「FOR」引数のほとんどは、マイクロサービスアーキテクチャの使用です。これにより、マイクロサービスは相互に頻繁かつ多くの通信を行う必要があります。通信の場合、ほとんどの場合、HTTPインタラクションが選択されます。 HTTPには軽量のWebサーバーが必要ですが、最初に何が思い浮かびますか? Tomcat。ここでは、セッションの最大数の制限に問題があり、それを超えるとWebサーバーは要求を拒否し始めます(ただし、この制限を達成するのはそれほど簡単ではありません)。ここで、原子炉が救助に来ますが、これはそのような制限によって制限されることはなく、たとえば、箱から出して反応性で動作するWebサーバーとしてのNetty。リアクティブWebサーバーがあるため、リアクティブWebクライアント(SpringWebClientまたはReactiveFeign)が必要です。また、クライアントがリアクティブであるため、このすべての恐怖がビジネスロジックに浸透し、MonoとFluxが親友になります(最初はそこにありますが)ただ嫌いです:))
ビジネスタスクの中には、大量のデータを処理する深刻な手順が頻繁にあり、それらにもリアクターを使用する必要があります。ここで驚きが始まります。原子炉の調理方法がわからない場合、多くの問題が発生する可能性があります。サーバー上のファイル記述子の制限を超えている、非ブロッキングコードの速度が制御されていないため、OutOfMemoryなど、今日お話しします。同僚と私は、原子炉を制御する方法を理解するのに問題があるために多くの困難を経験しましたが、私たちを殺さないものはすべて私たちを賢くします!
ブロッキングコードと非ブロッキングコード
, . , . , - , - . , , .
- HTTP , , . Reactive Feign Playtika, Spring Boot + WebFlux + Eureka .
-: , , reactive, - :) Hibernate + PostgreSQL - , JavaMail - , IBMMQ - . , , MongoDB - . , , , (Thread.sleep() / Socket.read() ), - . ? , . 2 :
. BlockHound ( )
, , :
Schedulers.boundedElastic()
.publishOn
&subscribeOn
, , !
1
@Test
fun testLevel1() {
val result = Mono.just("")
.map { "123" }
.block()
assertEquals("123", result)
}
, reactor . ? Mono.just
:) map
"123" block
subscribe
.
block
, , , .block
RestController
, .
2
fun nonBlockingMethod1sec(data: String)
= data.toMono().delayElement(Duration.ofMillis(1000))
@Test
fun testLevel2() {
val result = nonBlockingMethod1sec("Hello world")
.flatMap { nonBlockingMethod1sec(it) }
.block()
assertEquals("Hello world", result)
}
, nonBlockingMethod1sec
, - . - , , .
3
fun collectTasks() = (0..99)
@Test
fun testLevel3() {
val result = nonBlockingMethod1sec("Hello world")
.flatMap { businessContext ->
collectTasks()
.toFlux()
.map {
businessContext + it
}
.collectList()
}
.block()!!
assertEquals(collectTasks().toList().size, result.size)
}
- Flux
! collectTasks
, , Flux
- . map. collectList
.
, . " ", .
4
fun collectTasks() = (0..100)
@Test
fun testLevel4() {
val result = nonBlockingMethod1sec("Hello world")
.flatMap { businessContext ->
collectTasks().toFlux()
.flatMap {
Mono.deferContextual { reactiveContext ->
val hash = businessContext + it + reactiveContext["requestId"]
hash.toMono()
}
}.collectList()
}
.contextWrite { it.put("requestId", UUID.randomUUID().toString()) }
.block()!!
assertEquals(collectTasks().toList().size, result.size)
}
. (15)
, (10)
. .
5
fun collectTasks() = (0..1000)
fun doSomethingNonBlocking(data: String)
= data.toMono().delayElement(Duration.ofMillis(1000))
fun doSomethingBlocking(data: String): String {
Thread.sleep(1000); return data
}
val pool = Schedulers.newBoundedElastic(10, Int.MAX_VALUE, "test-pool")
private val logger = getLogger()
@Test
fun testLevel5() {
val counter = AtomicInteger(0)
val result = nonBlockingMethod1sec("Hello world")
.flatMap { _ ->
collectTasks().toFlux()
.parallel()
.runOn(pool)
.flatMap {
Mono.deferContextual { _ ->
doSomethingNonBlocking(it.toString())
.doOnRequest { logger.info("Added task in pool ${counter.incrementAndGet()}") }
.doOnNext { logger.info("Non blocking code finished ${counter.get()}") }
.map { doSomethingBlocking(it) }
.doOnNext { logger.info("Removed task from pool ${counter.decrementAndGet()}") }
}
}.sequential()
.collectList()
}
.block()!!
assertEquals(collectTasks().toList().size, result.size)
}
! , . : doSomethingNonBlocking
(3)
& doSomethingBlocking
(6)
- , . (10)
, (15)
. parallel
(19)
sequential
(29)
. (20)
. , , doOnRequest
( ), doOnNext
( ). - , .
"", , . - , , . , , . , Flux .
. . , ? 100 , 1 , 1 , 10 ? ( senior reactor developer :))
12 . :) , 100 10 , 10 . , . " " .
(26) .map { doSomethingBlocking(it) }
. , , ?
2 ! 1 " " 1 . 100 . 10 ? ? .
collectTasks()
... 1000? 15000? ?
2 ! 1 " " 1 . . . ?
?
? ? ? 30000 , , , , ( web-client feign, ?) , , SSH . , , " ".
. Thread Pool & Reactor
- , - X , X , - . ? :) .
thread pool - . - , .

reactor! ?

, , . ? epoll , . . , , . , " ?", , . . , - , 500 -, . ! , , Schedulers.boundedElastic()
.
"", ?
!
, , , , , , 4-8 production 32 .
parallel
parallelism

parallelism
, rails ( , , ). Prefetch .
parallelism , .
flatMap
( Flux) , maxConcurrency

maxConcurrency
, Integer.MAX_VALUE
( . ?
, , ( http ), ! .

.
:
parallel (parallelism)
flatMap (maxConcurrency)
, .
- * Integer.MAX_VALUE *
, 5 5 . !
val result = nonBlockingMethod1sec("Hello world")
.flatMap { _ ->
collectTasks().toFlux()
.parallel(1)
.runOn(pool, 1)
.flatMap({
Mono.deferContextual { _ ->
doSomethingNonBlocking(it.toString())
}
}, false, 1, 1)
.sequential()
.collectList()
}
.block()!!
, ?
Thread Pool
? . - , , ! ? , :)
, Schedulers.parallel() ? =)
( parallel, ), , , .
. , , , . , , production . .
, round-robin, .

production , , , .

collectList()
, , 1 . , , .
concatMap
flatMap
( , )
, ( )
, ( )
prefetch
( !)
prefetch
flatMap
& runOn
, , , . - 256. 1, "work stealing", , , , .

それが私にとってすべてです。あなたのコメントやコメントを読むのは興味深いでしょう。私は100%真実であるとは思いませんが、すべての結果はSpring Boot + Project Reactor3.4の実際の例によって裏付けられています。ありがとうございます!