配色: 字号:
RxJava 常用操作符
2016-09-19 | 阅:  转:  |  分享 
  
RxJava常用操作符

1Observable的创建



1.1from()



转换集合为一个每次发射集合中一个元素的Observable对象。可用来遍历集合。



方法列表:



publicstaticObservablefrom(Futurefuture)



publicstaticObservablefrom(Futurefuture,longtimeout,TimeUnitunit)



publicstaticObservablefrom(Futurefuture,Schedulerscheduler)



publicstaticObservablefrom(Iterableiterable)



publicstaticObservablefrom(T[]array)



栗子:



//1.遍历集合

Observableobservable=Observable.from(newString[]{"hello","hi"});

1

2

//2.使用Future创建Observable,Future表示一个异步计算的结果。

FutureTaskfutureTask=newFutureTask(newCallable(){

@Override

publicStringcall()throwsException{

//TODO执行异步操作并返回数据

return"hihi";

}

});



Scheduler.Workerworker=Schedulers.io().createWorker();

worker.schedule(newAction0(){

@Override

publicvoidcall(){

futureTask.run();

}

});



Observableobservable=Observable.from(futureTask);



1.2just()



转换一个或多个Object为依次发射这些Object的Observable对象。



方法列表:



publicstaticObservablejust(finalTvalue)



publicstaticObservablejust(Tt1,Tt2)



publicstaticObservablejust(Tt1,Tt2,Tt3)



publicstaticObservablejust(Tt1,Tt2,Tt3,Tt4)



publicstaticObservablejust(Tt1,Tt2,Tt3,Tt4,Tt5)



publicstaticObservablejust(Tt1,Tt2,Tt3,Tt4,Tt5,Tt6)



publicstaticObservablejust(Tt1,Tt2,Tt3,Tt4,Tt5,Tt6,Tt7)



publicstaticObservablejust(Tt1,Tt2,Tt3,Tt4,Tt5,Tt6,Tt7,Tt8)



publicstaticObservablejust(Tt1,Tt2,Tt3,Tt4,Tt5,Tt6,Tt7,Tt8,Tt9)



publicstaticObservablejust(Tt1,Tt2,Tt3,Tt4,Tt5,Tt6,Tt7,Tt8,Tt9,Tt10)



栗子:



Observableobservable=Observable.just("hello");



//使用just()遍历几个元素

Observableobservable=Observable.just("hello","hi","...");



//使用from()方法遍历,效果和just()一样。

String[]stringArrs=newString[]{"hello","hi","..."};

Observableobservable=Observable.from(stringArrs);



just()方法可传入1~10个参数,也就说当元素个数小于等于10的时候既可以使用just()也可以使用from(),否则只能用from()方法。



1.3create()



返回一个在被OnSubscribe订阅时执行特定方法的Observable对象。



方法列表:



publicstaticObservablecreate(OnSubscribef)



@BetapublicstaticObservablecreate(SyncOnSubscribesyncOnSubscribe)



@ExperimentalpublicstaticObservablecreate(AsyncOnSubscribeasyncOnSubscribe)



栗子:



Observable.OnSubscribeonSubscribe=newObservable.OnSubscribe(){

@Override

publicvoidcall(Subscribersubscriber){

//onNext()方法可执行多次

subscribe.onNext("hello");

subscribe.onCompleted();

}

};

Observableobservable=Observable.create(onSubscribe);



此方法不常用,大多数时候都是使用just()、form()等方法,如上面那串代码就可以写成:



Observableobservable=Observable.just("hello");

1

1.4interval()



返回一个每隔指定的时间间隔就发射一个序列号的Observable对象,可用来做倒计时等操作。



方法列表:



publicstaticObservableinterval(longinterval,TimeUnitunit)



publicstaticObservableinterval(longinterval,TimeUnitunit,Schedulerscheduler)



publicstaticObservableinterval(longinitialDelay,longperiod,TimeUnitunit)



publicstaticObservableinterval(longinitialDelay,longperiod,TimeUnitunit,Schedulerscheduler)



栗子:



//每隔1s发送一个序列号,序列号从0开始,每次累加1。

Observableobservable=Observable.interval(1,TimeUnit.SECONDS);

1

2

1.5timer()



创建一个在指定延迟时间后发射一条数据(固定值:0)的Observable对象,可用来做定时操作。



方法列表:



publicstaticObservabletimer(longdelay,TimeUnitunit)



publicstaticObservabletimer(longdelay,TimeUnitunit,Schedulerscheduler)



栗子:



//定时3s

Observableobservable=Observable.timer(3,TimeUnit.SECONDS);

1

2

1.6range()



创建一个发射指定范围内的连续整数的Observable对象。



方法列表:



publicstaticObservablerange(intstart,intcount)



publicstaticObservablerange(intstart,intcount,Schedulerscheduler)



栗子:



//依次发射5、6、7

Observableobservable=Observable.range(5,3);

1

2

1.7empty()



创建一个不发射任何数据就发出onCompleted()通知的Observable对象。



方法列表:



publicstaticObservableempty()

栗子:



//发出一个onCompleted()通知

Observableobservable=Observable.empty();

1

2

1.8error()



创建不发射任何数据就发出onError通知的Observable对象。



方法列表:



publicstaticObservableerror(Throwableexception)

栗子:



//发出一个onError()通知

Observableobservable=Observable.error(newThrowable("message"));

1

2

1.9never()



创建一个不发射任何数据和通知的Observable对象。



方法列表:



publicstaticObservablenever()

栗子:



Observableobservable=Observable.never();

1

1.10defer()



在订阅的时候才会创建Observable对象;每一次订阅都创建一个新的Observable对象。



方法列表:



publicstaticObservabledefer(Func0>observableFactory)

栗子:



Observableobservable=Observable.defer(newFunc0>(){

@Override

publicObservablecall(){

returnObservable.just("string");

}

});



2重做



2.1repeat()



使Observable对象在发出onNext()通知之后重复发射数据。重做结束才会发出onComplete()通知,若重做过程中出现异常则会中断并发出onError()通知。



方法列表:



publicfinalObservablerepeat()



publicfinalObservablerepeat(finallongcount)



publicfinalObservablerepeat(Schedulerscheduler)



publicfinalObservablerepeat(finallongcount,Schedulerscheduler)



栗子:



Observableobservable=Observable.just("string");

//无限重复执行

observable.repeat();

//重复执行5次

observable.repeat(5);



2.2repeatWhen()



使Observable对象在发出onNext()通知之后有条件的重复发射数据。重做结束才会发出onCompleted()通知,若重做过程中出现异常则会中断并发出onError()通知。



方法列表:



publicfinalObservablerepeatWhen(finalFunc1,?extendsObservable>notificationHandler)



publicfinalObservablerepeatWhen(finalFunc1,?extendsObservable>?>notificationHandler,Schedulerscheduler)



栗子:



observable.repeatWhen(newFunc1,Observable>(){

@Override

publicObservablecall(Observableobservable){

//重复3次,每次间隔1s

returnobservable.zipWith(Observable.range(1,3),newFunc2(){

@Override

publicIntegercall(VoidaVoid,Integerinteger){

returninteger;

}

}).flatMap(integer->Observable.timer(1,TimeUnit.SECONDS));

}

});



3重试



3.1retry()



在执行Observable对象的序列出现异常时,不直接发出onError()通知,而是重新订阅该Observable对象,直到重做过程中未出现异常,则会发出onNext()和onCompleted()通知;若重做过程中也出现异常,则会继续重试,直到达到重试次数上限,超出次数后发出最新的onError()通知。



方法列表:



publicfinalObservableretry()



publicfinalObservableretry(finallongcount)



publicfinalObservableretry(Func2predicate)



栗子:



Observableobservable=Observable.create(newObservable.OnSubscribe(){

@Override

publicvoidcall(Subscribersubscriber){

System.out.println(".......");

inta=1/0;

subscriber.onNext(a);

subscriber.onCompleted();

}

});

//无限次的重试

observable.retry();

//重试3次

observable.retry(3);

//使用谓语函数决定是否重试

observable.retry(newFunc2(){

@Override

publicBooleancall(Integerinteger,Throwablethrowable){

//参数integer是订阅的次数;参数throwable是抛出的异常

//返回值为true表示重试,返回值为false表示不重试

returnfalse;

}

});



3.2retryWhen()



作用:有条件的执行重试。



方法列表:



publicfinalObservableretryWhen(finalFunc1,?extendsObservable>notificationHandler)



publicfinalObservableretryWhen(finalFunc1,?extendsObservable>notificationHandler,Schedulerscheduler)



栗子:



//重试3次,每次间隔1s

observable.retryWhen(newFunc1,Observable>(){

@Override

publicObservablecall(Observableobservable){

returnobservable.zipWith(Observable.range(1,3),newFunc2(){

@Override

publicObjectcall(Throwablethrowable,Integerinteger){

returninteger;

}

}).flatMap(newFunc1>(){

@Override

publicObservablecall(Objecto){

returnObservable.timer(1,TimeUnit.SECONDS);

}

});

}

});



4变换



4.1map()



把源Observable发射的元素应用于指定的函数,并发送该函数的结果。



方法列表:



publicfinalObservablemap(Func1func)

栗子:



Observable.just(2)

.map(newFunc1(){

@Override

publicStringcall(Integerinteger){

returnString.valueOf(String.format("原始数据的两倍为:%s",integer2));

}

});



4.2flatMap()



转换源Observable对象为另一个Observable对象。



方法列表:



publicfinalObservableflatMap(Func1>func)



@BetapublicfinalObservableflatMap(Func1>func,intmaxConcurrent)



publicfinalObservableflatMap(Func1>onNext,Func1>onError,Func0>onCompleted)



@BetapublicfinalObservableflatMap(Func1>onNext,Func1>onError,Func0>onCompleted,intmaxConcurrent)



publicfinalObservableflatMap(finalFunc1>collectionSewww.shanxiwang.netlector,finalFunc2resultSelector)



@BetapublicfinalObservableflatMap(finalFunc1>collectionSelector,finalFunc2resultSelector,intmaxConcurrent)



栗子:



Observable.just(2)

.flatMap(newFunc1>(){

@Override

publicObservablecall(Integerinteger){

//转换为一个定时integer秒的Observable对象

returnObservable.timer(integer,TimeUnit.SECONDS);

}

});



5过滤



5.1filter()



只发射满足指定谓词的元素。



方法列表:



publicfinalObservablefilter(Func1predicate)

栗子:



Observable.just(-1,-2,0,1,2)

.filter(newFunc1(){

@Override

publicBooleancall(Integerinteger){

returninteger>0;

}

});



5.2first()



返回一个仅仅发射源Observable发射的第一个[满足指定谓词的]元素的Observable,如果如果源Observable为空,则会抛出一个NoSuchElementException。



方法列表:



publicfinalObservablefirst()



publicfinalObservablefirst(Func1predicate)



栗子:



//发射第一个元素

Observable.just(-1,-2,0,1,2).first();



//发射满足条件的第一个元素

Observable.just(-1,-2,0,1,2)

.first(newFunc1(){

@Override

publicBooleancall(Integerinteger){

returninteger>0;

}

});



//会抛出NoSuchElementException异常

Observable.empty().first();



5.3last()



返回一个仅仅发射源Observable发射的倒数第一个[满足指定谓词的]元素的Observable,如果如果源Observable为空,则会抛出一个NoSuchElementException。



方法列表:



publicfinalObservablelast()



publicfinalObservablelast(Func1predicate)



栗子:



//发射倒数第一个元素

Observable.just(-1,-2,0,1,2).first();



//发射满足条件的倒数第一个元素

Observable.just(-1,-2,0,1,2)

.first(newFunc1(){

@Override

publicBooleancall(Integerinteger){

returninteger<0;

}

});



//会抛出NoSuchElementException异常

Observable.empty().last();



5.4skip()



跳过前面指定数量或指定时间内的元素,只发射后面的元素。



方法列表:



publicfinalObservableskip(intcount)



publicfinalObservableskip(longtime,TimeUnitunit)



publicfinalObservableskip(longtime,TimeUnitunit,Schedulerscheduler)



栗子:



Observable.just(-1,-2,0,1,2)

.skip(2)//跳过前两条数据

1

2

5.5skipLast()



跳过前面指定数量或指定时间内的元素,只发射后面的元素。指定时间时会延迟源Observable发射的任何数据。



方法列表:



publicfinalObservableskipLast(intcount)



publicfinalObservableskipLast(longtime,TimeUnitunit)



publicfinalObservableskipLast(longtime,TimeUnitunit,Schedulerscheduler)



栗子:



Observable.just(-1,-2,0,1,2)

.skip(2)//跳过后两条数据

1

2

5.6take()



只发射前面指定数量或指定时间内的元素。



方法列表:



publicfinalObservabletake(finalintcount)



publicfinalObservabletake(longtime,TimeUnitunit)



publicfinalObservabletake(longtime,TimeUnitunit,Schedulerscheduler)



栗子:



Observable.just(-1,-2,0,1,2).take(3);//只发射前三条数据

1

5.7takeLast()



只发射后面指定数量或指定时间内的元素。指定时间时会延迟源Observable发射的任何数据。



方法列表:



publicfinalObservabletakeLast(finalintcount)



publicfinalObservabletakeLast(intcount,longtime,TimeUnitunit)



publicfinalObservabletakeLast(intcount,longtime,TimeUnitunit,Schedulerscheduler)



publicfinalObservabletakeLast(longtime,TimeUnitunit)



publicfinalObservabletakeLast(longtime,TimeUnitunit,Schedulerscheduler)



栗子:



Observable.just(-1,-2,0,1,2).takeLast(3);//只发射后三条数据

1

5.8sample()



定期发射Observable发射的最后一条数据。



方法列表:



publicfinalObservablesample(longperiod,TimeUnitunit)



publicfinalObservablesample(longperiod,TimeUnitunit,Schedulerscheduler)



publicfinalObservablesample(Observablesampler)



栗子:



Observable.interval(300,TimeUnit.MILLISECONDS)

.sample(2,TimeUnit.SECONDS)



5.9elementAt()



只发射指定索引的元素。



方法列表:



publicfinalObservableelementAt(intindex)

栗子:



Observable.just(-1,-2,0,1,2).elementAt(2);//发射索引为2的数据

1

5.10elementAtOrDefault()



只发射指定索引的元素,若该索引对应的元素不存在,则发射默认值。



方法列表:



publicfinalObservableelementAtOrDefault(intindex,TdefaultValue)

栗子:



Observable.just(-1,-2,0,1,2).elementAtOrDefault(9,-5);//发射索引为9的数据,若不存在,则发射-5

1

5.11ignoreElements()



不发射任何数据,直接发出onCompleted()通知。



方法列表:



publicfinalObservableignoreElements()

栗子:



Observable.just(-1,-2,0,1,2).ignoreElements()

1

5.12distinct()



过滤重复的元素,过滤规则是:只允许还没有发射过的元素通过。



方法列表:



publicfinalObservabledistinct()



publicfinalObservabledistinct(Func1keySelector)



栗子:



//直接过滤

Observable.just(-1,-2,0,1,2,1).distinct();



//通过生成的key值过滤

Observable.just(-1,-2,0,1,2,1).distinct(newFunc1(){

@Override

publicIntegercall(Integerinteger){

//随机生成key

returninteger(int)(Math.random()10);

}

});



5.13debounce()



源Observable每产生结果后,如果在规定的间隔时间内没有产生新的结果,则发射这个结果,否则会忽略这个结果。该操作符会过滤掉发射速率过快的数据项。



方法列表:



publicfinalObservabledebounce(longtimeout,TimeUnitunit)



publicfinalObservabledebounce(longtimeout,TimeUnitunit,Schedulerscheduler)



publicfinalObservabledebounce(Func1>debounceSelector)



栗子:



Observableobservable=Observable.create(newObservable.OnSubscribe(){

@Override

publicvoidcall(Subscribersubscriber){

try{

//产生结果的间隔时间分别为100、200、300...900毫秒

for(inti=1;i<10;i++){

subscriber.onNext(i);

Thread.sleep(i100);

}

subscriber.onCompleted();

}catch(Exceptione){

subscriber.onError(e);

}

}

});

observable.debounce(400,TimeUnit.MILLISECONDS)//超时时间为400毫秒



该栗子产生结果为:依次打印5、6、7、8。



附:功能实现



延时遍历



//遍历

ObservabletraverseObservable=Observable.just(3,4,5,6);

//计时

ObservableintervalObservable=Observable.interval(1,TimeUnit.SECONDS);



Func2func2=newFunc2(){

@Override

publicIntegercall(LongaLong,Integerinteger){

returninteger;

}

};



intervalObservable.zipWith(traverseObservable,func2)

.toBlocking()

.subscribe(newSubscriber(){

@Override

publicvoidonCompleted(){

System.out.println("onCompleted");

}



@Override

publicvoidonError(Throwablee){

e.printStackTrace();

}



@Override

publicvoidonNext(Integerinteger){

System.out.println(integer);

}

});



倒计时



intstartTime=10;



Observable.interval(0,1,TimeUnit.SECONDS)

.take(startTime+1)//接收startTime+1次

.map(newFunc1(){

@Override

publicLongcall(Longtime){

//123...转换为...321

returnstartTime-time;

}

})

.toBlocking()

.subscribe(newSubscriber(){

@Override

publicvoidonCompleted(){

System.out.println("倒计时结束");

}



@Override

publicvoidonError(Throwablee){

System.out.println("倒计时出现异常");

e.printStackTrace();

}



@Override

publicvoidonNext(LongaLong){

System.out.println(String.format("倒计时:%ss",aLong));

}

});

献花(0)
+1
(本文系网络学习天...首藏)