How to Emit Items, One by One, from Collection with a Delay in Rxswift

How to emit items, one by one, from Collection with a delay in RxSwift

Well... after some R&D this is a tested working version for my question.

Observable.zip(Observable.from(allDogs), Observable<Int>.interval(RxTimeInterval(delay), scheduler: MainScheduler.instance)).subscribe(onNext: { (dog, index) in
print(dog.name)
}
)

Delay iteration of an collection in RxSwift

This is a common confusion. As you have learned delay will be applied to every element equally so all of them are delayed by five seconds, rather than putting a five second delay between each event. (So the first event happens immediately, the second at five seconds, the third at ten, and so on.

Once you realize that what you are really trying to do is put a delay between each event, the solution becomes more clear. Or at least, the reason why just putting a delay on the from operator isn't working should be more clear.

The solution is to use one of the flatMap variants:

let array = [1, 2, 3, 4, 5]
Observable.from(array)
.concatMap { Observable.empty().delay(.seconds(5), scheduler: MainScheduler.instance).startWith($0) }
.subscribe { (intValue) in
print("onNext() \(intValue)")
}

The line Observable.empty().delay(.seconds(5), scheduler: MainScheduler.instance).startWith($0) will immediately emit the value, but then wait five seconds before emitting a completed event.

The concatMap operator calls the closure on each incoming event and concats the resulting Observables together so that following Observables aren't subscribed to, until the previous one completes.

Learn more about this in my article, The Many Faces of FlatMap


An alternative solution based on the comments that Sandeep Bhandari wrote on the question would be to use an interval to constrain when the from operator can emit values. Something like this:

let array = [1, 2, 3, 4, 5]
Observable.zip(Observable.from(array), Observable<Int>.interval(.seconds(5), scheduler: MainScheduler.instance).take(while: { $0 != array.count }))
.map { $0 }
.subscribe { (intValue) in
print("onNext() \(intValue)")
}

Combine: publish elements of a sequence with some delay

Using the idea from the answer linked by Alexander in comments, you can create a publisher that emits a value every 0.5 seconds using Timer.publish(every:on:in:), then zip that together with your Array.publisher to make your downstream publisher emit a value every time both of your publishers have emitted a new value.

Publishers.Zip takes the n-th element of its of its upstream publishers and only emits when both of its upstreams have reached n emitted values - hence by zipping together a publisher that only emits its values at 0.5 second intervals with your original publisher that emits all of its values immediately, you delay each value by 0.5 seconds.

let delayPublisher = Timer.publish(every: 0.5, on: .main, in: .default).autoconnect()
let delayedValuesPublisher = Publishers.Zip(myCollection.publisher, delayPublisher)
let subscription = delayedValuesPublisher.sink { print($0.0) }

RxJava delay for each item of list emitted

One way to do it is to use zip to combine your observable with an Interval observable to delay the output.

Observable.zip(Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList()),
Observable.interval(50, TimeUnit.MILLISECONDS),
(obs, timer) -> obs)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();

How to put delay in RxSwift?

You can use Observable.timer along with Observable.zip.

Something like:

Observable<MyType>.zip(
myObservable,
Observable<Int>.timer(RxTimeInterval.seconds(5), scheduler: MainScheduler.instance),
resultSelector: { myItem, _ in return myItem }
)

Result selector is to ignore value produced by timer.

RxSwift, count how many equal consecutive Equatable items are emitted

You can write your own operator. Here's an example implementation. I create a new observable, that subscribes to self's events. Anytime self has a new element, the .next case in the switch is hit, which does the run-length booking keeping. Whenever a different element, an error, or a completion are encountered, the grouping is emitted.

extension ObservableType where Self.E: Equatable {
func runLengthEncode() -> Observable<(element: E, count: Int)> {
var lastGrouping: (element: E, count: Int)? = nil

return Observable.create { observer in
return self.subscribe { event in
switch event {
case .next(let currentElement):
if let currentGrouping = lastGrouping {
if currentGrouping.element == currentElement {
lastGrouping = (element: currentElement, count: currentGrouping.count + 1)
}
else { // This run ended, a new element was encountered.
lastGrouping = (element: currentElement, count: 1) // start a new grouping
observer.on(.next(currentGrouping)) // emit the completed grouping
}
} else {
lastGrouping = (element: currentElement, count: 1)
}

case .error(let error):
if let lastGrouping = lastGrouping { observer.on(.next(lastGrouping)) } // Emit the last unemitted grouping.
observer.on(.error(error))

case .completed:
if let lastGrouping = lastGrouping { observer.on(.next(lastGrouping)) } // Emit the last unemitted grouping.
observer.on(.completed)
}
}
}
}
}

You can also implement a complimentary run length decoding operator:

extension ObservableType {
func runLengthDecode<Element>() -> Observable<Element>
where Self.E == (element: Element, count: Int) {
return Observable.create { observer in
return self.subscribe { event in
switch event {
case .next((element: let element, count: let count)):
for _ in 1...count {
observer.on(.next(element))
}

case .error(let error): observer.on(.error(error))
case .completed: observer.on(.completed)
}
}
}
}
}

Test case:

let numbers = Observable<Int>.from([
0, 0,
3, 3, 3, 3, 3,
2,
0, 0, 0,
6, 6,
])

let runLengthEncoded = numbers.runLengthEncode()
runLengthEncoded.subscribe { print($0) }

let runLengthDecoded = runLengthEncoded.runLengthDecode()
runLengthDecoded.subscribe { print($0) }

RXSwift Zip operator N items via an enumerable

It can handle more than 8 if you use it on CollectionType.

Here's the method definition:

extension CollectionType where Generator.Element : ObservableType {
public func zip<R>(resultSelector: [Generator.Element.E] throws -> R) -> Observable<R> {
return ZipCollectionType(sources: self, resultSelector: resultSelector)
}
}

So instead of using it like this:

Observable.zip(o1, o2, o3, o4, o5, o6, o7, o8, o9) { ... }

Use it like this:

[o1, o2, o3, o4, o5, o6, o7, o8, o9].zip { ... }

How to make one Observable sequence wait for another to complete before emitting?

A couple ways I can think of

import {take, publish} from 'rxjs/operators'
import {concat} from 'rxjs'

//Method one

var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1));
concat(one, two).subscribe(function() {/*do something */});

//Method two, if they need to be separate for some reason
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1), publish());
two.subscribe(function(){/*do something */});
one.subscribe(function(){/*do something */}, null, two.connect.bind(two));


Related Topics



Leave a reply



Submit