RxJava 常见使用场景
发布于:2016-03-11 21:34 最后更新于:2016-03-11 21:34
分类: Android 标签: Android

摘要: 这篇文章收集了 RxJava 常见的使用场景以及示例。

使用场景

1. 后台线程取数据(网络请求、文件访问等),主线程展示(界面更新)[ subscribeOn() , observeOn() ]

Observable.just(1, 2, 3, 4)
o.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
o.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
o.subscribe(new Action1<Integer>() {
o@Override
opublic void call(Integer number) {
oLog.d(tag, "number:" + number);
o}
o});

2. 获取数据时先检查缓存 [ concat() ]

例:先检查内存中是否有缓存,再检查是否有文件缓存,最后从网络获取。如果找到了缓存,就不会执行后面的操作。

final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        if (memoryCache != null) {
            subscriber.onNext(memoryCache);
        } else {
            subscriber.onCompleted();
        }
    }
});
Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        String cachePref = rxPreferences.getString("cache").get();
        if (!TextUtils.isEmpty(cachePref)) {
            subscriber.onNext(cachePref);
        } else {
            subscriber.onCompleted();
        }
    }
});
Observable<String> network = Observable.just("network");

Observable.concat(memory, disk, network)
        .first()
        .subscribeOn(Schedulers.newThread())
        .subscribe(s -> {
            memoryCache = "memory";
            System.out.println("--------------subscribe: " + s);
        });

3. 等待多个请求完成后再进行操作,或将多个请求返回的数据合并 [ merge() ]

Observable<String> observable1 = DemoUtils.createObservable1().subscribeOn(Schedulers.newThread());
Observable<String> observable2 = DemoUtils.createObservable2().subscribeOn(Schedulers.newThread());

Observable.merge(observable1, observable2)
        .subscribeOn(Schedulers.newThread())
        .subscribe(System.out::println);

4. 一个请求依赖另一个请求返回的数据(消除嵌套回调)

例:登陆后根据拿到的token获取消息列表

NetworkService.getToken("username", "password")
        .flatMap(s -> NetworkService.getMessage(s))
        .subscribe(s -> {
            System.out.println("message: " + s);
        });

5. 轮询请求 [ schedulePerodically() ]

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(final Subscriber<? super String> observer) {

        Schedulers.newThread().createWorker()
                .schedulePeriodically(new Action0() {
                    @Override
                    public void call() {
                        observer.onNext(doNetworkCallAndGetStringResult());
                    }
                }, INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS);
    }
}).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        log.d(tag,"polling….”);
    }
});

6. 界面按钮防止连续点击 [ throttleFirst() ]

RxView.clicks(findViewById(R.id.btn_throttle))
        .throttleFirst(1, TimeUnit.SECONDS)
        .subscribe(aVoid -> {
            System.out.println("click");
        });

7. 响应式界面

例:勾选了某个 checkbox ,自动更新对应的 preference

SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);

Preference<Boolean> checked = rxPreferences.getBoolean("checked", true);

CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);
RxCompoundButton.checkedChanges(checkBox)
        .subscribe(checked.asAction());

8. 减少频繁的请求 [ debounce() ]

例:带有自动联想的搜索框,避免每输入或删除一个字就做一次联想

RxTextView.textChangeEvents(inputEditText)
        .debounce(400, TimeUnit.MILLISECONDS)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<TextViewTextChangeEvent>() {
            @Override
            public void onCompleted() {
                log.d(tag,"onComplete");
            }

            @Override
            public void onError(Throwable e) {
                log.d(tag,"Error");
            }

            @Override
            public void onNext(TextViewTextChangeEvent onTextChangeEvent) {
                log.d(tag,format("Searching for %s", onTextChangeEvent.text().toString()));
            }
        });

9. 定时操作[ timer() ]、周期性操作 [ interval() ]

例:每隔 2 秒输出日志“hello world”

Observable.interval(2, TimeUnit.SECONDS)  
 .subscribe(new Observer<Long>() {
     @Override
     public void onCompleted() {
         log.d("completed");
     }

     @Override
     public void onError(Throwable e) {
         log.e("error");
     }

     @Override
     public void onNext(Long number) {
         log.d("hello world");
     }
 }); 

10. 数据变换 ,例如过滤掉不符合条件的数据[ filter() ]、去掉重复的数据[ distinct() ]、取出前几个数据[ take() ]等

Observable.just("1", "2", "2", "3", "4", "5")
        .map(Integer::parseInt)
        .filter(s -> s > 1)
        .distinct()
        .take(3)
        .reduce((integer, integer2) -> integer.intValue() + integer2.intValue())
        .subscribe(System.out::println);

RxJava-Android-Samples

RxJava-Android-Samples:https://github.com/kaushikgopal/RxJava-Android-Samples

这个仓库收集了一些使用 RxJava 开发 Android 应用的真实存在的、有用的例子,可以帮助了解 RxJava 在 Android 开发中的应用场景。

评论 登录后评论

没有评论