How to Process an Array of Task Asynchronously with Swift Combine

How to process an array of task asynchronously with swift combine

Combine offers extensions around URLSession to handle network requests unless you really need to integrate with OperationQueue based networking, then Future is a fine candidate. You can run multiple Futures and collect them at some point, but I'd really suggest looking at URLSession extensions for Combine.

struct User: Codable {
var username: String
}

let requestURL = URL(string: "https://example.com/")!
let publisher = URLSession.shared.dataTaskPublisher(for: requestURL)
.map { $0.data }
.decode(type: User.self, decoder: JSONDecoder())

Regarding running a batch of requests, it's possible to use Publishers.MergeMany, i.e:

struct User: Codable {
var username: String
}

let userIds = [1, 2, 3]

let subscriber = Just(userIds)
.setFailureType(to: Error.self)
.flatMap { (values) -> Publishers.MergeMany<AnyPublisher<User, Error>> in
let tasks = values.map { (userId) -> AnyPublisher<User, Error> in
let requestURL = URL(string: "https://jsonplaceholder.typicode.com/users/\(userId)")!

return URLSession.shared.dataTaskPublisher(for: requestURL)
.map { $0.data }
.decode(type: User.self, decoder: JSONDecoder())
.eraseToAnyPublisher()
}
return Publishers.MergeMany(tasks)
}.collect().sink(receiveCompletion: { (completion) in
if case .failure(let error) = completion {
print("Got error: \(error.localizedDescription)")
}
}) { (allUsers) in
print("Got users:")
allUsers.map { print("\($0)") }
}

In the example above I use collect to collect all results, which postpones emitting the value to the Sink until all of the network requests successfully finished, however you can get rid of the collect and receive each User in the example above one by one as network requests complete.

Combine framework: how to process each element of array asynchronously before proceeding

With your latest edit and this comment below:

I literally am asking is there a Combine equivalent of "don't proceed to the next step until this step, involving multiple asynchronous steps, has finished"

I think this pattern can be achieved with .flatMap to an array publisher (Publishers.Sequence), which emits one-by-one and completes, followed by whatever per-element async processing is needed, and finalized with a .collect, which waits for all elements to complete before proceeding

So, in code, assuming we have these functions:

func getFoos() -> AnyPublisher<[Foo], Error>
func getPartials(for: Foo) -> AnyPublisher<[Partial], Error>
func getMoreInfo(for: Partial, of: Foo) -> AnyPublisher<MoreInfo, Error>

We can do the following:

getFoos()
.flatMap { fooArr in
fooArr.publisher.setFailureType(to: Error.self)
}

// per-foo element async processing
.flatMap { foo in

getPartials(for: foo)
.flatMap { partialArr in
partialArr.publisher.setFailureType(to: Error.self)
}

// per-partial of foo async processing
.flatMap { partial in

getMoreInfo(for: partial, of: foo)
// build completed partial with more info
.map { moreInfo in
var newPartial = partial
newPartial.moreInfo = moreInfo
return newPartial
}
}
.collect()
// build completed foo with all partials
.map { partialArr in
var newFoo = foo
newFoo.partials = partialArr
return newFoo
}
}
.collect()

(Deleted the old answer)

Combine framework serialize async operations

I've only briefly tested this, but at first pass it appears that each request waits for the previous request to finish before starting.

I'm posting this solution in search of feedback. Please be critical if this isn't a good solution.

extension Collection where Element: Publisher {

func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
// If the collection is empty, we can't just create an arbititary publisher
// so we return nil to indicate that we had nothing to serialize.
if isEmpty { return nil }

// We know at this point that it's safe to grab the first publisher.
let first = self.first!

// If there was only a single publisher then we can just return it.
if count == 1 { return first.eraseToAnyPublisher() }

// We're going to build up the output starting with the first publisher.
var output = first.eraseToAnyPublisher()

// We iterate over the rest of the publishers (skipping over the first.)
for publisher in self.dropFirst() {
// We build up the output by appending the next publisher.
output = output.append(publisher).eraseToAnyPublisher()
}

return output
}
}


A more concise version of this solution (provided by @matt):

extension Collection where Element: Publisher {
func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
guard let start = self.first else { return nil }
return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
$0.append($1).eraseToAnyPublisher()
}
}
}

Swift map from array of objects to array of async functions and await them all

The standard pattern is TaskGroup. Add your tasks for the individual images, and then await in a for loop, map, or, in this case, reduce:

func fetchAllTheAvatars(people: [Person]) async -> [Person.ID: UIImage] {
await withTaskGroup(of: (Person.ID, UIImage?).self) { group in
for person in people {
group.addTask { await (person.id, person.fetchAvatar()) }
}

return await group.reduce(into: [Person.ID: UIImage]()) { (dictionary, result) in
if let image = result.1 {
dictionary[result.0] = image
}
}
}
}

Note, because the order is not guaranteed and because some of your Person may not return an image, my implementation returns an efficient, order-independent, structure (i.e., a dictionary).

Needless to say, the above assumes that you make Person conform to Identifiable:

struct Person: Identifiable {
let id = UUID()
let name: String

func fetchAvatar() async -> UIImage? { … }
}

How to schedule a synchronous sequence of asynchronous calls in Combine?

After experimenting for a while in a playground, I believe I found a solution, but if you have a better idea, please share. The solution is to add maxPublishers parameter to flatMap and set the value to max(1)

Publishers.Sequence(sequence: urls)
.flatMap(maxPublishers: .max(1)) // <<<<--- here
{ url in
Publishers.Future<Result, Error> { callback in
myCall { data, error in
if let data = data {
callback(.success(data))
} else if let error = error {
callback(.failure(error))
}
}
}
}


Related Topics



Leave a reply



Submit