是什麼讓我們閱讀RxJava源碼如此艱難?

鴻洋2019-06-12 17:27:15

本文作者


作者:遺失的美好yxj2

鏈接:

https://juejin.im/post/5cce6fb05188254177317fdc

本文由作者授權發佈。


這是一篇需要用心去感受的文章,快讀的閱讀可能效果不好喲。


1
概述


你是不是看過了很多分析Rxjava源碼的文章,但依舊無法在心中勾勒出Rxjava原理的樣貌。


是什麼讓我們閱讀Rxjava源碼變得如此艱難?


是Rxjava的代碼封裝,以及各種細節問題的解決。


本文我把Rxjava的各種封裝、抽象統統剝去,只專注於基本的事件變換。在理解了事件變換大概是做了件什麼事情時,再去看源碼,考慮一些其它問題就會更加容易。說明:這是一篇Rxjava源碼分析的入門文章。旨在讓讀者腦中有個概念Rxjava最主要乾了件什麼事情,幾個常用操作符的主要原理。今後再去看其它源碼分析文章或源碼能夠更容易理解。因此本文先不去考慮Rxjava源碼中複雜的抽象封裝,線程間通信,onComplete、onError、dispose等方法,僅專注於“onNext”的最基本調用方式。


項目源碼

https://github.com/OliverY/RxjavaYxj


本文目錄:


  • 手寫Rxjava核心代碼,create,nullMap(核心)操作符

  • map,observeOn,subscribeOn,flatMap操作符

  • 響應式編程思想的理解


手寫Rxjava核心代碼,create,nullMap操作符


2
手寫Rxjava核心代碼,create,nullMap操作符


Create操作符


我們先來看一個最簡單調用


MainActivity.java

Observable.create(new Observable() {
    @Override
    public void subscribe(Observer observer) {
        observer.onNext("hello");
        observer.onNext("world");
        observer.onComplete();
    }
}).subscribe(new Observer() {
    @Override
    public void onNext(String s) {
        Log.e("yxj",s);
    }

    @Override
    public void onComplete() {
        Log.e("yxj","onComplete");
    }
});          


Observable.java

public abstract class ObservableT> {

    public abstract void subscribe(Observer observer);

    public static  Observable create(Observable observable){
        return observable;
    }

}       


Observer.java

public interface ObserverT> {

    void onNext(T t);
    void onComplete();
}


本篇文章我把Observable稱為“節點”,Observer稱為“處理者”,一是因為我被觀察者、被觀察者、誰訂閱誰給繞暈了,更重要的是我覺得這個名稱比較符合Rxjava的設計思想。


Observable調用create方法創建一個自己,重寫subscribe方法說:如果 我有一個處理者Observer,我就把“hello”,“world”交給它處理。


Observable調用了subscribe方法,真的找到了Observer。於是兌現承諾,完成整個調用邏輯。


這裡是“如果”有處理者,需要subscribe方法被調用時,“如果”才成立。Rxjava就是建立在一系列的“如果”(回調)操作上的。


3
“nullMap”操作符(核心)


1.創建一個observable
2.調用空map操作符做變換
3.交給observer處理

MainActivity.java

Observable.create(new Observable() {
            @Override
            public void subscribe(Observer observer) {
                observer.onNext("hello");
                observer.onNext("world");
                observer.onComplete();
            }
        })
        .nullMap()
        .subscribe(new Observer() {
            @Override
            public void onNext(String s) {
                Log.e("yxj",s);
            }

            @Override
            public void onComplete() {
                Log.e("yxj","onComplete");
            }
        });


nullMap()等價於 下面這段代碼。


即把上個節點的數據不做任何修改的傳遞給下一節點的map操作

.map(new FunctionString, String>() {
    @Override
    public String apply(String s) throws Exception {
        return s;
    }
})

"nullMap"操作符在Rxjava源碼裡並不存在,是我方便大家理解Rxjava運行機制寫出來的。


 因為nullMap操作是一個 base變換操作,map,flatMap,subscribeOn,observeOn操作符都是在nullMap上修改而來。所以Rxjava的變換的基礎就是nullMap操作符。


Observable.java
// 這就是Rxjava的變換核心

public Observable nullMap() {

        return new Observable() {
            @Override
            public void subscribe(final Observer observerC) {

                Observer observerB = new Observer() {
                    @Override
                    public void onNext(T t) {
                        observerC.onNext(t);
                    }

                    @Override
                    public void onComplete() {
                        observerC.onComplete();
                    }
                };
                Observable.this.subscribe(observerB);
            }
        };
    }


“nullMap”操作符做了件什麼事情:


  1. 上一個節點Observable A調用nullMap(),在內部new一個新的節點Observable B。

  2. 節點B重寫subscribe方法,說"如果"自己有操作者Observer C,就new一個操作者Observer B,然後讓節點A subscribe 操作者B。

  3. 節點A subscribe 操作者B,讓操作者B執行onNext方法。操作者B的onNext方法內部,調用了操作者C的onNext。從而完成了整個調用。


請注意2中的”如果“。意味著,當節點B中的subscribe方法沒有被調用的時候,2,3步驟都不會執行(他們都是回調),沒有Observer B,節點A也不會調用subscribe方法。 


接下來分兩種情況:


  1. 節點B調用了subscribe方法,則執行2,3,完成整個流程。

  2. 節點B調用nullMap,從新走一遍1,2,3步驟,相當於節點B把任務交給了下一個節點C。


概況一下就是:


Observable每調用一次操作符,其實就是創建一個新的Observable。新Observable內部通過subscribe方法“逆向的”與上一Observable關聯。在新Observable中的new出來的Observer內的onNext方法中做了和下一個Observer之間的關聯。


圖文詳細解說nullMap整體調用過程


第一階段



上圖代表節點還未最終subscribe一個Observer,圖中


步驟1節點A調用map方法,在內部創建了一個新的節點B


這一階段:主要就是節點與節點之間做連接,之間有各種“如果”(回調)的承諾。節點B這時候對節點A做了個承諾:“如果”我有處理者Observer C,那我就內部new一個 Observer B給你(節點A)用”。節點B中的操作者Observer B內部做了與Observer C的銜接工作。


第二階段:逆向subscribe



這一階段是subscribe方法被調用,傳入了最終的Observer。


圖中步驟2、3


步驟2. 節點B調用subscribe方法,找到處理者Observer C


步驟3. 節點B兌現對節點A的承諾:如果我有處理者Observer C,那我就內部new一個 Observer B給你(節點A)用”,這裡的Observable.this == Observable A。


這一階段:是把原來各個節點的“如果”一一兌現的過程,從最末一個Observable的subscribe方法開始,按節點順序逆向的兌現承諾。每個subscribe方法內部都會新建一個Observer,然後用上一個節點Observable來subcriber這個Observer。這是一個逆序的過程.


第三階段:運行業務


圖中步驟4,5

步驟 4:節點A調用subscribe,讓Observer B調用onNext方法傳入“hello”,“world”數據

步驟 5:在Observer B的onNext()方法中,通知ObserverC調用onNext方法


這一階段:是通過各個節點的Observer順序執行具體的業務操作的過程,只有這個階段是與具體業務相關的階段。


大家可以先思考一下,如果是一個普通的map(Function function),這個變換髮生在哪?


答案是:在第三階段中,Observer B的內部的onNext方法中。


整個Rxjava就是這9行核心變換代碼了。如果以上不是特別理解我非常建議你繼續看完剩下的部分,再回過頭來看一遍第一部分。


map,observeOn,subscribeOn,flatMap操作符


接下來讓我們看看這4個操作符,僅僅是在nullMap中做了小改動而已。


https://github.com/OliverY/RxjavaYxj/blob/master/app/src/main/java/com/yxj/rxjavayxj/rxjava/Observable.java


map操作符


Observable.java

public  Observable map(final Function function) {

        return new Observable() {
            @Override
            public void subscribe(final Observer observer1) {
                Observable.this.subscribe(new Observer() {
                    @Override
                    public void onNext(T t) {
                        R r = function.apply(t); // 僅僅在這裡加了變換操作
                        observer1.onNext(r);
                    }

                    @Override
                    public void onComplete() {
                        observer1.onComplete();
                    }
                });
            }
        };
    }


和“nullMap”相比,僅僅加了一行代碼function.apply() 方法的調用。


observeOn操作符


Observable.java

public Observable observeOn() 
{
        return new Observable() {
            @Override
            public void subscribe(final Observer observer) {
                Observable.this.subscribe(new Observer() {
                    @Override
                    public void onNext(final T t) {
                //模擬切換到主線程(通常上個節點是運行在子線程的情況)
                        handler.post(new Runnable() {
                            @Override
                            public void run() {
                                observer.onNext(t);
                            }
                        });
                    }

                    @Override
                    public void onComplete() {

                    }
                });
            }
        };
    }


與“nullMap”相比,修改了最內部的onNext方法執行所在的線程。Rxjava源碼會更加靈活,observerOn方法參數讓你可以指定切換到的線程,其實就是傳入了一個線程調度器,用於指定observer.onNext()方法要在哪個線程執行。


原理是一樣的。


我這裡就簡寫,直接寫了切換到主線程,這你肯定能看明白。


subscribeOn操作符


Observable.java

public Observable subscribeOn() 
{
        return new Observable() {
            @Override
            public void subscribe(final Observer observer) {

                new Thread() {
                    @Override
                    public void run() {
                    // 這裡簡寫了,沒有new Observer做中轉,github上有完整代碼
                        Observable.this.subscribe(observer);
                    }
                }.start();
            }
        };
    }


將上一個節點切換到新的線程,修改了Observable.this.subscribe()運行的線程,Observable.this指的是調用subscribeOn()的Observable,即上一個節點。


因此subscribeOn操作符修改了上一個節點的運行所在的線程。


flatMap操作符


public  Observable flatMap(final Function> function) {

        return new Observable() {
            @Override
            public void subscribe(final Observer observer) {
                Observable.this.subscribe(new Observer() {
                    @Override
                    public void onNext(T t) {
                        try {
                            Observable observable = function.apply(t);
                            observable.subscribe(observer);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onComplete() {

                    }
                });
            }
        };

    }


flatmap和map極為相似,只不過function.apply()的返回值是一個Observable。


Observable是一個節點,既可以用來封裝異步操作,也可以用來封裝同步操作(封裝同步操作 == map操作符)。所以這樣就可以很方便的寫出一個 耗時1操作 —> 耗時2操作 —> 耗時3操作...的操作


到這裡相信大家已經對Rxjava怎樣運行,幾個常見的操作符內部基本原理有了初步的理解,本文的目的就已經達到了。


在之後看Rxjava源碼或者其它分析文章時,就能少受各種變換的干擾。


接下來就可以思考Rxjava是如何對各個Observable做封裝,線程之間如何通信,onComplete、onError、dispose等方法如何實現了。


4
響應式編程的思想


響應式編程是一種面向數據流和變化傳播的編程範式。


直接看這句話其實不太容易理解。讓我們換個說法,實際編程中是什麼會干擾我們,使我們無法專注於數據流和變化傳播呢?


答案是:異步,它會讓我們的代碼形成嵌套,不夠順序化。


因為異步,我們的業務邏輯會寫成回調嵌套的形式,導致過一段時間看自己代碼看不懂,語義化不強,不是按著順序一個節點一個節點的往下執行的。


Rxjava將所有的業務操作變成一步一步,每一步不管你是同步、異步,統統用一個節點包裹起來,節點與節點之間是同步調用的關係。如此,整個代碼的節點都是按順序執行的。


限於作者個人水平有限,本文部分表述難免有不對之處,請留言指出,相互交流。


最後推薦一下我做的網站,玩Android: wanandroid.com ,包含詳盡的知識體系、好用的工具,還有本公眾號文章合集,歡迎體驗和收藏!



推薦閱讀

我在一個群分享Android  好像被我分享得沒人說話了... 3期

ViewPager2重大更新,支持offscreenPageLimit

Android  值得你深入的內容  | 5 期


掃一掃 關注我的公眾號

如果你想要跟大家分享你的文章,歡迎投稿~


┏(^0^)┛明天見!

https://weiwenku.net/d/200850189