RxJava的简单使用
Table of Contents
参考文章: https://www.jianshu.com/p/031745744bfa
1. 准备
安装依赖
compile 'io.reactivex.rxjava2:rxjava:2.0.2'
compile 'io.reactivex.rxjava2:rxandroid:2.0.2'
2. 使用
创建一个按钮点击被观察者,且规定发射出来的 sourse 的类型为 String
Observable.create(new ObservableOnSubscribe<String>() {})
实现订阅方法,给 mainButton2 按钮绑定点击事件,然后将 mainEdit1 输入框的文字发射出去,并且在 emitter 取消的时候将 mainButton2 的监听移除,防止内存泄露
public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
mainButton2.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
emitter.onNext(mainEdit1.getText().toString());
}
});
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
mainButton2.setOnClickListener(null);
}
});
}
创建观察者,且订阅按钮点击被观察者
searchTextObservable
.subscribe(new Consumer<String>() {
@Override
public void accept(String result) throws Exception {
showResult(result);
}
});
}
模拟耗时任务,睡眠 2s,返回只包含输入框文字的数组
private static class SearchEngine {
ArrayList<String> strings;
private List<String> search(String txt) {
strings = new ArrayList<>();
strings.add(txt);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return strings;
}
}
运用 filter 操作符实现 Predicate 接口进行条件判定过滤,运用 debounce 操作符设定发射间隔
buttonClickObservable
.filter(new Predicate<String>() {
@Override
public boolean test(@NonNull String s) throws Exception {
return s.length() > 1;
}
})
.debounce(2, TimeUnit.SECONDS);
再创建一个输入文字改变被观察者
private Observable<String> createTextChangeObservable() {
Observable<String> textChangeObservable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
final TextWatcher watcher = new TextWatcher() {
@Override
public void beforeTextChanged(CharSequence s, int start, int count, int after) {}
@Override
public void afterTextChanged(Editable s) {}
@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
emitter.onNext(s.toString());
}
};
mainEdit1.addTextChangedListener(watcher);
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
mainEdit1.removeTextChangedListener(watcher);
}
});
}
});
return textChangeObservable
.filter(new Predicate<String>() {
@Override
public boolean test(@NonNull String s) throws Exception {
return s.length() > 1;
}
})
.debounce(2, TimeUnit.SECONDS);
}
运用 merge 操作符将以上两个被观察者联合起来
Observable<String> searchTextObservable = createButtonClickObservable();
Observable<String> textChangeObservable = createTextChangeObservable();
Observable<String> observable = Observable.merge(searchTextObservable,textChangeObservable);
切换线程 先切换到主线程,将进度条显示出来 再切换到子线程,进行耗时的搜索工作 再切换到主线程,隐藏进度条且将搜索结果显示出来
mDisposable = observable
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
showProgress();
}
})
.observeOn(Schedulers.io())
.map(new Function<String, List<String>>() {
@Override
public List<String> apply(@NonNull String query) throws Exception {
return mSearchEngine.search(query);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> result) throws Exception {
hideProgress();
showResult(result);
}
});
在 onStop 中解除订阅
@Override
protected void onStop() {
super.onStop();
if (!mDisposable.isDisposed()) {
mDisposable.dispose();
}
}
- RxJava 的 Subject Subject = Observable + Observer,Subject 可以是一个 Observable 同时也可以是一个 Observer
四种 Subject:
- PublishSubject PublishSubject 仅会向 Observer 释放在订阅之后 Observable 释放的数据。
- BehaviorSubject 当 Observer 订阅了一个 BehaviorSubject,它一开始就会释放 Observable 最近释放的一个数据对象,当还没有任何数据释放时,它则是一个默认值。接下来就会释放 Observable 释放的所有数据。如果 Observable 因异常终止,BehaviorSubject 将不会向后续的 Observer 释放数据,但是会向 Observer 传递一个异常通知。
- ReplaySubject 不管 Observer 何时订阅 ReplaySubject,ReplaySubject 会向所有 Observer 释放 Observable 释放过的数据。 有不同类型的 ReplaySubject,它们是用来限定 Replay 的范围,例如设定 Buffer 的具体大小,或者设定具体的时间范围。 如果使用 ReplaySubject 作为 Observer,注意不要在多个线程中调用 onNext、onComplete 和 onError 方法,因为这会导致顺序错乱,这个是违反了 Observer 规则的。
- AsyncSubject AsyncSubject 仅释放 Observable 释放的最后一个数据,并且仅在 Observable 完成之后。然而如果当 Observable 因为异常而终止,AsyncSubject 将不会释放任何数据,但是会向 Observer 传递一个异常通知。