Rxswift Merge Different Kind of Observables

RxSwift merge different kind of Observables

To merge them, they need to have the same type for their Element.

So, one option is to throw away their type information and cast to AnyObject. Now they can be merged:

let stringSubject = PublishSubject<String>()
let stringObservable = stringSubject.asObservable().map { $0 as AnyObject }

let intSubject = PublishSubject<Int>()
let intObservable = intSubject.asObservable().map { $0 as AnyObject }

Observable.of(stringObservable, intObservable).merge()
.subscribeNext { print($0) }
.addDisposableTo(disposeBag)

stringSubject.onNext("a")
stringSubject.onNext("b")
intSubject.onNext(1)
intSubject.onNext(2)
stringSubject.onNext("c")

Output:

a

b

1

2

c

Another option would be to wrap then in an enum:

enum Container {
case S(String)
case I(Int)
}

let stringSubject = PublishSubject<String>()
let stringObservable = stringSubject.asObservable().map { Container.S($0) }

let intSubject = PublishSubject<Int>()
let intObservable = intSubject.asObservable().map { Container.I($0) }

Observable.of(stringObservable, intObservable).merge()
.subscribeNext { e in
switch e {
case .S(let str):
print("next element is a STRING: \(str)")
case .I(let int):
print("next element is an INT: \(int)")
}
}
.addDisposableTo(disposeBag)

stringSubject.onNext("a")
stringSubject.onNext("b")
intSubject.onNext(1)
intSubject.onNext(2)
stringSubject.onNext("c")

Output:

next element is a STRING: a

next element is a STRING: b

next element is an INT: 1

next element is an INT: 2

next element is a STRING: c

As for the other operators that can combine Observables of varying types (like zip and combineLatest), none work quite like merge. However, check those out. They might be better suited to your requirements.

RxSwift: Combining different types of observables and mapping result

Your comment describes a different use-case than what the question describes...

updatedValue is changed with every key strike, isChanged is called only when update button is tapped while savedValue is orignal value.

The above implies that you want something like:

func example<Value>(savedValue: Value, isChanged: Observable<Void>, updatedValue: Observable<Value>) -> Observable<Value> {
isChanged
.withLatestFrom(updatedValue)
.startWith(savedValue)
}

The above will emit the savedValue, then emit whatever was last emitted by updatedValue every time isChanged emits. I suggest you change the name of isChanged to something else since it isn't a Bool.

Meanwhile, the question implies that you want something more like:

func exampleʹ<Value>(savedValue: Value, isChanged: Observable<Bool>, updatedValue: Observable<Value>) -> Observable<Value> {
isChanged
.withLatestFrom(updatedValue) { $0 ? savedValue : $1 }
}

The above will also emit a value every time isChanged emits a value. It will emit savedValue whenever isChanged emits false and updatedValue whenever isChanged emits true.


If savedValue is an Observable (maybe from a network request or a DB get) then the code would look more like this:

func example<Value>(isChanged: Observable<Void>, savedValue: Observable<Value>, updatedValue: Observable<Value>) -> Observable<Value> {
savedValue
.concat(
isChanged.withLatestFrom(updatedValue)
)
}

func exampleʹ<Value>(isChanged: Observable<Bool>, savedValue: Observable<Value>, updatedValue: Observable<Value>) -> Observable<Value> {
isChanged
.withLatestFrom(Observable.combineLatest(savedValue, updatedValue)) { $0 ? $1.0 : $1.1 }
}

Chaining RxSwift observable with different type

Try combineLatest operator. You can combine multiple observables:

let data = Observable.combineLatest(fetchDevices, fetchRooms, fetchSections) 
{ devices, rooms, sections in
return AllModels(sections: sections, rooms: rooms, devices:devices)
}
.distinctUntilChanged()
.shareReplay(1)

And then, you subscribe to it:

data.subscribe(onNext: {models in 
// do something with your AllModels object
})
.disposed(by: bag)

Combine two single responses RXSwift

If think merge operator is exactly what you want, it will wait until you get data from both of your requests, and then it will merge them into one array, which you can modify with map. And after that you can convert it to the single.

So it looks like that:

Observable.merge(responseUpcoming.asObservable(), responsePast.asObservable()).map { $0.toModel() }.asSingle()

Combining multiple different observables into one observable

combineLatest can accept more than 2 parameters!

Try:

Observable.combineLatest(inType.asObservable(), inFilterRetailers.asObservable(), inFilterColors.asObservable(), inFilterPriceRanges.asObservable())
{ type, filterRetailers, filterColors, filterPriceRanges in
// do something with your data
}
.distinctUntilChanged()
.shareReplay(1)

How to combine multi Observable in RxSwift

create a new Observable to emit an event when a download is completed

Look at the method imageObservable for a possible implementation.

create a new Observable to emit only completed event when all download
is completed

The zip (documentation) operator might be what you are looking for.

import RxSwift
import UIKit

enum CustomError: Error {
case someError
}

class Executer {

let disposeBag = DisposeBag()

func execute() {
let imageURLs = [
URL(string: "http://via.placeholder.com/350x150")!,
URL(string: "http://via.placeholder.com/350x150")!
]
let imageObservables = imageURLs.map { self.imageObservable(url: $0) }
Observable
.zip(imageObservables) // wait for all image requests to finish
.subscribe(onNext: { images in
// here you have every single image in the 'images' array
images.forEach { print($0) }
})
.disposed(by: disposeBag)
}

// wrap 'URLSession' datatask into an observable
func imageObservable(url: URL) -> Observable<UIImage> {
return Observable.create { observer in
URLSession
.shared
.dataTask(with: url, completionHandler: { (data, response, error) -> Void in
if let error = error {
observer.onError(error)
return
}
guard let data = data, let image = UIImage(data: data) else {
observer.onError(CustomError.someError)
return
}
observer.onNext(image)
observer.onCompleted()
})
.resume()
return Disposables.create()
}
}
}

This answer might also be relevant for you.

Merging two notification observers in RxSwift

.merge() combines multiple Observables so you'll want to do appActiveNotifications.toObservable() then call .merge() on it

Edit:
Or as the example in the RxSwift's playground, you can use Observable.of() then use .merge() on it; like so:

let a = NSNotificationCenter.defaultCenter().rx_notification(UIApplicationWillEnterForegroundNotification)
let b = NSNotificationCenter.defaultCenter().rx_notification(Constants.AppRuntimeCallIncomingNotification)

Observable.of(a, b)
.merge()
.takeUntil(self.rx_deallocated)
.subscribeNext() { [weak self] _ in
// notification handling
}.addDisposableTo(disposeBag)


Related Topics



Leave a reply



Submit