Rxjava Android How to Use the Zip Operator

Rxjava Android how to use the Zip operator

Zip operator strictly pairs emitted items from observables. It waits for both (or more) items to arrive then merges them. So yes this would be suitable for your needs.

I would use Func2 to chain the result from the first two observables.
Notice this approach would be simpler if you use Retrofit since its api interface may return an observable. Otherwise you would need to create your own observable.

// assuming each observable returns response in the form of String
Observable<String> movOb = Observable.create(...);
// if you use Retrofit
Observable<String> picOb = RetrofitApiManager.getService().uploadPic(...),
Observable.zip(movOb, picOb, new Func2<String, String, MyResult>() {
@Override
public MyResult call(String movieUploadResponse, String picUploadResponse) {
// analyze both responses, upload them to another server
// and return this method with a MyResult type
return myResult;
}
}
)
// continue chaining this observable with subscriber
// or use it for something else

Observable zip in rxJava2 for Android

Change it to

Java implementation:

Flowable.zip(
Flowable.just("1"),
Flowable.just("2"),
Flowable.just("3"),
Flowable.just("4"),
new Function4<String, String, String, String, MyResult >() {
@Override
public MyResult apply(t1: String, t2: String, t3: String, t4: String) {
// return MyResult
}
})

Kotlin implementation:

Flowable.zip(
Flowable.just("1"),
Flowable.just("2"),
Flowable.just("3"),
Flowable.just("4"),
object : Function4<String, String, String, String, MyResult > {
override fun apply(t1: String, t2: String, t3: String, t4: String): MyResult {
// return MyResult
}
})

Is it possible to use Zip more than 2 Observables using RxJava 2.x?

Actually,

It supports, I didn't return value, and IDE's error message was misleading.

Observable.zip(
getData(),
getOtherData(),
getTemplate(),
(o1,o2,o3)->{
return null;
});

Zip more than 9 Observables in rxJava

    Observable one = Observable.zip(
orderRepository.getOne(54, "id"),
orderRepository.getTwo(54, "id"),
// Etc up to five (I think)
Function zipper
);

Observable two = Observable.zip(
one, orderRepository.getSix(54, "id"),
orderRepository.getSeven(54, "id"),
// Etc up to five,
Function zipper
);

Rinse and repeat until you get all your observables zipped together.

RxJava - Zip operator - handle network error (skip)

I haven't tested the below code but you probably wouldn't want to use a zip ideally because that has to have equal amount of responses from all upstream subscriptions (Have 3 Single's Zipped then all 3 need to emit 1 event to get a response)

Generally it's better to have a state object and use scanWith.

That said one way is to use an Optional style object, combined with onErrorReturnItem

e.g:

data class CombinedResult(
val responseOne: ResponseOne? = null,
val responseTwo: List<ResponseTwo>? = null
)
val firstSingle: Single<Optional<RequestOne>> = requestOne.onErrorReturnItem(Optional.empty())
val secondSingle: Single<Optional<Request = requestTwo.onErrorReturnItem(Optional.empty())

Single.zip(fistSingle, secondSingle, BiFunction { r1: Optional<RequestOne>, r2: Optional<List<RequestTwo>> ->
return@BiFunction CombinedResult(r1.value, r2.value)
})

You can create your own optional if your not sure what that should look like:

class Optional<T>(val value: T?) {
companion object {
fun <T> empty() = Optional<T>()
fun <T> from(value: T?) = Optional(value)
}
}

I took this Java Optional class from Guava, but Guava is huge so I would only use that unless it's in your project already.

@GwtCompatible(serializable = true)
public abstract class Optional<T> implements Serializable {
/**
* Returns an {@code Optional} instance with no contained reference.
*
* <p><b>Comparison to {@code java.util.Optional}:</b> this method is equivalent to Java 8's
* {@code Optional.empty}.
*/
public static <T> Optional<T> absent() {
return Absent.withType();
}

/**
* Returns an {@code Optional} instance containing the given non-null reference. To have {@code
* null} treated as {@link #absent}, use {@link #fromNullable} instead.
*
* <p><b>Comparison to {@code java.util.Optional}:</b> no differences.
*
* @throws NullPointerException if {@code reference} is null
*/
public static <T> Optional<T> of(T reference) {
return new Present<T>(checkNotNull(reference));
}

/**
* If {@code nullableReference} is non-null, returns an {@code Optional} instance containing that
* reference; otherwise returns {@link Optional#absent}.
*
* <p><b>Comparison to {@code java.util.Optional}:</b> this method is equivalent to Java 8's
* {@code Optional.ofNullable}.
*/
public static <T> Optional<T> fromNullable(@NullableDecl T nullableReference) {
return (nullableReference == null) ? Optional.<T>absent() : new Present<T>(nullableReference);
}

Optional() {}

/**
* Returns {@code true} if this holder contains a (non-null) instance.
*
* <p><b>Comparison to {@code java.util.Optional}:</b> no differences.
*/
public abstract boolean isPresent();

/**
* Returns the contained instance, which must be present. If the instance might be absent, use
* {@link #or(Object)} or {@link #orNull} instead.
*
* <p><b>Comparison to {@code java.util.Optional}:</b> when the value is absent, this method
* throws {@link IllegalStateException}, whereas the Java 8 counterpart throws {@link
* java.util.NoSuchElementException NoSuchElementException}.
*
* @throws IllegalStateException if the instance is absent ({@link #isPresent} returns {@code
* false}); depending on this <i>specific</i> exception type (over the more general {@link
* RuntimeException}) is discouraged
*/
public abstract T get();

/**
* Returns the contained instance if it is present; {@code defaultValue} otherwise. If no default
* value should be required because the instance is known to be present, use {@link #get()}
* instead. For a default value of {@code null}, use {@link #orNull}.
*
* <p>Note about generics: The signature {@code public T or(T defaultValue)} is overly
* restrictive. However, the ideal signature, {@code public <S super T> S or(S)}, is not legal
* Java. As a result, some sensible operations involving subtypes are compile errors:
*
* <pre>{@code
* Optional<Integer> optionalInt = getSomeOptionalInt();
* Number value = optionalInt.or(0.5); // error
*
* FluentIterable<? extends Number> numbers = getSomeNumbers();
* Optional<? extends Number> first = numbers.first();
* Number value = first.or(0.5); // error
* }</pre>
*
* <p>As a workaround, it is always safe to cast an {@code Optional<? extends T>} to {@code
* Optional<T>}. Casting either of the above example {@code Optional} instances to {@code
* Optional<Number>} (where {@code Number} is the desired output type) solves the problem:
*
* <pre>{@code
* Optional<Number> optionalInt = (Optional) getSomeOptionalInt();
* Number value = optionalInt.or(0.5); // fine
*
* FluentIterable<? extends Number> numbers = getSomeNumbers();
* Optional<Number> first = (Optional) numbers.first();
* Number value = first.or(0.5); // fine
* }</pre>
*
* <p><b>Comparison to {@code java.util.Optional}:</b> this method is similar to Java 8's {@code
* Optional.orElse}, but will not accept {@code null} as a {@code defaultValue} ({@link #orNull}
* must be used instead). As a result, the value returned by this method is guaranteed non-null,
* which is not the case for the {@code java.util} equivalent.
*/
public abstract T or(T defaultValue);

/**
* Returns this {@code Optional} if it has a value present; {@code secondChoice} otherwise.
*
* <p><b>Comparison to {@code java.util.Optional}:</b> this method has no equivalent in Java 8's
* {@code Optional} class; write {@code thisOptional.isPresent() ? thisOptional : secondChoice}
* instead.
*/
public abstract Optional<T> or(Optional<? extends T> secondChoice);

/**
* Returns the contained instance if it is present; {@code supplier.get()} otherwise.
*
* <p><b>Comparison to {@code java.util.Optional}:</b> this method is similar to Java 8's {@code
* Optional.orElseGet}, except when {@code supplier} returns {@code null}. In this case this
* method throws an exception, whereas the Java 8 method returns the {@code null} to the caller.
*
* @throws NullPointerException if this optional's value is absent and the supplier returns {@code
* null}
*/
@Beta
public abstract T or(Supplier<? extends T> supplier);

/**
* Returns the contained instance if it is present; {@code null} otherwise. If the instance is
* known to be present, use {@link #get()} instead.
*
* <p><b>Comparison to {@code java.util.Optional}:</b> this method is equivalent to Java 8's
* {@code Optional.orElse(null)}.
*/
@NullableDecl
public abstract T orNull();

/**
* Returns an immutable singleton {@link Set} whose only element is the contained instance if it
* is present; an empty immutable {@link Set} otherwise.
*
* <p><b>Comparison to {@code java.util.Optional}:</b> this method has no equivalent in Java 8's
* {@code Optional} class. However, this common usage:
*
* <pre>{@code
* for (Foo foo : possibleFoo.asSet()) {
* doSomethingWith(foo);
* }
* }</pre>
*
* ... can be replaced with:
*
* <pre>{@code
* possibleFoo.ifPresent(foo -> doSomethingWith(foo));
* }</pre>
*
* @since 11.0
*/
public abstract Set<T> asSet();

/**
* If the instance is present, it is transformed with the given {@link Function}; otherwise,
* {@link Optional#absent} is returned.
*
* <p><b>Comparison to {@code java.util.Optional}:</b> this method is similar to Java 8's {@code
* Optional.map}, except when {@code function} returns {@code null}. In this case this method
* throws an exception, whereas the Java 8 method returns {@code Optional.absent()}.
*
* @throws NullPointerException if the function returns {@code null}
* @since 12.0
*/
public abstract <V> Optional<V> transform(Function<? super T, V> function);

/**
* Returns {@code true} if {@code object} is an {@code Optional} instance, and either the
* contained references are {@linkplain Object#equals equal} to each other or both are absent.
* Note that {@code Optional} instances of differing parameterized types can be equal.
*
* <p><b>Comparison to {@code java.util.Optional}:</b> no differences.
*/
@Override
public abstract boolean equals(@NullableDecl Object object);

/**
* Returns a hash code for this instance.
*
* <p><b>Comparison to {@code java.util.Optional}:</b> this class leaves the specific choice of
* hash code unspecified, unlike the Java 8 equivalent.
*/
@Override
public abstract int hashCode();

/**
* Returns a string representation for this instance.
*
* <p><b>Comparison to {@code java.util.Optional}:</b> this class leaves the specific string
* representation unspecified, unlike the Java 8 equivalent.
*/
@Override
public abstract String toString();

/**
* Returns the value of each present instance from the supplied {@code optionals}, in order,
* skipping over occurrences of {@link Optional#absent}. Iterators are unmodifiable and are
* evaluated lazily.
*
* <p><b>Comparison to {@code java.util.Optional}:</b> this method has no equivalent in Java 8's
* {@code Optional} class; use {@code
* optionals.stream().filter(Optional::isPresent).map(Optional::get)} instead.
*
* @since 11.0 (generics widened in 13.0)
*/
@Beta
public static <T> Iterable<T> presentInstances(
final Iterable<? extends Optional<? extends T>> optionals) {
checkNotNull(optionals);
return new Iterable<T>() {
@Override
public Iterator<T> iterator() {
return new AbstractIterator<T>() {
private final Iterator<? extends Optional<? extends T>> iterator =
checkNotNull(optionals.iterator());

@Override
protected T computeNext() {
while (iterator.hasNext()) {
Optional<? extends T> optional = iterator.next();
if (optional.isPresent()) {
return optional.get();
}
}
return endOfData();
}
};
}
};
}

private static final long serialVersionUID = 0;
}

Rxjava zip operator filter second Observable by checking data with first Observable android

You can apply this filtering with reactive approach using a map and a list, first collect all categories to a map, and all services to a list, zip them together, and then filter the services list according to categories map:

Observable<HashMap<Integer, CategoryData>> categoriesMapObservable =
obsCategory
.flatMapIterable(CategoryModel::getData)
.reduce(new HashMap<>(),
(map, categoryData) -> {
map.put(categoryData.getCategoryTypeId(), categoryData);
return map;
}
);

Observable<List<ServiceData>> serviceListObservable = obsService
.map(ServiceModel::getData);

Observable obsCombined =
Observable.zip(
categoriesMapObservable
.subscribeOn(Schedulers.io()),
serviceListObservable
.subscribeOn(Schedulers.io()),
Pair::new
)
.flatMap(hashMapListPair -> {
HashMap<Integer, CategoryData> categoriesMap = hashMapListPair.first;
return Observable.from(hashMapListPair.second)
.filter(serviceData -> categoriesMap.containsKey(serviceData.getCategoryTypeId()))
.toList();
}, (hashMapListPair, serviceDataList) -> new Pair<>(hashMapListPair.first.values(), serviceDataList));

the output result depends on you , here I apply at the end a selector of flatMap() that will create a Pair of Collection of CategoryData and a filtered list of ServiceData, you can of course create whatever custom Object you need for that.

I'm not sure you're gaining much from this, it's seems more efficient from complexity perspective, assuming HashMap is O(1), where categories are N, and services are M, you have here N + M (N constructing the map, M iterating the list and querying the map), while your naive implementation will be N x M.

as for code complexity, i'm not sure it worth it, you can apply your logic at the end of the zip for filtering, or use some library that might be doing filter more efficiently.

P.S the observerOn(AndroidSchedulers.mainThread() is unnecessary so I removed it.

RxJava : How to handle error with zip operator ?

You can use onErrorResumeNext to return some Observable or onErrorReturn to return some default value to zip, like:

Observable.zip(
responseOneObservable
.onErrorReturn(new Func1<Throwable, ResponseOne>() {
@Override
public ResponseOne call(final Throwable throwable) {
return new ResponseOne();
}
}),
responseTwoObservable
.onErrorReturn(new Func1<Throwable, ResponseTwo>() {
@Override
public ResponseTwo call(final Throwable throwable) {
return new ResponseTwo();
}
}),
...

See onError handling for more info.


UPDATE: With RxJava 2.0 you must use Function instead of Func1:

import io.reactivex.functions.Function;
...
Observable.zip(
responseOneObservable
.onErrorReturn(new Function<Throwable, ResponseOne>() {
@Override
public ResponseOne apply(@NonNull final Throwable throwable) {
return new ResponseOne();
}
}),
responseTwoObservable
.onErrorReturn(new Function<Throwable, ResponseTwo>() {
@Override
public ResponseTwo apply(@NonNull final Throwable throwable) {
return new ResponseTwo();
}
}),
...

Or using lambdas:

Observable.zip(
responseOneObservable
.onErrorReturn(throwable -> new ResponseOne()),
responseTwoObservable
.onErrorReturn(throwable -> new ResponseTwo()),
...

Or using Kotlin:

Observable.zip(
responseOneObservable
.onErrorReturn { ResponseOne() },
responseTwoObservable
.onErrorReturn { ResponseTwo() },
...

Zip operator in RxJava is not working with Retrofit

For the .zip() operator to emit anything, all zipped observables have to emit at least once. If one of your observables emits an error, or does not emit at all, you will never receive an onNext event.

  • For checking for error emissions, add logging or breakpoints into your onError within subscribe
  • For checking for missing emissions, you can add doOnNext and doOnCompleted calls with logging after all your zipped Observables and see which one does not emit

Cheers!



Related Topics



Leave a reply



Submit