From 9a6d09147e159247b3bcb9063fd86e6cfcec9579 Mon Sep 17 00:00:00 2001 From: Yaoda Wang Date: Thu, 13 Apr 2017 12:57:34 +0200 Subject: [PATCH] Use publish instead of doOnNext to make 2 behavior happen together --- .../data/source/PackagesRepository.java | 92 ++++++++------- .../remote/PackagesRemoteDataSource.java | 105 +++++++++++------- 2 files changed, 116 insertions(+), 81 deletions(-) diff --git a/mobile/src/main/java/io/github/marktony/espresso/data/source/PackagesRepository.java b/mobile/src/main/java/io/github/marktony/espresso/data/source/PackagesRepository.java index 32f8a37..02dba62 100644 --- a/mobile/src/main/java/io/github/marktony/espresso/data/source/PackagesRepository.java +++ b/mobile/src/main/java/io/github/marktony/espresso/data/source/PackagesRepository.java @@ -114,19 +114,21 @@ public int compare(Package o1, Package o2) { // Return the cached packages. return packagesLocalDataSource .getPackages() - .flatMap(new Function, ObservableSource>>() { + .publish(new Function>, ObservableSource>>() { @Override - public ObservableSource> apply(List packages) throws Exception { - return Observable - .fromIterable(packages) - .doOnNext(new Consumer() { - @Override - public void accept(Package aPackage) throws Exception { - cachedPackages.put(aPackage.getNumber(), aPackage); - } - }) - .toList() - .toObservable(); + public ObservableSource> apply(Observable> listObservable) throws Exception { + listObservable.flatMapIterable(new Function, Iterable>() { + @Override + public Iterable apply(List packages) throws Exception { + return packages; + } + }).subscribe(new Consumer() { + @Override + public void accept(Package aPackage) throws Exception { + cachedPackages.put(aPackage.getNumber(), aPackage); + } + }); + return listObservable; } }); } @@ -185,13 +187,16 @@ public void deletePackage(@NonNull String packageId) { public Observable> refreshPackages() { return packagesRemoteDataSource .refreshPackages() - .flatMap(new Function, ObservableSource>>() { - @Override - public ObservableSource> apply(List packages) throws Exception { - - return Observable - .fromIterable(packages) - .doOnNext(new Consumer() { + .publish(new Function>, ObservableSource>>() { + @Override + public ObservableSource> apply(Observable> listObservable) throws Exception { + listObservable.flatMapIterable(new Function, Iterable>() { + @Override + public Iterable apply(List packages) throws Exception { + return packages; + } + }) + .subscribe(new Consumer() { @Override public void accept(Package aPackage) throws Exception { Package p = cachedPackages.get(aPackage.getNumber()); @@ -201,11 +206,10 @@ public void accept(Package aPackage) throws Exception { p.setReadable(true); } } - }) - .toList() - .toObservable(); - } - }); + }); + return listObservable; + } + }); } /** @@ -218,16 +222,22 @@ public void accept(Package aPackage) throws Exception { @Override public Observable refreshPackage(@NonNull final String packageId) { return packagesRemoteDataSource.refreshPackage(packageId) - .doOnNext(new Consumer() { + .publish(new Function, ObservableSource>() { @Override - public void accept(Package aPackage) throws Exception { - Package pkg = cachedPackages.get(aPackage.getNumber()); - if (pkg != null) { - pkg.setData(aPackage.getData()); - pkg.setReadable(true); - } + public ObservableSource apply(Observable packageObservable) throws Exception { + packageObservable.subscribe(new Consumer() { + @Override + public void accept(Package aPackage) throws Exception { + Package pkg = cachedPackages.get(aPackage.getNumber()); + if (pkg != null) { + pkg.setData(aPackage.getData()); + pkg.setReadable(true); + } + } + + }); + return packageObservable; } - }); } @@ -311,13 +321,19 @@ private Package getPackageWithNumber(@NonNull String packNumber) { private Observable getPackageWithNumberFromLocalRepository(@NonNull final String packNumber) { return packagesLocalDataSource .getPackage(packNumber) - .doOnNext(new Consumer() { + .publish(new Function, ObservableSource>() { @Override - public void accept(Package aPackage) throws Exception { - if (cachedPackages == null) { - cachedPackages = new LinkedHashMap<>(); - } - cachedPackages.put(packNumber, aPackage); + public ObservableSource apply(Observable packageObservable) throws Exception { + packageObservable.subscribe(new Consumer() { + @Override + public void accept(Package aPackage) throws Exception { + if (cachedPackages == null) { + cachedPackages = new LinkedHashMap<>(); + } + cachedPackages.put(packNumber, aPackage); + } + }); + return packageObservable; } }); } diff --git a/mobile/src/main/java/io/github/marktony/espresso/data/source/remote/PackagesRemoteDataSource.java b/mobile/src/main/java/io/github/marktony/espresso/data/source/remote/PackagesRemoteDataSource.java index 82a2e72..ec542ab 100644 --- a/mobile/src/main/java/io/github/marktony/espresso/data/source/remote/PackagesRemoteDataSource.java +++ b/mobile/src/main/java/io/github/marktony/espresso/data/source/remote/PackagesRemoteDataSource.java @@ -20,6 +20,7 @@ import android.support.annotation.Nullable; import java.util.List; +import java.util.concurrent.Callable; import io.github.marktony.espresso.data.Package; import io.github.marktony.espresso.data.source.PackagesDataSource; @@ -98,22 +99,33 @@ public void deletePackage(@NonNull String packageId) { public Observable> refreshPackages() { // It is necessary to build a new realm instance // in a different thread. - Realm realm = Realm.getInstance(new RealmConfiguration.Builder() - .deleteRealmIfMigrationNeeded() - .name(DATABASE_NAME) - .build()); - return Observable.fromIterable(realm.copyFromRealm(realm.where(Package.class).findAll())) - .subscribeOn(Schedulers.io()) - .flatMap(new Function>() { + return Observable.fromCallable(new Callable>() { + @Override + public List call() throws Exception { + Realm realm = Realm.getInstance(new RealmConfiguration.Builder() + .deleteRealmIfMigrationNeeded() + .name(DATABASE_NAME) + .build()); + return realm.copyFromRealm(realm.where(Package.class).findAll()); + } + }).publish(new Function>, ObservableSource>>() { + @Override + public ObservableSource> apply(Observable> listObservable) throws Exception { + listObservable.flatMapIterable(new Function, Iterable>() { @Override - public ObservableSource apply(Package aPackage) throws Exception { - // A nested request. - return refreshPackage(aPackage.getNumber()); + public Iterable apply(List packages) throws Exception { + return packages; } - }) - .toList() - .toObservable(); + }).subscribe(new Consumer() { + @Override + public void accept(Package aPackage) throws Exception { + refreshPackage(aPackage.getNumber()); + } + }); + return listObservable; + } + }); } /** @@ -122,7 +134,8 @@ public ObservableSource apply(Package aPackage) throws Exception { * @return The observable package of latest status. */ @Override - public Observable refreshPackage(@NonNull String packageId) { + public Observable refreshPackage(@NonNull + final String packageId) { // It is necessary to build a new realm instance // in a different thread. Realm realm = Realm.getInstance(new RealmConfiguration.Builder() @@ -145,37 +158,43 @@ public boolean test(Package aPackage) throws Exception { } }) .subscribeOn(Schedulers.io()) - .doOnNext(new Consumer() { + .publish(new Function, ObservableSource>() { @Override - public void accept(Package aPackage) throws Exception { - - // To avoid the server error or other problems - // making the data in database being dirty. - if (aPackage != null && aPackage.getData() != null) { - // It is necessary to build a new realm instance - // in a different thread. - Realm rlm = Realm.getInstance(new RealmConfiguration.Builder() - .deleteRealmIfMigrationNeeded() - .name(DATABASE_NAME) - .build()); - - // Only when the origin data is null or the origin - // data's size is less than the latest data's size - // set the package unread new(readable = true). - if (p.getData() == null || aPackage.getData().size() > p.getData().size()) { - p.setReadable(true); - p.setPushable(true); - p.setState(aPackage.getState()); + public ObservableSource apply(Observable packageObservable) throws Exception { + packageObservable.subscribe(new Consumer() { + @Override + public void accept(Package aPackage) throws Exception { + + // To avoid the server error or other problems + // making the data in database being dirty. + if (aPackage != null && aPackage.getData() != null) { + // It is necessary to build a new realm instance + // in a different thread. + Realm rlm = Realm.getInstance(new RealmConfiguration.Builder() + .deleteRealmIfMigrationNeeded() + .name(DATABASE_NAME) + .build()); + + // Only when the origin data is null or the origin + // data's size is less than the latest data's size + // set the package unread new(readable = true). + if (p.getData() == null || aPackage.getData().size() > p.getData().size()) { + p.setReadable(true); + p.setPushable(true); + p.setState(aPackage.getState()); + } + + p.setData(aPackage.getData()); + // DO NOT forget to begin a transaction. + rlm.beginTransaction(); + rlm.copyToRealmOrUpdate(p); + rlm.commitTransaction(); + + rlm.close(); + } } - - p.setData(aPackage.getData()); - // DO NOT forget to begin a transaction. - rlm.beginTransaction(); - rlm.copyToRealmOrUpdate(p); - rlm.commitTransaction(); - - rlm.close(); - } + }); + return packageObservable; } }); }