Swift Combine How to Skip an Event

Update property without running combine pipeline

On further inspection, the answer came to me.
This is the case for class PassthroughSubject.

class ObserveableTest: ObservableObject {

private var cancellables: Set<AnyCancellable> = []

/// private(set) to restrict all changes to go through class functions
@Published private(set) var items: [Int] = []

/// A subject we can pass data into, when we wanna publish changes
private let listener = PassthroughSubject<[Int], Never>()

init() {
listener
.debounce(for: 0.6, scheduler: RunLoop.main)
.sink {
print($0)
}
.store(in: &cancellables)
}


func thisFunctionIsCalledByUserAndTriggersPipeline() {
items.append((0...1000000).randomElement()!)
listener.send(items) // publish to pipeline
}

func thisFunctionIsCalledByServerAndDoesNotTriggerPipeline() {
items.append((0...1000000).randomElement()!)
}

}

How do you apply a Combine operator only after the first message has been received?

In your particular case of debounce, you might prefer the behavior of throttle. It sends the first element immediately, and then sends no more than one element per interval.

Anyway, can you do it with Combine built-ins? Yes, with some difficulty. Should you? Maybe…

Here's a marble diagram of your goal:

marble diagram of modified debounce operator

Each time a value goes into the kennyc-debouncer, it starts a timer (represented by a shaded region). If a value arrives while the timer is running, the kennyc-debouncer saves the value and restarts the timer. When the timer expires, if any values arrived while the timer was running, the kennyc-debouncer emits the latest value immediately.

The scan operator allows us to keep state that we mutate each time an input arrives. We need to send two kinds of inputs into scan: the outputs from the upstream publisher, and timer firings. So let's define a type for those inputs:

fileprivate enum DebounceEvent<Value> {
case value(Value)
case timerFired
}

What kind of state do we need inside our scan transform? We definitely need the scheduler, the interval, and the scheduler options, so that we can set timers.

We also need a PassthroughSubject we can use to turn timer firings into inputs to the scan operator.

We can't actually cancel and restart a timer, so instead, when the timer fires, we'll see whether it should have been restarted. If so, we'll start another timer. So we need to know whether the timer is running, and what output to send when the timer fires, and the restart time for the timer if restarting is necessary.

Since scan's output is the entire state value, we also need the state to include the output value to send downstream, if any.

Here's the state type:

fileprivate struct DebounceState<Value, S: Scheduler> {
let scheduler: S
let interval: S.SchedulerTimeType.Stride
let options: S.SchedulerOptions?

let subject = PassthroughSubject<Void, Never>()

enum TimerState {
case notRunning
case running(PendingOutput?)

struct PendingOutput {
var value: Value
var earliestDeliveryTime: S.SchedulerTimeType
}
}

var output: Value? = nil
var timerState: TimerState = .notRunning
}

Now let's look at how to actually use scan with some other operators to implement the kennyc version of debounce:

extension Publisher {
func kennycDebounce<S: Scheduler>(
for dueTime: S.SchedulerTimeType.Stride,
scheduler: S,
options: S.SchedulerOptions? = nil
) -> AnyPublisher<Output, Failure>
{
let initialState = DebounceState<Output, S>(
scheduler: scheduler,
interval: dueTime,
options: options)
let timerEvents = initialState.subject
.map { _ in DebounceEvent<Output>.timerFired }
.setFailureType(to: Failure.self)
return self
.map { DebounceEvent.value($0) }
.merge(with: timerEvents)
.scan(initialState) { $0.updated(with: $1) }
.compactMap { $0.output }
.eraseToAnyPublisher()
}
}

We start by constructing the initial state for the scan operator.

Then, we create a publisher that turns the Void outputs of the state's PassthroughSubject into .timerFired events.

Finally, we construct our full pipeline, which has four stages:

  1. Turn the upstream outputs (from self) into .value events.

  2. Merge the value events with the timer events.

  3. Use scan to update the debouncing state with the value and timer events. The actual work is done in an updated(with:) method we'll add to DebounceState below.

  4. Map the full state down to just the value we want to pass downstream, and discard nulls (which happen when upstream events get suppressed by debouncing).

All that's left is to write the updated(with:) method. It looks at each incoming event's type (value or timerFired) and the state of the timer to decide what the new state should be and, if necessary, set a new timer.

extension DebounceState {
func updated(with event: DebounceEvent<Value>) -> DebounceState<Value, S> {
var answer = self
switch (event, timerState) {
case (.value(let value), .notRunning):
answer.output = value
answer.timerState = .running(nil)
scheduler.schedule(after: scheduler.now.advanced(by: interval), tolerance: .zero, options: options) { [subject] in subject.send() }
case (.value(let value), .running(_)):
answer.output = nil
answer.timerState = .running(.init(value: value, earliestDeliveryTime: scheduler.now.advanced(by: interval)))
case (.timerFired, .running(nil)):
answer.output = nil
answer.timerState = .notRunning
case (.timerFired, .running(.some(let pendingOutput))):
let now = scheduler.now
if pendingOutput.earliestDeliveryTime <= now {
answer.output = pendingOutput.value
answer.timerState = .notRunning
} else {
answer.output = nil
scheduler.schedule(after: pendingOutput.earliestDeliveryTime, tolerance: .zero, options: options) { [subject] in subject.send() }
}
case (.timerFired, .notRunning):
// Impossible!
answer.output = nil
}
return answer
}
}

Does it work? Let's test it:

import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true

let subject = PassthroughSubject<String, Never>()
let q = DispatchQueue.main
let start = DispatchTime.now()
let cfStart = CFAbsoluteTimeGetCurrent()
q.asyncAfter(deadline: start + .milliseconds(100)) { subject.send("A") }
// A should be delivered at start + 100ms.
q.asyncAfter(deadline: start + .milliseconds(200)) { subject.send("B") }
q.asyncAfter(deadline: start + .milliseconds(300)) { subject.send("C") }
// C should be delivered at start + 800ms.
q.asyncAfter(deadline: start + .milliseconds(1100)) { subject.send("D") }
// D should be delivered at start + 1100ms.
q.asyncAfter(deadline: start + .milliseconds(1800)) { subject.send("E") }
// E should be delivered at start + 1800ms.
q.asyncAfter(deadline: start + .milliseconds(1900)) { subject.send("F") }
q.asyncAfter(deadline: start + .milliseconds(2000)) { subject.send("G") }
// G should be delivered at start + 2500ms.

let ticket = subject
.kennycDebounce(for: .milliseconds(500), scheduler: q)
.sink {
print("\($0) \(((CFAbsoluteTimeGetCurrent() - cfStart) * 1000).rounded())") }

Output:

A 107.0
C 847.0
D 1167.0
E 1915.0
G 2714.0

I'm not sure why the later events are so delayed. It could just be playground side effects.

Why does .delay break this short piece of code using Combine framework in swift

This happens because you do not store Cancellable returned after subscription. As soon as Cancellable is deallocated the whole subscription is cancelled.
Without delay everything works because subscriber is called immediatly, right after subscription.

Add property to your view:

@State var cancellable: AnyCancellable?

And save Cancellable returned after subscription:

cancellable = initialSequence.publisher

However your code won't add delay between each color change. All colors are sent immediatly -> you add delay to each event -> after 1 sec all colors are sent to subscriber :) .

swift combine declarative syntax


Cleaning up the code first

Formatting

To start, reading/understanding this code would be much easier if it was formatted properly. So let's start with that:

[1, 2, 3]
.publisher
.map({ (val) in
return val * 3
})
.sink(
receiveCompletion: { completion in
switch completion {
case .failure(let error):
print("Something went wrong: \(error)")
case .finished:
print("Received Completion")
}
},
receiveValue: { value in
print("Received value \(value)")
}
)

Cleaning up the map expression

We can further clean up the map, by:

  1. Using an implicit return

    map({ (val) in
    return val * 3
    })
  2. Using an implicit return

    map({ (val) in
    val * 3
    })
  3. Remove unecessary brackets around param declaration

    map({ val in
    val * 3
    })
  4. Remove unecessary new-lines. Sometimes they're useful for visually seperating things, but this is a simple enough closure that it just adds uneeded noise

    map({ val in val * 3 })
  5. Use an implicit param, instead of a val, which is non-descriptive anyway

    map({ $0 * 3 })
  6. Use trailing closure syntax

    map { $0 * 3 }

Final result

with numbered lines, so I can refer back easily.

/*  1 */[1, 2, 3]
/* 2 */ .publisher
/* 3 */ .map { $0 * 3 }
/* 4 */ .sink(
/* 5 */ receiveCompletion: { completion in
/* 6 */ switch completion {
/* 7 */ case .failure(let error):
/* 8 */ print("Something went wrong: \(error)")
/* 9 */ case .finished:
/* 10 */ print("Received Completion")
/* 11 */ }
/* 12 */ },
/* 13 */ receiveValue: { value in
/* 14 */ print("Received value \(value)")
/* 15 */ }
/* 16 */ )

Going through it.

Line 1, [1, 2, 3]

Line 1 is an array literal. It's an expression, just like 1, "hi", true, someVariable or 1 + 1. An array like this doesn't need to be assigned to anything for it to be used.

Interestingly, that doesn't mean necessarily that it's an array. Instead, Swift has the ExpressibleByArrayLiteralProtocol. Any conforming type can be initialized from an array literal. For example, Set conforms, so you could write: let s: Set = [1, 2, 3], and you would get a Set containing 1, 2 and 3. In the absence of other type information (like the Set type annotation above, for example), Swift uses Array as the preferred array literal type.

Line 2, .publisher

Line 2 is calling the publisher property of the array literal. This returns a Sequence<Array<Int>, Never>. That isn't a regular Swift.Sequence, which is a non-generic protocol, but rather, it's found in the Publishers namespace (a case-less enum) of the Combine module. So its fully qualified type is Combine.Publishers.Sequence<Array<Int>, Never>.

It's a Publisher whose Output is Int, and whose Failure type is Never (i.e. an error isn't possible, since there is no way to create an instance of the Never type).

Line 3, map

Line 3 is calling the map instance function (a.k.a. method) of the Combine.Publishers.Sequence<Array<Int>, Never> value above. Everytime an element passed through this chain, it'll be transformed by the closure given to map.

  • 1 will go in, 3 will come out.
  • Then 2 will go in, and 6 will come out.
  • Finally 3 would go in, and 6 would come out.

The result of this expression so far is another Combine.Publishers.Sequence<Array<Int>, Never>

Line 4, sink(receiveCompletion:receiveValue:)

Line 4 is a call to Combine.Publishers.Sequence<Array<Int>, Never>.sink(receiveCompletion:receiveValue:). With two closure arguments.

  1. The { completion in ... } closure is provided as an argument to the parameter labelled receiveCompletion:
  2. The { value in ... } closure is provided as an argument to the parameter labelled receiveValue:

Sink is creating a new subscriber to the Subscription<Array<Int>, Never> value that we had above. When elements come through, the receiveValue closure will be called, and passed as an argument to its value parameter.

Eventually the publisher will complete, calling the receiveCompletion: closure. The argument to the completion param will be a value of type Subscribers.Completion, which is an enum with either a .failure(Failure) case, or a .finished case. Since the Failure type is Never, it's actually impossible to create a value of .failure(Never) here. So the completion will always be .finished, which would cause the print("Received Completion") to be called. The statement print("Something went wrong: \(error)") is dead code, which can never be reached.

Discussion on "declarative"

There's no single syntactic element that makes this code qualify as "declarative". A declarative style is a distinction from an "imperative" style. In an imperative style, your program consists of a series of imperatives, or steps to be completed, usually with a very rigid ordering.

In a declarative style, your program consists of a series of declarations. The details of what's necessary to fulfill those declarations is abstracted away, such as into libraries like Combine and SwiftUI. For example, in this case, you're declaring that print("Received value \(value)") of triple the number is to be printed whenever a number comes in from the [1, 2, 3].publisher. The publisher is a basic example, but you could imagine a publisher that's emitting values from a text field, where events are coming in an unknown times.

My favourite example for disguising imperative and declarative styles is using a function like Array.map(_:).

You could write:

var input: [InputType] = ...
var result = [ResultType]()

for element in input {
let transformedElement = transform(element)
result.append(result)
}

but there are a lot of issues:

  1. It's a lot of boiler-plate code you end up repeating all over your code base, with only subtle differences.
  2. It's trickier to read. Since for is such a general construct, many things are possible here. To find out exactly what happens, you need to look into more detail.
  3. You've missed an optimization opportunity, by not calling Array.reserveCapacity(_:). These repeated calls to append can reach the max capacity of an the result arrays's buffer. At that point:

    • a new larger buffer must be allocated
    • the existing elements of result need to be copied over
    • the old buffer needs to be released
    • and finally, the new transformedElement has to be added in

    These operations can get expensive. And as you add more and more elements, you can run out of capacity several times, causing multiple of these regrowing operations. By callined result.reserveCapacity(input.count), you can tell the array to allocate a perfectly sized buffer, up-front, so that no regrowing operations will be necessary.

  4. The result array has to be mutable, even though you might not ever need to mutate it after its construction.

This code could instead be written as a call to map:

let result = input.map(transform)

This has many benefits:

  1. Its shorter (though not always a good thing, in this case nothing is lost for having it be shorter)
  2. It's more clear. map is a very specific tool, that can only do one thing. As soon as you see map, you know that input.count == result.count, and that the result is an array of the output of the transform function/closure.
  3. It's optimized, internally map calls reserveCapacity, and it will never forget to do so.
  4. The result can be immutable.

Calling map is following a more declarative style of programming. You're not fiddling around with the details of array sizes, iteration, appending, or whatever. If you have input.map { $0 * $0 }, you're saying "I want the input's elements squared", the end. The implementation of map would have the for loop, appends, etc. necessary to do that. While it's implemented in an imperative style, the function abstracts that away, and lets you write code at higher levels of abstraction, where you're not mucking about with irrelevant things like for loops.

How to zip two publishers but to get newest values instead of default oldest behavior of zip?

Here is the final code that I came up with to match my exact requirement. I was reluctant to write a custom pub-sub or port combineLatestFrom from RxSwift to Combine.
Thanks to @matt for directing me to the right approach he answered here: Swift Combine operator with same functionality like `withLatestFrom` in the RxSwift Framework

import Combine
import Foundation

let pub1 = PassthroughSubject<Int, Never>()
let pub2 = PassthroughSubject<Bool, Never>()
var subscriptions = Set<AnyCancellable>()

pub2.map { value in (unique: UUID(), value: value) }
.combineLatest(pub1)
.removeDuplicates {
return $0.0.unique == $1.0.unique
}
.map { (tuple) in
return (tuple.0.1, tuple.1)
}
.sink { event in
print(event)
}
.store(in: &subscriptions)

pub1.send(1)
pub1.send(2)
pub1.send(2)
pub1.send(3)
pub2.send(true)
pub1.send(4)
pub1.send(5)
pub2.send(false)
pub2.send(true)

And the output is:
(true, 3)

(false, 5)

(true, 5)



Related Topics



Leave a reply



Submit