Swift Combine Alternative to Rx Observable.Create

Swift Combine alternative to Rx Observable.create

I think I found a way to mimic Observable.create using a PassthroughSubject in Combine. Here is the helper I made:

struct AnyObserver<Output, Failure: Error> {
let onNext: ((Output) -> Void)
let onError: ((Failure) -> Void)
let onComplete: (() -> Void)
}

struct Disposable {
let dispose: () -> Void
}

extension AnyPublisher {
static func create(subscribe: @escaping (AnyObserver<Output, Failure>) -> Disposable) -> Self {
let subject = PassthroughSubject<Output, Failure>()
var disposable: Disposable?
return subject
.handleEvents(receiveSubscription: { subscription in
disposable = subscribe(AnyObserver(
onNext: { output in subject.send(output) },
onError: { failure in subject.send(completion: .failure(failure)) },
onComplete: { subject.send(completion: .finished) }
))
}, receiveCancel: { disposable?.dispose() })
.eraseToAnyPublisher()
}
}

And here is how it looks in usage:

func loadWidgets() -> AnyPublisher<[Widget], Error> {
AnyPublisher.create { observer in
let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
observer.onNext(widgets)
observer.onComplete()
}, error: { error in
observer.onError(error)
})
return Disposable {
loadTask.cancel()
}
}
}

Creating a Combine's publisher like RxSwift's Observable.Create for an Alamofire request

You can use Future to connect responseObject's callback to a Combine Publisher. I don't have Alamofire handy for testing, but I think the following should work:

func doRequest<T: Mappable>(request: URLRequestConvertible) -> AnyPublisher<T, AFError> {
return Future { promise in
self.session
.request(request)
.validate()
.responseObject { (response: AFDataResponse<T>) in
promise(response.result)
}
}.eraseToAnyPublisher()
}

Note that this is somewhat simpler than the RxSwift version because promise takes a Result directly, so we don't have to switch over response.result.

A Future is sort of a “lukewarm” publisher. It is like a hot observable because it executes its body immediately and only once, so it starts the Alamofire request immediately. It is also like a cold observable, because every subscriber eventually receives a value or an error (assuming you eventually call promise). The Future only executes its body once, but it caches the Result you pass to promise.

You can create a truly cold publisher by wrapping the Future in a Deferred:

func doRequest<T: Mappable>(request: URLRequestConvertible) -> AnyPublisher<T, AFError> {
return Deferred {
Future { promise in
self.session
.request(request)
.validate()
.responseObject { (response: AFDataResponse<T>) in
promise(response.result) }
}
}.eraseToAnyPublisher()
}

Deferred calls its body to create a new inner Publisher every time you subscribe to it. So each time you subscribe, you'll create a new Future that will immediately start a new Alamofire request. This is useful if you want to use the retry operator, as in this question.

Alternative for BehaviorRelay in Swift Combine

As mentioned in this Cheat Sheet, an RxSwift BehaviorSubject is the same as the Combine CurrentValueSubject. A BehaviorRelay is a simple wrapper around a BehaviorSubject and could easily be recreated in Combine, but doesn't exist natively.

Swift Combine Chunk Operator

What you are looking for is the buffer operator in the ReactiveX world.

There is no built in buffer operator (in the ReactiveX sense) in Combine. The built-in buffer is seems to be more like a bufferCount in ReactiveX.

I found this answer by Daniel T, which recreates the buffer operator in RxSwift, and also this cheatsheet, which tells you how to port RxSwift to Combine.

However, the answer by Daniel T uses Observable.create, which isn't available in Combine. I looked a bit deeper, and found this other answer, that recreates Observable.create in Combine.

Combining the three things I've found (no pun intended), this is what I came up with:

// -------------------------------------------------
// from https://stackoverflow.com/a/61035663/5133585
struct AnyObserver<Output, Failure: Error> {
let onNext: ((Output) -> Void)
let onError: ((Failure) -> Void)
let onCompleted: (() -> Void)
}

struct Disposable {
let dispose: () -> Void
}

extension AnyPublisher {
static func create(subscribe: @escaping (AnyObserver<Output, Failure>) -> Disposable) -> Self {
let subject = PassthroughSubject<Output, Failure>()
var disposable: Disposable?
return subject
.handleEvents(receiveSubscription: { subscription in
disposable = subscribe(AnyObserver(
onNext: { output in subject.send(output) },
onError: { failure in subject.send(completion: .failure(failure)) },
onCompleted: { subject.send(completion: .finished) }
))
}, receiveCancel: { disposable?.dispose() })
.eraseToAnyPublisher()
}
}
// -------------------------------------------------

// -------------------------------------------------
// adapted from https://stackoverflow.com/a/43413167/5133585
extension Publisher {

/// collects elements from the source sequence until the boundary sequence fires. Then it emits the elements as an array and begins collecting again.
func buffer<T: Publisher, U>(_ boundary: T) -> AnyPublisher<[Output], Failure> where T.Output == U {
return AnyPublisher.create { observer in
var buffer: [Output] = []
let lock = NSRecursiveLock()
let boundaryDisposable = boundary.sink(receiveCompletion: {
_ in
}, receiveValue: {_ in
lock.lock(); defer { lock.unlock() }
observer.onNext(buffer)
buffer = []
})
let disposable = self.sink(receiveCompletion: { (event) in
lock.lock(); defer { lock.unlock() }
switch event {
case .finished:
observer.onNext(buffer)
observer.onCompleted()
case .failure(let error):
observer.onError(error)
buffer = []
}
}) { (element) in
lock.lock(); defer { lock.unlock() }
buffer.append(element)
}
return Disposable {
disposable.cancel()
boundaryDisposable.cancel()
}
}
}
}
// -------------------------------------------------

Creating observable from another observable

I don't know where Observable comes from, there's no class like that in Foundation. Combine has observables, but they're not called that there, they're various types of publishers. Are you using another library like RxSwift?

For the sake of getting you closer to answering the question, let's imagine that fetchUserData returns an array instead.

map needs to take a function that will transform the input from [String:Any] to User. In your case, something like

    fetchUserData(...).compactMap { dict in
// I am making "username" and "email" up below,
// you did not mention which keys would exist in the dictionary.
if let username = dict["username"] as? String,
let email = dict["email"] as? String {
return User(email:email, username:username)
} else {
return nil
}
}

I used compactMap, which does the same thing as map, except that when you return an optional (User? in this case), it removes the nil entries.

The reactive framework you're using will have similar calls to do the same thing on Observable, on Publisher, etc. They will also allow you to throw an error instead of returning nil, but they all handle it differently (Combine includes the error type in the type of the Publisher, for example).

How do I create an observable of an array from an array of observables?

You can use .merge() to combine the array of observables into a single observable, and then use .toArray() to get them as a list in a single event.

For RxSwift 3+ use:

let arrayOfObservables: [Observable<E>] = ...
let singleObservable: Observable<E> = Observable.from(arrayOfObservables).merge()
let wholeSequence: Observable<[E]> = singleObservable.toArray()

For previous versions:

let arrayOfObservables: [Observable<E>] = ...
let singleObservable: Observable<E> = arrayOfObservables.toObservable().merge()
let wholeSequence: Observable<[E]> = singleObservable.toArray()

Swift Combine - Multiple Observable (CurrentValueSubject or PassthroughSubject) into one but wait for update from all

Zip3 seems to be what you are looking for:

Publishers
.Zip3(var1, var2, var3)
.sink(receivedValue: { var1, var2, var3 in
print("Printed!")
})
.store(in: &subscriptions)

From the docs of zip:

Use zip(_:_:) to return a new publisher that combines the elements from two additional publishers to publish a tuple to the downstream. The returned publisher waits until all three publishers have emitted an event, then delivers the oldest unconsumed event from each publisher as a tuple to the subscriber.

See a visualisation on RxMarbles!



Related Topics



Leave a reply



Submit