データ収集を自動化するためのKotlinでの並列クエリ

みなさん、こんにちは!私の仕事では、自動化にKotlinをよく使用します。私の活動はプログラミングに直接関係していませんが、Kotlinは作業タスクを大幅に簡素化します。





最近、分析を行うためにかなり大きなサイズのデータ​​を収集する必要があったので、データを取得してExcelに保存するための小さなスクリプトを作成することにしました。最後の点で問題はありませんでした-私はApachePOIについて読み、公式ドキュメントからいくつかの例を取り、自分で修正しました。インターネットリクエストについても同じことは言えません。





ソースはjsonのバッチで返され、これらの「バッチ」をなんとかしてすばやく収集し、テキストに変換してテーブルをファイルに書き込む必要がありました。





非同期方式

単純な非同期化から始めることにしました。HttpUrlConnectionを少し調べた後、それが属する場所に送信し、JavaのHttpClientに置き換えました。





テストのために、私はサービスhttps://jsonplaceholder.typicode.com/を利用しましたこれは、あるおなじみの開発者によって提案されました。重複してテストを開始しないように、変数へのコメントでJsonを発行するリンクを保存しました。





const val URL = "https://jsonplaceholder.typicode.com/comments"
      
      



機能は準備ができていて、機能していました。データが届きました。





fun getDataAsync(url: String): String? {
    val httpClient = HttpClient.newBuilder()
        .build()
    val httpRequest = HttpRequest.newBuilder()
        .uri(URI.create(link)).build()

    return httpClient.sendAsync(httpRequest, BodyHandlers.ofString())
        .join().body()
}
      
      



今度は仕事のスピードをチェックする必要がありました。measureTimeMillisを装備して、コードを実行しました。





val asyncTime = measureTimeMillis { 
    val res = (1..10)
        .toList()
        .map {getDataAsync("$URL/$it")}
    res.forEach { println(it) }
}
println("   $asyncTime ")
      
      



すべてが正常に機能しましたが、もっと速くしたかったのです。インターネットを少し掘り下げた後、タスクを並行して実行するソリューションに出くわしました。





パラレルマップ

, . , , .





suspend fun <A, B> Iterable<A>.pmap(f: suspend (A) -> B): List<B> =
    coroutineScope {
        map { async { f(it) } }.awaitAll()
    }
      
      



, ( Iterable) pmap, . A. async , .awaitAll() . suspend, .





, , - .





val parmapTime = measureTimeMillis {
    runBlocking {
        val res = (1..10)
            .toList()
            .pmap { getDataAsync("$URL/$it") }
        println(mapResult)
    }
}
println(" pmap $parmapTime ")
      
      



- 1523, . map async, .





Parallel Map v 2.0

, , .





suspend fun <T, V> Iterable<T>.parMap(func: suspend (T) -> V): Iterable<V> =
    coroutineScope {
        map { element -> 
            async(Dispatchers.IO) { func(element) } 
        }.awaitAll() 
    }

val parMapTime = measureTimeMillis {
    runBlocking {
        val res = (1..10)
            .toList()
            .parMap { getDataAsync("$URL/$it") }
    }
    println(res)
}
println(" map  $parMapTime ")
      
      



Dispatchers.IO 2 ~ 610 . ! ( , excel ..) . , - .





Java ParallelStream

, stackowerflow parallelStream. , IDEA.





val javaParallelTime = measureTimeMillis { 
    val res = (1..10).toList()
        .parallelStream()
        .map { getDataAsync("$URL/$it") }
    res.forEach { println(it) }
}
println("Java parallelSrtream  $javaParallelTime ")
      
      



, . , . stream . , , , "" , Json.





, - , async . .





結果は下の表で見ることができます。私自身は、asyncを待つことにしましたもちろん、主にエラー処理が簡単なためです。そして、ここでコルーチンを超える必要はありません。





方法





時間(ミリ秒)





非同期方式





1487





WebからのPmapの実装





1523年





私のオプションはparallelMapです





610





Java.parallelStream





578





将来的には、これを小さなライブラリに配置して個人的な目的で使用することも考えられます。もちろん、十分な可能性がある限り、すべてを「ヒンズー教のコード」から人間のコードに書き直します。そして、それをすべてvdsにアップロードします。





私の経験が誰かに役立つことを願っています。建設的な批判やアドバイスをいただければ幸いです。ありがとうございます








All Articles