Swift Combine: What Are Those Multicast Functions for and How to Use Them

Understanding share() in Combine

The problem is that you are not using a pipeline where it does make a difference. Consider this example (based on a Cocoa With Love article) where a second subscriber comes online after the publisher has been publishing for some time:

let pub1 = Timer.publish(every: 1, on: .main, in: .default)
let c1 = pub1.connect()
let scan = Publishers.Scan(upstream: pub1, initialResult: 0) { (a, b) -> Int in
a + 1
}
scan.sink { print("a:", $0) }.store(in:&storage)
delay(3) {
scan.sink { print("b:", $0) }.store(in:&self.storage)
}

The point is, there is only one scan and it is producing 1, 2, 3 when after a delay another subscriber comes along. What will that subscriber get? Will it just pick up where we are now? No. We get this:

a: 1
a: 2
a: 3
a: 4
b: 1
a: 5
b: 2
a: 6
b: 3
...

So in effect we start all over again with our second subscription, because the publisher is a new copy. But if we promote the publisher to a class, we get completely different results:

let pub1 = Timer.publish(every: 1, on: .main, in: .default)
let c1 = pub1.connect()
let scan = Publishers.Scan(upstream: pub1, initialResult: 0) { (a, b) -> Int in
a + 1
}
let scan2 = scan.share() // <--
scan2.sink { print("a:", $0) }.store(in:&storage)
delay(3) {
scan2.sink { print("b:", $0) }.store(in:&self.storage)
}

Now we get this:

a: 1
a: 2
a: 3
a: 4
b: 4
a: 5
b: 5
a: 6
b: 6
a: 7
b: 7

Obviously that's a very significant difference. You can see the same sort of thing if your publisher is a Subject, because that's a class, not a struct.

Swift : Create a multi-function multicast delegate

You need to change the signature of invokeDelegates to take a closure of type (MyProtocol) -> (), and then you need to pass each delegate to the closure.

protocol MyProtocol {
func method1()
func method2()
func method3()
}

class TestClass {
var delegates = [MyProtocol]()

func invokeDelegates(delegateMethod: (MyProtocol) -> ()) {
for delegate in delegates {
delegateMethod(delegate)
}
}
}

The closure should just invoke the appropriate delegate method on its argument. Swift can infer the argument and return types of the closure, and you can use the shorthand $0 to refer to the argument, so the closure can be quite short:

let tester = TestClass()
tester.invokeDelegates(delegateMethod: { $0.method1() })

On the other hand, you could just use Collection.forEach directly on the delegates array (if it's accessible) and skip the invokeDelegates method:

tester.delegates.forEach { $0.method1() }

Swift Combine: subsequent Publisher that consumes other Publishers (using CombineLatest) doesn't fire

I've got this question answered here: https://forums.swift.org/t/crash-in-swiftui-app-using-combine-was-using-published-in-conjunction-with-state-in-swiftui/26628/9 by the very friendly and helpful Nanu Jogi, who is not on stackoverflow.

It is rather straight forward:

add this line:

        .receive(on: RunLoop.main) // run on main thread 

in validatedCredentials so that it looks like this:

var validatedCredentials: AnyPublisher<(String, String)?, Never> {
return Publishers.CombineLatest(validatedEMail, validatedPassword)

.receive(on: RunLoop.main) // <<—— run on main thread

.map { validatedEMail, validatedPassword in
print("validatedEMail: \(validatedEMail ?? "not set"), validatedPassword: \(validatedPassword ?? "not set")")

guard let eMail = validatedEMail, let password = validatedPassword else { return nil }

return (eMail, password)

}
.eraseToAnyPublisher()

This is all what is needed.

And here one more time the whole code for reference (updated for Xcode 11.0 beta 5 (11M382q)):

//
// RegistrationView.swift
// Combine-Beta-Feedback
//
// Created by Lars Sonchocky-Helldorf on 09.07.19.
// Copyright © 2019 Lars Sonchocky-Helldorf. All rights reserved.
//

import SwiftUI
import Combine

struct RegistrationView : View {
@ObservedObject var registrationModel = RegistrationModel()

@State private var registrationButtonDisabled = true

@State private var validatedEMail: String = ""
@State private var validatedPassword: String = ""

var body: some View {
Form {
Section {
TextField("Enter your EMail", text: $registrationModel.eMail)
SecureField("Enter a Password", text: $registrationModel.password)
SecureField("Enter the Password again", text: $registrationModel.passwordRepeat)
Button(action: registrationButtonAction) {
Text("Create Account")
}
.disabled($registrationButtonDisabled.wrappedValue)
.onReceive(self.registrationModel.validatedCredentials) { newValidatedCredentials in
self.registrationButtonDisabled = (newValidatedCredentials == nil)
}
}

Section {
Text("Validated EMail: \(validatedEMail)")
.onReceive(self.registrationModel.validatedEMail) { newValidatedEMail in
self.validatedEMail = newValidatedEMail != nil ? newValidatedEMail! : "EMail invalid"
}
Text("Validated Password: \(validatedPassword)")
.onReceive(self.registrationModel.validatedPassword) { newValidatedPassword in
self.validatedPassword = newValidatedPassword != nil ? newValidatedPassword! : "Passwords to short or don't match"
}
}
}
.navigationBarTitle(Text("Sign Up"))
}

func registrationButtonAction() {

}
}

class RegistrationModel : ObservableObject {

@Published var eMail: String = ""
@Published var password: String = ""
@Published var passwordRepeat: String = ""

var validatedEMail: AnyPublisher<String?, Never> {
return $eMail
.debounce(for: 0.5, scheduler: RunLoop.main)
.removeDuplicates()
.map { username in
return Future { promise in
print("username: \(username)")
self.usernameAvailable(username) { available in
promise(.success(available ? username : nil))
}
}
}
.switchToLatest()
.eraseToAnyPublisher()
}

var validatedPassword: AnyPublisher<String?, Never> {
return Publishers.CombineLatest($password, $passwordRepeat)
.debounce(for: 0.5, scheduler: RunLoop.main)
.map { password, passwordRepeat in
print("password: \(password), passwordRepeat: \(passwordRepeat)")
guard password == passwordRepeat, password.count > 5 else { return nil }
return password
}
.eraseToAnyPublisher()
}

var validatedCredentials: AnyPublisher<(String, String)?, Never> {
return Publishers.CombineLatest(validatedEMail, validatedPassword)
.receive(on: RunLoop.main)
.map { validatedEMail, validatedPassword in
print("validatedEMail: \(validatedEMail ?? "not set"), validatedPassword: \(validatedPassword ?? "not set")")
guard let eMail = validatedEMail, let password = validatedPassword else { return nil }
return (eMail, password)
}
.eraseToAnyPublisher()
}

func usernameAvailable(_ username: String, completion: (Bool) -> Void) {
let isValidEMailAddress: Bool = NSPredicate(format:"SELF MATCHES %@", "[A-Z0-9a-z._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,64}").evaluate(with: username)

completion(isValidEMailAddress)
}
}

#if DEBUG
struct RegistrationView_Previews : PreviewProvider {
static var previews: some View {
RegistrationView()
}
}
#endif

How can I trigger a process after a returned publisher would be subscribed?

You can write your own type that conforms to Publisher and wraps a PassthroughSubject. In your implementation, you can start the background process when you get a subscription.

public struct MyPublisher: Publisher {
public typealias Output = Data
public typealias Failure = Error

public func receive<Downstream: Subscriber>(subscriber: Downstream)
where Downstream.Input == Output, Downstream.Failure == Failure
{
let subject = PassthroughSubject<Output, Failure>()
subject.subscribe(subscriber)

startBackgroundProcess(subject: subject)
}

private func startBackgroundProcess(subject: PassthroughSubject<Output, Failure>) {
DispatchQueue.global(qos: .utility).async {
print("background process running")
subject.send(Data())
subject.send(completion: .finished)
}
}
}

Note that this publisher starts a new background process for each subscriber. That is a common implementation. For example URLSession.DataTaskPublisher issues a new request for each subscriber. If you want multiple subscribers to share the output of a single request, you can use the .multicast operator, add multiple subscribers, and then .connect() the multicast publisher to start the background process once:

let pub = MyPublisher().multicast { PassthroughSubject() }
pub.sink(...).store(in: &tickets) // first subscriber
pub.sink(...).store(in: &tickets) // second subscriber
pub.connect().store(in: &tickets) // start the background process

Combine: Chain requests with dependency, keep both responses

You are not going to be able to chain the requests the way you are trying to and still capture all the results.

Think of it this way. By chaining Combine operators you're constructing a pipeline. You can decide what to put into the input to the pipeline, and you can dump whatever comes out of the output of the pipeline into a sink where you can see the results, but you can't go through the sides of the pipe to see the intermediate values (at least not without cutting a window in the pipe which we'll get to).

Here's your code:

let places = callAPI.places()
let firstPlace = places.compactMap { $0.first }
let weather = firstPlace.flatMap { place in
callAPI.weather(latitude: place.latitude, longitude: place.longitude)
}

let token = weather.sink(receiveCompletion: { _ in },
receiveValue: { print($0) })

Those variables each held a piece of the pipeline (not the values that will flow through the pipe) and you're screwing the pipeline together putting longer and longer pieces in each variable.

If I want to make the whole pipeline a bit more obvious write it like this:

let cancellable = callAPI.places()
.compactMap { $0.first }
.flatMap { place in
callAPI.weather(latitude: place.latitude, longitude: place.longitude)
}
.sink(receiveCompletion: { _ in },
receiveValue: { print($0) })

(note that might not compile as is... I pulled it together in the answer editor)

When you chain the operators directly it's obvious that you don't have any opportunity to catch intermediate results. For your pipeline the stuff that goes into the pipeline comes from the network. You catch the stuff flowing out of pipeline in a sink. But notice how you only get to look at the "stuff" flowing through the pipeline in closures that are part of the pipeline itself.

Now, if you really want to cut a window into the pipeline to pull out intermediate stuff, you need one of those closures that can push the value out of the pipeline. In this case, to get at the array of Places you might do it using handleEvents. It would look something like this:

var allPlaces : [Place]?

callAPI.places()
.handleEvents(receiveOutput: { allPlaces = $0 })
.compactMap { $0.first }
...

In this code, you catch the receiveOutput event and sneak the result out into a nearby variable.

handleEvents, in my opinion, is one of those "Great Power, Great Responsibility" operators. In this case it will let you do what you are asking to do, but I'm not sure you should do it.

The whole point of chaining operators together is that the resulting pipeline "should" be free of side-effects. In this case handleEvents is being used to explicitly introduce a side-effect (setting the allPlaces variable). Essentially this is, in my opinion, a code smell that suggests you may need to rethink your design.



Related Topics



Leave a reply



Submit