Rx チェーンの初期化

みなさん、こんにちは。Android 開発者の Ivan と申します。今日は、RxJava2 での私の経験を共有し、チェーンの初期化がどのように行われるかを説明したいと思います。なぜ私はこれをまったく取り上げようと思ったのですか?他の開発者と話した後、このツールを使用するすべての人がその仕組みを理解しているわけではないことに気付きました。次に、RxJava2 でサブスクリプションがどのように配置され、すべての作業がどのような順序で初期化されるかを理解することにしました。これを説明する記事は 1 つも見つかりませんでした。これを踏まえて、ソース コードを調べてすべてがどのように機能するかを確認し、自分用に小さな虎の巻をスケッチしました。これがこの記事に発展しました。





この記事ではObservable



それが何であるかObserver



および RxJava2 で使用される他のすべてのエンティティーについては説明しませんこの記事を読むことにした場合は、すでにこの情報に精通していると思います。これらの概念にまだ慣れていない場合は、読む前に理解しておくことをお勧めします。





開始する方法は次のとおりです。





Grock * RxJava





RxJava 2 for Android の探索





最も単純なチェーンがどのように機能するかを見てみましょう。





Observable.just (1, 2, 3, 4, 5)
.map {…}
.filter {…}
.subscribe();
      
      



上に

まず、このチェーンで実行する各ステップについて簡単に説明します (ステップは上から下に向かって始まります)。





  • オブジェクトは just ステートメントで作成されますObservableFromArray







  • オブジェクトは map ステートメントObservableMap



    で作成されます。このステートメントは、just ステートメントで以前に作成されたオブジェクトへの参照をコンストラクターに取り込みます。





  • filter ObservableFilter



    , map, just.





  • Observable



    Observable



    subscribe()



    ( ObservableFilter



    filter) Observer



    , .





  • ObservableFilter.subscribe()



    ObservableFilter.subscribeActual()



    , Observer



    , filter, FilterObserver



    . Observer



    Observer



    ObservableFilter.subscribe()



    .





  • ObservableMap.subscribe()



    ObservableMap.subscribeActual()



    Observer,



    map, MapObserver



    , FilterObserver



    .





  • ObservableFromArray.subscribe()



    ObservableFromArray.subscribeActual()



    , onSubscribe()



    ObservableFromArray.subscribeActual()



    Observer



    ’.





  • onSubscribe()



    Observer



    ’ .





  • ObservableFromArray



    onNext()



    Observer



    ’.





上の図を視覚的に表現したもの。
.

, just()



null, fromArray(),



Observable



.





public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5) {
   ObjectHelper.requireNonNull(item1, "item1 is null");
   ObjectHelper.requireNonNull(item2, "item2 is null");
   ObjectHelper.requireNonNull(item3, "item3 is null");
   ObjectHelper.requireNonNull(item4, "item4 is null");
   ObjectHelper.requireNonNull(item5, "item5 is null");

   return fromArray(item1, item2, item3, item4, item5);
}
      
      



fromArray()



, .





public static <T> Observable<T> fromArray(T... items) {
   ObjectHelper.requireNonNull(items, "items is null");
   if (items.length == 0) {
       return empty();
   }
   if (items.length == 1) {
       return just(items[0]);
   }
   return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
      
      



ObservableFromArray



, .





onAssembly()



, - Observable



, , .





public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
   Function<? super Observable, ? extends Observable> f = onObservableAssembly;
   if (f != null) {
       return apply(f, source);
   }
   return source;
}
      
      



onAssembly()



Observable



- , :





RxJavaPlugins.setOnObservableAssembly(o -> {
	if (o instanceof ObservableFromArray) {
    	return new ObservableFromArray<>(new Integer[] { 4, 5, 6 });
	}
	return o;
});
 
Observable.just(1, 2, 3)
.filter(v -> v > 3)
.test()
.assertResult(4, 5, 6);
      
      



作成したばかりの ObservableFromArray
ObservableFromArray

map()



. , . null, ObservableMap



.





public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
   ObjectHelper.requireNonNull(mapper, "mapper is null");
   return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
      
      



, ObservableMap



mapper, , this (source). this ObservableFromArray



. ObservableMap



AbstractObservableWithUpstream



, source.





AbstractObservableWithUpstream



, Observable



.





onAssembly()



Observable







生成された ObservableMap でスキーマを更新
ObservableMap

filter()



. , , ObservableFilter



this ObservableMap



( ObservableFromArray



, ) .





public final Observable<T> filter(Predicate<? super T> predicate) {
   ObjectHelper.requireNonNull(predicate, "predicate is null");
   return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
}
      
      



生成された ObservableFilter でスキーマを更新
ObservableFilter

subscribe()



, . onNext()



. subscribe()



ObservableFilter



, Observable



.





public final Disposable subscribe(Consumer<? super T> onNext) {
   return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
      
      



null, LambdaObserver



.





public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
       Action onComplete, Consumer<? super Disposable> onSubscribe) {
   ObjectHelper.requireNonNull(onNext, "onNext is null");
   ObjectHelper.requireNonNull(onError, "onError is null");
   ObjectHelper.requireNonNull(onComplete, "onComplete is null");
   ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

   LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

   subscribe(ls);

   return ls;
}
      
      



, .





public final void subscribe(Observer<? super T> observer) {
   ObjectHelper.requireNonNull(observer, "observer is null");
   try {
       observer = RxJavaPlugins.onSubscribe(this, observer);

       ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

       subscribeActual(observer);
   } catch (NullPointerException e) { 
     ......
 }
}
      
      



subscribeActual()



LambdaObserver



. subscribeActual()



ObservableFilter



. .





public void subscribeActual(Observer<? super T> observer) {
   source.subscribe(new FilterObserver<T>(observer, predicate));
}
      
      



FilterObserver



, LambdaObserver



, ObservableFilter



.





FilterObserver



BasicFuseableObserver



, onSubscribe()



. BasicFuseableObserver



, Observer



’. , 6 , FilterObserver



MapObserver



. BasicFuseableObserver.onSubscribe()



onSubscribe()



Observer



’, . :





public final void onSubscribe(Disposable d) {
   if (DisposableHelper.validate(this.upstream, d)) {
       this.upstream = d;
       if (d instanceof QueueDisposable) {
           this.qd = (QueueDisposable<T>)d;
       }
       if (beforeDownstream()) {

           downstream.onSubscribe(this);

           afterDownstream();
       }
   }
}
      
      



, ObservableFilter



FilterObserver



, source.subscribe()



. , source ObservableMap



, . ObservableMap



subscribe()



.





public final void subscribe(Observer<? super T> observer) {
   ObjectHelper.requireNonNull(observer, "observer is null");
   try {
       observer = RxJavaPlugins.onSubscribe(this, observer);

       ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

       subscribeActual(observer);
   } catch (NullPointerException e) { 
     ......
 }
}
      
      



, subscribe()



subscribeActual()



, ObservableMap



. subscribeActual()



MapObserver



FilterObserver



mapper



’. 





public void subscribeActual(Observer<? super U> t) {
   source.subscribe(new MapObserver<T, U>(t, function));
}
      
      



public void subscribeActual(Observer<? super T> observer) {
   FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);

   observer.onSubscribe(d);

   if (d.fusionMode) {
       return;
   }
   d.run();
}
      
      



Observer



BasicFuseableObserver



, onSubscribe()



, Observer



, onSubscribe()



.





subscribeActual()



run()



, Observer



.





void run() {
   T[] a = array;
   int n = a.length;

   for (int i = 0; i < n && !isDisposed(); i++) {
       T value = a[i];
       if (value == null) {
           downstream.onError(new NullPointerException("The element at index " + i + " is null"));
           return;
       }
       downstream.onNext(value);
   }
   if (!isDisposed()) {
       downstream.onComplete();
   }
}
      
      



onNext()



Observer



’, onComplete()



onError()



, .





作成および購読プロセスの視覚的表現





Observable



callback’ Observer



, .





onSubscribe()



, doOnSubscribe()



.





3 :









  • Observable







  • Observer







したがって、演算子を使用する場合、各演算子は複数のオブジェクトにメモリを割り当て、「可能」という理由だけでチェーンに演算子を追加しないでください。





RxJava は強力なツールですが、その仕組みと用途を理解する必要があります。バックグラウンド スレッドでネットワーク リクエストを実行し、その結果をメイン スレッドで実行する必要があるだけなら、「大砲でスズメを撃つ」ようなものです。








All Articles