構造化ストリーミングは、Apache Spark2.0で最初に導入されました。このプラットフォームは、分散ストリーミングアプリケーションを構築するための最良の選択肢としての地位を確立しています。 SQL / Dataset / DataFrame APIと組み込みのSpark関数の統合により、開発者は、ストリーミング集約、ストリームストリーム結合、ウィンドウサポートなどの複雑な基本事項をはるかに簡単に実装できます。 Structured Streamingのリリース以来、Spark Streaming(DStreamなど)で行ったように、ストリーミング制御を改善することが開発者から人気のある要求になっています。 Apache Spark 3.0では、StructuredStreamingの新しいUIをリリースしました。
新しいUIStructured Streamingは、実用的な洞察と統計を使用してすべてのストリーミングジョブを監視する簡単な方法を提供し、デバッグ中の問題のトラブルシューティングを容易にし、リアルタイムメトリックで本番環境の可視性を向上させます。UIは、2セットの統計を表示します。1)ストリーミングクエリジョブに関する集約情報と2)入力レート、プロセスレート、入力行、バッチ期間、操作期間などのストリーミング要求に関する詳細な統計情報。
ストリーミングクエリジョブに関する集約情報
開発者がストリーミングSQLクエリを送信すると、[構造化ストリーミング]タブに表示されます。このタブには、アクティブなストリーミングクエリと完了したクエリの両方が含まれます。結果テーブルには、リクエスト名、ステータス、ID、runID、送信時間、リクエスト期間、最後のパケットIDなどのストリーミングリクエストに関する基本情報のほか、平均受信レートや平均処理レートなどの集約情報が表示されます。ストリーミング要求のステータスには、RUNNING、FINISHED、およびFAILEDの3つのタイプがあります。すべてのFINISHEDおよびFAILEDリクエストは、完了したストリーミングリクエストテーブルに一覧表示されます。 [エラー]列には、失敗した要求の例外の詳細が表示されます。
[IDの実行]リンクをクリックすると、ストリーミング要求の詳細な統計を表示できます。
詳細な統計情報
[統計]ページには、取り込み/処理速度、待ち時間、詳細な操作時間などの指標が表示されます。これらは、ストリーミングリクエストの状態を理解するのに役立ち、リクエスト処理の異常を簡単にデバッグできます。
次のメトリックが含まれています。
- 入力レート:データ到着の(すべてのソースにわたる)集計レート。
- プロセスレート:Sparkがデータを処理する(すべてのソースにわたる)集計レート。
- バッチ期間:各バッチの期間。
- 操作時間:さまざまな操作を実行するのにかかる時間(ミリ秒単位)。
監視対象のトランザクションは次のとおりです。
addBatch:ソースからマイクロバッチの入力データを読み取り、それらを処理し、同期するバッチの出力データを書き込むために費やされた時間。これは通常、マイクロバッチ時間のほとんどを占めます。getBatch:ソースから現在のマイクロパッケージの入力データを読み取る論理要求を準備するのにかかる時間。getOffset:ソースに新しい入力があるかどうかを尋ねるのに費やした時間。walCommit:メタデータログにオフセットを書き込みます。queryPlanning:実行計画を作成します。
リストされているすべての操作がUIに表示されるわけではないことに注意してください。データソースの種類によって操作が異なるため、リストされている操作の一部を1つのストリーミング要求で実行できます。
UIを使用したストリーミングパフォーマンスのトラブルシューティング
このセクションでは、新しいUI構造化ストリーミングが異常なことが起こっていることを示しているいくつかのケースを見ていきます。高レベルのデモリクエストは次のようになります。いずれの場合も、いくつかの前提条件を前提としています。
import java.util.UUID
val bootstrapServers = ...
val topics = ...
val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation", checkpointLocation)
.start()
不十分な処理能力による待ち時間の増加
最初のケースでは、ApacheKafkaデータをできるだけ早く処理するリクエストを実行します。バッチごとに、ストリーミングジョブはKafkaで利用可能なすべてのデータを処理します。バーストデータを処理するのに処理能力が不十分な場合、待ち時間は急速に増加します。最も直感的な判断は、入力行とバッチ期間が直線的に増加することです。Input Rowsパラメーターは、ストリーミングジョブが1秒あたり最大8000回の書き込みを処理できることを指定します。しかし、現在の入力レートは1秒あたり約20,000レコードです。スレッドジョブに実行するリソースを増やすことも、プロデューサーに追いつくために必要なすべてのコンシューマーを処理するのに十分なパーティションを追加することもできます。
安定しているが待ち時間が長い
このケースは前のケースとどう違うのですか?次のスクリーンショットに示すように、レイテンシは増加しませんが、
安定したままです。プロセスレートは同じ入力レートで安定したままであることがわかりました。これは、ジョブの処理能力が入力データを処理するのに十分であることを意味します。ただし、各バッチの処理時間、つまり遅延は20秒のままです。待ち時間が長い主な理由は、各バッチのデータが多すぎることです。通常、このジョブの並列処理を増やすことで、待ち時間を短縮できます。Sparkタスク用にさらに10個のKafkaパーティションと10個のコアを追加した後、待ち時間は約5秒であることがわかりました。これは20秒よりもはるかに優れています。
トラブルシューティングに操作期間チャートを使用する
操作時間チャートには、さまざまな操作の実行に費やされた時間がミリ秒単位で表示されます。これは、各バッチのタイミングを理解し、トラブルシューティングを容易にするのに役立ちます。例として、Apache Sparkコミュニティのパフォーマンス改善作業「SPARK-30915:最新のバッチIDを探すときにメタデータログファイルを読み取らないようにする」を使用してみましょう。
この改善の前は、圧縮されたメタデータログが巨大になると、圧縮後の後続の各バッチは他のバッチよりも時間がかかりました。
コードを調べた後、圧縮されたログファイルの不要な読み取りが見つかり、修正されました。次の操作期間の図は、期待される効果を確認しています。
今後の計画
上に示したように、新しいUI構造化ストリーミングは、ストリーミングリクエストに関するより有用な情報を持つことで、開発者がストリーミングジョブをより適切に制御するのに役立ちます。初期バージョンとして、新しいUIはまだ開発中であり、将来のリリースで改善される予定です。以下を含むがこれらに限定されない、それほど遠くない将来に実装される可能性のあるいくつかの機能があります。
- ストリーミングリクエストの実行の詳細:遅延データ、ウォーターマーク、データ状態メトリックなど。
- Spark HistoryServerでの構造化ストリーミングUIのサポート。
- 異常な動作のより顕著な手がかり:遅延など。
新しいUIを試す
新しいDatabricksRuntime7.1のApacheSpark3.0でこの新しいSparkStreamingUIを試してください。Databricksノートブックを使用している場合、これにより、ノートブック内のストリーミング要求のステータスを監視し、要求を管理する簡単な方法も提供されます。無料のDatabricksアカウントにサインアップして、クレジット情報なしで数分で無料で開始できます。
DWHのデータ品質は、データウェアハウスの一貫性です。無料のウェビナー。
推奨読書:
データビルドツール、またはデータウェアハウスとスムージー
に共通するものDelta Lake Dive:Schema Enforcement and Evolution
High Speed Apache Parquet in Python with Apache Arrow