商业转载请联系作者获得授权,非商业转载请注明出处。 目录 3.8 ignoreElements & ignoreElement(忽略元素) 3.11 throttleFirst & throttleLast & throttleWithTimeout & throttleLatest 5.10 concatMapCompletableDelayError() 5.11 flattenAsFlowable & flattenAsObservable 1 前言1.1 用操作符组合Observable对于ReactiveX来说,Observable和Observer仅仅是个开始,它们本身不过是标准观察者模式的一些轻量级扩展,目的是为了更好的处理事件序列。 ReactiveX真正强大的地方在于它的操作符,操作符让你可以变换、组合、操纵和处理Observable发射的数据。 Rx的操作符让你可以用声明式的风格组合异步操作序列,它拥有回调的所有效率优势,同时又避免了典型的异步系统中嵌套回调的缺点。 下面是常用的操作符列表:
这些操作符并不全都是ReactiveX的核心组成部分,有一些是语言特定的实现或可选的模块。 1.2 操作符分类ReactiveX的每种编程语言的实现都实现了一组操作符的集合。不同的实现之间有很多重叠的部分,也有一些操作符只存在特定的实现中。每种实现都倾向于用那种编程语言中他们熟悉的上下文中相似的方法给这些操作符命名。 本文首先会给出ReactiveX的核心操作符列表和对应的文档链接,后面还有一个决策树用于帮助你根据具体的场景选择合适的操作符。最后有一个语言特定实现的按字母排序的操作符列表。 如果你想实现你自己的操作符,可以参考这里: 1.2.1 创建操作1.1.2 变换操作这些操作符可用于对Observable发射的数据进行变换,详细解释可以看每个操作符的文档
1.1.3 过滤操作1.1.4 组合操作组合操作符用于将多个Observable组合成一个单一的Observable
1.1.5 错误处理1.1.6 辅助操作1.1.7 条件和布尔操作这些操作符可用于单个或多个数据项,也可用于Observable
1.1.8 算术和聚合操作1.1.9 连接操作1.1.10 转换操作1.1.11 操作符决策树对RxJava而言,操作符的相关内容Rxjava3 or 2 其实没什么改动,大部分Rxjava2的操作符都没变,即使有所变动,也只是包名或类名的改动。上面常见的操作符也可以直接从文档中查看用法,下面总结一些常用的操作符进行。 2 创建操作符
注意:interval()、timer()、delay()的区别
2.1 create()创建Observable最原始的方式,onNext/onComplete/onError方法可完全自由控制。在Rxjava3文档级教程:入门到掌握 (一 基本用法 )中,被观察者的创建基本都用的这种方式,不再重写赘述。 2.2 from()String[] stringArray = {"a", "b", "c"};Observable.fromArray(stringArray);Observable.fromArray("a", "b", "c");Observable.fromArray(1, 2, 3, 4); fromIterable方法参数为实现Iterable接口的类,如List/Map/Set等集合类。 String[] strings = {"a", "b", "c"}; List<String> listString = Arrays.asList(strings); Observable.fromIterable(listString); 2.3 just()just重载了多个参数数量不同的方法,最大可带10个参数,just实际上同样是调用的fromArray方法; Observable.just(1, 2, 3, 4).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Log.i("lucas", interger + "");}}); 2.4 defer()defer确保了 2.5 range()//发送从10开始的整数,发送4个(发到13)Observable.range(10, 4).subscribe(integer -> Log.i("lucas", ""+integer));//发送从10开始的长整型数,发送6个(发到15)Observable.rangeLong(10, 6).subscribe(integer -> Log.i("lucas", ""+integer)); 2.6 interval()interval用于定时发送 //每3秒发个自增整数Observable.interval(3, TimeUnit.SECONDS);//初始延时1秒,每3秒发一个自增整数Observable.interval(1, 3, TimeUnit.SECONDS);//初始延时2秒,后每1秒发一个从10开始的整数,发5个(发到14)停止Observable.intervalRange(10, 5, 2, 1, TimeUnit.SECONDS); 2.7 repeat()repeat操作符可以重复发送指定次数的某个事件流,repeat操作符默认在trampoline调度器上执行,repeat默认重复次数为Long.MAX_VALUE,可使用重载方法指定次数以及使用repeatUntil指定条件。 //一直重复Observable.fromArray(1, 2, 3, 4).repeat();//重复发送5次Observable.fromArray(1, 2, 3, 4).repeat(5);//重复发送直到符合条件时停止重复Observable.fromArray(1, 2, 3, 4).repeatUntil(new BooleanSupplier() {@Overridepublic boolean getAsBoolean() throws Exception {//自定判断条件,为true即可停止,默认为falsereturn false;}}); 2.8 timer()timer用于延时发送。 //延时3秒后,发送一个整数0Observable.timer(3, TimeUnit.SECONDS); 3 过滤操作符过滤操作符主要是指对数据源进行选择和过滤的常用操作符。
3.1 skip / skipLast可以作用于Flowable,Observable,表示源发射数据前,跳过多少个。skipLast(n)操作表示从流的尾部跳过n个元素。 Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);source.skipLast(4).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Throwable {Log.w("TAG","onNext--->"+ integer);}});// 结果:1 2 3 4 5 6//Lambda写法Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);source.skip(4).subscribe(i->{Log.w("TAG","onNext--->"+ i);});// 结果:5 6 7 8 9 10 3.2 debounce(去抖动)可作用于Flowable,Observable。在Android开发,通常为了防止用户重复点击而设置标记位,而通过RxJava的debounce操作符可以有效达到该效果。在规定时间内,用户重复点击只有最后一次有效, Observable<String> source = Observable.create(emitter -> {emitter.onNext("A");Thread.sleep(1_500);emitter.onNext("B");Thread.sleep(500);emitter.onNext("C");Thread.sleep(250);emitter.onNext("D");Thread.sleep(2000);emitter.onNext("E");emitter.onComplete();});source.subscribeOn(Schedulers.io()).debounce(1, TimeUnit.SECONDS).blockingSubscribe(item -> Log.w("TAG","onNext--->"+ item),Throwable::printStackTrace,() -> Log.w("TAG","onNext--->"+ "onComplete" ));//结果2020-04-03 16:08:47.520 30559-30559/com.ysalliance.getfan.myapplication W/TAG: onNext--->A2020-04-03 16:08:49.777 30559-30559/com.ysalliance.getfan.myapplication W/TAG: onNext--->D2020-04-03 16:08:50.776 30559-30559/com.ysalliance.getfan.myapplication W/TAG: onNext--->E2020-04-03 16:08:50.776 30559-30559/com.ysalliance.getfan.myapplication W/TAG: onNext--->onComplete 上文代码中,数据源以一定的时间间隔发送A,B,C,D,E。操作符debounce的时间设为1秒,发送A后1.5秒并没有发射其他数据,所以A能成功发射。发射B后,在1秒之内,又发射了C和D,在D之后的2秒才发射E,所有B、C都失效,只有D有效;而E之后已经没有其他数据流了,所有E有效。 3.3 distinct(去重)可作用于Flowable,Observable,去掉数据源重复的数据。 distinctUntilChanged()去掉相邻重复数据。 Observable.just(2, 3, 4, 4, 2, 1).distinct().subscribe(System.out::print);// 打印:2 3 4 1Observable.just(1, 1, 2, 1, 2, 3, 3, 4).distinctUntilChanged().subscribe(System.out::print);//打印:1 2 1 2 3 4 3.4 elementAt(获取指定位置元素)可作用于Flowable,Observable,从数据源获取指定位置的元素,从0开始。 elementAtOrError:指定元素的位置超过数据长度,则发射异常。 Observable.just(2,4,3,1,5,8).elementAt(0).subscribe(integer ->Log.d("TAG","elmentAt->"+integer));打印:2Observable<String> source = Observable.just("Kirk", "Spock", "Chekov", "Sulu");Single<String> element = source.elementAtOrError(4);element.subscribe(name -> System.out.println("onSuccess will not be printed!"),error -> System.out.println("onError: " + error));打印:onSuccess will not be printed! 3.5 filter(过滤)可作用于 Flowable,Observable,Maybe,Single。在filter中返回表示发射该元素,返回false表示过滤该数据。 Observable.just(1, 2, 3, 4, 5, 6).filter(x -> x % 2 == 0).subscribe(System.out::print);打印:2 4 6 3.6 first(第一个)作用于 Flowable,Observable。发射数据源第一个数据,如果没有则发送默认值。 Observable<String> source = Observable.just("A", "B", "C");Single<String> firstOrDefault = source.first("D");firstOrDefault.subscribe(System.out::println);打印:AObservable<String> emptySource = Observable.empty();Single<String> firstOrError = emptySource.firstOrError();firstOrError.subscribe(element -> System.out.println("onSuccess will not be printed!"),error -> System.out.println("onError: " + error));打印:onError: java.util.NoSuchElementException 和firstElement的区别是first返回的是Single,而firstElement返回Maybe。firstOrError在没有数据会返回异常。 3.7 last(最后一个)last、lastElement、lastOrError与fist、firstElement、firstOrError相对应。 Observable<String> source = Observable.just("A", "B", "C");Single<String> lastOrDefault = source.last("D");lastOrDefault.subscribe(System.out::println);//打印:CObservable<String> source = Observable.just("A", "B", "C");Maybe<String> last = source.lastElement();last.subscribe(System.out::println);//打印:CObservable<String> emptySource = Observable.empty();Single<String> lastOrError = emptySource.lastOrError();lastOrError.subscribe(element -> System.out.println("onSuccess will not be printed!"),error -> System.out.println("onError: " + error));// 打印:onError: java.util.NoSuchElementException 3.8 ignoreElements & ignoreElement(忽略元素)ignoreElements 作用于Flowable、Observable。ignoreElement作用于Maybe、Single。两者都是忽略掉数据,不发射任何数据,返回完成或者错误时间。 这里关注下intervalRange的用法,以下面这个例子说明:从1开始输出5个数据,延迟1秒执行,每隔1秒执行一次: Single<Long> source = Single.timer(1, TimeUnit.SECONDS);Completable completable = source.ignoreElement();completable.doOnComplete(() -> System.out.println("Done!")).blockingAwait();// 1秒后打印:Donde!Observable<Long> source = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS);Completable completable = source.ignoreElements();completable.doOnComplete(() -> System.out.println("Done!")).blockingAwait();// 六秒后打印:Done! 3.9 ofType(过滤类型)作用于Flowable、Observable、Maybe,过滤选择类型。 Observable<Number> numbers = Observable.just(1, 4.0, 3, 2.71, 2f, 7);Observable<Integer> integers = numbers.ofType(Integer.class);integers.subscribe((Integer x) -> System.out.print(x+" "));//打印:1 3 7 3.10 sample作用于Flowable、Observable,在一个周期内发射最新的数据。sample操作符会在指定的事件内从数据项中采集所需要的数据。 Observable<String> source = Observable.create(emitter -> {emitter.onNext("A");Thread.sleep(500);emitter.onNext("B");Thread.sleep(200);emitter.onNext("C");Thread.sleep(800);emitter.onNext("D");Thread.sleep(600);emitter.onNext("E");emitter.onComplete();});source.subscribeOn(Schedulers.io()).sample(1, TimeUnit.SECONDS).blockingSubscribe(item -> System.out.print(item+" "),Throwable::printStackTrace,() -> System.out.print("onComplete"));// 打印: C D onComplete 与debounce的区别是,sample是以时间为周期的发射,一秒又一秒内的最新数据。而debounce是最后一个有效数据开始。 3.11 throttleFirst & throttleLast & throttleWithTimeout & throttleLatest作用于Flowable、Observable。throttleFirst是指定周期内第一个数据,throttleLast与smaple一致。throttleWithTimeout与debounce一致。 Observable<String> source = Observable.create(emitter -> {emitter.onNext("A");Thread.sleep(500);emitter.onNext("B");Thread.sleep(200);emitter.onNext("C");Thread.sleep(800);emitter.onNext("D");Thread.sleep(600);emitter.onNext("E");emitter.onComplete();});source.subscribeOn(Schedulers.io()).throttleFirst(1, TimeUnit.SECONDS).blockingSubscribe(item -> System.out.print(item+" "),Throwable::printStackTrace,() -> System.out.print(" onComplete"));//打印:A D onCompletesource.subscribeOn(Schedulers.io()).throttleLast(1, TimeUnit.SECONDS).blockingSubscribe(item -> System.out.print(item+" "),Throwable::printStackTrace,() -> System.out.print(" onComplete"));// 打印:C D onComplete throttleLatest:如果源的第一个数据总会被发射,然后开始周期计时,此时的效果就会跟throttleLast一致。 Observable<String> source = Observable.create(emitter -> {emitter.onNext("A");Thread.sleep(500);emitter.onNext("B");Thread.sleep(200);emitter.onNext("C");Thread.sleep(200);emitter.onNext("D");Thread.sleep(400);emitter.onNext("E");Thread.sleep(400);emitter.onNext("F");Thread.sleep(400);emitter.onNext("G");Thread.sleep(2000);emitter.onComplete();});source.subscribeOn(Schedulers.io()).throttleLatest(1, TimeUnit.SECONDS).blockingSubscribe(item -> Log.e("RxJava",item),Throwable::printStackTrace,() -> Log.e("RxJava","finished"));//打印 A D F G RxJava","finished 3.12 take & takeLast作用于Flowable、Observable。take发射前n个元素。takeLast发射后n个元素。 Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);source.take(4).subscribe(System.out::print);//打印:1 2 3 4source.takeLast(4).subscribe(System.out::println);//打印:7 8 9 10 3.13 timeout(超时)作用于Flowable、Observable、Maybe、Single、Completabl。后一个数据发射未在前一个元素发射后规定时间内发射则返回超时异常。 Observable<String> source = Observable.create(emitter -> {emitter.onNext("A");Thread.sleep(800);emitter.onNext("B");Thread.sleep(400);emitter.onNext("C");Thread.sleep(1200);emitter.onNext("D");emitter.onComplete();});source.timeout(1, TimeUnit.SECONDS).subscribe(item -> System.out.println("onNext: " + item),error -> System.out.println("onError: " + error),() -> System.out.println("onComplete will not be printed!"));// 打印:// onNext: A// onNext: B// onNext: C// onError: java.util.concurrent.TimeoutException:The source did not signal an event for 1 secondsand has been terminated. 3.14 merge/concatmerge操作符可以合并两个事件流,如果在merge操作符上增加延时发送的操作,那么就会导致其发射的数据项是无序的,会跟着发射的时间点进行合并。虽然是将两个事件流合并成一个事件流进行发射,但在最终的一个事件流中,发射出来的却是两次数据流。 merge和concat的区别:merge():合并后发射的数据项是无序的,concat():合并后发射的数据项是有序的。 Observable<String> just1 = Observable.just("A", "B", "C");Observable<String> just2 = Observable.just("1", "2", "3");Observable.merge(just1, just2).subscribe(new Consumer<Serializable>() {@Overridepublic void accept(Serializable serializable) throws Exception {Log.i("lucas", "" + serializable.toString() );}});//打印结果2020-04-04 17:52:20.734 4372-4372/com.ysalliance.getfan.myapplication I/lucas: A2020-04-04 17:52:20.734 4372-4372/com.ysalliance.getfan.myapplication I/lucas: B2020-04-04 17:52:20.734 4372-4372/com.ysalliance.getfan.myapplication I/lucas: C2020-04-04 17:52:20.734 4372-4372/com.ysalliance.getfan.myapplication I/lucas: 12020-04-04 17:52:20.734 4372-4372/com.ysalliance.getfan.myapplication I/lucas: 22020-04-04 17:52:20.734 4372-4372/com.ysalliance.getfan.myapplication I/lucas: 3 3.15 zip()zip操作符是将两个数据流进行指定的函数规则合并。 Observable<String> just1 = Observable.just("A", "B", "C");Observable<String> just2 = Observable.just("1", "2", "3");Observable.zip(just1, just2, new BiFunction<String, String, String>() {@Overridepublic String apply(String s, String s2) throws Exception {return s + s2;}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {Log.i("lucas", "" + s );}});//打印结果2020-04-04 17:55:08.905 4744-4744/com.ysalliance.getfan.myapplication I/lucas: A12020-04-04 17:55:08.905 4744-4744/com.ysalliance.getfan.myapplication I/lucas: B22020-04-04 17:55:08.905 4744-4744/com.ysalliance.getfan.myapplication I/lucas: C3 3.16 startWith()startWith操作符是将另一个数据流合并到原数据流的开头。 Observable<String> just1 = Observable.just("A", "B", "C");Observable<String> just2 = Observable.just("1", "2", "3");just1.startWith(just2).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {Log.i("lucas", "" + s );}});//打印结果2020-04-04 17:57:22.155 4917-4917/com.ysalliance.getfan.myapplication I/lucas: 12020-04-04 17:57:22.155 4917-4917/com.ysalliance.getfan.myapplication I/lucas: 22020-04-04 17:57:22.156 4917-4917/com.ysalliance.getfan.myapplication I/lucas: 32020-04-04 17:57:22.156 4917-4917/com.ysalliance.getfan.myapplication I/lucas: A2020-04-04 17:57:22.156 4917-4917/com.ysalliance.getfan.myapplication I/lucas: B2020-04-04 17:57:22.156 4917-4917/com.ysalliance.getfan.myapplication I/lucas: C 3.17 join()join操作符是有时间期限的合并操作符。 Observable<String> just1 = Observable.just("A", "B", "C");Observable<Long> just2 = Observable.interval(1, TimeUnit.SECONDS);just1.join(just2, new Function<String, ObservableSource<Long>>() {@Overridepublic ObservableSource<Long> apply(String s) throws Exception {return Observable.timer(3, TimeUnit.SECONDS);}}, new Function<Long, ObservableSource<Long>>() {@Overridepublic ObservableSource<Long> apply(Long l) throws Exception {return Observable.timer(8, TimeUnit.SECONDS);}}, new BiFunction<String, Long, String>() {@Overridepublic String apply(String s, Long l) throws Exception {return s + l;}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {Log.i("lucas", "" + s );}});//打印结果2020-04-04 18:04:43.751 6042-6109/com.ysalliance.getfan.myapplication I/lucas: A02020-04-04 18:04:43.752 6042-6109/com.ysalliance.getfan.myapplication I/lucas: B02020-04-04 18:04:43.752 6042-6109/com.ysalliance.getfan.myapplication I/lucas: C02020-04-04 18:04:44.750 6042-6109/com.ysalliance.getfan.myapplication I/lucas: A12020-04-04 18:04:44.751 6042-6109/com.ysalliance.getfan.myapplication I/lucas: B12020-04-04 18:04:44.751 6042-6109/com.ysalliance.getfan.myapplication I/lucas: C1 join操作符有三个函数需要设置 第一个函数:规定just2的过期期限 4 连接/组合操作符通过连接操作符,可以将多个被观察数据(数据源)连接在一起。 4.1 startWith()可作用于Flowable、Observable。将指定数据源合并在另外数据源的开头。 Observable<String> name = Observable.just("My", "name");Observable<String> name2 = Observable.just("is", "Lucas","!");name2.startWith(name).subscribe(item -> Log.d("Lucas",item));//打印:2020-04-03 16:35:49.493 31609-31609/com.ysalliance.getfan.myapplication D/Lucas: My2020-04-03 16:35:49.493 31609-31609/com.ysalliance.getfan.myapplication D/Lucas: name2020-04-03 16:35:49.493 31609-31609/com.ysalliance.getfan.myapplication D/Lucas: is2020-04-03 16:35:49.493 31609-31609/com.ysalliance.getfan.myapplication D/Lucas: Lucas2020-04-03 16:35:49.493 31609-31609/com.ysalliance.getfan.myapplication D/Lucas: ! 4.2 merge / mergeWith可作用所有数据源类型,用于合并多个数据源到一个数据源。 Observable<String> name = Observable.just("My", "name");Observable<String> name2 = Observable.just("is", "Lucas","!");Observable.merge(name,name2).subscribe(v -> Log.d("lucas", v));name.mergeWith(name2).subscribe(v -> Log.d("lucas",v));//打印:2020-04-03 16:39:41.934 32212-32212/com.ysalliance.getfan.myapplication D/lucas: My2020-04-03 16:39:41.934 32212-32212/com.ysalliance.getfan.myapplication D/lucas: name2020-04-03 16:39:41.934 32212-32212/com.ysalliance.getfan.myapplication D/lucas: is2020-04-03 16:39:41.935 32212-32212/com.ysalliance.getfan.myapplication D/lucas: Lucas2020-04-03 16:39:41.935 32212-32212/com.ysalliance.getfan.myapplication D/lucas: !2020-04-03 16:39:41.937 32212-32212/com.ysalliance.getfan.myapplication D/lucas: My2020-04-03 16:39:41.937 32212-32212/com.ysalliance.getfan.myapplication D/lucas: name2020-04-03 16:39:41.937 32212-32212/com.ysalliance.getfan.myapplication D/lucas: is2020-04-03 16:39:41.937 32212-32212/com.ysalliance.getfan.myapplication D/lucas: Lucas2020-04-03 16:39:41.937 32212-32212/com.ysalliance.getfan.myapplication D/lucas: ! merge在合并数据源时,如果一个合并发生异常后会立即调用观察者的onError方法,并停止合并。可通过mergeDelayError操作符,将发生的异常留到最后处理。 Observable<String> name = Observable.just("My", "name");Observable<String> name2 = Observable.just("is", "Lucas","!");Observable<String> error = Observable.error(new NullPointerException("Error!"));Observable.mergeDelayError(name,error,name2).subscribe(v -> Log.d("lucas",v), e->Log.d("lucas",e.getMessage()));//打印:2020-04-03 16:42:07.030 32391-32391/com.ysalliance.getfan.myapplication D/lucas: My2020-04-03 16:42:07.030 32391-32391/com.ysalliance.getfan.myapplication D/lucas: name2020-04-03 16:42:07.033 32391-32391/com.ysalliance.getfan.myapplication D/lucas: is2020-04-03 16:42:07.034 32391-32391/com.ysalliance.getfan.myapplication D/lucas: Lucas2020-04-03 16:42:07.034 32391-32391/com.ysalliance.getfan.myapplication D/lucas: !2020-04-03 16:42:07.034 32391-32391/com.ysalliance.getfan.myapplication D/lucas: Error! 4.3 zip()可作用于Flowable、Observable、Maybe、Single。将多个数据源的数据一个一个的合并在一起哇。当其中一个数据源发射完事件之后,若其他数据源还有数据未发射完毕,也会停止。 Observable<String> name = Observable.just("My", "name");Observable<String> name2 = Observable.just("is", "Lucas", "!", "haha!");name.zipWith(name2, (first, last) -> first + "-" + last).subscribe(item -> Log.d("lucas", item));//打印:2020-04-03 16:44:59.127 32616-32616/com.ysalliance.getfan.myapplication D/lucas: My-is2020-04-03 16:44:59.128 32616-32616/com.ysalliance.getfan.myapplication D/lucas: name-Lucas 4.4 combineLatest()public static String[] str = {"A", "B", "C", "D", "E"};public void combineLatest() {Observable<String> just1 = Observable.interval(1, TimeUnit.SECONDS).map(new Function<Long, String>() {@Overridepublic String apply(Long aLong) throws Exception {return str[(int) (aLong % 5)];}});Observable<Long> just2 = Observable.interval(1, TimeUnit.SECONDS);Observable.combineLatest(just1, just2, new BiFunction<String, Long, String>() {@Overridepublic String apply(String s, Long l) throws Exception {return s + l;}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println("onNext=" + s);}});}//输出onNext=A0onNext=B0onNext=B1onNext=C1onNext=C2onNext=D2onNext=D3onNext=E3onNext=E4onNext=A4onNext=A5 可作用于Flowable, Observable。在结合不同数据源时,发射速度快的数据源最新item与较慢的相结合。 如下时间线,Observable-1发射速率快,发射了65,Observable-2才发射了C, 那么两者结合就是C5。 4.5 switchOnNext()一个发射多个小数据源的数据源,这些小数据源发射数据的时间发生重复时,取最新的数据源。 5 变换/转换操作符变换操作符用于变化数据源的数据,并转化为新的数据源。
5.1 map()map利用Function进行类型转换的例子: Observable.just("1", "2", "3").map(new Function<String, Integer>() {@Overridepublic Integer apply(String s) throws Exception {return Integer.valueOf(s) * 100;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Log.i("lucas", ""+integer);}}); 在实际编码中,我们还是常用Lambda表达式进行代码的精简优化,但是掌握Lambda的前提是,你还得会写非Lambda的表达,否则长时间下来可能会忘记原来的类写法: Observable.just("1", "2", "3").map( (String s) -> Integer.valueOf(s) * 100).subscribe(integer -> Log.i("lucas", ""+integer)); 可以看到,使用Lambda表达式后只有三行,简洁! 5.2 flatMap() / concatMap()flatMap操作符将数据流进行类型转换,然后将新的数据流传递给新的事件流进行分发,这里通过模拟请求登录的延时操作进行说明 public static class UserParams {public UserParams(String username, String password) {this.username = username;this.password = password;}public String username;public String password;}Observable.just(new UserParams("lucas", "123123")).flatMap(new Function<UserParams, ObservableSource<String>>() {@Overridepublic ObservableSource<String> apply(UserParams userParams) throws Exception {return Observable.just(userParams.username + "登录成功").delay(2, TimeUnit.SECONDS);}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {Log.i("lucas", "" + s );}}); Lambda表达式优化: Observable.just(new UserParams("lucas", "123123")).flatMap((UserParams userParams)->Observable.just(userParams.username + "登录成功").delay(2, TimeUnit.SECONDS)).subscribe(s -> Log.i("lucas", "" + s )); concatMap与flatMap的区别: concatMap是有序的,flatMap是无序的。 再举个组合的例子: Observable.just("A", "B", "C").flatMap(a -> {return Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS).map(b -> '(' + a + ", " + b + ')');}).blockingSubscribe( v -> {Log.d("lucas", v+" " );});//打印结果2020-04-03 22:05:58.363 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (A, 1)2020-04-03 22:05:58.365 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (B, 1)2020-04-03 22:05:58.367 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (C, 1)2020-04-03 22:05:59.355 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (A, 2)2020-04-03 22:05:59.362 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (B, 2)2020-04-03 22:05:59.362 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (C, 2)2020-04-03 22:06:00.357 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (A, 3)2020-04-03 22:06:00.361 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (B, 3)2020-04-03 22:06:00.363 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (C, 3) 5.3 groupBy()groupBy操作符可以将发射出来的数据项进行分组,并将分组后的数据项保存在具有key-value映射的事件流中。groupBy具体的分组规则由groupBy操作符传递进来的函数参数Function所决定的,它可以将key和value按照Function的返回值进行分组,返回一个具有分组规则的事件流GroupedObservable,注意这里分组出来的事件流是按照原始事件流的顺序输出的,我们可以通过sorted()对数据项进行排序,然后输出有序的数据流。 Observable.just("java", "c", "c++", "python", "javaScript", "android").groupBy(new Function<String, Character>() {@Overridepublic Character apply(String s) throws Exception {return s.charAt(0);//按首字母分组}}).subscribe(new Consumer<GroupedObservable<Character, String>>() {@Overridepublic void accept(final GroupedObservable<Character, String> characterStringGroupedObservable) throws Exception {//排序后,直接订阅输出key和valuecharacterStringGroupedObservable.sorted().subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {Log.i("lucas", "onNext= key:" + characterStringGroupedObservable.getKey() + " value:" + s);}});}});//打印结果2020-04-04 16:36:13.766 32460-32460/com.ysalliance.getfan.myapplication I/lucas: onNext= key:p value:python2020-04-04 16:36:13.766 32460-32460/com.ysalliance.getfan.myapplication I/lucas: onNext= key:a value:android2020-04-04 16:36:13.766 32460-32460/com.ysalliance.getfan.myapplication I/lucas: onNext= key:c value:c2020-04-04 16:36:13.766 32460-32460/com.ysalliance.getfan.myapplication I/lucas: onNext= key:c value:c++2020-04-04 16:36:13.766 32460-32460/com.ysalliance.getfan.myapplication I/lucas: onNext= key:j value:java2020-04-04 16:36:13.766 32460-32460/com.ysalliance.getfan.myapplication I/lucas: onNext= key:j value:javaScript Observable<String> animals = Observable.just("Tiger", "Elephant", "Cat", "Chameleon", "Frog", "Fish", "Turtle", "Flamingo");animals.groupBy(animal -> animal.charAt(0), String::toUpperCase).concatMapSingle(Observable::toList).subscribe(System.out::println);// prints:// [TIGER, TURTLE]// [ELEPHANT]// [CAT, CHAMELEON]// [FROG, FISH, FLAMINGO] 5.4 scan()scan操作符会对发射的数据和上一轮发射的数据进行函数处理,并返回的数据供下一轮使用,持续这个过程来产生剩余的数据流。其应用场景有简单的累加计算,判断所有数据的最小值等。 Observable.just(2, 4, 1, 9).scan(new BiFunction<Integer, Integer, Integer>() {@Overridepublic Integer apply(Integer integer, Integer integer2) throws Exception {return integer < integer2 ? integer : integer2;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer item) throws Exception {Log.i("lucas", "" + item );}});//打印结果2020-04-04 16:14:10.931 30301-30301/com.ysalliance.getfan.myapplication I/lucas: 22020-04-04 16:14:10.931 30301-30301/com.ysalliance.getfan.myapplication I/lucas: 22020-04-04 16:14:10.931 30301-30301/com.ysalliance.getfan.myapplication I/lucas: 12020-04-04 16:14:10.931 30301-30301/com.ysalliance.getfan.myapplication I/lucas: 1 Lambda表达式写法: Observable.just(2, 4, 1, 9).scan((Integer integer, Integer integer2) -> integer < integer2 ? integer : integer2).subscribe(item -> Log.i("lucas", "" + item )); 带初始值的聚合叠加: Observable.just(1, 2, 3).scan(10, (x, y) -> x + y).subscribe(item -> Log.i("lucas", "" + item ));// prints:2020-04-04 16:23:06.590 31451-31451/com.ysalliance.getfan.myapplication I/lucas: 102020-04-04 16:23:06.590 31451-31451/com.ysalliance.getfan.myapplication I/lucas: 112020-04-04 16:23:06.590 31451-31451/com.ysalliance.getfan.myapplication I/lucas: 132020-04-04 16:23:06.590 31451-31451/com.ysalliance.getfan.myapplication I/lucas: 16 5.5 buffer()buffer操作符可以将发射出来的数据流,在给定的缓存池中进行缓存,当缓存池中的数据项溢满时,则将缓存池的数据项进行输出,重复上述过程,直到将发射出来的数据全部发射出去。如果发射出来的数据不够缓存池的大小,则按照当前发射出来的数量进行输出。如果对buffer操作符设置了skip参数,则buffer每次缓存池溢满时,会跳过指定的skip数据项,然后再进行缓存和输出。 Observable.just(1, 2, 3, 4, 5, 6, 7, 8).buffer(3).subscribe(new Consumer<List<Integer>>() {@Overridepublic void accept(List<Integer> integers) throws Exception {Log.i("lucas", "" + integers.toString() );}});//打印结果2020-04-04 16:40:41.744 32722-32722/com.ysalliance.getfan.myapplication I/lucas: [1, 2, 3]2020-04-04 16:40:41.745 32722-32722/com.ysalliance.getfan.myapplication I/lucas: [4, 5, 6]2020-04-04 16:40:41.745 32722-32722/com.ysalliance.getfan.myapplication I/lucas: [7, 8] 5.6 window()window操作符和buffer操作符在功能上实现的效果是一样的,但window操作符最大区别在于同样是缓存一定数量的数据项,window操作符最终发射出来的是新的事件流integerObservable,而buffer操作符发射出来的是新的数据流,也就是说,window操作符发射出来新的事件流中的数据项,还可以经过Rxjava其他操作符进行处理。 Observable.just(1, 2, 3, 4).window(2, 1).subscribe(new Consumer<Observable<Integer>>() {@Overridepublic void accept(Observable<Integer> integerObservable) throws Exception {integerObservable.subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Log.i("lucas", "" + integer );}});}});//打印结果2020-04-04 16:49:54.681 1475-1475/com.ysalliance.getfan.myapplication I/lucas: 12020-04-04 16:49:54.681 1475-1475/com.ysalliance.getfan.myapplication I/lucas: 22020-04-04 16:49:54.682 1475-1475/com.ysalliance.getfan.myapplication I/lucas: 22020-04-04 16:49:54.682 1475-1475/com.ysalliance.getfan.myapplication I/lucas: 32020-04-04 16:49:54.682 1475-1475/com.ysalliance.getfan.myapplication I/lucas: 32020-04-04 16:49:54.682 1475-1475/com.ysalliance.getfan.myapplication I/lucas: 42020-04-04 16:49:54.682 1475-1475/com.ysalliance.getfan.myapplication I/lucas: 4 Observable.range(1, 4)// Create windows containing at most 2 items, and skip 3 items before starting a new window..window(2).flatMapSingle(window -> {return window.map(String::valueOf).reduce(new StringJoiner(", ", "[", "]"), StringJoiner::add);}).subscribe(System.out::println);// prints:// [1, 2]// [3, 4] 5.7 cast()作用于Flowable、Observable、Maybe、Single。将数据元素转型成其他类型,转型失败会抛出异常。 Observable<Number> numbers = Observable.just(1, 4.0, 3f, 7, 12, 4.6, 5);numbers.filter((Number x) -> Integer.class.isInstance(x)).cast(Integer.class).subscribe((Integer x) -> System.out.println(x));// 打印:// 1// 7// 12// 5 5.8 concatMapDelayError与concatMap作用相同,只是将过程发送的所有错误延迟到最后处理。 Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS).concatMapDelayError(x -> {if (x.equals(1L)) return Observable.error(new IOException("Something went wrong!"));else return Observable.just(x, x * x);}).blockingSubscribe(x -> System.out.println("onNext: " + x),error -> System.out.println("onError: " + error.getMessage()));// prints:// onNext: 2// onNext: 4// onNext: 3// onNext: 9// onError: Something went wrong! 5.9 concatMapCompletable()作用于Flowable、Observable。与contactMap类似,不过应用于函数后,返回的是CompletableSource。订阅一次并在所有CompletableSource对象完成时返回一个Completable对象。 Observable<Integer> source = Observable.just(2, 1, 3);Completable completable = source.concatMapCompletable(x -> {return Completable.timer(x, TimeUnit.SECONDS).doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed"));});completable.doOnComplete(() -> System.out.println("Info: Processing of all items completed")).blockingAwait();// prints:// Info: Processing of item "2" completed// Info: Processing of item "1" completed// Info: Processing of item "3" completed// Info: Processing of all items completed 5.10 concatMapCompletableDelayError()与concatMapCompletable作用相同,只是将过程发送的所有错误延迟到最后处理。 Observable<Integer> source = Observable.just(2, 1, 3);Completable completable = source.concatMapCompletableDelayError(x -> {if (x.equals(2)) {return Completable.error(new IOException("Processing of item \"" + x + "\" failed!"));} else {return Completable.timer(1, TimeUnit.SECONDS).doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed"));}});completable.doOnError(error -> System.out.println("Error: " + error.getMessage())).onErrorComplete().blockingAwait();// prints:// Info: Processing of item "1" completed// Info: Processing of item "3" completed// Error: Processing of item "2" failed! 5.11 flattenAsFlowable & flattenAsObservable作用于Maybe、Single,将其转化为Flowable,或Observable。 Single<Double> source = Single.just(2.0);Flowable<Double> flowable = source.flattenAsFlowable(x -> {return List.of(x, Math.pow(x, 2), Math.pow(x, 3));});flowable.subscribe(x -> System.out.println("onNext: " + x));// prints:// onNext: 2.0// onNext: 4.0// onNext: 8.0 6 处理操作符
6.1 onErrorReturn()作用于Flowable、Observable、Maybe、Single。但调用数据源的onError函数后会回到该函数,可对错误进行处理,然后返回值,会调用观察者onNext()继续执行,执行完调用onComplete()函数结束所有事件的发射。 Single.just("2A").map(v -> Integer.parseInt(v, 10)).onErrorReturn(error -> {if (error instanceof NumberFormatException) return 0;else throw new IllegalArgumentException();}).subscribe(v->Log.d("lucas", v+" " ),error -> System.err.println("onError should not be printed!"));//2020-04-03 22:13:56.573 13238-13238/com.ysalliance.getfan.myapplication D/lucas: 0 6.2 onErrorReturnItem()与onErrorReturn类似,onErrorReturnItem不对错误进行处理,直接返回一个值。 Single.just("2A").map(v -> Integer.parseInt(v, 10)).onErrorReturnItem(0).subscribe(v->Log.d("lucas", v+" " ),error -> System.err.println("onError should not be printed!"));//2020-04-03 22:12:53.757 13100-13100/com.ysalliance.getfan.myapplication D/lucas: 0 6.3 onExceptionResumeNext()可作用于Flowable、Observable、Maybe。 onErrorReturn发生异常时,回调onComplete()函数后不再往下执行,而onExceptionResumeNext则是要在处理异常的时候返回一个数据源,然后继续执行,如果返回null,则调用观察者的onError()函数。 Observable.create((ObservableOnSubscribe<Integer>) e -> {e.onNext(1);e.onNext(2);e.onNext(3);e.onError(new NullPointerException());e.onNext(4);}).onErrorResumeNext(throwable -> {Log.d("lucas", "onErrorResumeNext ");return Observable.just(4);}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d("lucas", "onSubscribe ");}@Overridepublic void onNext(Integer integer) {Log.d("lucas", "onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d("lucas", "onError ");}@Overridepublic void onComplete() {Log.d("lucas", "onComplete ");}});//运行结果2020-04-03 22:15:58.668 13456-13456/com.ysalliance.getfan.myapplication D/lucas: onSubscribe2020-04-03 22:15:58.672 13456-13456/com.ysalliance.getfan.myapplication D/lucas: onNext 12020-04-03 22:15:58.672 13456-13456/com.ysalliance.getfan.myapplication D/lucas: onNext 22020-04-03 22:15:58.673 13456-13456/com.ysalliance.getfan.myapplication D/lucas: onNext 32020-04-03 22:15:58.673 13456-13456/com.ysalliance.getfan.myapplication D/lucas: onErrorResumeNext2020-04-03 22:15:58.674 13456-13456/com.ysalliance.getfan.myapplication D/lucas: onNext 42020-04-03 22:15:58.674 13456-13456/com.ysalliance.getfan.myapplication D/lucas: onComplete 6.4 retry()可作用于所有的数据源,当发生错误时,数据源重复发射item,直到没有异常或者达到所指定的次数。 Observable.create((ObservableOnSubscribe<Integer>) e -> {e.onNext(1);e.onNext(2);if (first){first=false;e.onError(new NullPointerException());}}).retry(9).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d("lucas", "onSubscribe ");}@Overridepublic void onNext(Integer integer) {Log.d("lucas", "onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d("lucas", "onError ");}@Overridepublic void onComplete() {Log.d("lucas", "onComplete ");}});//打印结果2020-04-03 22:18:28.605 13647-13647/com.ysalliance.getfan.myapplication D/lucas: onSubscribe2020-04-03 22:18:28.607 13647-13647/com.ysalliance.getfan.myapplication D/lucas: onNext 12020-04-03 22:18:28.607 13647-13647/com.ysalliance.getfan.myapplication D/lucas: onNext 22020-04-03 22:18:28.607 13647-13647/com.ysalliance.getfan.myapplication D/lucas: onNext 12020-04-03 22:18:28.607 13647-13647/com.ysalliance.getfan.myapplication D/lucas: onNext 2
6.5 retryUntil()作用于Flowable、Observable、Maybe。与retry类似,但发生异常时,返回值是false表示继续执行(重复发射数据),true不再执行,但会调用onError方法。 Observable.create((ObservableOnSubscribe<Integer>) e -> {e.onNext(1);e.onNext(2);e.onError(new NullPointerException());e.onNext(3);e.onComplete();}).retryUntil(() -> true).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d("lucas", "onSubscribe ");}@Overridepublic void onNext(Integer integer) {Log.d("lucas", "onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d("lucas", "onError ");}@Overridepublic void onComplete() {Log.d("lucas", "onComplete ");}});//打印结果2020-04-03 22:22:01.625 13905-13905/com.ysalliance.getfan.myapplication D/lucas: onSubscribe2020-04-03 22:22:01.627 13905-13905/com.ysalliance.getfan.myapplication D/lucas: onNext 12020-04-03 22:22:01.627 13905-13905/com.ysalliance.getfan.myapplication D/lucas: onNext 22020-04-03 22:22:01.627 13905-13905/com.ysalliance.getfan.myapplication D/lucas: onError 6.6 retryWhen()retryWhen操作符和retry操作符相似,区别在于retryWhen将错误Throwable传递给了函数进行处理并产生新的事件流进行处理。 private static int retryCount = 0;private static int maxRetries = 2;public void retryWhen(){Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {for (int i = 1; i < 5; i++) {if (i == 4) {e.onError(new Exception("onError crash"));}e.onNext(i);}}}).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Throwable throwable) throws Exception {if (++retryCount <= maxRetries) {// When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).System.out.println("get error, it will try after " + 1 + " seconds, retry count " + retryCount);return Observable.timer(1, TimeUnit.SECONDS);}return Observable.error(throwable);}});}}).onErrorReturn(new Function<Throwable, Integer>() {@Overridepublic Integer apply(Throwable throwable) throws Exception {return -1;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("onError");}}, new Action() {@Overridepublic void run() throws Exception {System.out.println("onComplete");}});}//结果onNext=1onNext=2onNext=3get error, it will try after 1 seconds, retry count 1onNext=1onNext=2onNext=3get error, it will try after 1 seconds, retry count 2onNext=1onNext=2onNext=3onNext=-1onComplete 补充1:Rxjava3包结构的变动:Rxjava3组件位于 io.reactivex.rxjava3包(rxjava1有 rx,rxjava2是 io.reactivex)。 这使得Rxjava3可以和早期版本一起使用。 此外,RxJava 的核心类型(Flowable、 Observer 等)已经被移动到 io.reactivex.rxjava3.core。
补充2:在RxJava1.0中,有的人会使用CompositeSubscription来收集Subscription,来统一取消订阅,现在在RxJava2.0中,由于subscribe()方法现在返回void,那怎么办呢? 其实在RxJava2.0中,Flowable提供了subscribeWith这个方法可以返回当前订阅的观察者,并且通过ResourceSubscriber DisposableSubscriber等观察者来提供 Disposable的接口。 所以,如果想要达成RxJava1.0的效果,现在应该是这样做: CompositeDisposable composite = new CompositeDisposable(); composite.add(Flowable.range(1, 8).subscribeWith(subscriber)); 这个subscriber 应该是 ResourceSubscriber 或者 DisposableSubscriber 的实例。 参考文章:因为写RxJava系列的文章时进行了很多阅读和参考,因此不分一二三等,将全系列的参考引用统一如下: RxJava3 Wiki:https://github.com/ReactiveX/RxJava/wiki RxJava3官方github:https://github.com/ReactiveX/RxJava/wiki/What's-different-in-3.0 ReactiveX文档中文翻译:https://mcxiaoke./rxdocs/content/operators/Creating-Observables.html single:http:///documentation/single.html 操作符系列讲的很好的文章:https://blog.csdn.net/weixin_42046829/article/details/104836592 基础介绍:https://blog.csdn.net/weixin_42046829/article/details/104833751 RxJava3的一些简介:https:///post/5d1eeffe6fb9a07f0870b4e8 观察者被观察者入门RxJava的一篇好文章:https:///post/580103f20e3dd90057fc3e6d 关于背压一个很好的介绍:https:///post/582d413c8ac24700619cceed RxLifecycle:https://github.com/trello/RxLifecycle 刚哥平台的挺好很全:RxJava2 只看这一篇文章就够了https:///post/5b17560e6fb9a01e2862246f |
|