Apple Combine Framework: How to Execute Multiple Publishers in Parallel and Wait for All of Them to Finish

Apple Combine framework: How to execute multiple Publishers in parallel and wait for all of them to finish?

You can run some operations in parallel by creating a collection of publishers, applying the flatMap operator and then collect to wait for all of the publishers to complete before continuing. Here's an example that you can run in a playground:

import Combine
import Foundation

func delayedPublisher<Value>(_ value: Value, delay after: Double) -> AnyPublisher<Value, Never> {
let p = PassthroughSubject<Value, Never>()
DispatchQueue.main.asyncAfter(deadline: .now() + after) {
p.send(value)
p.send(completion: .finished)
}
return p.eraseToAnyPublisher()
}

let myPublishers = [1,2,3]
.map{ delayedPublisher($0, delay: 1 / Double($0)).print("\($0)").eraseToAnyPublisher() }

let cancel = myPublishers
.publisher
.flatMap { $0 }
.collect()
.sink { result in
print("result:", result)
}

Here is the output:

1: receive subscription: (PassthroughSubject)
1: request unlimited
2: receive subscription: (PassthroughSubject)
2: request unlimited
3: receive subscription: (PassthroughSubject)
3: request unlimited
3: receive value: (3)
3: receive finished
2: receive value: (2)
2: receive finished
1: receive value: (1)
1: receive finished
result: [3, 2, 1]

Notice that the publishers are all immediately started (in their original order).

The 1 / $0 delay causes the first publisher to take the longest to complete. Notice the order of the values at the end. Since the first took the longest to complete, it is the last item.

Swift Combine - Multiple Observable (CurrentValueSubject or PassthroughSubject) into one but wait for update from all

Zip3 seems to be what you are looking for:

Publishers
.Zip3(var1, var2, var3)
.sink(receivedValue: { var1, var2, var3 in
print("Printed!")
})
.store(in: &subscriptions)

From the docs of zip:

Use zip(_:_:) to return a new publisher that combines the elements from two additional publishers to publish a tuple to the downstream. The returned publisher waits until all three publishers have emitted an event, then delivers the oldest unconsumed event from each publisher as a tuple to the subscriber.

See a visualisation on RxMarbles!

How to limit flatMap concurrency in Combine still having all source events processed?

You have stumbled very beautifully right into the use case for the .buffer operator. Its purpose is to compensate for .flatMap backpressure by accumulating values that would otherwise be dropped.

I will illustrate by a completely artificial example:

class ViewController: UIViewController {
let sub = PassthroughSubject<Int,Never>()
var storage = Set<AnyCancellable>()
var timer : Timer!
override func viewDidLoad() {
super.viewDidLoad()
sub
.flatMap(maxPublishers:.max(3)) { i in
return Just(i)
.delay(for: 3, scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
}
.sink { print($0) }
.store(in: &storage)

var count = 0
self.timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) {
_ in
count += 1
self.sub.send(count)
}
}
}

So, our publisher is emitting an incremented integer every second, but our flatMap has .max(3) and takes 3 seconds to republish a value. The result is that we start to miss values:

1
2
3
5
6
7
9
10
11
...

The solution is to put a buffer in front of the flatMap. It needs to be large enough to hold any missed values long enough for them to be requested:

        sub
.buffer(size: 20, prefetch: .keepFull, whenFull: .dropOldest)
.flatMap(maxPublishers:.max(3)) { i in

The result is that all the numeric values do in fact arrive at the sink. Of course in real life we could still lose values if the buffer is not large enough to compensate for disparity between the rate of value emission from the publisher and the rate of value emission from the backpressuring flatMap.

Combine framework: how to process each element of array asynchronously before proceeding

With your latest edit and this comment below:

I literally am asking is there a Combine equivalent of "don't proceed to the next step until this step, involving multiple asynchronous steps, has finished"

I think this pattern can be achieved with .flatMap to an array publisher (Publishers.Sequence), which emits one-by-one and completes, followed by whatever per-element async processing is needed, and finalized with a .collect, which waits for all elements to complete before proceeding

So, in code, assuming we have these functions:

func getFoos() -> AnyPublisher<[Foo], Error>
func getPartials(for: Foo) -> AnyPublisher<[Partial], Error>
func getMoreInfo(for: Partial, of: Foo) -> AnyPublisher<MoreInfo, Error>

We can do the following:

getFoos()
.flatMap { fooArr in
fooArr.publisher.setFailureType(to: Error.self)
}

// per-foo element async processing
.flatMap { foo in

getPartials(for: foo)
.flatMap { partialArr in
partialArr.publisher.setFailureType(to: Error.self)
}

// per-partial of foo async processing
.flatMap { partial in

getMoreInfo(for: partial, of: foo)
// build completed partial with more info
.map { moreInfo in
var newPartial = partial
newPartial.moreInfo = moreInfo
return newPartial
}
}
.collect()
// build completed foo with all partials
.map { partialArr in
var newFoo = foo
newFoo.partials = partialArr
return newFoo
}
}
.collect()

(Deleted the old answer)

How to queue or wait until last Combine request done?

You can achieve this with maxPublishers parameter of FlatMap. Restricting to one creates a back pressure on upstream until the last produced publisher completes:

Publishers.Merge(
publisher1
.filter { $0 }
.map { _ in },
publisher2
)
.flatMap(maxPublishers: .max(1)) { // <- here
scheduleNotifications()
}
.sink { print("Complete: \(Date())") }
.store(in: &cancellable)

Why is sink never called in my combine pipeline?

dataTaskPublisher(for: urlRequest) will send values asynchronously. When program execution leaves your current scope, there are no more references to your pipeline and ARC destroys your pipeline before the network request has completed.

Your pipeline returns a Cancellable. Either assign that Cancellable directly to an instance variable or add the store(in:) operator to your pipeline.

Also worth mentioning is that if you mess around with Combine pipelines in an Xcode playground, your pipelines may live longer than you'd expect because the Playground tries to be smart about holding on to references for the sake of making experimentation easier. See this answer for an async example you can run in a playground, even though it applies neither of the fixes I mentioned.

Swift Combine: How to create a single publisher from a list of publishers?

Essentially, in your specific situation you're looking at something like this:

func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> {
Publishers.MergeMany(ingredients.map(createIngredient(ingredient:)))
.collect()
.eraseToAnyPublisher()
}

This 'collects' all the elements produced by the upstream publishers and – once they have all completed – produces an array with all the results and finally completes itself.

Bear in mind, if one of the upstream publishers fails – or produces more than one result – the number of elements may not match the number of subscribers, so you may need additional operators to mitigate this depending on your situation.

The more generic answer, with a way you can test it using the EntwineTest framework:

import XCTest
import Combine
import EntwineTest

final class MyTests: XCTestCase {

func testCreateArrayFromArrayOfPublishers() {

typealias SimplePublisher = Just<Int>

// we'll create our 'list of publishers' here. Each publisher emits a single
// Int and then completes successfully – using the `Just` publisher.
let publishers: [SimplePublisher] = [
SimplePublisher(1),
SimplePublisher(2),
SimplePublisher(3),
]

// we'll turn our array of publishers into a single merged publisher
let publisherOfPublishers = Publishers.MergeMany(publishers)

// Then we `collect` all the individual publisher elements results into
// a single array
let finalPublisher = publisherOfPublishers.collect()

// Let's test what we expect to happen, will happen.
// We'll create a scheduler to run our test on
let testScheduler = TestScheduler()

// Then we'll start a test. Our test will subscribe to our publisher
// at a virtual time of 200, and cancel the subscription at 900
let testableSubscriber = testScheduler.start { finalPublisher }

// we're expecting that, immediately upon subscription, our results will
// arrive. This is because we're using `just` type publishers which
// dispatch their contents as soon as they're subscribed to
XCTAssertEqual(testableSubscriber.recordedOutput, [
(200, .subscription), // we're expecting to subscribe at 200
(200, .input([1, 2, 3])), // then receive an array of results immediately
(200, .completion(.finished)), // the `collect` operator finishes immediately after completion
])
}
}


Related Topics



Leave a reply



Submit