Combine Framework Serialize Async Operations

Combine framework serialize async operations

I've only briefly tested this, but at first pass it appears that each request waits for the previous request to finish before starting.

I'm posting this solution in search of feedback. Please be critical if this isn't a good solution.

extension Collection where Element: Publisher {

func serialize() -> AnyPublisher? {
// If the collection is empty, we can't just create an arbititary publisher
// so we return nil to indicate that we had nothing to serialize.
if isEmpty { return nil }

// We know at this point that it's safe to grab the first publisher.
let first = self.first!

// If there was only a single publisher then we can just return it.
if count == 1 { return first.eraseToAnyPublisher() }

// We're going to build up the output starting with the first publisher.
var output = first.eraseToAnyPublisher()

// We iterate over the rest of the publishers (skipping over the first.)
for publisher in self.dropFirst() {
// We build up the output by appending the next publisher.
output = output.append(publisher).eraseToAnyPublisher()
}

return output
}
}


A more concise version of this solution (provided by @matt):

extension Collection where Element: Publisher {
func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
guard let start = self.first else { return nil }
return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
$0.append($1).eraseToAnyPublisher()
}
}
}

Swift Combine framework multiple async request responses into one

There's no need to return a Future publisher. Future publisher is a specific publisher, but as far as a downstream is concerned, a publisher is defined by its output and failure types. Instead, return a AnyPublisher<ResultType3, Error>.

Zip is a publisher that waits for all results to arrive to emit a value. This is probably what you'd need (more on this later). This is how your function could look:

func combinedApiRequests() -> AnyPublisher<ResultType3, Error> {
Publishers.Zip(apiRequest1, apiRequest2)
.map { transform(res1: $0, res2: $1) }
.eraseToAnyPublisher()
}

There is also CombineLatest publisher. For the first result from each upstream, it behaves the same as Zip, but for subsequent results it differs. In your case, it doesn't matter since Future is a one-shot publisher, but if the upstream publishers emitted multiple values, then you'd have to decide for your specific use case whether to use Zip - which always waits for all upstreams to emit a value before it emits a combined value, or CombineLatest - which emits with each new upstream value and combines it with the latest for other upstreams.

How to replicate PromiseKit-style chained async flow using Combine + Swift

This is not a real answer to your whole question — only to the part about how to get started with Combine. I'll demonstrate how to chain two asynchronous operations using the Combine framework:

    print("start")
Future<Bool,Error> { promise in
delay(3) {
promise(.success(true))
}
}
.handleEvents(receiveOutput: {_ in print("finished 1")})
.flatMap {_ in
Future<Bool,Error> { promise in
delay(3) {
promise(.success(true))
}
}
}
.handleEvents(receiveOutput: {_ in print("finished 2")})
.sink(receiveCompletion: {_ in}, receiveValue: {_ in print("done")})
.store(in:&self.storage) // storage is a persistent Set<AnyCancellable>

First of all, the answer to your question about persistence is: the final subscriber must persist, and the way to do this is using the .store method. Typically you'll have a Set<AnyCancellable> as a property, as here, and you'll just call .store as the last thing in the pipeline to put your subscriber in there.

Next, in this pipeline I'm using .handleEvents just to give myself some printout as the pipeline moves along. Those are just diagnostics and wouldn't exist in a real implementation. All the print statements are purely so we can talk about what's happening here.

So what does happen?

start
finished 1 // 3 seconds later
finished 2 // 3 seconds later
done

So you can see we've chained two asynchronous operations, each of which takes 3 seconds.

How did we do it? We started with a Future, which must call its incoming promise method with a Result as a completion handler when it finishes. After that, we used .flatMap to produce another Future and put it into operation, doing the same thing again.

So the result is not beautiful (like PromiseKit) but it is a chain of async operations.

Before Combine, we'd have probably have done this with some sort of Operation / OperationQueue dependency, which would work fine but would have even less of the direct legibility of PromiseKit.

Slightly more realistic

Having said all that, here's a slightly more realistic rewrite:

var storage = Set<AnyCancellable>()
func async1(_ promise:@escaping (Result<Bool,Error>) -> Void) {
delay(3) {
print("async1")
promise(.success(true))
}
}
func async2(_ promise:@escaping (Result<Bool,Error>) -> Void) {
delay(3) {
print("async2")
promise(.success(true))
}
}
override func viewDidLoad() {
print("start")
Future<Bool,Error> { promise in
self.async1(promise)
}
.flatMap {_ in
Future<Bool,Error> { promise in
self.async2(promise)
}
}
.sink(receiveCompletion: {_ in}, receiveValue: {_ in print("done")})
.store(in:&self.storage) // storage is a persistent Set<AnyCancellable>
}

As you can see, the idea that is our Future publishers simply have to pass on the promise callback; they don't actually have to be the ones who call them. A promise callback can thus be called anywhere, and we won't proceed until then.

You can thus readily see how to replace the artificial delay with a real asynchronous operation that somehow has hold of this promise callback and can call it when it completes. Also my promise Result types are purely artificial, but again you can see how they might be used to communicate something meaningful down the pipeline. When I say promise(.success(true)), that causes true to pop out the end of the pipeline; we are disregarding that here, but it could be instead a downright useful value of some sort, possibly even the next Future.

(Note also that we could insert .receive(on: DispatchQueue.main) at any point in the chain to ensure that what follows immediately is started on the main thread.)

Slightly neater

It also occurs to me that we could make the syntax neater, perhaps a little closer to PromiseKit's lovely simple chain, by moving our Future publishers off into constants. If you do that, though, you should probably wrap them in Deferred publishers to prevent premature evaluation. So for example:

var storage = Set<AnyCancellable>()
func async1(_ promise:@escaping (Result<Bool,Error>) -> Void) {
delay(3) {
print("async1")
promise(.success(true))
}
}
func async2(_ promise:@escaping (Result<Bool,Error>) -> Void) {
delay(3) {
print("async2")
promise(.success(true))
}
}
override func viewDidLoad() {
print("start")
let f1 = Deferred{Future<Bool,Error> { promise in
self.async1(promise)
}}
let f2 = Deferred{Future<Bool,Error> { promise in
self.async2(promise)
}}
// this is now extremely neat-looking
f1.flatMap {_ in f2 }
.receive(on: DispatchQueue.main)
.sink(receiveCompletion: {_ in}, receiveValue: {_ in print("done")})
.store(in:&self.storage) // storage is a persistent Set<AnyCancellable>
}

Combine framework: how to process each element of array asynchronously before proceeding

With your latest edit and this comment below:

I literally am asking is there a Combine equivalent of "don't proceed to the next step until this step, involving multiple asynchronous steps, has finished"

I think this pattern can be achieved with .flatMap to an array publisher (Publishers.Sequence), which emits one-by-one and completes, followed by whatever per-element async processing is needed, and finalized with a .collect, which waits for all elements to complete before proceeding

So, in code, assuming we have these functions:

func getFoos() -> AnyPublisher<[Foo], Error>
func getPartials(for: Foo) -> AnyPublisher<[Partial], Error>
func getMoreInfo(for: Partial, of: Foo) -> AnyPublisher<MoreInfo, Error>

We can do the following:

getFoos()
.flatMap { fooArr in
fooArr.publisher.setFailureType(to: Error.self)
}

// per-foo element async processing
.flatMap { foo in

getPartials(for: foo)
.flatMap { partialArr in
partialArr.publisher.setFailureType(to: Error.self)
}

// per-partial of foo async processing
.flatMap { partial in

getMoreInfo(for: partial, of: foo)
// build completed partial with more info
.map { moreInfo in
var newPartial = partial
newPartial.moreInfo = moreInfo
return newPartial
}
}
.collect()
// build completed foo with all partials
.map { partialArr in
var newFoo = foo
newFoo.partials = partialArr
return newFoo
}
}
.collect()

(Deleted the old answer)

How to schedule a synchronous sequence of asynchronous calls in Combine?

After experimenting for a while in a playground, I believe I found a solution, but if you have a better idea, please share. The solution is to add maxPublishers parameter to flatMap and set the value to max(1)

Publishers.Sequence(sequence: urls)
.flatMap(maxPublishers: .max(1)) // <<<<--- here
{ url in
Publishers.Future<Result, Error> { callback in
myCall { data, error in
if let data = data {
callback(.success(data))
} else if let error = error {
callback(.failure(error))
}
}
}
}

Translating async method into Combine

Assuming you’ve refactored readTokenFromKeyChain, decrypt, and fetchToken to return AnyPublisher<String, Error> themselves, you can then do:

func getToken() -> AnyPublisher<String, Error> {
readTokenFromKeyChain()
.flatMap { self.tokenCryptoHelper.decrypt(encryptedToken: $0) }
.catch { _ in self.fetchToken() }
.receive(on: DispatchQueue.main)
.eraseToAnyPublisher()
}

That will read the keychain, if it succeeded, decrypt it, and if it didn’t succeed, it will call fetchToken. And having done all of that, it will make sure the final result is delivered on the main queue.


I think that’s the right general pattern. Now, let's talk about that dispatchQueue: Frankly, I’m not sure I’m seeing anything here that warrants running on a background thread, but let’s imagine you wanted to kick this off in a background queue, then, you readTokenFromKeyChain might dispatch that to a background queue:

func readTokenFromKeyChain() -> AnyPublisher<String, Error> {
dispatchQueue.publisher { promise in
let query: [CFString: Any] = [
kSecReturnData: true,
kSecClass: kSecClassGenericPassword,
kSecAttrAccount: "token",
kSecAttrService: Bundle.main.bundleIdentifier!]

var extractedData: AnyObject?
let status = SecItemCopyMatching(query as CFDictionary, &extractedData)

if
status == errSecSuccess,
let retrievedData = extractedData as? Data,
let string = String(data: retrievedData, encoding: .utf8)
{
promise(.success(string))
} else {
promise(.failure(TokenError.failure))
}
}
}

By the way, that’s using a simple little method, publisher that I added to DispatchQueue:

extension DispatchQueue {
/// Dispatch block asynchronously
/// - Parameter block: Block

func publisher<Output, Failure: Error>(_ block: @escaping (Future<Output, Failure>.Promise) -> Void) -> AnyPublisher<Output, Failure> {
Future<Output, Failure> { promise in
self.async { block(promise) }
}.eraseToAnyPublisher()
}
}

For the sake of completeness, this is a sample fetchToken implementation:

func fetchToken() -> AnyPublisher<String, Error> {
let request = ...

return URLSession.shared
.dataTaskPublisher(for: request)
.map { $0.data }
.decode(type: ResponseObject.self, decoder: JSONDecoder())
.map { $0.payload.token }
.eraseToAnyPublisher()
}

Using operation queues with combine framework

Combine is just another asynchronous pattern, but doesn’t supplant operation queues (or dispatch queues). Just as GCD and operation queues happily coexist in our code bases, the same is true with Combine.

  • GCD is great at easy-to-write, yet still highly performant, code to dispatching tasks to various queues. So if you have something that might risk blocking the main thread, GCD makes it really easy to dispatch that to a background thread, and then dispatch some completion block back to the main thread. It also handles timers on background threads, data synchronization, highly-optimized parallelized code, etc.

  • Operation queues are great for higher-level tasks (especially those that are, themselves, asynchronous). You can take these pieces of work, wrap them up in discrete objects (for nice separation of responsibilities) and the operation queues manage execution, cancelation, and constrained concurrency, quite elegantly.

  • Combine shines at writing concise, declarative, composable, asynchronous event handling code. It excels at writing code that outlines how, for example, one’s UI should reflect some event (network task, notification, even UI updates).

This is obviously an oversimplification, but those are a few of the strengths of the various frameworks. And there is definitely overlap in these three frameworks, for sure, but each has its place.

Combine Framework stream transformation ['a','b','c'] - 'a' then 'b' then 'c'

You can use flatMap to create a publisher that, for each element that the upstream publisher publishes, transform the element to a publisher and publish what that publisher publishes instead.

Suppose cvs is a CurrentValueSubject<[String], Never>, you can get a publisher that publishes each string of the array that cvs publishes by doing:

cvs.flatMap(\.publisher)

Note that \.publisher here refers to the publisher property on sequences, which is a publisher that publishes each element separately.



Related Topics



Leave a reply



Submit