KotlinCoroutinesによるJavaリアクターの処理

現在の仕事では、Reactorで書き込みます。テクノロジーはかっこいいですが、いつものようにたくさんあります。面倒なこともあれば、コードの書き込みと読み取りが難しいこと、そしてThreadLocalは本当の惨事です。Kotlin Coroutinesに切り替えると、どのような問題が解消され、逆にどのような問題が追加されるかを確認することにしました。





患者カード

私はこの記事のために小さなプロジェクトを書き、仕事で遭遇した問題を再現しました。メインコードはこちらです。アルゴリズムは意図的にそれを別々のメソッドに分解しなかったので、問題はよりよく見られます。





アルゴリズムについて一言で言えば: 





ある口座から別の口座に送金し、送金の事実に関する取引を記録します。 





変換は同一であるため、トランザクションがすでにデータベースにある場合は、すべてが正常であることをクライアントに返信します。トランザクションを挿入すると、DataIntegrityViolationExceptionがスローされる場合があります。これは、トランザクションがすでに存在することも意味します。





ネガティブにならないように、+ Optimisticロックコードのチェックがあります。これは、アカウントの競合的な更新を許可しません。それを機能させるには、再試行と追加のエラー処理が必要です。





アルゴリズム自体が気に入らない人のために

プロジェクトのアルゴリズムは、問題を再現するために選択されたものであり、効率的でアーキテクチャ的に正しいものではありません。1つのトランザクションの代わりに、半導体を挿入する必要があります。楽観的なロックはまったく必要ありません(代わりに、sqlでアカウントの陽性をチェックします)。select+ insertをupsertに置き換える必要があります。





患者の苦情

  1. Stacktrace .





  2. , . 





  3. - flatMap.





  4. .





  5. Mono.empty().





  6. , - , traceId. ( , ThreadLocal , SpringSecurity)





  7. .





  8. api .





PR Java Kotlin. 









com.fasterxml.jackson.module:jackson-module-kotlin data org.jetbrains.kotlin.plugin.spring open .





suspend fun transfer(@RequestBody request: TransferRequest)



public Mono<Void> transfer(@RequestBody TransferRequest request)







suspend fun save(account: Account): Account



Mono<Account> save(Account account);



, , suspend , , Reactor .





runBlocking { … }



, suspend .





Retry kotlin-retry. , , ( PR).





, , . -.





:





public Mono<Void> transfer(String transactionKey, long fromAccountId,
                           long toAccountId, BigDecimal amount) {
  return transactionRepository.findByUniqueKey(transactionKey)
    .map(Optional::of)
    .defaultIfEmpty(Optional.empty())
    .flatMap(withMDC(foundTransaction -> {
      if (foundTransaction.isPresent()) {
        log.warn("retry of transaction " + transactionKey);
        return Mono.empty();
      }
      return accountRepository.findById(fromAccountId)
        .switchIfEmpty(Mono.error(new AccountNotFound()))
        .flatMap(fromAccount -> accountRepository.findById(toAccountId)
          .switchIfEmpty(Mono.error(new AccountNotFound()))
          .flatMap(toAccount -> {
            var transactionToInsert = Transaction.builder()
              .amount(amount)
              .fromAccountId(fromAccountId)
              .toAccountId(toAccountId)
              .uniqueKey(transactionKey)
              .build();
            var amountAfter = fromAccount.getAmount().subtract(amount);
            if (amountAfter.compareTo(BigDecimal.ZERO) < 0) {
              return Mono.error(new NotEnoghtMoney());
            }
            return transactionalOperator.transactional(
              transactionRepository.save(transactionToInsert)
                .onErrorResume(error -> {
                  //transaction was inserted on parallel transaction,
                  //we may return success response
                  if (error instanceof DataIntegrityViolationException
             && error.getMessage().contains("TRANSACTION_UNIQUE_KEY")) {
                    return Mono.empty();
                  } else {
                    return Mono.error(error);
                  }
                })
                .then(accountRepository.transferAmount(
                  fromAccount.getId(), fromAccount.getVersion(), 
                  amount.negate()
                ))
                .then(accountRepository.transferAmount(
                  toAccount.getId(), toAccount.getVersion(), amount
                ))
            );
          }));
    }))
    .retryWhen(Retry.backoff(3, Duration.ofMillis(1))
      .filter(OptimisticLockException.class::isInstance)
      .onRetryExhaustedThrow((__, retrySignal) -> retrySignal.failure())
    )
    .onErrorMap(
      OptimisticLockException.class,
      e -> new ResponseStatusException(
        BANDWIDTH_LIMIT_EXCEEDED,
        "limit of OptimisticLockException exceeded", e
      )
    )
    .onErrorResume(withMDC(e -> {
      log.error("error on transfer", e);
      return Mono.error(e);
    }));
}
      
      



:





suspend fun transfer(transactionKey: String, fromAccountId: Long,
                     toAccountId: Long, amount: BigDecimal) {
  try {
    try {
      retry(limitAttempts(3) + filter { it is OptimisticLockException }) {
        val foundTransaction = transactionRepository
          .findByUniqueKey(transactionKey)
        if (foundTransaction != null) {
          logger.warn("retry of transaction $transactionKey")
          return@retry
        }

        val fromAccount = accountRepository.findById(fromAccountId)
          ?: throw AccountNotFound()
        val toAccount = accountRepository.findById(toAccountId)
          ?: throw AccountNotFound()

        if (fromAccount.amount - amount < BigDecimal.ZERO) {
          throw NotEnoghtMoney()
        }
        val transactionToInsert = Transaction(
          amount = amount,
          fromAccountId = fromAccountId,
          toAccountId = toAccountId,
          uniqueKey = transactionKey
        )
        transactionalOperator.executeAndAwait {
          try {
            transactionRepository.save(transactionToInsert)
          } catch (e: DataIntegrityViolationException) {
            if (e.message?.contains("TRANSACTION_UNIQUE_KEY") != true) {
              throw e;
            }
          }

          accountRepository.transferAmount(
            fromAccount.id!!, fromAccount.version, amount.negate()
          )
          accountRepository.transferAmount(
            toAccount.id!!, toAccount.version, amount
          )
        }
      }
    } catch (e: OptimisticLockException) {
      throw ResponseStatusException(
        BANDWIDTH_LIMIT_EXCEEDED, 
        "limit of OptimisticLockException exceeded", e
      )
    }
  } catch (e: Exception) {
    logger.error(e) { "error on transfer" }
    throw e;
  }
}
      
      



Stacktraces

, . 





:





o.s.w.s.ResponseStatusException: 509 BANDWIDTH_LIMIT_EXCEEDED "limit of OptimisticLockException exceeded"; nested exception is c.g.c.v.r.OptimisticLockException
	at c.g.c.v.r.services.Ledger.lambda$transfer$5(Ledger.java:75)
	...
Caused by: c.g.c.v.r.OptimisticLockException: null
	at c.g.c.v.r.repos.AccountRepositoryImpl.lambda$transferAmount$0(AccountRepositoryImpl.java:27)
	at r.c.p.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)
  ...
      
      



:





error on transfer o.s.w.s.ResponseStatusException: 509 BANDWIDTH_LIMIT_EXCEEDED "limit of OptimisticLockException exceeded"; nested exception is c.g.c.v.r.OptimisticLockException
	at c.g.c.v.r.services.Ledger.transfer$suspendImpl(Ledger.kt:70)
	at c.g.c.v.r.services.Ledger$transfer$1.invokeSuspend(Ledger.kt)
	...
Caused by: c.g.c.v.r.OptimisticLockException: null
	at c.g.c.v.r.repos.AccountRepositoryImpl.transferAmount(AccountRepositoryImpl.kt:24)
	...
	at c.g.c.v.r.services.Ledger$transfer$3$1.invokeSuspend(Ledger.kt:65)
	at c.g.c.v.r.services.Ledger$transfer$3$1.invoke(Ledger.kt)
	at o.s.t.r.TransactionalOperatorExtensionsKt$executeAndAwait$2$1.invokeSuspend(TransactionalOperatorExtensions.kt:30)
	(Coroutine boundary)
	at o.s.t.r.TransactionalOperatorExtensionsKt.executeAndAwait(TransactionalOperatorExtensions.kt:31)
	at c.g.c.v.r.services.Ledger$transfer$3.invokeSuspend(Ledger.kt:56)
	at com.github.michaelbull.retry.RetryKt$retry$3.invokeSuspend(Retry.kt:38)
	at c.g.c.v.r.services.Ledger.transfer$suspendImpl(Ledger.kt:35)
	at c.g.c.v.r.controllers.LedgerController$transfer$2$1.invokeSuspend(LedgerController.kt:20)
	at c.g.c.v.r.controllers.LedgerController$transfer$2.invokeSuspend(LedgerController.kt:19)
	at kotlin.reflect.full.KCallables.callSuspend(KCallables.kt:55)
	at o.s.c.CoroutinesUtils$invokeSuspendingFunction$mono$1.invokeSuspend(CoroutinesUtils.kt:64)
	(Coroutine creation stacktrace)
	at k.c.i.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:122)
	at k.c.i.CancellableKt.startCoroutineCancellable(Cancellable.kt:30)
	...
Caused by: c.g.c.v.r.OptimisticLockException: null
	at c.g.c.v.r.repos.AccountRepositoryImpl.transferAmount(AccountRepositoryImpl.kt:24)
	...
	at c.g.c.v.r.services.Ledger$transfer$3$1.invokeSuspend(Ledger.kt:65)
	at c.g.c.v.r.services.Ledger$transfer$3$1.invoke(Ledger.kt)
	at o.s.t.r.TransactionalOperatorExtensionsKt$executeAndAwait$2$1.invokeSuspend(TransactionalOperatorExtensions.kt:30)
	...

      
      



, ( , ).





Java . , . . . Kotlin .





, - . ? . , - traceId (thread name ) .





Kotlin , , . ( : ).





flatMap. - try catch, .





:





return accountRepository.findById(fromAccountId)
  .switchIfEmpty(Mono.error(new AccountNotFound()))
  .flatMap(fromAccount -> accountRepository.findById(toAccountId)
    .switchIfEmpty(Mono.error(new AccountNotFound()))
    .flatMap(toAccount -> {
      ...
    })
      
      



:





val fromAccount = accountRepository.findById(fromAccountId)
  ?: throw AccountNotFound()
val toAccount = accountRepository.findById(toAccountId)
  ?: throw AccountNotFound()
...
      
      



try catch, .





:





return transactionRepository.findByUniqueKey(transactionKey)
  ...
  .onErrorMap(
    OptimisticLockException.class,
    e -> new ResponseStatusException(
      BANDWIDTH_LIMIT_EXCEEDED, 
      "limit of OptimisticLockException exceeded", e
    )
  )
      
      



:





try {
  val foundTransaction = transactionRepository
    .findByUniqueKey(transactionKey)
  ...
} catch (e: OptimisticLockException) {
  throw ResponseStatusException(
    BANDWIDTH_LIMIT_EXCEEDED, 
    "limit of OptimisticLockException exceeded", e
  )
}
      
      



throw, . Reactor :





.flatMap(foo -> {
  if (foo.isEmpty()) { 
    return Mono.error(new IllegalStateException());
  } else {
    return Mono.just(foo);
  }
})
      
      



, , . - .





Mono.empty()

. null . ¨C5C. 





Ide , mono . . , - .





Kotlin not null , , . nullable - .





:





:





return transactionRepository.findByUniqueKey(transactionKey)
  .map(Optional::of)
  .defaultIfEmpty(Optional.empty())
  .flatMap(foundTransaction -> {
    if (foundTransaction.isPresent()) {
      log.warn("retry of transaction " + transactionKey);
      return Mono.empty();
    }
...
      
      



:





val foundTransaction = transactionRepository
  .findByUniqueKey(transactionKey)
if (foundTransaction != null) {
  logger.warn("retry of transaction $transactionKey")
  return@retry
}
...
      
      



, - Reactor, .





, traceId . ThreadLocal , MDC ( ). ?





. Reactor Coroutines immutable, MDC ( ).





Java , traceId :





@Component
public class TraceIdFilter implements WebFilter {
  @Override
  public Mono<Void> filter(
    ServerWebExchange exchange, WebFilterChain chain
  ) {
    var traceId = Optional.ofNullable(
      exchange.getRequest().getHeaders().get("X-B3-TRACEID")
    )
      .orElse(Collections.emptyList())
      .stream().findAny().orElse(UUID.randomUUID().toString());
    return chain.filter(exchange)
      .contextWrite(context ->
        LoggerHelper.addEntryToMDCContext(context, "traceId", traceId)
      );
  }
}
      
      



, - , traceId MDC:





public static <T, R> Function<T, Mono<R>> withMDC(
  Function<T, Mono<R>> block
) {
  return value -> Mono.deferContextual(context -> {
    Optional<Map<String, String>> mdcContext = context
      .getOrEmpty(MDC_ID_KEY);
    if (mdcContext.isPresent()) {
      try {
        MDC.setContextMap(mdcContext.get());
        return block.apply(value);
      } finally {
        MDC.clear();
      }
    } else {
      return block.apply(value);
    }
  });
}
      
      



, Mono. .. , Mono. :





.onErrorResume(withMDC(e -> {
  log.error("error on transfer", e);
  return Mono.error(e);
}))
      
      



Kotlin . , traceId MDC:





@Component
class TraceIdFilter : WebFilter {
  override fun filter(
    exchange: ServerWebExchange, chain: WebFilterChain
  ): Mono<Void> {
    val traceId = exchange.request.headers["X-B3-TRACEID"]?.first() 
    MDC.put("traceId", traceId ?: UUID.randomUUID().toString())
    return chain.filter(exchange)
  }
}
      
      



withContext(MDCContext()) { … }







, MDC traceId. .





Java Reactor , : , , breakpoints ...





: stepOver, , ( ). 





, suspend . issue. , , Java Reactor evaluate , .





, , .





:





return Mono.zip(
  transactionRepository.findByUniqueKey(transactionKey)
    .map(Optional::of)
    .defaultIfEmpty(Optional.empty()),
  accountRepository.findById(fromAccountId)
    .switchIfEmpty(Mono.error(new AccountNotFound())),
  accountRepository.findById(toAccountId)
    .switchIfEmpty(Mono.error(new AccountNotFound())),
).flatMap(withMDC(fetched -> {
  var foundTransaction = fetched.getT1();
  var fromAccount = fetched.getT2();
  var toAccount = fetched.getT3();
  if (foundTransaction.isPresent()) {
    log.warn("retry of transaction " + transactionKey);
    return Mono.empty();
  }
  ...
}
      
      



:





coroutineScope {
  val foundTransactionAsync = async {
    logger.info("async fetch of transaction $transactionKey")
    transactionRepository.findByUniqueKey(transactionKey)
  }
  val fromAccountAsync = async { 
    accountRepository.findById(fromAccountId) 
  }
  val toAccountAsync = async { 
    accountRepository.findById(toAccountId) 
  }

  if (foundTransactionAsync.await() != null) {
    logger.warn("retry of transaction $transactionKey")
    return@retry
  }

  val fromAccount = fromAccountAsync.await() ?: throw AccountNotFound()
  val toAccount = toAccountAsync.await() ?: throw AccountNotFound()
  ...
}
      
      



Kotlin “ ”, “ ” Reactor.





, -. Reactor , . - foundTransactionAsync.await(). , transactionRepository.findByUniqueKey() , , accountRepository.findById() ( ).





. , Reactor :





coroutineScope {
  val foundTransactionAsync = async {
    logger.info("async fetch of transaction $transactionKey")
    transactionRepository.findByUniqueKey(transactionKey)
  }
  val fromAccountAsync = async {
    accountRepository.findById(fromAccountId)
  }
  val toAccountAsync = async {
    accountRepository.findById(toAccountId)
  }

  if (foundTransactionAsync.await() != null) {
    logger.warn("retry of transaction $transactionKey")
    return@retry
  }

  val transactionToInsert = Transaction(
    amount = amount,
    fromAccountId = fromAccountId,
    toAccountId = toAccountId,
    uniqueKey = transactionKey
  )
  transactionalOperator.executeAndAwait {
    try {
      transactionRepository.save(transactionToInsert)
    } catch (e: DataIntegrityViolationException) {
      if (e.message?.contains("TRANSACTION_UNIQUE_KEY") != true) {
        throw e;
      }
    }
    val fromAccount = fromAccountAsync.await() ?: throw AccountNotFound()
    val toAccount = toAccountAsync.await() ?: throw AccountNotFound()
    if (fromAccount.amount - amount < BigDecimal.ZERO) {
      throw NotEnoghtMoney()
    }

    accountRepository.transferAmount(
      fromAccount.id!!, fromAccount.version, amount.negate()
    )
    accountRepository.transferAmount(
      toAccount.id!!, toAccount.version, amount
    )
  }
}
      
      



. .. , . , , ( ).





, , .





context scope

, :





  1. scope. , , .





  2. context. .





Spring , :





@PutMapping("/transfer")
suspend fun transfer(@RequestBody request: TransferRequest) {
  coroutineScope {
    withContext(MDCContext()) {
      ledger.transfer(request.transactionKey, request.fromAccountId, 
                      request.toAccountId, request.amount)
    }
  }
}
      
      



, regexp , . - .





AOP suspend

, , . aspect suspend .





私は最終的にその側面を書くことができました。しかし、これがどのように機能するかを説明するには、別の記事が必要です。





アスペクトを書くためのより適切な方法があることを願っています(私はこれに貢献しようとします)。





治療評価

すべての問題が消えました。新しいものがいくつか追加されましたが、それは許容範囲です。





私は、コルチンが急速に発達していると言わなければなりません、そして私はそれらとのより良い仕事を期待するだけです。





JetBrainsチームが開発者の問題に注意を払っていたことがわかります。私の知る限り、たとえば、約1年前には、デバッグやスタクトレーシングにまだ問題がありました。





最も重要なのは、コルチンを使用する場合、Reactorとその強力なAPIのすべての機能を覚えておく必要がないことです。コードを書くだけです。








All Articles