How to emit items, one by one, from Collection with a delay in RxSwift
Well... after some R&D this is a tested working version for my question.
Observable.zip(Observable.from(allDogs), Observable<Int>.interval(RxTimeInterval(delay), scheduler: MainScheduler.instance)).subscribe(onNext: { (dog, index) in
print(dog.name)
}
)
Delay iteration of an collection in RxSwift
This is a common confusion. As you have learned delay
will be applied to every element equally so all of them are delayed by five seconds, rather than putting a five second delay between each event. (So the first event happens immediately, the second at five seconds, the third at ten, and so on.
Once you realize that what you are really trying to do is put a delay between each event, the solution becomes more clear. Or at least, the reason why just putting a delay on the from
operator isn't working should be more clear.
The solution is to use one of the flatMap variants:
let array = [1, 2, 3, 4, 5]
Observable.from(array)
.concatMap { Observable.empty().delay(.seconds(5), scheduler: MainScheduler.instance).startWith($0) }
.subscribe { (intValue) in
print("onNext() \(intValue)")
}
The line Observable.empty().delay(.seconds(5), scheduler: MainScheduler.instance).startWith($0)
will immediately emit the value, but then wait five seconds before emitting a completed event.
The concatMap
operator calls the closure on each incoming event and concats the resulting Observables together so that following Observables aren't subscribed to, until the previous one completes.
Learn more about this in my article, The Many Faces of FlatMap
An alternative solution based on the comments that Sandeep Bhandari wrote on the question would be to use an interval to constrain when the from
operator can emit values. Something like this:
let array = [1, 2, 3, 4, 5]
Observable.zip(Observable.from(array), Observable<Int>.interval(.seconds(5), scheduler: MainScheduler.instance).take(while: { $0 != array.count }))
.map { $0 }
.subscribe { (intValue) in
print("onNext() \(intValue)")
}
Combine: publish elements of a sequence with some delay
Using the idea from the answer linked by Alexander in comments, you can create a publisher that emits a value every 0.5 seconds using Timer.publish(every:on:in:)
, then zip
that together with your Array.publisher
to make your downstream publisher emit a value every time both of your publishers have emitted a new value.
Publishers.Zip
takes the n-th element of its of its upstream publishers and only emits when both of its upstreams have reached n emitted values - hence by zipping together a publisher that only emits its values at 0.5 second
intervals with your original publisher that emits all of its values immediately, you delay each value by 0.5 seconds.
let delayPublisher = Timer.publish(every: 0.5, on: .main, in: .default).autoconnect()
let delayedValuesPublisher = Publishers.Zip(myCollection.publisher, delayPublisher)
let subscription = delayedValuesPublisher.sink { print($0.0) }
RxJava delay for each item of list emitted
One way to do it is to use zip
to combine your observable with an Interval
observable to delay the output.
Observable.zip(Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList()),
Observable.interval(50, TimeUnit.MILLISECONDS),
(obs, timer) -> obs)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
How to put delay in RxSwift?
You can use Observable.timer
along with Observable.zip
.
Something like:
Observable<MyType>.zip(
myObservable,
Observable<Int>.timer(RxTimeInterval.seconds(5), scheduler: MainScheduler.instance),
resultSelector: { myItem, _ in return myItem }
)
Result selector is to ignore value produced by timer.
RxSwift, count how many equal consecutive Equatable items are emitted
You can write your own operator. Here's an example implementation. I create a new observable, that subscribes to self
's events. Anytime self
has a new element, the .next
case in the switch
is hit, which does the run-length booking keeping. Whenever a different element, an error, or a completion are encountered, the grouping is emitted.
extension ObservableType where Self.E: Equatable {
func runLengthEncode() -> Observable<(element: E, count: Int)> {
var lastGrouping: (element: E, count: Int)? = nil
return Observable.create { observer in
return self.subscribe { event in
switch event {
case .next(let currentElement):
if let currentGrouping = lastGrouping {
if currentGrouping.element == currentElement {
lastGrouping = (element: currentElement, count: currentGrouping.count + 1)
}
else { // This run ended, a new element was encountered.
lastGrouping = (element: currentElement, count: 1) // start a new grouping
observer.on(.next(currentGrouping)) // emit the completed grouping
}
} else {
lastGrouping = (element: currentElement, count: 1)
}
case .error(let error):
if let lastGrouping = lastGrouping { observer.on(.next(lastGrouping)) } // Emit the last unemitted grouping.
observer.on(.error(error))
case .completed:
if let lastGrouping = lastGrouping { observer.on(.next(lastGrouping)) } // Emit the last unemitted grouping.
observer.on(.completed)
}
}
}
}
}
You can also implement a complimentary run length decoding operator:
extension ObservableType {
func runLengthDecode<Element>() -> Observable<Element>
where Self.E == (element: Element, count: Int) {
return Observable.create { observer in
return self.subscribe { event in
switch event {
case .next((element: let element, count: let count)):
for _ in 1...count {
observer.on(.next(element))
}
case .error(let error): observer.on(.error(error))
case .completed: observer.on(.completed)
}
}
}
}
}
Test case:
let numbers = Observable<Int>.from([
0, 0,
3, 3, 3, 3, 3,
2,
0, 0, 0,
6, 6,
])
let runLengthEncoded = numbers.runLengthEncode()
runLengthEncoded.subscribe { print($0) }
let runLengthDecoded = runLengthEncoded.runLengthDecode()
runLengthDecoded.subscribe { print($0) }
RXSwift Zip operator N items via an enumerable
It can handle more than 8 if you use it on CollectionType
.
Here's the method definition:
extension CollectionType where Generator.Element : ObservableType {
public func zip<R>(resultSelector: [Generator.Element.E] throws -> R) -> Observable<R> {
return ZipCollectionType(sources: self, resultSelector: resultSelector)
}
}
So instead of using it like this:
Observable.zip(o1, o2, o3, o4, o5, o6, o7, o8, o9) { ... }
Use it like this:
[o1, o2, o3, o4, o5, o6, o7, o8, o9].zip { ... }
How to make one Observable sequence wait for another to complete before emitting?
A couple ways I can think of
import {take, publish} from 'rxjs/operators'
import {concat} from 'rxjs'
//Method one
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1));
concat(one, two).subscribe(function() {/*do something */});
//Method two, if they need to be separate for some reason
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1), publish());
two.subscribe(function(){/*do something */});
one.subscribe(function(){/*do something */}, null, two.connect.bind(two));
Related Topics
Why Does Cabasicanimation Try to Initialize Another Instance of My Custom Calayer
Avaudioconverter with Avaudioconverterinputblock Stutters Audio After Processing
Why How to Make Same-Type Requirement in Swift with Generics? Is There Any Way
Navigation Bar Items After Push from Swiftui to UIkit
Reading from Txt File in Swift 3
Accessibility Custom Actions Aren't Announced in Swift
How to Find Realm File Location of a MAC App
How to Access The Firebase Topics a User Is Subscribed To
Viewwilllayoutsubviews in Swift
Scrolltoitem at Indexpath at .Top Hides Cell Under Header When Sectionheaderspintovisiblebounds
Swift Navigation Bar Item Not Calling Action
How to Check If a Variable Is Nil
How to Group Items in a Nspopupbutton
Can't Upload .Ipa from Xcode 8, "The Info.Plist Indicates a iOS App, But Submitting a Pkg or Mpkg."