How to Limit Flatmap Concurrency in Combine Still Having All Source Events Processed

How to limit flatMap concurrency in Combine still having all source events processed?

You have stumbled very beautifully right into the use case for the .buffer operator. Its purpose is to compensate for .flatMap backpressure by accumulating values that would otherwise be dropped.

I will illustrate by a completely artificial example:

class ViewController: UIViewController {
let sub = PassthroughSubject<Int,Never>()
var storage = Set<AnyCancellable>()
var timer : Timer!
override func viewDidLoad() {
super.viewDidLoad()
sub
.flatMap(maxPublishers:.max(3)) { i in
return Just(i)
.delay(for: 3, scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
}
.sink { print($0) }
.store(in: &storage)

var count = 0
self.timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) {
_ in
count += 1
self.sub.send(count)
}
}
}

So, our publisher is emitting an incremented integer every second, but our flatMap has .max(3) and takes 3 seconds to republish a value. The result is that we start to miss values:

1
2
3
5
6
7
9
10
11
...

The solution is to put a buffer in front of the flatMap. It needs to be large enough to hold any missed values long enough for them to be requested:

        sub
.buffer(size: 20, prefetch: .keepFull, whenFull: .dropOldest)
.flatMap(maxPublishers:.max(3)) { i in

The result is that all the numeric values do in fact arrive at the sink. Of course in real life we could still lose values if the buffer is not large enough to compensate for disparity between the rate of value emission from the publisher and the rate of value emission from the backpressuring flatMap.

How to apply back pressure with Combine buffer operator to avoid flatMap to ask an infinite demand upstream?

The issue appears to be a Combine bug, as pointed out here. Using Publishers.Sequence causes the following operator to accumulate every value sent downstream before proceeding.

A workaround is to type-erase the sequence publisher:

import Foundation
import Combine

let cancellable = (0..<1_000_000).publisher
.eraseToAnyPublisher() // <----
.map(some_preprocessing)
.flatMap(maxPublishers: .max(32)) { request in
URLSession.dataTaskPublisher(for: request)
.map(\.data)
.catch { _ in
return Just(Data())
}
}
.sink { completion in
print(completion)
} receiveValue: { value in
print(value)
}

// Required in a command line tool without running loop
sleep(.max)

How can I use 'merge' to limit the concurrency of a list of observables, but only return once all observables have completed?

The issue with the posted solutions (while giving me the concurrency flexibility) was that they didn't satisfy the condition requiring the entire action to only emit once, once every item had completed.

The working solution is as follows:

import { toArray, mergeMap } from "rxjs/operators";
import { of, from, Observable } from "rxjs";

export function limitedParallelObservableExecution<T>(
listOfItems: Array<T>,
observableMethod: (item: T) => Observable<any>,
maxConcurrency: number = 4
): Observable<any> {
if (listOfItems && listOfItems.length > 0) {
let observableListOfItems: Observable<T> = from(listOfItems);
return observableListOfItems.pipe(
mergeMap(observableMethod, maxConcurrency),
toArray()
);
} else {
return of({});
}
}

The strategy here is to:

1) Create an observable stream from the list of items

2) Pass the observable method into mergeMap, along with maxConcurrency

3) Use toArray() to ensure all the observables complete before returning

Message processing throttling/backpressure

This isn't strictly backpressure, this is just limiting concurrency. Here's an easy way to do it (ignore my possibly wrong syntax, coding via TextArea):

Rx.Observable.interval(1000)
.flatMap (tick) ->
// returns an `Observable`
loadMessages()
.map (message) ->
// also returns and `Observable`, but only when
// someone first subscribes to it
Rx.Observable.defer ->
makeHttpRequest(message)
.merge 10 // at a time
.subscribe (result) ->
console.info "Processed: ", result

In C#, the equivalent idea is, instead of SelectMany, it's Select(Defer(x)).Merge(n). Merge(int) subscribes to at most n in-flight Observables, and buffers the rest until later. The reason we have a Defer, is to make it such that we don't do any work until the Merge(n) subscribes to us.



Related Topics



Leave a reply



Submit