みなさん、こんにちは。Android 開発者の Ivan と申します。今日は、RxJava2 での私の経験を共有し、チェーンの初期化がどのように行われるかを説明したいと思います。なぜ私はこれをまったく取り上げようと思ったのですか?他の開発者と話した後、このツールを使用するすべての人がその仕組みを理解しているわけではないことに気付きました。次に、RxJava2 でサブスクリプションがどのように配置され、すべての作業がどのような順序で初期化されるかを理解することにしました。これを説明する記事は 1 つも見つかりませんでした。これを踏まえて、ソース コードを調べてすべてがどのように機能するかを確認し、自分用に小さな虎の巻をスケッチしました。これがこの記事に発展しました。
この記事ではObservable
、それが何であるか、Observer
および RxJava2 で使用される他のすべてのエンティティーについては説明しません。この記事を読むことにした場合は、すでにこの情報に精通していると思います。これらの概念にまだ慣れていない場合は、読む前に理解しておくことをお勧めします。
開始する方法は次のとおりです。
最も単純なチェーンがどのように機能するかを見てみましょう。
Observable.just (1, 2, 3, 4, 5)
.map {…}
.filter {…}
.subscribe();
上に
まず、このチェーンで実行する各ステップについて簡単に説明します (ステップは上から下に向かって始まります)。
オブジェクトは just ステートメントで作成されます
ObservableFromArray
。
オブジェクトは map ステートメント
ObservableMap
で作成されます。このステートメントは、just ステートメントで以前に作成されたオブジェクトへの参照をコンストラクターに取り込みます。
filter
ObservableFilter
, map, just.
Observable
’Observable
subscribe()
(ObservableFilter
filter)Observer
, .
ObservableFilter.subscribe()
ObservableFilter.subscribeActual()
,Observer
, filter,FilterObserver
.Observer
Observer
ObservableFilter.subscribe()
.
ObservableMap.subscribe()
ObservableMap.subscribeActual()
Observer,
map,MapObserver
,FilterObserver
.
ObservableFromArray.subscribe()
ObservableFromArray.subscribeActual()
,onSubscribe()
ObservableFromArray.subscribeActual()
Observer
’.
onSubscribe()
Observer
’ .
ObservableFromArray
onNext()
Observer
’.
, just()
null, fromArray(),
Observable
.
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5) {
ObjectHelper.requireNonNull(item1, "item1 is null");
ObjectHelper.requireNonNull(item2, "item2 is null");
ObjectHelper.requireNonNull(item3, "item3 is null");
ObjectHelper.requireNonNull(item4, "item4 is null");
ObjectHelper.requireNonNull(item5, "item5 is null");
return fromArray(item1, item2, item3, item4, item5);
}
fromArray()
, .
public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
}
if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
ObservableFromArray
, .
onAssembly()
, - Observable
, , .
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
onAssembly()
Observable
- , :
RxJavaPlugins.setOnObservableAssembly(o -> {
if (o instanceof ObservableFromArray) {
return new ObservableFromArray<>(new Integer[] { 4, 5, 6 });
}
return o;
});
Observable.just(1, 2, 3)
.filter(v -> v > 3)
.test()
.assertResult(4, 5, 6);
map()
. , . null, ObservableMap
.
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
, ObservableMap
mapper, , this (source). this ObservableFromArray
. ObservableMap
AbstractObservableWithUpstream
, source.
AbstractObservableWithUpstream
, Observable
.
onAssembly()
Observable
.
filter()
. , , ObservableFilter
this ObservableMap
( ObservableFromArray
, ) .
public final Observable<T> filter(Predicate<? super T> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
}
subscribe()
, . onNext()
. subscribe()
ObservableFilter
, Observable
.
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
null, LambdaObserver
.
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
, .
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) {
......
}
}
subscribeActual()
LambdaObserver
. subscribeActual()
ObservableFilter
. .
public void subscribeActual(Observer<? super T> observer) {
source.subscribe(new FilterObserver<T>(observer, predicate));
}
FilterObserver
, LambdaObserver
, ObservableFilter
.
FilterObserver
BasicFuseableObserver
, onSubscribe()
. BasicFuseableObserver
, Observer
’. , 6 , FilterObserver
MapObserver
. BasicFuseableObserver.onSubscribe()
onSubscribe()
Observer
’, . :
public final void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
this.qd = (QueueDisposable<T>)d;
}
if (beforeDownstream()) {
downstream.onSubscribe(this);
afterDownstream();
}
}
}
, ObservableFilter
FilterObserver
, source.subscribe()
. , source ObservableMap
, . ObservableMap
subscribe()
.
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) {
......
}
}
, subscribe()
subscribeActual()
, ObservableMap
. subscribeActual()
MapObserver
FilterObserver
mapper
’.
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
public void subscribeActual(Observer<? super T> observer) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
observer.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
Observer
’ BasicFuseableObserver
, onSubscribe()
, Observer
, onSubscribe()
.
subscribeActual()
run()
, Observer
’.
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
downstream.onError(new NullPointerException("The element at index " + i + " is null"));
return;
}
downstream.onNext(value);
}
if (!isDisposed()) {
downstream.onComplete();
}
}
onNext()
Observer
’, onComplete()
onError()
, .
Observable
’ callback’ Observer
’, .
onSubscribe()
, doOnSubscribe()
.
3 :
Observable
Observer
したがって、演算子を使用する場合、各演算子は複数のオブジェクトにメモリを割り当て、「可能」という理由だけでチェーンに演算子を追加しないでください。
RxJava は強力なツールですが、その仕組みと用途を理解する必要があります。バックグラウンド スレッドでネットワーク リクエストを実行し、その結果をメイン スレッドで実行する必要があるだけなら、「大砲でスズメを撃つ」ようなものです。