Skip to content
This repository has been archived by the owner on May 28, 2020. It is now read-only.

Commit

Permalink
Use publish instead of doOnNext to make 2 behavior happen together
Browse files Browse the repository at this point in the history
  • Loading branch information
Yaoda Wang committed Apr 13, 2017
1 parent 147ee9e commit 9a6d091
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,21 @@ public int compare(Package o1, Package o2) {
// Return the cached packages.
return packagesLocalDataSource
.getPackages()
.flatMap(new Function<List<Package>, ObservableSource<List<Package>>>() {
.publish(new Function<Observable<List<Package>>, ObservableSource<List<Package>>>() {
@Override
public ObservableSource<List<Package>> apply(List<Package> packages) throws Exception {
return Observable
.fromIterable(packages)
.doOnNext(new Consumer<Package>() {
@Override
public void accept(Package aPackage) throws Exception {
cachedPackages.put(aPackage.getNumber(), aPackage);
}
})
.toList()
.toObservable();
public ObservableSource<List<Package>> apply(Observable<List<Package>> listObservable) throws Exception {
listObservable.flatMapIterable(new Function<List<Package>, Iterable<Package>>() {
@Override
public Iterable<Package> apply(List<Package> packages) throws Exception {
return packages;
}
}).subscribe(new Consumer<Package>() {
@Override
public void accept(Package aPackage) throws Exception {
cachedPackages.put(aPackage.getNumber(), aPackage);
}
});
return listObservable;
}
});
}
Expand Down Expand Up @@ -185,13 +187,16 @@ public void deletePackage(@NonNull String packageId) {
public Observable<List<Package>> refreshPackages() {
return packagesRemoteDataSource
.refreshPackages()
.flatMap(new Function<List<Package>, ObservableSource<List<Package>>>() {
@Override
public ObservableSource<List<Package>> apply(List<Package> packages) throws Exception {

return Observable
.fromIterable(packages)
.doOnNext(new Consumer<Package>() {
.publish(new Function<Observable<List<Package>>, ObservableSource<List<Package>>>() {
@Override
public ObservableSource<List<Package>> apply(Observable<List<Package>> listObservable) throws Exception {
listObservable.flatMapIterable(new Function<List<Package>, Iterable<Package>>() {
@Override
public Iterable<Package> apply(List<Package> packages) throws Exception {
return packages;
}
})
.subscribe(new Consumer<Package>() {
@Override
public void accept(Package aPackage) throws Exception {
Package p = cachedPackages.get(aPackage.getNumber());
Expand All @@ -201,11 +206,10 @@ public void accept(Package aPackage) throws Exception {
p.setReadable(true);
}
}
})
.toList()
.toObservable();
}
});
});
return listObservable;
}
});
}

/**
Expand All @@ -218,16 +222,22 @@ public void accept(Package aPackage) throws Exception {
@Override
public Observable<Package> refreshPackage(@NonNull final String packageId) {
return packagesRemoteDataSource.refreshPackage(packageId)
.doOnNext(new Consumer<Package>() {
.publish(new Function<Observable<Package>, ObservableSource<Package>>() {
@Override
public void accept(Package aPackage) throws Exception {
Package pkg = cachedPackages.get(aPackage.getNumber());
if (pkg != null) {
pkg.setData(aPackage.getData());
pkg.setReadable(true);
}
public ObservableSource<Package> apply(Observable<Package> packageObservable) throws Exception {
packageObservable.subscribe(new Consumer<Package>() {
@Override
public void accept(Package aPackage) throws Exception {
Package pkg = cachedPackages.get(aPackage.getNumber());
if (pkg != null) {
pkg.setData(aPackage.getData());
pkg.setReadable(true);
}
}

});
return packageObservable;
}

});
}

Expand Down Expand Up @@ -311,13 +321,19 @@ private Package getPackageWithNumber(@NonNull String packNumber) {
private Observable<Package> getPackageWithNumberFromLocalRepository(@NonNull final String packNumber) {
return packagesLocalDataSource
.getPackage(packNumber)
.doOnNext(new Consumer<Package>() {
.publish(new Function<Observable<Package>, ObservableSource<Package>>() {
@Override
public void accept(Package aPackage) throws Exception {
if (cachedPackages == null) {
cachedPackages = new LinkedHashMap<>();
}
cachedPackages.put(packNumber, aPackage);
public ObservableSource<Package> apply(Observable<Package> packageObservable) throws Exception {
packageObservable.subscribe(new Consumer<Package>() {
@Override
public void accept(Package aPackage) throws Exception {
if (cachedPackages == null) {
cachedPackages = new LinkedHashMap<>();
}
cachedPackages.put(packNumber, aPackage);
}
});
return packageObservable;
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import android.support.annotation.Nullable;

import java.util.List;
import java.util.concurrent.Callable;

import io.github.marktony.espresso.data.Package;
import io.github.marktony.espresso.data.source.PackagesDataSource;
Expand Down Expand Up @@ -98,22 +99,33 @@ public void deletePackage(@NonNull String packageId) {
public Observable<List<Package>> refreshPackages() {
// It is necessary to build a new realm instance
// in a different thread.
Realm realm = Realm.getInstance(new RealmConfiguration.Builder()
.deleteRealmIfMigrationNeeded()
.name(DATABASE_NAME)
.build());

return Observable.fromIterable(realm.copyFromRealm(realm.where(Package.class).findAll()))
.subscribeOn(Schedulers.io())
.flatMap(new Function<Package, ObservableSource<Package>>() {
return Observable.fromCallable(new Callable<List<Package>>() {
@Override
public List<Package> call() throws Exception {
Realm realm = Realm.getInstance(new RealmConfiguration.Builder()
.deleteRealmIfMigrationNeeded()
.name(DATABASE_NAME)
.build());
return realm.copyFromRealm(realm.where(Package.class).findAll());
}
}).publish(new Function<Observable<List<Package>>, ObservableSource<List<Package>>>() {
@Override
public ObservableSource<List<Package>> apply(Observable<List<Package>> listObservable) throws Exception {
listObservable.flatMapIterable(new Function<List<Package>, Iterable<Package>>() {
@Override
public ObservableSource<Package> apply(Package aPackage) throws Exception {
// A nested request.
return refreshPackage(aPackage.getNumber());
public Iterable<Package> apply(List<Package> packages) throws Exception {
return packages;
}
})
.toList()
.toObservable();
}).subscribe(new Consumer<Package>() {
@Override
public void accept(Package aPackage) throws Exception {
refreshPackage(aPackage.getNumber());
}
});
return listObservable;
}
});
}

/**
Expand All @@ -122,7 +134,8 @@ public ObservableSource<Package> apply(Package aPackage) throws Exception {
* @return The observable package of latest status.
*/
@Override
public Observable<Package> refreshPackage(@NonNull String packageId) {
public Observable<Package> refreshPackage(@NonNull
final String packageId) {
// It is necessary to build a new realm instance
// in a different thread.
Realm realm = Realm.getInstance(new RealmConfiguration.Builder()
Expand All @@ -145,37 +158,43 @@ public boolean test(Package aPackage) throws Exception {
}
})
.subscribeOn(Schedulers.io())
.doOnNext(new Consumer<Package>() {
.publish(new Function<Observable<Package>, ObservableSource<Package>>() {
@Override
public void accept(Package aPackage) throws Exception {

// To avoid the server error or other problems
// making the data in database being dirty.
if (aPackage != null && aPackage.getData() != null) {
// It is necessary to build a new realm instance
// in a different thread.
Realm rlm = Realm.getInstance(new RealmConfiguration.Builder()
.deleteRealmIfMigrationNeeded()
.name(DATABASE_NAME)
.build());

// Only when the origin data is null or the origin
// data's size is less than the latest data's size
// set the package unread new(readable = true).
if (p.getData() == null || aPackage.getData().size() > p.getData().size()) {
p.setReadable(true);
p.setPushable(true);
p.setState(aPackage.getState());
public ObservableSource<Package> apply(Observable<Package> packageObservable) throws Exception {
packageObservable.subscribe(new Consumer<Package>() {
@Override
public void accept(Package aPackage) throws Exception {

// To avoid the server error or other problems
// making the data in database being dirty.
if (aPackage != null && aPackage.getData() != null) {
// It is necessary to build a new realm instance
// in a different thread.
Realm rlm = Realm.getInstance(new RealmConfiguration.Builder()
.deleteRealmIfMigrationNeeded()
.name(DATABASE_NAME)
.build());

// Only when the origin data is null or the origin
// data's size is less than the latest data's size
// set the package unread new(readable = true).
if (p.getData() == null || aPackage.getData().size() > p.getData().size()) {
p.setReadable(true);
p.setPushable(true);
p.setState(aPackage.getState());
}

p.setData(aPackage.getData());
// DO NOT forget to begin a transaction.
rlm.beginTransaction();
rlm.copyToRealmOrUpdate(p);
rlm.commitTransaction();

rlm.close();
}
}

p.setData(aPackage.getData());
// DO NOT forget to begin a transaction.
rlm.beginTransaction();
rlm.copyToRealmOrUpdate(p);
rlm.commitTransaction();

rlm.close();
}
});
return packageObservable;
}
});
}
Expand Down

0 comments on commit 9a6d091

Please sign in to comment.