Swift Combine: How to Create a Single Publisher from a List of Publishers

Swift Combine: How to create a single publisher from a list of publishers?

Essentially, in your specific situation you're looking at something like this:

func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> {
Publishers.MergeMany(ingredients.map(createIngredient(ingredient:)))
.collect()
.eraseToAnyPublisher()
}

This 'collects' all the elements produced by the upstream publishers and – once they have all completed – produces an array with all the results and finally completes itself.

Bear in mind, if one of the upstream publishers fails – or produces more than one result – the number of elements may not match the number of subscribers, so you may need additional operators to mitigate this depending on your situation.

The more generic answer, with a way you can test it using the EntwineTest framework:

import XCTest
import Combine
import EntwineTest

final class MyTests: XCTestCase {

func testCreateArrayFromArrayOfPublishers() {

typealias SimplePublisher = Just<Int>

// we'll create our 'list of publishers' here. Each publisher emits a single
// Int and then completes successfully – using the `Just` publisher.
let publishers: [SimplePublisher] = [
SimplePublisher(1),
SimplePublisher(2),
SimplePublisher(3),
]

// we'll turn our array of publishers into a single merged publisher
let publisherOfPublishers = Publishers.MergeMany(publishers)

// Then we `collect` all the individual publisher elements results into
// a single array
let finalPublisher = publisherOfPublishers.collect()

// Let's test what we expect to happen, will happen.
// We'll create a scheduler to run our test on
let testScheduler = TestScheduler()

// Then we'll start a test. Our test will subscribe to our publisher
// at a virtual time of 200, and cancel the subscription at 900
let testableSubscriber = testScheduler.start { finalPublisher }

// we're expecting that, immediately upon subscription, our results will
// arrive. This is because we're using `just` type publishers which
// dispatch their contents as soon as they're subscribed to
XCTAssertEqual(testableSubscriber.recordedOutput, [
(200, .subscription), // we're expecting to subscribe at 200
(200, .input([1, 2, 3])), // then receive an array of results immediately
(200, .completion(.finished)), // the `collect` operator finishes immediately after completion
])
}
}

Swift Combine: Run a list of publishers one after the other, and publish the first non-nil element

The approach is almost like your original one, but you need to restrict flatMap to run at most one publisher at a time with maxPublishers parameter:

publishers
.publisher
.flatMap(maxPublishers: .max(1), { $0 })
.compactMap { $0 } // Remove all nil values and unwrap the non-nil ones.
.first()

Combine publishers: notify when ANY of the publishers changes a value

Instead of using Publishers.MergeMany as in your linked question, you want to use .combineLatest(_:) on your first publisher, like so:

import SwiftUI
import Combine
import Foundation

struct Credentials {
let username: String
let password: String

init(username: String = "",
password: String = "") {
self.userName = username
self.password = password
}
}

final class ViewModel: ObservableObject {
@Published var username = ""
@Published var password = ""

@Published var credentials = Credentials()

private var cancellable: Cancellable? = nil


init() {
cancellable = $username.combineLatest($password).sink { tuple in
self.credentials = Credentials(username: tuple.0, password: tuple.1)
}

credentials = Credentials(username: username, password: password)
}
}

(it's been a bit so this code may not run immediately, but hopefully you see where this is going).

How to make a Publisher from array of Publishers?

As long as you don't insist too strongly on the "array" part, combineLatest or zip is correct for the Combine framework too. The difference between them is whether you want the overall value emitted each time to contain the oldest (zip) or the newest (combineLatest) contribution from each publisher.

Your example doesn't actually give enough information to tell which of those you want. To know which you want, you'd need to say what should happen when you say e.g.:

obs1.onNext(5) 
obs1.onNext(6)
obs2.onNext(10)

Be that as it may, Combine is Swift, so it uses a tuple, not an array, to express the totality of values. But if, as in your example, all the publishers have the same output and failure types, then an array can easily be substituted by mapping. See How to zip more than 4 publishers for an example turning zip into an array processor. Exactly the same technique works for combineLatest.

So, here's an actual working example; to make the example more general than yours, I used three publishers instead of two:

class ViewController: UIViewController {
let obs1 = PassthroughSubject<Int,Never>()
let obs2 = PassthroughSubject<Int,Never>()
let obs3 = PassthroughSubject<Int,Never>()
var storage = Set<AnyCancellable>()
override func viewDidLoad() {
super.viewDidLoad()

let arr = [obs1, obs2, obs3]
let pub = arr.dropFirst().reduce(into: AnyPublisher(arr[0].map{[$0]})) {
res, ob in
res = res.combineLatest(ob) {
i1, i2 -> [Int] in
return i1 + [i2]
}.eraseToAnyPublisher()
}
pub.sink { print($0) }.store(in: &storage)
obs1.send(5)
obs2.send(10)
obs3.send(1) // [5, 10, 1]
obs1.send(12) // [12, 10, 1]
obs3.send(20) // [12, 10, 20]
}
}

Swift Combine transform an array of publishers

The general approach is to flatMap the array into a Sequence publisher of individual values, then do any async operations on each value (another flatMap), then collect the results:

HKHealthStore()
.workouts(HKObjectQueryNoLimit)
.flatMap { workouts in workouts.publisher }
.flatMap { workout in
workoutDetails(workout)
}
.collect()
.sink(
receiveCompletion: { ... },
receiveValue: { arrayOfWorkoutDetails in
// arrayOfWorkoutDetails is of type [WorkoutDetails]
})
.store(in: &bag)

How can I wait until all Combine publishers finished their jobs in Swift?

A possible way is a view model. In this class merge the publishers and use the receiveCompletion: parameter

class ViewModel : ObservableObject {

@Published var isFinished = false
let pub1 = ["one", "two", "three", "four"].publisher
let pub2 = ["five", "six", "seven", "eight"].publisher

private var subscriptions = Set<AnyCancellable>()

init() {
pub1
.sink { print($0) }
.store(in: &subscriptions)
pub2
.sink { print($0) }
.store(in: &subscriptions)

pub1.merge(with: pub2)
.sink(receiveCompletion: { _ in
self.isFinished = true
}, receiveValue: { _ in })
.store(in: &subscriptions)
}
}

struct SwiftUIView: View {
@StateObject private var model = ViewModel()
var body: some View {
if model.isFinished {
Text("Hello, World!")
} else {
ProgressView()
}
}
}

Swift Combine - combining publishers without waiting for all publishers to emit first element

You can use prepend(…) to prepend values to the beginning of a publisher.

Here's a version of your code that will prepend nil to both publishers.

let timer = Timer.publish(every: 10, on: .current, in: .common).autoconnect()
let anotherPub: AnyPublisher<Int, Never> = Just(10).delay(for: 5, scheduler: RunLoop.main).eraseToAnyPublisher()

Publishers.CombineLatest(
timer.map(Optional.init).prepend(nil),
anotherPub.map(Optional.init).prepend(nil)
)
.filter { $0 != nil && $1 != nil } // Filter the event when both are nil values
.sink(receiveValue: { (timer, val) in
print("Hello! \(timer) \(val)")
})

How to chain together two Combine publishers in Swift and keep the cancellable object

If I understand correctly, you want to link two publishers but with the option to break that link at some point in the future.

I would try using sink on the inputPublisher, since that function gives me a cancellable, and then a PassthroughSubject, since I wasn't able to figure out how to pass the value from sink directly to outputPublisher.

It would look something like this:

static func connect(inputPublisher: Published<String>.Publisher, outputPublisher: inout Published<String>.Publisher) -> AnyCancellable {
let passthrough = PassthroughSubject<String, Never>()
passthrough.assign(to: &outputPublisher)
let cancellable = inputPublisher.sink { string in
passthrough.send(string)
}
return cancellable
}

Disclaimer: I wrote this on a Playground and it compiles, but I didn't actually run it.



Related Topics



Leave a reply



Submit