Combine Sink: Ignore Receivevalue, Only Completion Is Needed

Combine sink: ignore receiveValue, only completion is needed

CurrentValueSubject seems a confusing choice, because that will send an initial value (of Void) when you first subscribe to it.

You could make things less ambiguous by using Future, which will send one-and-only-one value, when it's done.

To get around having to receive values you don't care about, you can flip the situation round and use an output type of Result<Void, Error> and a failure type of Never. When processing your network request, you can then fulfil the promise with .failure(error) or .success(()), and deal with it in sink:

let pub = Future<Result<Void, Error>, Never> {
promise in
// Do something asynchronous
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
promise(.success(.success(())))
//or
//promise(.success(.failure(error)))
}
}.eraseToAnyPublisher()

// somewhere else...
pub.sink {
switch $0 {
case .failure(let error):
print("Whoops \(error)")
case .success:
print("Yay")
}
}

You're swapping ugly code at one end of the chain for ugly code at the other, but if that's hidden away behind AnyPublisher and you're concerned with correct usage, that seems the way to go. Consumers can see exactly what to expect from looking at the output and error types, and don't have to deal with them in separate closures.

Combine sink only receives outputs after they have all been published

The problem is that you are running your worker on the same thread where you are collecting the results.

Because you are doing a receive(on:) on the main DispatchQueue each value that passes through receive(on:) is roughly equivalent to putting a new block on the main queue to be executed when the queue is free.

Your worker fires up, executing synchronously on the main queue. While it's running, the main queue is tied up and not available for other work.

As the worker does its thing, it is publishing results to the subject, and as part of the publisher pipeline receive(on:) queues up the delivery of each result to the main queue, waiting for that queue to be free. The critical point, however, is that the main queue won't be free to handle those blocks, and report results, until the worker is done because the worker itself is tying up the main queue.

So none of your results are reported until after all the work is one.

I suspect what you want to do is run your work in a different context, off the main thread, so that it can complete asynchronously and only report the results on the main thread.

Here's a playground, based on your code, that does that:

import UIKit
import Combine
import PlaygroundSupport

class JobWorker {
private var subject = CurrentValueSubject<Double, Never>(0)
var publisher: AnyPublisher<Double, Never> {
get { subject.eraseToAnyPublisher() }
}

func doWork() async {
do {
for subtask in 1...5 {
guard !Task.isCancelled else { break }

print("doing task \(subtask)")
self.incrementProgress(by: 20)

try await Task.sleep(nanoseconds: 1 * NSEC_PER_SEC)
}
} catch is CancellationError {
print("The Tasks were cancelled")
} catch {
print("An unexpected error occured")
}
}

private func incrementProgress(by: Double) {
subject.value = subject.value + by;
if subject.value >= 100 {
subject.send(completion: .finished)
}
}
}

let worker = JobWorker()
let subscription = worker.publisher
.print()
.receive(on: DispatchQueue.main)
.sink { _ in
print("done")
} receiveValue: { value in
print("New Value Received \(value)")
}

Task {
await worker.doWork()
}

PlaygroundPage.current.needsIndefiniteExecution = true

I made your doWork function an async function so I could execute it from an independent Task. I also added a delay because it makes the asynchronous nature of the code a bit easier to see.

In the "main thread, I create a JobWorker and subscribe to its publisher, but to do the work I create a task and run doWork in that separate task. Progress is reported in the main thread, but the work is being done (and completed) in a different execution context.

Publisher sink never runs completion

It turned out that the issue was that the controller was deinitialized early and therefore the promise could not respond to the observer. So it had nothing to do with Combine or the code I posted, but with the way I initialized the controller.

Prevent sink receiveValue closure from being called immediately

You can use the dropFirst operator. With no argument, it drops just the first output from upstream.

let someSubscriber = foo.$bar
.dropFirst()
.sink { value in
print("value is \(value)")
}

To address issues raised in the comments:

Each implementation of Publisher can decide what to do with each new subscriber. It is a policy decision, not generally a design deficiency. Different Publishers make different decisions. Here are some examples:

  • PassthroughSubject doesn't immediately publish anything.
  • CurrentValueSubject immediately publishes its current value.
  • NSObject.KeyValueObservingPublisher immediately publishes the current value of the observed property if and only if it is created with the .initial option.
  • Published.Publisher (which is the type you get for an @Published property) publishes the current value of the property immediately.

Execute two promises in sequence in Swift / Combine

In Combine, Futures are just specialised publishers. Given you deal with publishers, you would do something like this:

let cancellable = firstCall()
.tryMap { result in
try result.get()
}
.flatMap { _ in
secondCall()
}
.tryMap { result in
try result.get()
}
.sink { completion in
print(completion)
} receiveValue: { _ in
print("reveiveValue")
}

You can write it more concisely, however, I would suggest to simplify your Output and Error types first, as already pointed out in the comments:

For example:

func firstCall2() -> Future<Void, Error> {
return Future { promise in
promise(.success(()))
//promise(.failure(MyError()))
}
}

func secondCall2() -> Future<Void, Error> {
return Future { promise in
promise(.success(()))
}
}

and then:

let cancellable2 = firstCall2()
.map(secondCall2)
.sink { completion in
print(completion)
} receiveValue: { _ in
print("reveiveValue")
}

How to stop storing AnyCancellable after Swift Combine Sink has received at least one value?

If you just need to keep it alive until the sink is called once, you can just create a temporary variable

var cancellable: AnyCancellable?
cancellable = $locationState.sink { locationState in
if let locationState = locationState {
invokeCallbackWithLocationState(locationState)
}
cancellable = nil
}

This will retain the AnyCancellable long enough (because the closure retains the reference)



Related Topics



Leave a reply



Submit