RxJava(三)-我准备起飞了,你呢?

RxJava(三)-我准备起飞了,你呢?

通过上一篇《Rx系列之RxJava操作符》,相信已经能够熟练的使用一些基本的操作符了。但是对于我们大家而言,其实最传统的命令式编程已经是我们顺手就可以拈来的,但是,现在用响应式编程,突然发现:卧槽,这个地方用响应式怎么写,这样写对么?估计很多人才开始接触RxJava的时候应该都有这样的疑虑。不用担心,这一篇就给大家讲讲RxJava到底该怎么用,在什么情况下用!

RxJava的使用场景

眼尖的小伙伴,可能已经发现,在上一篇中,很多那么重要的操作符怎么都没讲!哈哈哈,答案在这里。好废话不多说,来看看Rxjava到底在哪些情况下可以使用。

动态搜索的场景

我们先来看一个动态搜索的场景:

搜索

假设,我要在这进行网络搜索,那么,我就要在这里面进行网络访问,如果是输入完成之后点击确定进行搜索还好,但是如果是动态收索呢?只要搜索框中的搜索内容一改变,那么是不是就要进行网络请求呢?那这样就不是那么友好了。为了解决这样的问题,rxjava为我们提供了一个很好的解决方案:

  • 使用debounce作为textSearch
  • debounce()函数过滤掉由Observable发射的速率过快的数据;如果在一个指定的时间间隔过去了仍旧没有发射一个,那么它将发射最后的那个。*
    debounce()使用TimeUnit对象指定时间间隔。
    是不是感觉棒棒哒,昂,不管你喜不喜欢,反正我是爱死它了。

来看一下示意图
debounce示意图
由上图我们可以看出,在比较密集的数据(2,3,4,5)发射之后,其实最终只是发射5。

附上代码:

RxTextView.textChanges(editText)
.debounce(5000,TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<CharSequence>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: onCompleted");
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: onError");
}

@Override
public void onNext(CharSequence charSequence) {
Log.d(TAG, "onNext: "+charSequence.toString());
}
});

在这5s内,我输入了2,3,4,5(出最后一个5,其他输入之后就删除哈),但是最后得到的结果却是:

onNext: 5

注意:这个操作符会会接着最后一项数据发射原始Observable的onCompleted通知,即使这个通知发生在你指定的时间窗口内(从最后一项数据的发射算起)。也就是说,onCompleted通知不会触发限流。

在上面可能会存在一个疑问,那就是
RxTextView.textChanges(editText)
这是个什么东西?你丫怎么没讲,哈哈,这个呀,要在后面的Rx系列中单独来讲,所以不要着急,暂时说一下这个的功能,这个RxTextView.textChanges(editText)其实是RxBinding里面的一个对控件的操作,其功能就跟TextWatcher一样,就是对数据的变更进行监听,所以上面的数据变化之后5s后将数据发射出去。嗯嗯,到这里就把动态搜索场景讲解了。

缓存检测场景

在请求取数据的处理过程中,我们的操作一般是这样一个原理:

  • ** 首先检查内存是否有缓存**
  • 然后检查文件缓存中是否有
  • 最后才从网络中取
    任何一步一旦发现数据后面的操作都不执行
    在rxjava中为我们提供了两个解决这个问题的操作符,分别是: concatfirst

concat
不交错的发射两个或多个Observable
concat操作符连接多个Observable的输出,就好像它们是一个Observable,第一个Observable发射的所有数据在第二个Observable发射的任何数据前面,以此类推。直到前面一个Observable终止,Concat
才会订阅额外的一个Observable

请注意上面所说的“就好像它们是一个Observable”,其实并不是一个Observable,是前面一个停止之后才会订阅下一个,所以说他们并不是一个,请君注意咯。

concat示意图
如上所示,就是将两个Observable连接起来了。
还有一个实例方法concatWith,它是和concat等价的:Observable.concat(a,b)==a.concatWith(b)

来看一下是不是这个样子的:

Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: onCompleted");
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: onError");
}

@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
};

Observable a = Observable.just(1, 2, 3, 4, 5);
Observable b = Observable.just(6, 7, 8, 9, 10);

Observable.concat(a, b)
.subscribe(subscriber);

然后我们得到:

onNext: 1
 ...
onNext: 10
onCompleted: onCompleted

这时估计就会有人说了:你不是说这个操作符其实是将两个订阅连接起来了嘛!那么,为什么只是在最后打印了onCompleted,在onNext: 5后面不是也应该打印一个吗?
我们都知道观察者和被观察者之间,是由订阅建立关系的,那么对于被观察者来说,确实我发射了两个数据源,但是对于观察者来说,我不知道你有几个数据源,我的职责就只是,数据发射过来后,我打印而已。所以,只有当onNext没有接收到数据时,才会调用onCompleted

最后对这个操作符,再补充一点:如果当第一个Observable a抛异常,那么将不会继续执行后面的Observable b了。
如果想测试请将上面的

Observable a = Observable.just(1, 2, 3, 4, 5);

变成

Observable a = Observable.just(1, 2, 3, 4, new RuntimeException());

进行测试。

first
只发射第一项(或者满足某个条件的第一项)数据
first示意图
由上图我们可以看出,这个只要第一项满足条件,后面的将不会再进行发射,所以只是得到了1这个数字。

Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: onCompleted");
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: onError");
}

@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
};

Observable a = Observable.just(1, 2, 3, 4, 5);
a.first().subscribe(subscriber);

得到结果:

onNext: 1
onCompleted: onCompleted

这个应该很容易就看出来了。就是只是打印了第一个数据!
在这儿必须为大家区别一个操作符:single(),这个操作符也是只打印一个数据的,但是single()和first()最大的区别在于:前者只会发射一个数据,不能发射多个,否则会报错;而first确实满足条件的那一个。
如下:

Observable a = Observable.just(1);
a.single().subscribe(subscriber);

估计到这儿应该已经有人知道了上面的3个步骤改真没写了,来我们来看看代码:

final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (memoryCache != null) {
subscriber.onNext(memoryCache);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String cachePref = rxPreferences.getString("cache").get();
if (!TextUtils.isEmpty(cachePref)) {
subscriber.onNext(cachePref);
} else {
subscriber.onCompleted();
}
}
});

Observable<String> network = Observable.just("network");

//依次检查memory、disk、network
Observable
.concat(memory, disk, network)
.first()
.subscribeOn(Schedulers.newThread())
.subscribe(s -> {
memoryCache = "memory";
System.out.println("--------------subscribe: " + s);
});

现在看上面的代码是不是就知道它在干什么了,是不是很简单!这个缓存检测场景就讲到这里。

输入合法场景

在某些时候,我们需要所以的输入都合法后,我们的某些按钮才亮起来,或者才能点击,如下图:
输入合法示意图

在这个场景中,我们得掌握两个操作符:skipcombineLatest

skip
抑制Observable发射的前N项数据
skip示意图

从上图可以看到,总共发射了4个数据,只有最后两个发射出去了,这就是skip(2)的作用。

Observable.just(1,2,3,4).skip(2).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: onCompleted");
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: onError");
}

@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: "+integer);
}
});

得到如下结果:

onNext: 3
onNext: 4
onCompleted: onCompleted

combineLatest
当多个Observables中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。
CombineLatest在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时,CombineLatest
使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。
一开始看到这句话,我又懵b了,这tm几个意思?我们先来看看这个场景的实现代码,然后再解释:

    private void combineLatestEvent() {

Observable<CharSequence> usernameObservable = RxTextView.textChanges(mUsername).skip(1);
Observable<CharSequence> emailObservable = RxTextView.textChanges(mEmail).skip(1);
Observable<CharSequence> passwordObservable = RxTextView.textChanges(mPassword).skip(1);

Subscription subscription = Observable.combineLatest(usernameObservable, emailObservable,
passwordObservable,
new Func3<CharSequence, CharSequence, CharSequence, Boolean>() {
@Override
public Boolean call(CharSequence userName, CharSequence email, CharSequence
password) {

boolean isUserNameValid = !TextUtils.isEmpty(userName) && (userName
.toString().length() > 2 && userName.toString().length() < 9);

if (!isUserNameValid) {
mUsername.setError("用户名无效");
}


boolean isEmailValid = !TextUtils.isEmpty(email) && Patterns
.EMAIL_ADDRESS.matcher(email).matches();

if (!isEmailValid) {
mEmail.setError("邮箱无效");
}

boolean isPasswordValid = !TextUtils.isEmpty(password) && (password
.toString().length() >5 && password.toString().length() < 11);

if (!isPasswordValid) {
mPassword.setError("密码无效");
}


return isUserNameValid && isEmailValid && isPasswordValid;
}
})
.subscribe(getObserver());
}


private Observer<Boolean> getObserver() {
return new Observer<Boolean>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Boolean aBoolean) {
//更改注册按钮是否可用的状态
register.setEnabled(aBoolean);
}
};
}

这个场景,有3个edittext,分别是mUsername,mEmail,mPassword,通过输入合法的内容进行判定注册按钮是否亮起来。
当我点击其中的任何一个进行编写的时候,就会发射数据,发射的是什么?是我们编辑的内容吗?其实不是的,发射的是结合Func3这个方法的返回值,在这里这个返回值是Boolean型的。返回了boolean型之后,就可以在观察者里面设置注册按钮是否亮起来。现在再看上面那句高深莫测的话,是不是简单多了!

这儿可能有人有疑问了:这3个edittext为什么要使用skip(1)呢?
答案其实很简答啊,那就是当我们不写skip(1)的时候,edittext中没有输入任何值的时候,会把它当作第一个数据进行发射,虽然发射的是个空数据,但是还是会发射啊!
奥偶,这个场景解释完了!

数据过期场景

其实这个场景可以和上面的数据缓存检测场景进行合并:在缓存检测场景中,我们知道,如果memory中没有数据,就从disk上面寻找,然后再是网络请求,那么,问题来了,如果我们的memory中一直有数据,但是网络数据已经变更了,又由于缓存检测原则的只要有一个有数据就不会进行网络请求了,这就会造成我们显示的数据一直是一个旧数据。

哦豁
那这个该怎么办呢?
解决方法有如下两个:

  • 采用定时进行清除本地缓存数据
  • 采用过滤操作符

我们先来看看第一种,如果是进行定时做本地数据清空的话,那么就会用到,我们一个轮询的操作符Interval
创建一个按固定时间间隔发射整数序列的Observable
Interval通俗的讲,就是每隔一段时间过后做什么事情!上一篇已经讲过了,所以这里就不详细讲解了,直接上代码:

 Observable.interval(3, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
...
@Override
public void onNext(Long aLong) {
//清除缓存操作
}
});

很多人可能会想到,那既然,我能够用清除本地缓存的方法,那么能不能用,每隔一段时间进行请求,让请求的结果与本地缓存进行合并呢?
答案是肯定的,来看如下代码:

Observable.create(new Observable.OnSubscribe<String>() {  
@Override
public void call(final Subscriber<? super String> observer) {

Schedulers.newThread().createWorker()
.schedulePeriodically(new Action0() {
@Override
public void call() {
observer.onNext(doNetworkCallAndGetStringResult());
}
}, INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS);
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {

}
})

这个就是使用schedulePeriodically做轮询请求

这样造成每过一定时间我们就会,清除缓存或者网络请求。读到这儿,是不是感觉这个方法真烂,哈哈哈,不着急,我们不是还有第二种方法嘛!来接着看第二种方法

  • 采用过滤操作符
    其实这个操作符我们已经讲过了,那就是操作符first,回顾一下上面的代码,就是我们的first就是保证,众多的数据,有一个符合条件就发射数据,后面的都将不执行。我们的是否需要更新的条件不加在这里,就没天理咯!
Observable source = Observable
.concat(memory, disk, network)
.first(new Func1() {
@Override public Boolean call(Data data) {
return data.isUpToDate();
}
});

哇偶,这个操作符完美的解决了如上的问题!那你丫的还将那么多,呵呵,我只是给大家讲解操作符的使用场景而已,那个适合哪个场景,取决你们自己咯!

这一篇主要讲解的内容的就到这儿了,下面还有一些其他的场景,就简单的介绍一下。

其他的场景

合并两个数据源场景

使用merge合并两个数据源,代码如下:

Observable.merge(getInfoFromFile(), getInfoFromNet())  
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: onCompleted");
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: onError");
}

@Override
public void onNext(String data) {
Log.d(TAG, "onNext: only one ! ");
});

Retrofit结合RxJava场景

这个场景的话,大家可以查看扔物线大神写的给 Android 开发者的 RxJava 详解,其中讲解到了这个场景的结合!

就操作符使用场景这一块而言,大概就讲解这么多,如果大家有其他的使用场景,我们可以一起交流哦。感谢大家的支持,谢谢!


 
You forgot to set the qrcode for Alipay. Please set it in _config.yml.
You forgot to set the qrcode for Wechat. Please set it in _config.yml.
You forgot to set the business and currency_code for Paypal. Please set it in _config.yml.
You forgot to set the url Patreon. Please set it in _config.yml.
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×