Swift Combine alternative to Rx Observable.create
I think I found a way to mimic Observable.create
using a PassthroughSubject
in Combine
. Here is the helper I made:
struct AnyObserver<Output, Failure: Error> {
let onNext: ((Output) -> Void)
let onError: ((Failure) -> Void)
let onComplete: (() -> Void)
}
struct Disposable {
let dispose: () -> Void
}
extension AnyPublisher {
static func create(subscribe: @escaping (AnyObserver<Output, Failure>) -> Disposable) -> Self {
let subject = PassthroughSubject<Output, Failure>()
var disposable: Disposable?
return subject
.handleEvents(receiveSubscription: { subscription in
disposable = subscribe(AnyObserver(
onNext: { output in subject.send(output) },
onError: { failure in subject.send(completion: .failure(failure)) },
onComplete: { subject.send(completion: .finished) }
))
}, receiveCancel: { disposable?.dispose() })
.eraseToAnyPublisher()
}
}
And here is how it looks in usage:
func loadWidgets() -> AnyPublisher<[Widget], Error> {
AnyPublisher.create { observer in
let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
observer.onNext(widgets)
observer.onComplete()
}, error: { error in
observer.onError(error)
})
return Disposable {
loadTask.cancel()
}
}
}
Creating a Combine's publisher like RxSwift's Observable.Create for an Alamofire request
You can use Future
to connect responseObject
's callback to a Combine Publisher
. I don't have Alamofire handy for testing, but I think the following should work:
func doRequest<T: Mappable>(request: URLRequestConvertible) -> AnyPublisher<T, AFError> {
return Future { promise in
self.session
.request(request)
.validate()
.responseObject { (response: AFDataResponse<T>) in
promise(response.result)
}
}.eraseToAnyPublisher()
}
Note that this is somewhat simpler than the RxSwift version because promise
takes a Result
directly, so we don't have to switch over response.result
.
A Future
is sort of a “lukewarm” publisher. It is like a hot observable because it executes its body immediately and only once, so it starts the Alamofire request immediately. It is also like a cold observable, because every subscriber eventually receives a value or an error (assuming you eventually call promise
). The Future
only executes its body once, but it caches the Result
you pass to promise
.
You can create a truly cold publisher by wrapping the Future
in a Deferred
:
func doRequest<T: Mappable>(request: URLRequestConvertible) -> AnyPublisher<T, AFError> {
return Deferred {
Future { promise in
self.session
.request(request)
.validate()
.responseObject { (response: AFDataResponse<T>) in
promise(response.result) }
}
}.eraseToAnyPublisher()
}
Deferred
calls its body to create a new inner Publisher
every time you subscribe to it. So each time you subscribe, you'll create a new Future
that will immediately start a new Alamofire request. This is useful if you want to use the retry
operator, as in this question.
Alternative for BehaviorRelay in Swift Combine
As mentioned in this Cheat Sheet, an RxSwift BehaviorSubject is the same as the Combine CurrentValueSubject. A BehaviorRelay is a simple wrapper around a BehaviorSubject and could easily be recreated in Combine, but doesn't exist natively.
Swift Combine Chunk Operator
What you are looking for is the buffer
operator in the ReactiveX world.
There is no built in buffer
operator (in the ReactiveX sense) in Combine. The built-in buffer
is seems to be more like a bufferCount
in ReactiveX.
I found this answer by Daniel T, which recreates the buffer
operator in RxSwift, and also this cheatsheet, which tells you how to port RxSwift to Combine.
However, the answer by Daniel T uses Observable.create
, which isn't available in Combine. I looked a bit deeper, and found this other answer, that recreates Observable.create
in Combine.
Combining the three things I've found (no pun intended), this is what I came up with:
// -------------------------------------------------
// from https://stackoverflow.com/a/61035663/5133585
struct AnyObserver<Output, Failure: Error> {
let onNext: ((Output) -> Void)
let onError: ((Failure) -> Void)
let onCompleted: (() -> Void)
}
struct Disposable {
let dispose: () -> Void
}
extension AnyPublisher {
static func create(subscribe: @escaping (AnyObserver<Output, Failure>) -> Disposable) -> Self {
let subject = PassthroughSubject<Output, Failure>()
var disposable: Disposable?
return subject
.handleEvents(receiveSubscription: { subscription in
disposable = subscribe(AnyObserver(
onNext: { output in subject.send(output) },
onError: { failure in subject.send(completion: .failure(failure)) },
onCompleted: { subject.send(completion: .finished) }
))
}, receiveCancel: { disposable?.dispose() })
.eraseToAnyPublisher()
}
}
// -------------------------------------------------
// -------------------------------------------------
// adapted from https://stackoverflow.com/a/43413167/5133585
extension Publisher {
/// collects elements from the source sequence until the boundary sequence fires. Then it emits the elements as an array and begins collecting again.
func buffer<T: Publisher, U>(_ boundary: T) -> AnyPublisher<[Output], Failure> where T.Output == U {
return AnyPublisher.create { observer in
var buffer: [Output] = []
let lock = NSRecursiveLock()
let boundaryDisposable = boundary.sink(receiveCompletion: {
_ in
}, receiveValue: {_ in
lock.lock(); defer { lock.unlock() }
observer.onNext(buffer)
buffer = []
})
let disposable = self.sink(receiveCompletion: { (event) in
lock.lock(); defer { lock.unlock() }
switch event {
case .finished:
observer.onNext(buffer)
observer.onCompleted()
case .failure(let error):
observer.onError(error)
buffer = []
}
}) { (element) in
lock.lock(); defer { lock.unlock() }
buffer.append(element)
}
return Disposable {
disposable.cancel()
boundaryDisposable.cancel()
}
}
}
}
// -------------------------------------------------
Creating observable from another observable
I don't know where Observable
comes from, there's no class like that in Foundation. Combine has observables, but they're not called that there, they're various types of publishers. Are you using another library like RxSwift?
For the sake of getting you closer to answering the question, let's imagine that fetchUserData returns an array instead.
map
needs to take a function that will transform the input from [String:Any]
to User
. In your case, something like
fetchUserData(...).compactMap { dict in
// I am making "username" and "email" up below,
// you did not mention which keys would exist in the dictionary.
if let username = dict["username"] as? String,
let email = dict["email"] as? String {
return User(email:email, username:username)
} else {
return nil
}
}
I used compactMap
, which does the same thing as map
, except that when you return an optional (User?
in this case), it removes the nil entries.
The reactive framework you're using will have similar calls to do the same thing on Observable, on Publisher, etc. They will also allow you to throw an error instead of returning nil, but they all handle it differently (Combine includes the error type in the type of the Publisher, for example).
How do I create an observable of an array from an array of observables?
You can use .merge()
to combine the array of observables into a single observable, and then use .toArray()
to get them as a list in a single event.
For RxSwift 3+ use:
let arrayOfObservables: [Observable<E>] = ...
let singleObservable: Observable<E> = Observable.from(arrayOfObservables).merge()
let wholeSequence: Observable<[E]> = singleObservable.toArray()
For previous versions:
let arrayOfObservables: [Observable<E>] = ...
let singleObservable: Observable<E> = arrayOfObservables.toObservable().merge()
let wholeSequence: Observable<[E]> = singleObservable.toArray()
Swift Combine - Multiple Observable (CurrentValueSubject or PassthroughSubject) into one but wait for update from all
Zip3
seems to be what you are looking for:
Publishers
.Zip3(var1, var2, var3)
.sink(receivedValue: { var1, var2, var3 in
print("Printed!")
})
.store(in: &subscriptions)
From the docs of zip
:
Use
zip(_:_:)
to return a new publisher that combines the elements from two additional publishers to publish a tuple to the downstream. The returned publisher waits until all three publishers have emitted an event, then delivers the oldest unconsumed event from each publisher as a tuple to the subscriber.
See a visualisation on RxMarbles!
Related Topics
Synchronize Properties in Swift 3 Using Gcd
Insert, Update and Delete Animations with Foreach in Swiftui
How to Use Swift Playground to Display Nsview with Some Drawing
How to Make a Swift Framework Submodule Really Private
Swift - Converting from Unsafepointer<Uint8> with Length to String
Differencein Approach to Create Dispatchqueue Swift3
Uitableviewcell Height Auto Layout Not Working on iOS 10
How to Convert Copaquepointer in Swift to Some Type (Cgcontext? in Particular)
Swift, Pass Data Back from Popover to View Controller
Using Uiactivityindicatorview with Uiwebview in Swift
Making Nsdecimalnumber Codable
Why Avplayer Downloading First Instead Live Streaming
Closure Containing a Declaration Cannot Be Used with Function Builder 'Viewbuilder'