こんにちは、Khabrovites。今日はRxJavaについてお話します。彼女についてワゴンと小さなカートが書かれていることは知っていますが、共有する価値のある興味深い点がいくつかあるように思えます。最初に、RxJavaをAndroidアプリケーション用のVIPERアーキテクチャと一緒に使用する方法と、それを使用する「従来の」方法について説明します。その後、RxJavaの主な機能を確認し、スケジューラーの動作について詳しく見ていきましょう。すでにスナックを買いだめしている場合は、猫の下で歓迎します。
誰にでも合うアーキテクチャ
RxJavaはReactiveXコンセプトの実装であり、Netflixによって作成されました。彼らのブログには、なぜそれをしたのか、どのような問題を解決したのかについての一連の記事があります。リンク(1、2)は記事の最後にあります。 Netflixは、サーバー側(バックエンド)でRxJavaを使用して、1つの大きな要求の処理を並列化しました。彼らはバックエンドでRxJavaを使用する方法を提案しましたが、このアーキテクチャはさまざまなタイプのアプリケーション(モバイル、デスクトップ、バックエンドなど)の作成に適しています。 Netflix開発者は、サービスレイヤーの各メソッドがObservableを返すように、サービスレイヤーでRxJavaを使用しました。重要なのは、Observableの要素を同期的および非同期的に配信できるということです。これにより、メソッドは、値をすぐに同期して返すかどうかを自分で決定できます(たとえば、キャッシュで利用可能な場合)または最初にこれらの値を(たとえば、データベースまたはリモートサービスから)取得し、非同期で返します。いずれの場合も、制御はメソッドを呼び出した直後に戻ります(データの有無にかかわらず)。
/**
* , ,
* , ,
* callback `onNext()`
*/
public Observable<T> getProduct(String name) {
if (productInCache(name)) {
// ,
return Observable.create(observer -> {
observer.onNext(getProductFromCache(name));
observer.onComplete();
});
} else {
//
return Observable.<T>create(observer -> {
try {
//
T product = getProductFromRemoteService(name);
//
observer.onNext(product);
observer.onComplete();
} catch (Exception e) {
observer.onError(e);
}
})
// Observable IO
// /
.subscribeOn(Schedulers.io());
}
}
このアプローチでは、クライアント(この場合はコントローラー)用に1つの不変のAPIとさまざまな実装を取得します。クライアントは常に同じ方法でObservableと対話します。値が同期的に受信されるかどうかはまったく問題ではありません。同時に、APIの実装は、クライアントとの対話にまったく影響を与えることなく、同期から非同期に変更できます。このアプローチでは、マルチスレッドを編成する方法を完全に忘れて、ビジネスタスクの実装に集中できます。
このアプローチは、バックエンドのサービスレイヤーだけでなく、アーキテクチャMVC、MVP、MVVMなどにも適用できます。たとえば、MVPの場合、さまざまなソースへのデータの受信と保存を担当するInteractorクラスを作成し、すべてを作成できます。そのメソッドはObservableを返しました。それらはモデルとの相互作用のための契約になります。これにより、PresenterはRxJavaで利用可能なオペレーターの全機能を活用できるようになります。
さらに進んでPresenterをリアクティブAPIにすることもできますが、そのためには、すべてのビューが同時にPresenterから退会できるようにする退会メカニズムを正しく実装する必要があります。
次に、このアプローチが拡張MVPであるVIPERアーキテクチャにどのように適用されるかの例を見てみましょう。Observableへのサブスクリプションはメモリリークを生成するため、Observableシングルトンオブジェクトを作成できないことも覚えておく価値があります。
AndroidとVIPERの経験
現在および新しいAndroidプロジェクトのほとんどでは、VIPERアーキテクチャを使用しています。彼女がすでに使用されているプロジェクトの1つに参加したときに、彼女に会いました。 iOSに目を向けているのかと聞かれたとき、驚いたのを覚えています。 「AndroidプロジェクトのIOS?」と思いました。一方、VIPERはiOSの世界からやって来たもので、実際にはMVPのより構造化されたモジュラーバージョンです。 VIPERはこの記事(3)で非常によく書かれています。
最初は、すべてが正常に見えました:正しく分割され、オーバーロードされたレイヤーではなく、各レイヤーには独自の責任領域、明確なロジックがあります。しかし、しばらくすると、1つの欠点が現れ始め、プロジェクトが成長して変化するにつれて、干渉し始めました。
事実、私たちは記事の同僚と同じようにInteractorを使用しました。 Interactorは、「ネットワークから製品をダウンロードする」や「IDでデータベースから製品を取得する」などの小さな使用例を実装し、ワークフローでアクションを実行します。内部的には、InteractorはObservableを使用して操作を実行します。 Interactorを「実行」して結果を取得するには、ユーザーはObserverEntityインターフェイスをonNext、onError、およびonCompleteメソッドとともに実装し、パラメーターとともにexecute(params、ObserverEntity)メソッドに渡します。
あなたはおそらくすでに問題に気づいているでしょう-インターフェースの構造。実際には、3つの方法すべてが必要になることはめったになく、多くの場合、そのうちの1つまたは2つを使用します。このため、空のメソッドがコードに表示される場合があります。もちろん、インターフェイスのすべてのメソッドをデフォルトとしてマークすることはできますが、そのようなメソッドは、インターフェイスに新しい機能を追加するために必要です。さらに、すべてのメソッドがオプションであるインターフェイスがあるのは奇妙です。たとえば、インターフェイスを継承し、必要なメソッドをオーバーライドする抽象クラスを作成することもできます。または、最後に、1つから3つの機能インターフェイスを受け入れるexecute(params、ObserverEntity)メソッドのオーバーロードバージョンを作成します。この問題はコードの読みやすさには悪影響を及ぼしますが、幸いなことに、非常に簡単に解決できます。しかし、彼女だけではありません。
saveProductInteractor.execute(product, new ObserverEntity<Void>() {
@Override
public void onNext(Void aVoid) {
// ,
//
}
@Override
public void onError(Throwable throwable) {
//
// -
}
@Override
public void onComplete() {
//
// -
}
});
空のメソッドに加えて、より厄介な問題があります。 Interactorを使用して何らかのアクションを実行しますが、ほとんどの場合、このアクションだけがアクションではありません。たとえば、データベースから製品を取得し、レビューとその写真を取得して、すべてを別の場所に保存し、最後に別の画面に移動することができます。ここでは、各アクションは前のアクションに依存しており、Interactorsを使用すると、コールバックの巨大なチェーンが発生します。これは、追跡するのが非常に面倒な場合があります。
private void checkProduct(int id, Locale locale) {
getProductByIdInteractor.execute(new TypesUtil.Pair<>(id, locale), new ObserverEntity<Product>() {
@Override
public void onNext(Product product) {
getProductInfo(product);
}
@Override
public void onError(Throwable throwable) {
// -
}
@Override
public void onComplete() {
}
});
}
private void getProductInfo(Product product) {
getReviewsByProductIdInteractor.execute(product.getId(), new ObserverEntity<List<Review>>() {
@Override
public void onNext(List<Review> reviews) {
product.setReviews(reviews);
saveProduct(productInfo);
}
@Override
public void onError(Throwable throwable) {
// -
}
@Override
public void onComplete() {
// -
}
});
getImageForProductInteractor.execute(product.getId(), new ObserverEntity<Image>() {
@Override
public void onNext(Image image) {
product.setImage(image);
saveProduct(product);
}
@Override
public void onError(Throwable throwable) {
// -
}
@Override
public void onComplete() {
}
});
}
private void saveProduct(Product product) {
saveProductInteractor.execute(product, new ObserverEntity<Void>() {
@Override
public void onNext(Void aVoid) {
}
@Override
public void onError(Throwable throwable) {
// -
}
@Override
public void onComplete() {
goToSomeScreen();
}
});
}
さて、このマカロニはどうですか?同時に、単純なビジネスロジックと単一のネストがありますが、より複雑なコードで何が起こるか想像してみてください。また、メソッドを再利用して、Interactorに異なるスケジューラーを適用することも困難になります。
解決策は驚くほど簡単です。このアプローチはObservableの動作を模倣しようとしているように感じますが、それは間違っており、奇妙な制約自体を作成しますか?前に言ったように、このコードは既存のプロジェクトから取得しました。このレガシーコードを修正するときは、Netflixの人たちが私たちに遺したアプローチを使用します。毎回ObserverEntityを実装する代わりに、InteractorがObservableを返すようにしましょう。
private Observable<Product> getProductById(int id, Locale locale) {
return getProductByIdInteractor.execute(new TypesUtil.Pair<>(id, locale));
}
private Observable<Product> getProductInfo(Product product) {
return getReviewsByProductIdInteractor.execute(product.getId())
.map(reviews -> {
product.set(reviews);
return product;
})
.flatMap(product -> {
getImageForProductInteractor.execute(product.getId())
.map(image -> {
product.set(image);
return product;
})
});
}
private Observable<Product> saveProduct(Product product) {
return saveProductInteractor.execute(product);
}
private doAll(int id, Locale locale) {
//
getProductById (id, locale)
//
.flatMap(product -> getProductInfo(product))
//
.flatMap(product -> saveProduct(product))
//
.ignoreElements()
//
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
//
.subscribe(() -> goToSomeScreen(), throwable -> handleError());
}
出来上がり!そのため、面倒で扱いにくい恐怖を取り除くだけでなく、RxJavaのパワーをPresenterにもたらしました。
中心となる概念
機能的リアクティブプログラミング(以下、FRP)を使用してRxJavaの概念を説明しようとする方法をよく見ました。実際、それはこのライブラリとは何の関係もありません。FRPは、動的に変化する継続的な意味(動作)、継続的な時間、および表記上のセマンティクスに関するものです。記事の最後に、いくつかの興味深いリンクがあります(4、5、6、7)。
RxJavaは、コアコンセプトとしてリアクティブプログラミングと機能プログラミングを使用しています。リアクティブプログラミングは、オブザーバーオブジェクトがオブザーバーオブジェクトに情報を順次転送し、オブザーバーオブジェクトがこの情報が発生したときに自動的に(非同期に)受信するようにすることとして説明できます。
機能プログラミングは、純粋な機能、つまり外部状態を使用または変更しない機能の概念を使用します。それらは、出力を取得するために入力に完全に依存しています。純粋な関数に副作用がないため、ある関数の結果を別の関数への入力パラメーターとして使用できます。これにより、無制限の機能チェーンを構成できます。
これらの2つの概念を、GoF ObserverおよびIteratorパターンとともに組み合わせると、非同期データストリームを作成し、非常に便利な機能を豊富に備えて処理することができます。また、同期、メモリの不整合、スレッドのオーバーラップなどの問題を考慮することなく、マルチスレッドを非常に簡単に、そして最も重要なことに安全に使用することができます。
RxJavaの3匹のクジラ
RxJavaが構築されている主な3つのコンポーネントは、Observable、演算子、およびスケジューラです。RxJavaで
Observableは、リアクティブパラダイムの実装を担当します。オブザーバブルは、データストリームの概念と変更の伝播の両方を実装しているため、ストリームと呼ばれることがよくあります。Observableは、Gang of Fourの2つのパターンであるObserverとIteratorを組み合わせることにより、リアクティブパラダイムを実現するタイプです。Observableは、Iterableにある2つの欠落しているセマンティクスをObserverに追加します。
- 使用可能なデータがこれ以上ないことをプロデューサーがコンシューマーに通知する機能(Iterableのforeachループは終了し、戻るだけです。この場合、ObservableはonCompleateメソッドを呼び出します)。
- エラーが発生し、Observableが要素を発行できなくなったことをプロデューサーがコンシューマーに通知する機能(反復中にエラーが発生した場合、Iterableは例外をスローします。ObservableはオブザーバーでonErrorを呼び出して終了します)。
Iterableが「プル」アプローチを使用する場合、つまり、コンシューマーがプロデューサーに値を要求し、その値が到着するまでスレッドがブロックする場合、Observableはその「プッシュ」に相当します。これは、プロデューサーが値を利用可能になったときにのみコンシューマーに値を送信することを意味します。
ObservableはRxJavaの始まりにすぎません。それはあなたが非同期的に値をフェッチすることを可能にします、しかし本当の力は「リアクティブエクステンション」(したがってReactiveX)-オペレーターに付属していますこれにより、Observableによって放出される要素のシーケンスを変換、結合、および作成できます。ここで、機能パラダイムが純粋な機能で前面に出てきます。オペレーターはこの概念を最大限に活用します。もちろん、自分で作成しない限り、副作用を恐れることなく、Observableが放出する一連の要素を安全に操作できます。オペレーターは、スレッドの安全性、低レベルのスレッド制御、同期、メモリの不整合エラー、スレッドオーバーレイなどの問題を心配することなく、マルチスレッドを許可します。豊富な機能を備えているため、さまざまなデータを簡単に操作できます。これにより、非常に強力なツールが得られます。覚えておくべき主なことは、オペレーターはObservable自体ではなく、Observableによって発行されたアイテムを変更することです。オブザーバブルは、作成されてから変更されることはありません。スレッドと演算子について考えるときは、チャートで考えるのが最善です。問題を解決する方法がわからない場合は、考えて、利用可能な演算子のリスト全体を見て、もう一度考えてください。
リアクティブプログラミングの概念自体は非同期ですが(マルチスレッドと混同しないでください)、デフォルトでは、Observableのすべての要素は、subscribe()メソッドが呼び出されたのと同じスレッドで同期的にサブスクライバーに配信されます。同じ非同期を導入するには、別の実行スレッドでonNext(T)、onError(Throwable)、onComplete()メソッドを自分で呼び出すか、スケジューラーを使用する必要があります。通常、誰もが自分の行動を分析するので、その構造を見てみましょう。
プランナー独自のAPIの背後にある並列処理のソースからユーザーを抽象化します。これらは、スレッド、イベントループ、Executorなど、基盤となる同時実行メカニズム(実装)に関係なく、特定のプロパティを提供することを保証します。スケジューラーはデーモンスレッドを使用します。これは、Observableオペレーター内で何らかの計算が行われた場合でも、プログラムは実行のメインスレッドの終了とともに終了することを意味します。
RxJavaには、特定の目的に適したいくつかの標準スケジューラがあります。それらはすべて抽象スケジューラクラスを拡張し、ワーカーを管理するための独自のロジックを実装します。たとえば、ComputationSchedulerは、作成時にワーカーのプールを形成します。ワーカーの数は、プロセッサスレッドの数と同じです。次に、ComputationSchedulerはワーカーを使用して実行可能なタスクを実行します。 scheduleDirect()メソッドとschedulePeriodicallyDirect()メソッドを使用して、Runnableをスケジューラーに渡すことができます。どちらの方法でも、スケジューラーはプールから次のワーカーを取得し、Runnableをそれに渡します。
ワーカーはスケジューラー内にあり、いくつかの同時実行スキームの1つを使用して実行可能オブジェクト(タスク)を実行するエンティティです。言い換えると、スケジューラーはRunnableを取得し、実行のためにワーカーに渡します。また、他のワーカーやスケジューラ自体とは関係なく、スケジューラから独立してワーカーを取得し、1つ以上のRunnableを彼に転送することもできます。ワーカーがタスクを受け取ると、それをキューに入れます。ワーカーは、タスクが送信された順序で順番に実行されることを保証しますが、保留中のタスクによって順序が乱される可能性があります。たとえば、ComputationSchedulerでは、ワーカーはシングルスレッドのScheduledExecutorServiceを使用して実装されます。
したがって、任意の並列処理スキームを実装できる抽象的なワーカーがあります。このアプローチには、モジュール性、柔軟性、1つのAPI、さまざまな実装など、多くの利点があります。ExecutorServiceでも同様のアプローチが見られました。さらに、Observableとは別のスケジューラーを使用できます。
結論
RxJavaは非常に強力なライブラリであり、多くのアーキテクチャでさまざまな方法で使用できます。使い方は既存のものに限らないので、必ず自分に合わせてみてください。ただし、SOLID、DRY、およびその他の設計原則について覚えておいてください。また、経験を同僚と共有することを忘れないでください。この記事から何か新しくて面白いことを学ぶことができたと思います。じゃあね!