Chain Network Calls Sequentially in Combine

Swift combine chain requests

The general idea is to use a flatMap for chaining, which is what you did, but if you also need the original value, you can return a Zip publisher (with a .zip operator) that puts two results into a tuple.

One of the publishers is the second request, and the other should just emit the value. You can typically do this with Just(v), but you have to make sure that its failure type (which is Never) matches with the other publisher. You can match its failure type with .setFailureType(to:):

publisher1
.flatMap { one in
Just(one).setFailureType(to: Error.self) // error has to match publisher2
.zip(publisher2(with: one))
}
.sink(receiveCompletion: { completion in
// ...
}, receiveValue: { (one, two) in
// ...
})

Alternatively, you can use Result.Publisher which would infer the error (but might look somewhat odd):

.flatMap { one in
Result.Publisher(.success(one))
.zip(publisher2)
}

So, in your case it's going to be something like this:

URLSession.shared.dataTaskPublisher(for: url)
.map(\.data)
.decode(type: Schedule.self, decoder: JSONDecoder())
.flatMap {
Result.Publisher(.success($0))
.zip(self.fetchLiveFeed($0.dates.first?.games.first?.link ?? ""))
}
.sink(receiveCompletion: { completion in
// ...
}, receiveValue: { (schedule, live) in
// ...
})
.store(in: &cancellables)

How to merge multiple network calls’ responses into an array? With or without Combine?

Here's a great way to do it with the Combine operators:

func fetchCodes() -> AnyPublisher<[String],Error> { fatalError() }
func fetchImage(forCode: String) -> AnyPublisher<UIImage,Error> { fatalError() }

func example() -> AnyPublisher<[String: UIImage], Error> {
let codesAndImages = keysAndValues(fetchImage(forCode:))
return fetchCodes()
.flatMap { codes in
combineLatest(codesAndImages(codes))
}
.map(Dictionary.init(uniqueKeysWithValues:))
.eraseToAnyPublisher()
}

func keysAndValues<A, B>(_ get: @escaping (A) -> AnyPublisher<B, Error>) -> ([A]) -> [AnyPublisher<(A, B), Error>] {
{ xs in
xs.map { x in
Just(x).setFailureType(to: Error.self)
.zip(get(x))
.eraseToAnyPublisher()
}
}
}

func combineLatest<A>(_ xs: [AnyPublisher<A, Error>]) -> AnyPublisher<[A], Error> {
xs.reduce(Empty<[A], Error>().eraseToAnyPublisher()) { state, x in
state.combineLatest(x)
.map { $0.0 + [$0.1] }
.eraseToAnyPublisher()
}
}

or you could do it like this which is flatter:

func example() -> AnyPublisher<[String: UIImage], Error> {
let codes = fetchCodes()
.share()
let images = codes.flatMap { zipAll($0.map(fetchImage(forCode:))) }
return Publishers.Zip(codes, images)
.map { zip($0, $1) }
.map(Dictionary.init(uniqueKeysWithValues:))
.eraseToAnyPublisher()
}

func zipAll<A>(_ xs: [AnyPublisher<A, Error>]) -> AnyPublisher<[A], Error> {
xs.reduce(Empty<[A], Error>().eraseToAnyPublisher()) { state, x in
state.zip(x)
.map { $0.0 + [$0.1] }
.eraseToAnyPublisher()
}
}

Without Combine it gets a bit more tricky:

func fetchCodes(completion: @escaping (Result<[String],Error>) -> Void) { }
func fetchImage(forCode: String, completion: @escaping (Result<UIImage,Error>) -> Void) { }

func example(_ completion: @escaping (Result<[String: UIImage], Error>) -> Void) {
fetchCodes { codesResult in
switch codesResult {
case let .success(codes):
loadImages(codes: codes, completion)
case let .failure(error):
completion(.failure(error))
}
}
}

func loadImages(codes: [String], _ completion: @escaping (Result<[String: UIImage], Error>) -> Void) {
var result = [String: UIImage]()
let lock = NSRecursiveLock()
let group = DispatchGroup()
for code in codes {
group.enter()
fetchImage(forCode: code) { imageResult in
switch imageResult {
case let .success(image):
lock.lock()
result[code] = image
lock.unlock()
group.leave()
case let .failure(error):
completion(.failure(error))
}
}
}
group.notify(queue: .main) {
completion(.success(result))
}
}

With the Combine version if a fetchImage fails, then the others are all canceled. With the callback version, that is not the case. Instead, the others will finish and then throw away the data downloaded.

Combine: Chain requests with dependency, keep both responses

You are not going to be able to chain the requests the way you are trying to and still capture all the results.

Think of it this way. By chaining Combine operators you're constructing a pipeline. You can decide what to put into the input to the pipeline, and you can dump whatever comes out of the output of the pipeline into a sink where you can see the results, but you can't go through the sides of the pipe to see the intermediate values (at least not without cutting a window in the pipe which we'll get to).

Here's your code:

let places = callAPI.places()
let firstPlace = places.compactMap { $0.first }
let weather = firstPlace.flatMap { place in
callAPI.weather(latitude: place.latitude, longitude: place.longitude)
}

let token = weather.sink(receiveCompletion: { _ in },
receiveValue: { print($0) })

Those variables each held a piece of the pipeline (not the values that will flow through the pipe) and you're screwing the pipeline together putting longer and longer pieces in each variable.

If I want to make the whole pipeline a bit more obvious write it like this:

let cancellable = callAPI.places()
.compactMap { $0.first }
.flatMap { place in
callAPI.weather(latitude: place.latitude, longitude: place.longitude)
}
.sink(receiveCompletion: { _ in },
receiveValue: { print($0) })

(note that might not compile as is... I pulled it together in the answer editor)

When you chain the operators directly it's obvious that you don't have any opportunity to catch intermediate results. For your pipeline the stuff that goes into the pipeline comes from the network. You catch the stuff flowing out of pipeline in a sink. But notice how you only get to look at the "stuff" flowing through the pipeline in closures that are part of the pipeline itself.

Now, if you really want to cut a window into the pipeline to pull out intermediate stuff, you need one of those closures that can push the value out of the pipeline. In this case, to get at the array of Places you might do it using handleEvents. It would look something like this:

var allPlaces : [Place]?

callAPI.places()
.handleEvents(receiveOutput: { allPlaces = $0 })
.compactMap { $0.first }
...

In this code, you catch the receiveOutput event and sneak the result out into a nearby variable.

handleEvents, in my opinion, is one of those "Great Power, Great Responsibility" operators. In this case it will let you do what you are asking to do, but I'm not sure you should do it.

The whole point of chaining operators together is that the resulting pipeline "should" be free of side-effects. In this case handleEvents is being used to explicitly introduce a side-effect (setting the allPlaces variable). Essentially this is, in my opinion, a code smell that suggests you may need to rethink your design.

How to chain multiple API calls using dataTaskPublisher?

As suggested in https://stackoverflow.com/a/56786412/1271826, you can use collect():

func getAllData(for type: DataType) -> AnyPublisher<[MyData], Error> {
getIDs(for: type).flatMap { ids in
Publishers.Sequence(sequence: ids.map { self.getData(with: $0) })
.flatMap { $0 }
.collect()
}.eraseToAnyPublisher()
}

How to schedule a synchronous sequence of asynchronous calls in Combine?

After experimenting for a while in a playground, I believe I found a solution, but if you have a better idea, please share. The solution is to add maxPublishers parameter to flatMap and set the value to max(1)

Publishers.Sequence(sequence: urls)
.flatMap(maxPublishers: .max(1)) // <<<<--- here
{ url in
Publishers.Future<Result, Error> { callback in
myCall { data, error in
if let data = data {
callback(.success(data))
} else if let error = error {
callback(.failure(error))
}
}
}
}

Improving compile time of long combine chain

So this would eventually compile if the chain was split out into it's own function e.g:

func syncEndpoints(event: SyncUpdate?) -> AnyPublisher<SyncUpdate?, Error> {
return syncEndpointOne
.flatMap(syncEndpointTwo)
.flatMap(syncEndpointThree)
.flatMap(syncEndpointFour)
.eraseToAnyPublisher()
}

And then:

Timer.publish(every: interval, on: .current, in: .common)
.receive(on: DispatchQueue.global(qos: .background))
.flatMap(syncEndpoints)
...

Chaining Combine.Publisher and calling completion when finished

If the publishers depend on each other, you should chain with .flatMap. If not, you could use .append instead. Either way, if all of these publishers are one-shot publishers (they all send a completion after one value), then the chain will be torn down in good order when all have fired.

Example 1:

    Just(1)
.flatMap { Just(($0,2)) }
.flatMap { Just(($0.0, $0.1, 3)) }
.sink(receiveCompletion: {print($0)},
receiveValue: {print($0)})
.store(in:&storage)
// (1,2,3) as a tuple, then finished

Example 2:

    Just(1).append(Just(2).append(Just(3)))
.sink(receiveCompletion: {print($0)},
receiveValue: {print($0)})
.store(in:&storage)
// 1, then 2, then 3, then finished

Access used values in a combine-operator-chain

The answer that Jake wrote and deleted is correct. I don't know why he deleted it; maybe he felt he couldn't support it with example code. But the answer is exactly right. If you need the initial str value later in the pipeline, it is up to you to keep passing it down through every step. You typically do that by passing a tuple of values at each stage, so that the string makes it far enough down the chain to be retrieved. This is a very common strategy in Combine programming.

For a simple example, take a look at the Combine code in the central section of this article:

https://www.biteinteractive.com/swift-5-5-asynchronous-looping-with-async-await/

As I say in the article:

You’ll observe that, as opposed to GCD where local variables just magically “fall through” to nested completion handlers at a lower level of scope, every step in a Combine pipeline must explicitly pass down all information that may be needed by a later step. This can result in some rather ungainly values working their way down the pipeline, often in the form of a tuple, as I’ve illustrated here.

But I don’t regard that as a problem. On the contrary, being explicit about what’s passing down the pipeline seems to me to be a gain in clarity.

To illustrate further, here's a rewrite of your pseudo-code; this is real code, you can run it and try it out:

class ViewController: UIViewController {
var storage = Set<AnyCancellable>()
override func viewDidLoad() {
super.viewDidLoad()
let strings = ["a", "b", "c"]
let pipeline = strings.publisher
.map { str -> (String, URL) in
let url = URL(string: "https://www.apeth.com/pep/manny.jpg")!
return (str, url)
}
.flatMap { (str, url) -> AnyPublisher<(String, (data: Data, response: URLResponse)), URLError> in
let sess = URLSession.shared.dataTaskPublisher(for: url)
.eraseToAnyPublisher()
let just = Just(str).setFailureType(to: URLError.self)
.eraseToAnyPublisher()
let zip = just.zip(sess).eraseToAnyPublisher()
return zip
}
.map { (str, result) -> (String, Data) in
(str, result.data)
}
.sink { comp in print(comp) } receiveValue: { (str, data) in print (str, data) }
pipeline.store(in: &storage)
}
}

That's not the only way to express the pipeline, but it does work, and it should give you a starting point.



Related Topics



Leave a reply



Submit