Using Combine's Future to Replicate Async Await in Swift

Using Combine's Future to replicate async await in Swift

Future is a Publisher. To chain Publishers, use .flatMap.

However, there is no need to chain futures in your use case, because there is only one asynchronous operation, namely the call to requestAccess. If you want to encapsulate the result of an operation that might throw an error, like your fetchContacts, what you want to return is not a Future but a Result.

To illustrate, I'll create a possible pipeline that does what you describe. Throughout the discussion, I'll first show some code, then discuss that code, in that order.

First, I'll prepare some methods we can call along the way:

func checkAccess() -> Result<Bool, Error> {
Result<Bool, Error> {
let status = CNContactStore.authorizationStatus(for:.contacts)
switch status {
case .authorized: return true
case .notDetermined: return false
default:
enum NoPoint : Error { case userRefusedAuthorization }
throw NoPoint.userRefusedAuthorization
}
}
}

In checkAccess, we look to see whether we have authorization. There are only two cases of interest; either we are authorized, in which case we can proceed to access our contacts, or we are not determined, in which case we can ask the user for authorization. The other possibilities are of no interest: we know we have no authorization and we cannot request it. So I characterize the result, as I said earlier, as a Result:

  • .success(true) means we have authorization

  • .success(false) means we don't have authorization but we can ask for it

  • .failure means don't have authorization and there is no point going on; I make this a custom Error so we can throw it in our pipeline and thus complete the pipeline prematurely.

OK, on to the next function.

func requestAccessFuture() -> Future<Bool, Error> {
Future<Bool, Error> { promise in
CNContactStore().requestAccess(for:.contacts) { ok, err in
if err != nil {
promise(.failure(err!))
} else {
promise(.success(ok)) // will be true
}
}
}
}

requestAccessFuture embodies the only asynchronous operation, namely requesting access from the user. So I generate a Future. There are only two possibilities: either we will get an error or we will get a Bool that is true. There are no circumstances under which we get no error but a false Bool. So I either call the promise's failure with the error or I call its success with the Bool, which I happen to know will always be true.

func getMyEmailAddresses() -> Result<[CNLabeledValue<NSString>], Error> {
Result<[CNLabeledValue<NSString>], Error> {
let pred = CNContact.predicateForContacts(matchingName:"John Appleseed")
let jas = try CNContactStore().unifiedContacts(matching:pred, keysToFetch: [
CNContactFamilyNameKey as CNKeyDescriptor,
CNContactGivenNameKey as CNKeyDescriptor,
CNContactEmailAddressesKey as CNKeyDescriptor
])
guard let ja = jas.first else {
enum NotFound : Error { case oops }
throw NotFound.oops
}
return ja.emailAddresses
}
}

getMyEmailAddresses is just a sample operation accessing the contacts. Such an operation can throw, so I express it once again as a Result.

Okay, now we're ready to build the pipeline! Here we go.

self.checkAccess().publisher

Our call to checkAccess yields a Result. But a Result has a publisher! So that publisher is the start of our chain. If the Result didn't get an error, this publisher will emit a Bool value. If it did get an error, the publisher will throw it down the pipeline.

.flatMap { (gotAccess:Bool) -> AnyPublisher<Bool, Error> in
if gotAccess {
let just = Just(true).setFailureType(to:Error.self).eraseToAnyPublisher()
return just
} else {
let req = self.requestAccessFuture().eraseToAnyPublisher()
return req
}
}

This is the only interesting step along the pipeline. We receive a Bool. If it is true, we have no work to do; but if it is false, we need to get our Future and publish it. The way you publish a publisher is with .flatMap; so if gotAccess is false, we fetch our Future and return it. But what if gotAccess is true? We still have to return a publisher, and it needs to be of the same type as our Future. It doesn't actually have to be a Future, because we can erase to AnyPublisher. But it must be of the same types, namely Bool and Error.

So we create a Just and return it. In particular, we return Just(true), to indicate that we are authorized. But we have to jump through some hoops to map the error type to Error, because a Just's error type is Never. I do that by applying setFailureType(to:).

Okay, the rest is easy.

.receive(on: DispatchQueue.global(qos: .userInitiated))

We jump onto a background thread, so that we can talk to the contact store without blocking the main thread.

.compactMap { (auth:Bool) -> Result<[CNLabeledValue<NSString>], Error>? in
if auth {
return self.getMyEmailAddresses()
}
return nil
}

If we receive true at this point, we are authorized, so we call getMyEmailAddress and return the result, which, you recall, is a Result. If we receive false, we want to do nothing; but we are not allowed to return nothing from map, so we use compactMap instead, which allows us to return nil to mean "do nothing". Therefore, if we got an error instead of a Bool, the error will just pass on down the pipeline unchanged.

.receive(on: DispatchQueue.main)
.sink(receiveCompletion: { completion in
if case let .failure(err) = completion {
print("error:", err)
}
}, receiveValue: { result in
if case let .success(emails) = result {
print("got emails:", emails)
}
})

We've finished, so it remains only to get ready to receive the error or the emails (wrapped in a Result) that have come down the pipeline. I do this, by way of illustration, simply by getting back onto the main thread and printing out what comes down the pipeline at us.


This description doesn't seem quite enough to give some readers the idea, so I've posted an actual example project at https://github.com/mattneub/CombineAuthorization.

How to replicate PromiseKit-style chained async flow using Combine + Swift

This is not a real answer to your whole question — only to the part about how to get started with Combine. I'll demonstrate how to chain two asynchronous operations using the Combine framework:

    print("start")
Future<Bool,Error> { promise in
delay(3) {
promise(.success(true))
}
}
.handleEvents(receiveOutput: {_ in print("finished 1")})
.flatMap {_ in
Future<Bool,Error> { promise in
delay(3) {
promise(.success(true))
}
}
}
.handleEvents(receiveOutput: {_ in print("finished 2")})
.sink(receiveCompletion: {_ in}, receiveValue: {_ in print("done")})
.store(in:&self.storage) // storage is a persistent Set<AnyCancellable>

First of all, the answer to your question about persistence is: the final subscriber must persist, and the way to do this is using the .store method. Typically you'll have a Set<AnyCancellable> as a property, as here, and you'll just call .store as the last thing in the pipeline to put your subscriber in there.

Next, in this pipeline I'm using .handleEvents just to give myself some printout as the pipeline moves along. Those are just diagnostics and wouldn't exist in a real implementation. All the print statements are purely so we can talk about what's happening here.

So what does happen?

start
finished 1 // 3 seconds later
finished 2 // 3 seconds later
done

So you can see we've chained two asynchronous operations, each of which takes 3 seconds.

How did we do it? We started with a Future, which must call its incoming promise method with a Result as a completion handler when it finishes. After that, we used .flatMap to produce another Future and put it into operation, doing the same thing again.

So the result is not beautiful (like PromiseKit) but it is a chain of async operations.

Before Combine, we'd have probably have done this with some sort of Operation / OperationQueue dependency, which would work fine but would have even less of the direct legibility of PromiseKit.

Slightly more realistic

Having said all that, here's a slightly more realistic rewrite:

var storage = Set<AnyCancellable>()
func async1(_ promise:@escaping (Result<Bool,Error>) -> Void) {
delay(3) {
print("async1")
promise(.success(true))
}
}
func async2(_ promise:@escaping (Result<Bool,Error>) -> Void) {
delay(3) {
print("async2")
promise(.success(true))
}
}
override func viewDidLoad() {
print("start")
Future<Bool,Error> { promise in
self.async1(promise)
}
.flatMap {_ in
Future<Bool,Error> { promise in
self.async2(promise)
}
}
.sink(receiveCompletion: {_ in}, receiveValue: {_ in print("done")})
.store(in:&self.storage) // storage is a persistent Set<AnyCancellable>
}

As you can see, the idea that is our Future publishers simply have to pass on the promise callback; they don't actually have to be the ones who call them. A promise callback can thus be called anywhere, and we won't proceed until then.

You can thus readily see how to replace the artificial delay with a real asynchronous operation that somehow has hold of this promise callback and can call it when it completes. Also my promise Result types are purely artificial, but again you can see how they might be used to communicate something meaningful down the pipeline. When I say promise(.success(true)), that causes true to pop out the end of the pipeline; we are disregarding that here, but it could be instead a downright useful value of some sort, possibly even the next Future.

(Note also that we could insert .receive(on: DispatchQueue.main) at any point in the chain to ensure that what follows immediately is started on the main thread.)

Slightly neater

It also occurs to me that we could make the syntax neater, perhaps a little closer to PromiseKit's lovely simple chain, by moving our Future publishers off into constants. If you do that, though, you should probably wrap them in Deferred publishers to prevent premature evaluation. So for example:

var storage = Set<AnyCancellable>()
func async1(_ promise:@escaping (Result<Bool,Error>) -> Void) {
delay(3) {
print("async1")
promise(.success(true))
}
}
func async2(_ promise:@escaping (Result<Bool,Error>) -> Void) {
delay(3) {
print("async2")
promise(.success(true))
}
}
override func viewDidLoad() {
print("start")
let f1 = Deferred{Future<Bool,Error> { promise in
self.async1(promise)
}}
let f2 = Deferred{Future<Bool,Error> { promise in
self.async2(promise)
}}
// this is now extremely neat-looking
f1.flatMap {_ in f2 }
.receive(on: DispatchQueue.main)
.sink(receiveCompletion: {_ in}, receiveValue: {_ in print("done")})
.store(in:&self.storage) // storage is a persistent Set<AnyCancellable>
}

Translating async method into Combine

Assuming you’ve refactored readTokenFromKeyChain, decrypt, and fetchToken to return AnyPublisher<String, Error> themselves, you can then do:

func getToken() -> AnyPublisher<String, Error> {
readTokenFromKeyChain()
.flatMap { self.tokenCryptoHelper.decrypt(encryptedToken: $0) }
.catch { _ in self.fetchToken() }
.receive(on: DispatchQueue.main)
.eraseToAnyPublisher()
}

That will read the keychain, if it succeeded, decrypt it, and if it didn’t succeed, it will call fetchToken. And having done all of that, it will make sure the final result is delivered on the main queue.


I think that’s the right general pattern. Now, let's talk about that dispatchQueue: Frankly, I’m not sure I’m seeing anything here that warrants running on a background thread, but let’s imagine you wanted to kick this off in a background queue, then, you readTokenFromKeyChain might dispatch that to a background queue:

func readTokenFromKeyChain() -> AnyPublisher<String, Error> {
dispatchQueue.publisher { promise in
let query: [CFString: Any] = [
kSecReturnData: true,
kSecClass: kSecClassGenericPassword,
kSecAttrAccount: "token",
kSecAttrService: Bundle.main.bundleIdentifier!]

var extractedData: AnyObject?
let status = SecItemCopyMatching(query as CFDictionary, &extractedData)

if
status == errSecSuccess,
let retrievedData = extractedData as? Data,
let string = String(data: retrievedData, encoding: .utf8)
{
promise(.success(string))
} else {
promise(.failure(TokenError.failure))
}
}
}

By the way, that’s using a simple little method, publisher that I added to DispatchQueue:

extension DispatchQueue {
/// Dispatch block asynchronously
/// - Parameter block: Block

func publisher<Output, Failure: Error>(_ block: @escaping (Future<Output, Failure>.Promise) -> Void) -> AnyPublisher<Output, Failure> {
Future<Output, Failure> { promise in
self.async { block(promise) }
}.eraseToAnyPublisher()
}
}

For the sake of completeness, this is a sample fetchToken implementation:

func fetchToken() -> AnyPublisher<String, Error> {
let request = ...

return URLSession.shared
.dataTaskPublisher(for: request)
.map { $0.data }
.decode(type: ResponseObject.self, decoder: JSONDecoder())
.map { $0.payload.token }
.eraseToAnyPublisher()
}

Executing a task asynchronously using Combine with Swift

Your sleep(3) call runs on the main thread, which means that it blocks any other operations, including the code that prints the "Started" text.

I won't be rambling about how bad it is to block the main thread, this is well known information, but this is the reason you see the behaviour you asked about.

I don't see any thread switching code in your question, so if you wish to achieve some kind of asynchronicity, then you can either go with Rob's solution of using dispatch(after:), or do the locomotion (the sleep) on another thread:

func execute(with payload: TInput) -> AnyPublisher<TOutput, Never> {
return AnyPublisher(Future<TOutput, Never> { promise in
DispatchQueue.global().async {
promise(.success(self.task!))
}
})
.eraseToAnyPublisher()
}

Async Future Promise not working in Swift 5

dealing with async functions can be tricky. You are getting an empty array, because you are returning too early, in loadUserFromFirebase. Try this approach (untested) using the old style closure:

func loadUserFromFirebase(groupUserIds: [String], handler: @escaping ([groupMate]) -> Void) { // <-- here
var groupmateID = 0
var groupMates : [String] = []
print("groupUserIds: \(groupUserIds)" )

for groupUserUID in groupUserIds {
print("groupUserUID: \(groupUserUID)" )

let future = Future<groupMate, Never> { promise in
self.ref.collection("Users").document(groupUserUID).getDocument(){ (friendDocument, err) in
if let err = err {
print("Error getting documents \(err)")
} else {
print("friendDocument: \(String(describing: friendDocument?.data()))" )

let groupUsername = (friendDocument?.data()?["username"]) as! String
let groupUID = (friendDocument?.data()?["uid"]) as! String
let groupName = (friendDocument?.data()?["name"]) as! String
let groupPic = (friendDocument?.data()?["imageurl"]) as! String

promise(.success(groupMate(id: groupmateID, uid: groupUID , name: groupName , username: groupUsername, pic: groupPic)))
}
groupmateID += 1
}
}

future.sink(receiveCompletion: { completion in
print("in receiveCompletion")
print(1, completion)
switch completion {
case .failure(let error):
print(3, error)
handler([]) // <-- here
return // <-- here
case .finished:
break
}
},
receiveValue: {
print("in receiveValue")
groupMates.append($0)
print(groupMates)
handler(groupMates) // <-- here
})
}
// <-- do not return here
}


func creategroup(groupName: String) {
addedTogroupUsers.append(self.uid)

// -- here wait until you get the data
loadUserFromFirebase(groupUserIds: addedTogroupUsers) { groupMates in

let groupData: [String: Any] = [
"groupName": "\(groupName)",
"groupmates": groupMates // <-- here
]

print("groupMates are \(self.groupMates)")

var groupref: DocumentReference? = nil
groupref = ref.collection("groups").addDocument(data: groupData) { err in
if let err = err {
print("Error adding document: \(err)")
} else {
print("Document added with ID: \(groupref!.documentID)")
for addedgroupUser in self.addedTogroupUsers {
self.ref.collection("Users").document(addedgroupUser).updateData([
"groups": FieldValue.arrayUnion([groupref!.documentID])
])
}
}
}
print("groupName is \(groupName) and addedTogroup are \(addedTogroupUsers)")
}

}

Note, if you are targeting ios 15, macos 12, you are far better served if you use the swift 5.5 async/await/task features. They really work well.

EDIT: trying to return all results

func loadUserFromFirebase(groupUserIds: [String], handler: @escaping ([groupMate]) -> Void) {
var groupmateID = 0
var groupMates : [String] = []
print("groupUserIds: \(groupUserIds)" )

var arr: [Future<groupMate, Never>] = [Future<groupMate, Never>]()

var cancellable = Set<AnyCancellable>()

for groupUserUID in groupUserIds {
print("groupUserUID: \(groupUserUID)" )

let future = Future<groupMate, Never> { promise in
self.ref.collection("Users").document(groupUserUID).getDocument(){ (friendDocument, err) in
if let err = err {
print("Error getting documents \(err)")
} else {
print("friendDocument: \(String(describing: friendDocument?.data()))" )

let groupUsername = (friendDocument?.data()?["username"]) as! String
let groupUID = (friendDocument?.data()?["uid"]) as! String
let groupName = (friendDocument?.data()?["name"]) as! String
let groupPic = (friendDocument?.data()?["imageurl"]) as! String

promise(.success(groupMate(id: groupmateID, uid: groupUID , name: groupName , username: groupUsername, pic: groupPic)))
}
groupmateID += 1
}
}
arr.append(future)
}

Publishers.MergeMany(arr)
.collect()
.sink { _ in
print("-----> merging ")
} receiveValue: { value in
print("-----> value: \(value)")
groupMates = value // <--- maybe append?
print(groupMates)
handler(groupMates)
}
.store(in: &cancellable)
}

Swift Combine - Async calls

With some adjustment for used types and error types this should work. First you ask for the profile, then you force unwrap the profile if it is nil you throw an error that will be sent to the sink as a failure.
If the profile is present you call connect.

getUserProfile()
.tryMap { userDTO -> UserProfileDTO in
if let id = userDTO {
return id
}
throw MyError.noProfileDT
}
.flatMap { id in
connect(id)
}
.sink {
//.....
}


Related Topics



Leave a reply



Submit