Combine Framework: How to Process Each Element of Array Asynchronously Before Proceeding

Combine framework: how to process each element of array asynchronously before proceeding

With your latest edit and this comment below:

I literally am asking is there a Combine equivalent of "don't proceed to the next step until this step, involving multiple asynchronous steps, has finished"

I think this pattern can be achieved with .flatMap to an array publisher (Publishers.Sequence), which emits one-by-one and completes, followed by whatever per-element async processing is needed, and finalized with a .collect, which waits for all elements to complete before proceeding

So, in code, assuming we have these functions:

func getFoos() -> AnyPublisher<[Foo], Error>
func getPartials(for: Foo) -> AnyPublisher<[Partial], Error>
func getMoreInfo(for: Partial, of: Foo) -> AnyPublisher

We can do the following:

getFoos()
.flatMap { fooArr in
fooArr.publisher.setFailureType(to: Error.self)
}

// per-foo element async processing
.flatMap { foo in

getPartials(for: foo)
.flatMap { partialArr in
partialArr.publisher.setFailureType(to: Error.self)
}

// per-partial of foo async processing
.flatMap { partial in

getMoreInfo(for: partial, of: foo)
// build completed partial with more info
.map { moreInfo in
var newPartial = partial
newPartial.moreInfo = moreInfo
return newPartial
}
}
.collect()
// build completed foo with all partials
.map { partialArr in
var newFoo = foo
newFoo.partials = partialArr
return newFoo
}
}
.collect()

(Deleted the old answer)

How to process an array of task asynchronously with swift combine

Combine offers extensions around URLSession to handle network requests unless you really need to integrate with OperationQueue based networking, then Future is a fine candidate. You can run multiple Futures and collect them at some point, but I'd really suggest looking at URLSession extensions for Combine.

struct User: Codable {
var username: String
}

let requestURL = URL(string: "https://example.com/")!
let publisher = URLSession.shared.dataTaskPublisher(for: requestURL)
.map { $0.data }
.decode(type: User.self, decoder: JSONDecoder())

Regarding running a batch of requests, it's possible to use Publishers.MergeMany, i.e:

struct User: Codable {
var username: String
}

let userIds = [1, 2, 3]

let subscriber = Just(userIds)
.setFailureType(to: Error.self)
.flatMap { (values) -> Publishers.MergeMany> in
let tasks = values.map { (userId) -> AnyPublisher in
let requestURL = URL(string: "https://jsonplaceholder.typicode.com/users/\(userId)")!

return URLSession.shared.dataTaskPublisher(for: requestURL)
.map { $0.data }
.decode(type: User.self, decoder: JSONDecoder())
.eraseToAnyPublisher()
}
return Publishers.MergeMany(tasks)
}.collect().sink(receiveCompletion: { (completion) in
if case .failure(let error) = completion {
print("Got error: \(error.localizedDescription)")
}
}) { (allUsers) in
print("Got users:")
allUsers.map { print("\($0)") }
}

In the example above I use collect to collect all results, which postpones emitting the value to the Sink until all of the network requests successfully finished, however you can get rid of the collect and receive each User in the example above one by one as network requests complete.

Combine framework serialize async operations

I've only briefly tested this, but at first pass it appears that each request waits for the previous request to finish before starting.

I'm posting this solution in search of feedback. Please be critical if this isn't a good solution.

extension Collection where Element: Publisher {

func serialize() -> AnyPublisher? {
// If the collection is empty, we can't just create an arbititary publisher
// so we return nil to indicate that we had nothing to serialize.
if isEmpty { return nil }

// We know at this point that it's safe to grab the first publisher.
let first = self.first!

// If there was only a single publisher then we can just return it.
if count == 1 { return first.eraseToAnyPublisher() }

// We're going to build up the output starting with the first publisher.
var output = first.eraseToAnyPublisher()

// We iterate over the rest of the publishers (skipping over the first.)
for publisher in self.dropFirst() {
// We build up the output by appending the next publisher.
output = output.append(publisher).eraseToAnyPublisher()
}

return output
}
}


A more concise version of this solution (provided by @matt):

extension Collection where Element: Publisher {
func serialize() -> AnyPublisher? {
guard let start = self.first else { return nil }
return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
$0.append($1).eraseToAnyPublisher()
}
}
}

Swift Combine transform an array of publishers

The general approach is to flatMap the array into a Sequence publisher of individual values, then do any async operations on each value (another flatMap), then collect the results:

HKHealthStore()
.workouts(HKObjectQueryNoLimit)
.flatMap { workouts in workouts.publisher }
.flatMap { workout in
workoutDetails(workout)
}
.collect()
.sink(
receiveCompletion: { ... },
receiveValue: { arrayOfWorkoutDetails in
// arrayOfWorkoutDetails is of type [WorkoutDetails]
})
.store(in: &bag)

How to observe array's new values with combine?

The publisher method of the array does exactly that - emits every element of the array and then completes, because the array at the time of calling the publisher has a finite number of elements.

If you want to be notified every time the arrays changes (not just when something is appended to it, but on any change) then create a @Published var for the array and attach the observer to it:

@Published var arr = [1,2,3]

cancellable = $arr
.sink { completion in print("Completed with \(completion)")
} receiveValue: { val in
print("Received value \(val)")
}

arr.append(4)

The output will be:

Received value [1, 2, 3]
Received value [1, 2, 3, 4]

But looks like what you are really looking for is listening to a stream of numbers emitted one at a time. Then define a @Published var of that type and listen to it. You'll get called every time the var changes:

@Published var value = 1

cancellable = $value
.sink { completion in print("Completed with \(completion)")
} receiveValue: { val in
print("Received value \(val)")
}

value = 2
value = 3
value = 4

The output will be:

Received value 1
Received value 2
Received value 3
Received value 4


Combine Framework stream transformation ['a','b','c'] - 'a' then 'b' then 'c'

You can use flatMap to create a publisher that, for each element that the upstream publisher publishes, transform the element to a publisher and publish what that publisher publishes instead.

Suppose cvs is a CurrentValueSubject<[String], Never>, you can get a publisher that publishes each string of the array that cvs publishes by doing:

cvs.flatMap(\.publisher)

Note that \.publisher here refers to the publisher property on sequences, which is a publisher that publishes each element separately.

Inconsistent behavior of map function with Empty value in Supply and plain Array?

Why not do it like this...

my @strings = ["a", "1.24", "3", "def", "45", "0.23"];

my $supply = Supply.from-list(@strings);
my $compact = $supply.grep(*.Num.so);
$compact.tap(&say);

# 1.24
# 3
# 45
# 0.23

I agree that there seems to be a small divergence (related to Failure handling) vs. regular grep (no .so needed)...

say @strings.grep(*.Num);
#(1.24 3 45 0.23)

Swift Combine Nested Publishers

You can use Publishers.MergeMany and collect() for this:

let decoder = JSONDecoder()
decoder.keyDecodingStrategy = .convertFromSnakeCase

func loadTrailerLinks() -> AnyPublisher<[LPLinkMetadata], Error> {
// Download data
URLSession.shared.dataTaskPublisher(for: URL(string: "Doesn't matter")!)
.tryMap() { element -> Data in
guard let httpResponse = element.response as? HTTPURLResponse,
httpResponse.statusCode == 200 else {
throw URLError(.badServerResponse)
}
return element.data
}
.decode(type: TrailerVideoResult.self, decoder: decoder)
// Convert the TrailerVideoResult to a MergeMany publisher, which merges the
// [AnyPublisher] into a single publisher with output
// type LPLinkMetadata
.flatMap {
Publishers.MergeMany(
$0.results
.filter { $0.site == "YouTube" }
.compactMap { URL(string: "https://www.youtube.com/watch?v=\($0.key)") }
.map(fetchMetaData)
)
// Change the error type from Never to Error
.setFailureType(to: Error.self)
}
// Collect all the LPLinkMetadata and then publish a single result of
// [LPLinkMetadata]
.collect()
.eraseToAnyPublisher()
}

Chaining array of AnyPublishers

First of all, let's clarify your question. Based on your description of wanting to create a chain of flatMap-ed publishers, what you must have is an array of closures - not publishers - each returning an AnyPublisher publisher given a String parameter.

let pubs: [(String) -> AnyPublisher] = [
publisher1, publisher2, publisher3
]

To chain them, you could use a reduce method of the array by starting with a Just publisher to emit the initial parameter:

let pipe = pubs.reduce(Just("").eraseToAnyPublisher()) { acc, next in
acc.flatMap { next($0) }.eraseToAnyPublisher()
}


Related Topics



Leave a reply



Submit