Key-ValueAPIを介してApacheIgniteに大きなテーブルをすばやくロードする方法

しばらく前に、Apache Igniteプラットフォームが登場し、人気を博し始めました。インメモリコンピューティングは速度です。つまり、作業のすべての段階で、特にデータをロードするときに速度を確保する必要があります。



カットの下には、リレーショナルテーブルから分散型ApacheIgniteクラスターにデータをすばやくロードする方法の説明があります。クラスタのクライアントノードでのSQLクエリ結果セットの前処理と、map-reduceタスクを使用したクラスタ全体でのデータの分散について説明します。キャッシュと関連するリレーショナルテーブルについて説明し、テーブル行からカスタムオブジェクトを作成する方法、およびComputeTaskAdapterを使用して作成したオブジェクトをすばやく配置する方法を示します。すべてのコードは、FastDataLoadリポジトリで完全に確認できます



問題の歴史



このテキストは、GridGainWebサイトのインメモリコンピューティングブログにある私の投稿をロシア語に翻訳したものです。



そのため、ある会社は、コンピューティングをメモリ内クラスターに移動することで、低速のアプリケーションを高速化することにしました。計算の初期データはMSSQLにあります。計算結果はそこに置く必要があります。すでに大量のデータがあるため、クラスターは分散されており、アプリケーションのパフォーマンスは限界にあり、データ量は増加しています。ハードタイム制限が設定されています。



データを処理するための高速コードを作成する前に、データをすばやくロードする必要があります。Webを必死に検索すると、数千万行または数億行のテーブルに拡張できるコード例が明らかに不足していることがわかります。ダウンロード、コンパイル、およびデバッグの手順を実行できる例。これは一方でです。



, Apache Ignite / GridGain, . , . " ?", — , .

, .



(World Database)



, data collocation, . world.sql Apache Ignite.



CSV , — SQL :





countryCache country.csv. countryCache — code, — String, — Country, (name, continent, region).







, — , . Country , . org.h2.tools.Csv, CSV java.sql.ResultSet. Apache Ignite , SQL H2.



    // define countryCache
    IgniteCache<String,Country> cache = ignite.cache("countryCache");

    try (ResultSet rs = new Csv().read(csvFileName, null, null)) {
     while (rs.next()) {
      String code = rs.getString("Code");
      String name = rs.getString("Name");
      String continent = rs.getString("Continent");
      Country country = new Country(code,name,continent);
      cache.put(code,country);
     }
    }


. , , . - .



, . , .





Apache Ignite — -. , PARTITIONED - (partition) . ; , . -, affinity function, , .



ResultSet . , . .





, :



  • HashMap partition_number -> key -> Value

    Map<Integer, Map<String, Country>> result = new HashMap<>();
  • affinity function partition_number. cache.put() - HashMap partition_number

    try (ResultSet rs = new Csv().read(csvFileName, null, null)) {
     while (rs.next()) {
      String code = rs.getString("Code");
      String name = rs.getString("Name");
      String continent = rs.getString("Continent");
      Country country = new Country(code,name,continent);
      result.computeIfAbsent(affinity.partition(key), k -> new HashMap<>()).put(code,country);
     }
    }


ComputeTaskAdapter ComputeJobAdapter. ComputeJobAdapter 1024. , .



ComputeJobAdapter . , .



Compute Task,



, "ComputeTaskAdapter initiates the simplified, in-memory, map-reduce process". ComputeJobAdapter map — , . reduce — .



(RenewLocalCacheJob)



, RenewLocalCacheJob .



targetCache.putAll(addend);


RenewLocalCacheJob partition_number .



(AbstractLoadTask)



( loader) — AbstractLoadTask. . ( ), AbstractLoadTask TargetCacheKeyType. HashMap



    Map<Integer, Map<TargetCacheKeyType, BinaryObject>> result;


countryCache String. . AbstractLoadTask TargetCacheKeyType, BinaryObject. , — .



BinaryObject



— . , JVM, - . class definition , JAR- . Country



    IgniteCache<String, Country> countryCache;


, , classpath ClassNotFound.



. — classpath, :



  • JAR- ;
  • classpath ;
  • ;
  • .


BinaryObject () . :





  • IgniteCache<String, BinaryObject> countryCache;    
  • Country BinaryObject (. LoadCountries.java)

    Country country = new Country(code, name, .. );    
    BinaryObject binCountry = node.binary().toBinary(country);    
  • HashMap, BinaryObject

    Map<Integer, Map<String, BinaryObject>> result


, . , , ClassNotFoundException .



. .



Apache Ignite : .





default-config.xml — . :



  • GridGain CE Installing Using ZIP Archive. 8.7.10, FastDataLoad , ;
  • {gridgain}\config default-config.xml



    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="peerClassLoadingEnabled" value="true"/>
    </bean>
  • , {gridgain}\bin ignite.bat. ; ;
  • , . ,

    [08:40:04] Topology snapshot [ver=2, locNode=d52b1db3, servers=2, clients=0, state=ACTIVE, CPUs=8, offheap=3.2GB, heap=2.0GB]


. , 8.7.25, pom.xml



    <gridgain.version>8.7.25</gridgain.version>




class org.apache.ignite.spi.IgniteSpiException: Local node and remote node have different version numbers (node will not join, Ignite does not support rolling updates, so versions must be exactly the same) [locBuildVer=8.7.25, rmtBuildVer=8.7.10]




, , map-reduce. — JAR-, compute task . Windows, Linux.

:



  • FastDataLoad;
  • ;

    mvn clean package
  • , .

    java -jar .\target\fastDataLoad.jar


main() LoadApp LoaderAgrument . map-reduce LoadCountries.

LoadCountries RenewLocalCacheJob , ( ).



#1





#2









country.csv , CountryCode . cityCache countryLanguageCache; , .





.



.



:



  • (SQL Server Management Studio):

    • — 44 686 837;
    • — 1.071 GB;
  • — 0H:1M:35S;
  • RenewLocalCacheJob reduce — 0H:0M:9S.


SQLクエリを実行するよりも、クラスタ全体にデータを分散する方が時間がかかりません。




All Articles