Combine sink: ignore receiveValue, only completion is needed
CurrentValueSubject
seems a confusing choice, because that will send an initial value (of Void
) when you first subscribe to it.
You could make things less ambiguous by using Future
, which will send one-and-only-one value, when it's done.
To get around having to receive values you don't care about, you can flip the situation round and use an output type of Result<Void, Error>
and a failure type of Never
. When processing your network request, you can then fulfil the promise with .failure(error)
or .success(())
, and deal with it in sink:
let pub = Future<Result<Void, Error>, Never> {
promise in
// Do something asynchronous
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
promise(.success(.success(())))
//or
//promise(.success(.failure(error)))
}
}.eraseToAnyPublisher()
// somewhere else...
pub.sink {
switch $0 {
case .failure(let error):
print("Whoops \(error)")
case .success:
print("Yay")
}
}
You're swapping ugly code at one end of the chain for ugly code at the other, but if that's hidden away behind AnyPublisher
and you're concerned with correct usage, that seems the way to go. Consumers can see exactly what to expect from looking at the output and error types, and don't have to deal with them in separate closures.
Combine sink only receives outputs after they have all been published
The problem is that you are running your worker on the same thread where you are collecting the results.
Because you are doing a receive(on:)
on the main DispatchQueue each value that passes through receive(on:)
is roughly equivalent to putting a new block on the main queue to be executed when the queue is free.
Your worker fires up, executing synchronously on the main queue. While it's running, the main queue is tied up and not available for other work.
As the worker does its thing, it is publishing results to the subject, and as part of the publisher pipeline receive(on:)
queues up the delivery of each result to the main queue, waiting for that queue to be free. The critical point, however, is that the main queue won't be free to handle those blocks, and report results, until the worker is done because the worker itself is tying up the main queue.
So none of your results are reported until after all the work is one.
I suspect what you want to do is run your work in a different context, off the main thread, so that it can complete asynchronously and only report the results on the main thread.
Here's a playground, based on your code, that does that:
import UIKit
import Combine
import PlaygroundSupport
class JobWorker {
private var subject = CurrentValueSubject<Double, Never>(0)
var publisher: AnyPublisher<Double, Never> {
get { subject.eraseToAnyPublisher() }
}
func doWork() async {
do {
for subtask in 1...5 {
guard !Task.isCancelled else { break }
print("doing task \(subtask)")
self.incrementProgress(by: 20)
try await Task.sleep(nanoseconds: 1 * NSEC_PER_SEC)
}
} catch is CancellationError {
print("The Tasks were cancelled")
} catch {
print("An unexpected error occured")
}
}
private func incrementProgress(by: Double) {
subject.value = subject.value + by;
if subject.value >= 100 {
subject.send(completion: .finished)
}
}
}
let worker = JobWorker()
let subscription = worker.publisher
.print()
.receive(on: DispatchQueue.main)
.sink { _ in
print("done")
} receiveValue: { value in
print("New Value Received \(value)")
}
Task {
await worker.doWork()
}
PlaygroundPage.current.needsIndefiniteExecution = true
I made your doWork
function an async
function so I could execute it from an independent Task
. I also added a delay because it makes the asynchronous nature of the code a bit easier to see.
In the "main thread, I create a JobWorker
and subscribe to its publisher, but to do the work I create a task and run doWork
in that separate task. Progress is reported in the main thread, but the work is being done (and completed) in a different execution context.
Publisher sink never runs completion
It turned out that the issue was that the controller was deinitialized early and therefore the promise could not respond to the observer. So it had nothing to do with Combine or the code I posted, but with the way I initialized the controller.
Prevent sink receiveValue closure from being called immediately
You can use the dropFirst
operator. With no argument, it drops just the first output from upstream.
let someSubscriber = foo.$bar
.dropFirst()
.sink { value in
print("value is \(value)")
}
To address issues raised in the comments:
Each implementation of Publisher
can decide what to do with each new subscriber. It is a policy decision, not generally a design deficiency. Different Publisher
s make different decisions. Here are some examples:
PassthroughSubject
doesn't immediately publish anything.CurrentValueSubject
immediately publishes its current value.NSObject.KeyValueObservingPublisher
immediately publishes the current value of the observed property if and only if it is created with the.initial
option.Published.Publisher
(which is the type you get for an@Published
property) publishes the current value of the property immediately.
Execute two promises in sequence in Swift / Combine
In Combine, Futures are just specialised publishers. Given you deal with publishers, you would do something like this:
let cancellable = firstCall()
.tryMap { result in
try result.get()
}
.flatMap { _ in
secondCall()
}
.tryMap { result in
try result.get()
}
.sink { completion in
print(completion)
} receiveValue: { _ in
print("reveiveValue")
}
You can write it more concisely, however, I would suggest to simplify your Output and Error types first, as already pointed out in the comments:
For example:
func firstCall2() -> Future<Void, Error> {
return Future { promise in
promise(.success(()))
//promise(.failure(MyError()))
}
}
func secondCall2() -> Future<Void, Error> {
return Future { promise in
promise(.success(()))
}
}
and then:
let cancellable2 = firstCall2()
.map(secondCall2)
.sink { completion in
print(completion)
} receiveValue: { _ in
print("reveiveValue")
}
How to stop storing AnyCancellable after Swift Combine Sink has received at least one value?
If you just need to keep it alive until the sink is called once, you can just create a temporary variable
var cancellable: AnyCancellable?
cancellable = $locationState.sink { locationState in
if let locationState = locationState {
invokeCallbackWithLocationState(locationState)
}
cancellable = nil
}
This will retain the AnyCancellable
long enough (because the closure retains the reference)
Related Topics
Using UIdropinteractiondelegate and Movies
How to Decode a Utf16 String into a Unicode Character
How Are Swift Enums Implemented Internally
Filter Array of Objects with Multiple Criteria and Types in Swift
Getting Common Data from Two Different Types of Array
How to Make a Function to Accept Any Enum Types That Have a Rawvalue of String
Cannot Assign to Value: 'self' Is Immutable
Multiline Editable Text Field in Swiftui
Accessing Bundle of Main Application While Running Xctests
Carthage Update Error: "Github API Request Failed: Bad Credentials"
Save/Get UIcolor from Userdefaults
Using Index from Foreach in Other Array
Ibdesignable View Not Rendering
How to Disable The Automatic Activation of an Arcoachingoverlayview