RxJava的简单使用

Table of Contents

参考文章: https://www.jianshu.com/p/031745744bfa

1. 准备

安装依赖

    compile 'io.reactivex.rxjava2:rxjava:2.0.2'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.2'

2. 使用

创建一个按钮点击被观察者,且规定发射出来的 sourse 的类型为 String

    Observable.create(new ObservableOnSubscribe<String>() {})

实现订阅方法,给 mainButton2 按钮绑定点击事件,然后将 mainEdit1 输入框的文字发射出去,并且在 emitter 取消的时候将 mainButton2 的监听移除,防止内存泄露

    public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
       mainButton2.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View view) {
                emitter.onNext(mainEdit1.getText().toString());
            }
        });
        emitter.setCancellable(new Cancellable() {
            @Override
            public void cancel() throws Exception {
                mainButton2.setOnClickListener(null);
            }
        });
    }

创建观察者,且订阅按钮点击被观察者

    searchTextObservable
      .subscribe(new Consumer<String>() {
        @Override
        public void accept(String result) throws Exception {
          showResult(result);
        }
      });
    }

模拟耗时任务,睡眠 2s,返回只包含输入框文字的数组

    private static class SearchEngine {
        ArrayList<String> strings;
        private List<String> search(String txt) {
            strings = new ArrayList<>();
            strings.add(txt);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return strings;
        }
    }

运用 filter 操作符实现 Predicate 接口进行条件判定过滤,运用 debounce 操作符设定发射间隔

    buttonClickObservable
        .filter(new Predicate<String>() {
            @Override
            public boolean test(@NonNull String s) throws Exception {
                return s.length() > 1;
            }
        })
        .debounce(2, TimeUnit.SECONDS);

再创建一个输入文字改变被观察者

    private Observable<String> createTextChangeObservable() {
        Observable<String> textChangeObservable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
                final TextWatcher watcher = new TextWatcher() {
                    @Override
                    public void beforeTextChanged(CharSequence s, int start, int count, int after) {}

                    @Override
                    public void afterTextChanged(Editable s) {}

                    @Override
                    public void onTextChanged(CharSequence s, int start, int before, int count) {
                        emitter.onNext(s.toString());
                    }
                };

                mainEdit1.addTextChangedListener(watcher);

                emitter.setCancellable(new Cancellable() {
                    @Override
                    public void cancel() throws Exception {
                        mainEdit1.removeTextChangedListener(watcher);
                    }
                });
            }
        });

        return textChangeObservable
                .filter(new Predicate<String>() {
                    @Override
                    public boolean test(@NonNull String s) throws Exception {
                        return s.length() > 1;
                    }
                })
                .debounce(2, TimeUnit.SECONDS);
    }

运用 merge 操作符将以上两个被观察者联合起来

    Observable<String> searchTextObservable = createButtonClickObservable();
    Observable<String> textChangeObservable = createTextChangeObservable();
    Observable<String> observable = Observable.merge(searchTextObservable,textChangeObservable);

切换线程 先切换到主线程,将进度条显示出来 再切换到子线程,进行耗时的搜索工作 再切换到主线程,隐藏进度条且将搜索结果显示出来

    mDisposable = observable
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        showProgress();
                    }
                })
                .observeOn(Schedulers.io())
                .map(new Function<String, List<String>>() {
                    @Override
                    public List<String> apply(@NonNull String query) throws Exception {
                        return mSearchEngine.search(query);
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<List<String>>() {
                    @Override
                    public void accept(List<String> result) throws Exception {
                        hideProgress();
                        showResult(result);
                    }
                });

在 onStop 中解除订阅

    @Override
    protected void onStop() {
        super.onStop();
        if (!mDisposable.isDisposed()) {
            mDisposable.dispose();
        }
    }
  • RxJava 的 Subject Subject = Observable + Observer,Subject 可以是一个 Observable 同时也可以是一个 Observer

四种 Subject:

  1. PublishSubject PublishSubject 仅会向 Observer 释放在订阅之后 Observable 释放的数据。
  2. BehaviorSubject 当 Observer 订阅了一个 BehaviorSubject,它一开始就会释放 Observable 最近释放的一个数据对象,当还没有任何数据释放时,它则是一个默认值。接下来就会释放 Observable 释放的所有数据。如果 Observable 因异常终止,BehaviorSubject 将不会向后续的 Observer 释放数据,但是会向 Observer 传递一个异常通知。
  3. ReplaySubject 不管 Observer 何时订阅 ReplaySubject,ReplaySubject 会向所有 Observer 释放 Observable 释放过的数据。 有不同类型的 ReplaySubject,它们是用来限定 Replay 的范围,例如设定 Buffer 的具体大小,或者设定具体的时间范围。 如果使用 ReplaySubject 作为 Observer,注意不要在多个线程中调用 onNext、onComplete 和 onError 方法,因为这会导致顺序错乱,这个是违反了 Observer 规则的。
  4. AsyncSubject AsyncSubject 仅释放 Observable 释放的最后一个数据,并且仅在 Observable 完成之后。然而如果当 Observable 因为异常而终止,AsyncSubject 将不会释放任何数据,但是会向 Observer 传递一个异常通知。