Rxswift: Observable While a Button Holds Down

RxSwift: Observable while a button holds down

Even I was looking for a solution for your question. I got help from RxSwift slack channel.

    let button = submitButton.rx_controlEvent([.TouchDown])
button
.flatMapLatest { _ in
Observable<Int64>.interval(0.1, scheduler: MainScheduler.instance)
.takeUntil(self.submitButton.rx_controlEvent([.TouchUpInside]))
}
.subscribeNext{ x in print("BOOM \(x)") }
.addDisposableTo(disposeBag)

//prints BOOM 0 BOOM 1 BOOM 2 BOOM 3 BOOM 4 BOOM 5 for every 0.1 seconds

And also Check Interval Documentation.Thanks to @jari of RxSwift slack channel.

Handling button touch based on the Observable value

RxSwift already has ControlEvent that wrapped TouchUpInside event of UIButton. You can access it by .rx.tap:

button.rx.tap.subscribe(onNext: {
self.button.setTitle("\(arc4random_uniform(100))", for: .normal)
self.button.layer.borderColor = UIColor.random.cgColor // or your special color
})
.disposed(by: bag)

Result of 3 taps:

Sample Image


UPD.

You can check other observables by .withLatestFrom(). Improved top example:

let subject = BehaviorSubject<Bool>(value: true)

button.rx.tap.asDriver()
.withLatestFrom(subject.asDriver(onErrorJustReturn: false))
.drive(onNext: { value in
if value {
self.button.setTitle("\(arc4random_uniform(100))", for: .normal)
self.button.layer.borderColor = UIColor.random().cgColor
}
subject.onNext(!value)
})
.disposed(by: bag)

So, in your case you can put Observable.combineLatest(...) in .withLatestFrom and perform logic with these values.

RxSwift how to conditionally control when a button emits taps?

Here's something I wrote a while back and reproduced below. If you look at my gist (https://gist.github.com/danielt1263/1a70c4f7b8960d06bd7f1bfa81802cc3) you will see that I originally wrote it as a custom operator. Later I learned that the combination of built in operators below do the same job.

If nothing else, looking back at the older revisions in the gist will give you the sense of how to write your own operators.

extension ObservableType {

/**
Filters the source observable sequence using a trigger observable sequence producing Bool values.
Elements only go through the filter when the trigger has not completed and its last element was true. If either source or trigger error's, then the source errors.
- parameter trigger: Triggering event sequence.
- returns: Filtered observable sequence.
*/
func filter(if trigger: Observable<Bool>) -> Observable<E> {
return withLatestFrom(trigger) { (myValue, triggerValue) -> (Element, Bool) in
return (myValue, triggerValue)
}
.filter { (myValue, triggerValue) -> Bool in
return triggerValue == true
}
.map { (myValue, triggerValue) -> Element in
return myValue
}
}
}

If you want to change what the button does depending on the value of your control observable, setup two filters. Normally the filter only passes through taps when the enableButtons emits true. Use a map to reverse it in the second case and direct button taps down another path:

button.rx.tap.filter(if: enableButtons)
.subscribe(onNext: { /* do one thing when enableButtons emits true */ }
.disposed(by: bag)

button.rx.tap.filter(if: enableButtons.map { !$0 })
.subscribe(onNext: { /* do other thing when enable buttons emits false*/ }
.disposed(by: bag)

RXSwift act while debounced

As far as I understand the question, the goal is to trigger some action for the timeframe of debouncing/throttling.

It is relatively straightforward to do for throttle (i.e. emitting at most once per timeframe): basically use .window() operator, hang the desired action onto it, and then use the result of .window() for actual throttling.

With debounce (i.e. emitting once after upstream did not emit for a given timeframe), it seems to be complex but probably also doable.

RxSwift - start and update count down timer

What you're trying to achieve kinda sounds like a state machine.

You could achieve it by splitting the timer actions into actual "Actions" and merging them based on the trigger (one being the manual "add two seconds", and thee other is automated as "reduce one second").

I haven't fully tested it, but this can be a good starting ground:

enum TimerAction {
case tick
case addTwoSeconds
}

let trigger = PublishRelay<Void>()

let timer = Observable<Int>
.interval(.seconds(1), scheduler: MainScheduler.instance)
.map { _ in TimerAction.tick }

let addSeconds = trigger.map { TimerAction.addTwoSeconds }

Observable
.merge(timer, addSeconds)
.scan(into: 15) { totalSeconds, action in
totalSeconds += action == .addTwoSeconds ? 2 : -1
}
.takeUntil(.inclusive) { $0 == 0 }
.subscribe()
.disposed(by: disposeBag)

DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
trigger.accept(()) // increase the timer by two seconds after 5 seconds
}

How to properly convert a 3rd party library delegate into a RxSwift Observable

So after digging some more, it looks like this will do the trick with a required delegate method, updated for RxSwift 3.3.1. This is using their DelegateProxy system.

import RxSwift
import RxCocoa
import Lib

public final class RxLibDelegate: DelegateProxy, LibDelegate, DelegateProxyType{

let _subject = PublishSubject<[LibResult]>()

public static func currentDelegateFor(_ object: AnyObject) -> AnyObject?{
let target = object as! Lib
return target.delegate
}

public static func setCurrentDelegate(_ delegate: AnyObject?, toObject object: AnyObject) {
let target = object as! Lib
target.delegate = delegate as? LibDelegate
}

public func lib(_ lib: Lib, didFinishWithResult results: [LibResult]) {
_subject.onNext(results)
_subject.onCompleted()
}
}

extension Lib{

public var rx_delegate: DelegateProxy{
// `proxyForDelegate` moved as compared to examples at:
// https://samritchie.net/2016/05/12/rxswift-delegateproxy-with-required-methods/
// https://medium.com/@maxofeden/rxswift-migrate-delegates-to-beautiful-observables-3e606a863048#.rksg2ckpj

return RxLibDelegate.proxyForObject(self)
}

public var rx_libResults: Observable<[LibResult]> {
// `proxyForDelegate` moved as compared to examples at:
// https://samritchie.net/2016/05/12/rxswift-delegateproxy-with-required-methods/
// https://medium.com/@maxofeden/rxswift-migrate-delegates-to-beautiful-observables-3e606a863048#.rksg2ckpj

let proxy = RxLibDelegate.proxyForObject(self)
return proxy._subject
}
}

That's about 28 LOC. My original "wrapper" (see updated version below) but I don't know if it's the best is 21 LOC; 6 of 1 half dozen of the other?

In my particular case I only have 1 delegate method to worry about. If you were working with some functionality that had multiple delegates I think the DelegateProxy + extension methods would be a lot more practical and the better choice in that case.

Regarding my original trial wrapping thing using that Void observable, it appears it's totally acceptable to alter the stream with flatMapLatest as evidenced here re: Sending continual events while a button is pressed:

https://stackoverflow.com/a/39123102/1060314

import RxSwift
import RxCocoa

let button = submitButton.rx_controlEvent([.TouchDown])
button
.flatMapLatest { _ in
Observable<Int64>.interval(0.1, scheduler: MainScheduler.instance)
.takeUntil(self.submitButton.rx_controlEvent([.TouchUpInside]))
}
.subscribeNext{ x in print("BOOM \(x)") }
.addDisposableTo(disposeBag)

//prints BOOM 0 BOOM 1 BOOM 2 BOOM 3 BOOM 4 BOOM 5 for every 0.1 seconds

Note that a new Observable is returned from flatMapLatest. The author cites the RxSwift slack channel, so I assume it is at least acceptable to do.

Here's an updated version of my wrapper version that I think might be a bit cleaner:

import RxSwift

public final class RxLibBridge: LibDelegate{

let lib = Lib()
let _source = PublishSubject<[LibResult]>()

public init(){
lib.delegate = self
}

public func asObservable() -> Observable<[LibResult]>{
// create a cold observable to start
// the Lib's async operation on subscribe.
return Observable.just(())
.do(onNext: {
self.lib.startOperation()
})
.flatMapLatest{self._source}
}

// the lib's completion delegate method
public func lib(_ lib: Lib, didFinishWithResult results: [LibResult]) {
// grab the PublishSubject, emit the result and complete
_source.onNext(results)
_source.onCompleted()
}
}

How do I make a resettable RxSwift Timer?

Update

In the comments, I was asked to justify why I suggested making a test for "new code". Part of the answer was that you should never accept the first draft of your code. As any composition teacher would tell you, don't turn in your first draft, spend some time refining what you write (with peer review if you can get it.) So given that and the fact that my tests missed one of the specifications, I was going to replace my initial answer with this more refined version, but I think it is instructive to keep the original so it can be compared to the refined answer.

In the below, you will see that I have updated the tests to accommodate the new specification and refined the code.

The fact that there is a flatMap in the function implies that there are two abstractions here. So I broke that out into a separate function.

The fact that I have enums with two case implies that I could use a Bool instead and remove the switches.

class rx_sandboxTests: XCTestCase {

func testPause() {
let scheduler = TestScheduler(initialClock: 0)
let pause = scheduler.createColdObservable([.next(10, ()), .next(20, ())])
let reset = scheduler.createColdObservable([.next(30, ())])
let result = scheduler.start {
isPaused(pause: pause.asObservable(), reset: reset.asObservable())
}
XCTAssertEqual(result.events, [.next(200, true), .next(210, false), .next(220, true)])
}

func testTimerStart() {
let scheduler = TestScheduler(initialClock: 0)
let pause = scheduler.createColdObservable([.next(10, ())])
let reset = scheduler.createColdObservable([Recorded<Event<Void>>]())

let result = scheduler.start {
timer(initial: 10, pause: pause.asObservable(), reset: reset.asObservable(), scheduler: scheduler)
}

XCTAssertEqual(result.events, [.next(200, 10), .next(211, 9), .next(212, 8), .next(213, 7), .next(214, 6), .next(215, 5), .next(216, 4), .next(217, 3), .next(218, 2), .next(219, 1), .next(220, 0)])
}

func testPausedTimer() {
let scheduler = TestScheduler(initialClock: 0)
let pause = scheduler.createColdObservable([.next(10, ()), .next(13, ()), .next(20, ())])
let reset = scheduler.createColdObservable([Recorded<Event<Void>>]())

let result = scheduler.start {
timer(initial: 4, pause: pause.asObservable(), reset: reset.asObservable(), scheduler: scheduler)
}

XCTAssertEqual(result.events, [.next(200, 4), .next(211, 3), .next(212, 2), .next(221, 1), .next(222, 0)])
}

func testResetBeforeStarting() {
let scheduler = TestScheduler(initialClock: 0)
let pause = scheduler.createColdObservable([.next(20, ())])
let reset = scheduler.createColdObservable([.next(10, ())])

let result = scheduler.start {
timer(initial: 3, pause: pause.asObservable(), reset: reset.asObservable(), scheduler: scheduler)
}

XCTAssertEqual(result.events, [.next(200, 3), .next(221, 2), .next(222, 1), .next(223, 0)])
}

func testResetWhileRunning() {
let scheduler = TestScheduler(initialClock: 0)
let pause = scheduler.createColdObservable([.next(10, ()), .next(20, ())])
let reset = scheduler.createColdObservable([.next(13, ())])

let result = scheduler.start {
timer(initial: 4, pause: pause.asObservable(), reset: reset.asObservable(), scheduler: scheduler)
}

XCTAssertEqual(result.events, [.next(200, 4), .next(211, 3), .next(212, 2), .next(213, 4), .next(221, 3), .next(222, 2), .next(223, 1), .next(224, 0)])
}

func testResetWhilePaused() {
let scheduler = TestScheduler(initialClock: 0)
let pause = scheduler.createColdObservable([.next(10, ()), .next(13, ()), .next(20, ())])
let reset = scheduler.createColdObservable([.next(15, ())])

let result = scheduler.start {
timer(initial: 4, pause: pause.asObservable(), reset: reset.asObservable(), scheduler: scheduler)
}

XCTAssertEqual(result.events, [.next(200, 4), .next(211, 3), .next(212, 2), .next(215, 4), .next(221, 3), .next(222, 2), .next(223, 1), .next(224, 0)])
}

func testResetWhenEnded() {
let scheduler = TestScheduler(initialClock: 0)
let pause = scheduler.createColdObservable([.next(10, ()), .next(20, ())])
let reset = scheduler.createColdObservable([.next(15, ())])

let result = scheduler.start {
timer(initial: 4, pause: pause.asObservable(), reset: reset.asObservable(), scheduler: scheduler)
}

XCTAssertEqual(result.events, [.next(200, 4), .next(211, 3), .next(212, 2), .next(213, 1), .next(214, 0), .next(215, 4), .next(221, 3), .next(222, 2), .next(223, 1), .next(224, 0)])
}
}

func timer(initial: Int, pause: Observable<Void>, reset: Observable<Void>, scheduler: SchedulerType) -> Observable<Int> {
let tick = isPaused(pause: pause, reset: reset)
.flatMapLatest { $0 ? .empty() : Observable<Int>.interval(.seconds(1), scheduler: scheduler).take(initial) }

return ticker(initial: initial, tick: tick, reset: reset)
}

func isPaused(pause: Observable<Void>, reset: Observable<Void>) -> Observable<Bool> {
Observable.merge(pause.map { false }, reset.map { true })
.scan(true) { $1 || !$0 }
.startWith(true)
.distinctUntilChanged()
}

func ticker<T>(initial: Int, tick: Observable<T>, reset: Observable<Void>) -> Observable<Int> {
return Observable.merge(tick.map { _ in false }, reset.map { true })
.scan(initial) { $1 ? initial : $0 - 1 }
.startWith(initial)
.filter { 0 <= $0 }
.distinctUntilChanged()
}

Original Answer Follows:

I changed your pause from an Observable<Bool> to Observable<Void>. The Bool didn't make any sense because the reset can also cause a pause and that would conflict with the other observable.

Here's the complete code, including a test harness:

class rx_sandboxTests: XCTestCase {

func testTimerStart() {
let scheduler = TestScheduler(initialClock: 0)
let pause = scheduler.createColdObservable([.next(10, ())])
let reset = scheduler.createColdObservable([Recorded<Event<Void>>]())

let result = scheduler.start {
timer(initial: 10, pause: pause.asObservable(), reset: reset.asObservable(), scheduler: scheduler)
}

XCTAssertEqual(result.events, [.next(211, 9), .next(212, 8), .next(213, 7), .next(214, 6), .next(215, 5), .next(216, 4), .next(217, 3), .next(218, 2), .next(219, 1), .next(220, 0)])
}

func testPause() {
let scheduler = TestScheduler(initialClock: 0)
let pause = scheduler.createColdObservable([.next(10, ()), .next(13, ()), .next(20, ())])
let reset = scheduler.createColdObservable([Recorded<Event<Void>>]())

let result = scheduler.start {
timer(initial: 4, pause: pause.asObservable(), reset: reset.asObservable(), scheduler: scheduler)
}

XCTAssertEqual(result.events, [.next(211, 3), .next(212, 2), .next(221, 1), .next(222, 0)])
}

func testResetBeforeStarting() {
let scheduler = TestScheduler(initialClock: 0)
let pause = scheduler.createColdObservable([.next(20, ())])
let reset = scheduler.createColdObservable([.next(10, ())])

let result = scheduler.start {
timer(initial: 3, pause: pause.asObservable(), reset: reset.asObservable(), scheduler: scheduler)
}

XCTAssertEqual(result.events, [.next(221, 2), .next(222, 1), .next(223, 0)])
}

func testResetWhileRunning() {
let scheduler = TestScheduler(initialClock: 0)
let pause = scheduler.createColdObservable([.next(10, ()), .next(20, ())])
let reset = scheduler.createColdObservable([.next(13, ())])

let result = scheduler.start {
timer(initial: 4, pause: pause.asObservable(), reset: reset.asObservable(), scheduler: scheduler)
}

XCTAssertEqual(result.events, [.next(211, 3), .next(212, 2), .next(221, 3), .next(222, 2), .next(223, 1), .next(224, 0)])
}

func testResetWhilePaused() {
let scheduler = TestScheduler(initialClock: 0)
let pause = scheduler.createColdObservable([.next(10, ()), .next(13, ()), .next(20, ())])
let reset = scheduler.createColdObservable([.next(15, ())])

let result = scheduler.start {
timer(initial: 4, pause: pause.asObservable(), reset: reset.asObservable(), scheduler: scheduler)
}

XCTAssertEqual(result.events, [.next(211, 3), .next(212, 2), .next(221, 3), .next(222, 2), .next(223, 1), .next(224, 0)])
}

func testResetWhenEnded() {
let scheduler = TestScheduler(initialClock: 0)
let pause = scheduler.createColdObservable([.next(10, ()), .next(20, ())])
let reset = scheduler.createColdObservable([.next(15, ())])

let result = scheduler.start {
timer(initial: 4, pause: pause.asObservable(), reset: reset.asObservable(), scheduler: scheduler)
}

XCTAssertEqual(result.events, [.next(211, 3), .next(212, 2), .next(213, 1), .next(214, 0), .next(221, 3), .next(222, 2), .next(223, 1), .next(224, 0)])
}
}

func timer(initial: Int, pause: Observable<Void>, reset: Observable<Void>, scheduler: SchedulerType) -> Observable<Int> {
enum Action { case pause, reset, tick }
let intent = Observable.merge(
pause.map { Action.pause },
reset.map { Action.reset }
)

let isPaused = intent
.scan(true) { isPaused, action in
switch action {
case .pause:
return !isPaused
case .reset:
return true
case .tick:
fatalError()
}
}
.startWith(true)

let tick = isPaused
.flatMapLatest { $0 ? .empty() : Observable<Int>.interval(.seconds(1), scheduler: scheduler) }

return Observable.merge(tick.map { _ in Action.tick }, reset.map { Action.reset })
.scan(initial) { (current, action) -> Int in
switch action {
case .pause:
fatalError()
case .reset:
return initial
case .tick:
return current == -1 ? -1 : current - 1
}

}
.filter { 0 <= $0 && $0 < initial }
}

It's good to know how to test Rx code.

Is calling `disposed(by:)` necessary in any case?

No, it is not necessary to call .disposed(by:) in every case.

If you have an Observable that you know will eventually send a stop event, and you know that you want to keep listening to that observable until it does so, then there is no reason/need to dispose the subscription and therefore no need to insert the disposable into a dispose bag.


The reason .subscribe and its ilk return a Disposable is so that the calling code can end the subscription before the observable has completed. The calling code ends the subscription by calling dispose() on the disposable returned. Otherwise, the subscription will continue until the source observable sends a stop event (either completed or error.)

If the calling code doesn't dispose the subscription, and the source observable doesn't send a stop event, then the subscription will continue to operate even if all other code involved has lost all references to the objects involved in the subscription.

So for example, if you put this in a viewDidLoad:

_ = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.subscribe(onNext: { print($0) })

The code above will continue to print values long after the view controller that created it ceases to exist.

In the example you presented, the UIButton object will emit a completed event when it is deinitialized, so if you want to listen to the button right up until that happens, putting the disposable in a dispose bag isn't necessary.

Ignoring disposables means you need to be very cognizant as to which Observables complete and which ones don't, but if you have that understanding, you can ignore them. Just remember that the next developer down the line, or future you, won't have as good an understanding of the code as you do.



Related Topics



Leave a reply



Submit