How to Create a Custom Chain in Swift Combine

How to create a custom chain in Swift Combine?

Your requestAuthorization should return the publisher:

func requestAuthorization(for type: LocationAPI.AuthorizationType = .whenInUse) -> AnyPublisher<Bool, Never> {
// start authorization flow

return authorizationPublisher
}

I would also recommend that you use a CurrentValueSubject instead of a PassthroughSubject so you can always get the "current" status when subscribing to the authorizationPublisher.

Combine: how to chain and then recombine one-to-many network queries

After the existing pipeline, you should first flatMap to Publishers.Sequence:

.flatMap(\.publisher)

This changes turns your publisher from one that publishes arrays of things, into one that publishes those array elements.

Then do another flat map to the URL session data task publisher, with all the steps to extract otherData attached. Note that at the very end is where we associate id to otherData:

.flatMap { id in
URLSession.shared.dataTaskPublisher(
// as an example
for: URL(string: "https://example.com/?id=\(id)")!
).receive(on: apiQueue)
.map(\.data)
.decode(type: Foo.self, decoder: JSONDecoder())
.map { (id, $0.otherData) } // associate id with otherData
}

Then you can collect() to turn it into a publisher that publishes an array only.

Full version:

// this is of type AnyPublisher<[(Int, Int)], Error>
let _ = URLSession.shared
.dataTaskPublisher(for: url)
.receive(on: apiQueue)
.map(\.data)
.decode(type: MyMainResultType.self, decoder: JSONDecoder())
.map { $0.results.map { $0.id } }
.flatMap(\.publisher)
.flatMap { id in
URLSession.shared.dataTaskPublisher(
// as an example
for: URL(string: "https://example.com/?id=\(id)")!
).receive(on: apiQueue)
.map(\.data)
.decode(type: Foo.self, decoder: JSONDecoder())
.map { (id, $0.otherData) } // associate id with otherData
}
.collect()
.eraseToAnyPublisher()

How to create and emit custom event with Combine and Swift?

Here is the solution utilising PassthroughSubject. For now the shortest and cleanest one I could produce. Tested with multiple subscribers, works like expected.

class MainClass {
private let sequenceManager: SequenceManager = .init()
private var bucket: Set<AnyCancellable> = []

func subscribeMe() {
//Here is how we use our publisher
sequenceManager
.publisher(for: .onSequenceUpdate)
.receive(on: DispatchQueue.main) //Optionally
.sink { (addingItems, removingItems) in
//Handle new data here
}
.store(in: &bucket)
}
}

//Object we want to publish its changes
class SequenceManager {
private let sequencePublisher = UpdatedSequencePublisher()

private func didUpdate(addingItems: [String], removingItems: [String]) {
//Here we publish changes
sequencePublisher.publish(adding: addingItems, removing: removingItems)
}
}

//Our publisher stuff
extension SequenceManager {
struct UpdatedSequencePublisher: Publisher {
typealias Output = (adding: [String], removing: [String])
typealias Failure = Never
private let passThroughSubject = PassthroughSubject<Output, Failure>()

func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
passThroughSubject.receive(subscriber: subscriber)
}

func publish(adding: [String], removing: [String]) {
passThroughSubject.send((adding, removing))
}
}

func publisher(for event: Event) -> UpdatedSequencePublisher {
switch event {
case .onSequenceUpdate:
return sequencePublisher
}
}

enum Event {
case onSequenceUpdate
}
}

Swift combine chain requests

The general idea is to use a flatMap for chaining, which is what you did, but if you also need the original value, you can return a Zip publisher (with a .zip operator) that puts two results into a tuple.

One of the publishers is the second request, and the other should just emit the value. You can typically do this with Just(v), but you have to make sure that its failure type (which is Never) matches with the other publisher. You can match its failure type with .setFailureType(to:):

publisher1
.flatMap { one in
Just(one).setFailureType(to: Error.self) // error has to match publisher2
.zip(publisher2(with: one))
}
.sink(receiveCompletion: { completion in
// ...
}, receiveValue: { (one, two) in
// ...
})

Alternatively, you can use Result.Publisher which would infer the error (but might look somewhat odd):

.flatMap { one in
Result.Publisher(.success(one))
.zip(publisher2)
}

So, in your case it's going to be something like this:

URLSession.shared.dataTaskPublisher(for: url)
.map(\.data)
.decode(type: Schedule.self, decoder: JSONDecoder())
.flatMap {
Result.Publisher(.success($0))
.zip(self.fetchLiveFeed($0.dates.first?.games.first?.link ?? ""))
}
.sink(receiveCompletion: { completion in
// ...
}, receiveValue: { (schedule, live) in
// ...
})
.store(in: &cancellables)

Swift Combine Nested Publishers

You can use Publishers.MergeMany and collect() for this:

let decoder = JSONDecoder()
decoder.keyDecodingStrategy = .convertFromSnakeCase

func loadTrailerLinks() -> AnyPublisher<[LPLinkMetadata], Error> {
// Download data
URLSession.shared.dataTaskPublisher(for: URL(string: "Doesn't matter")!)
.tryMap() { element -> Data in
guard let httpResponse = element.response as? HTTPURLResponse,
httpResponse.statusCode == 200 else {
throw URLError(.badServerResponse)
}
return element.data
}
.decode(type: TrailerVideoResult.self, decoder: decoder)
// Convert the TrailerVideoResult to a MergeMany publisher, which merges the
// [AnyPublisher<LPLinkMetadata, Never>] into a single publisher with output
// type LPLinkMetadata
.flatMap {
Publishers.MergeMany(
$0.results
.filter { $0.site == "YouTube" }
.compactMap { URL(string: "https://www.youtube.com/watch?v=\($0.key)") }
.map(fetchMetaData)
)
// Change the error type from Never to Error
.setFailureType(to: Error.self)
}
// Collect all the LPLinkMetadata and then publish a single result of
// [LPLinkMetadata]
.collect()
.eraseToAnyPublisher()
}

Access used values in a combine-operator-chain

The answer that Jake wrote and deleted is correct. I don't know why he deleted it; maybe he felt he couldn't support it with example code. But the answer is exactly right. If you need the initial str value later in the pipeline, it is up to you to keep passing it down through every step. You typically do that by passing a tuple of values at each stage, so that the string makes it far enough down the chain to be retrieved. This is a very common strategy in Combine programming.

For a simple example, take a look at the Combine code in the central section of this article:

https://www.biteinteractive.com/swift-5-5-asynchronous-looping-with-async-await/

As I say in the article:

You’ll observe that, as opposed to GCD where local variables just magically “fall through” to nested completion handlers at a lower level of scope, every step in a Combine pipeline must explicitly pass down all information that may be needed by a later step. This can result in some rather ungainly values working their way down the pipeline, often in the form of a tuple, as I’ve illustrated here.

But I don’t regard that as a problem. On the contrary, being explicit about what’s passing down the pipeline seems to me to be a gain in clarity.

To illustrate further, here's a rewrite of your pseudo-code; this is real code, you can run it and try it out:

class ViewController: UIViewController {
var storage = Set<AnyCancellable>()
override func viewDidLoad() {
super.viewDidLoad()
let strings = ["a", "b", "c"]
let pipeline = strings.publisher
.map { str -> (String, URL) in
let url = URL(string: "https://www.apeth.com/pep/manny.jpg")!
return (str, url)
}
.flatMap { (str, url) -> AnyPublisher<(String, (data: Data, response: URLResponse)), URLError> in
let sess = URLSession.shared.dataTaskPublisher(for: url)
.eraseToAnyPublisher()
let just = Just(str).setFailureType(to: URLError.self)
.eraseToAnyPublisher()
let zip = just.zip(sess).eraseToAnyPublisher()
return zip
}
.map { (str, result) -> (String, Data) in
(str, result.data)
}
.sink { comp in print(comp) } receiveValue: { (str, data) in print (str, data) }
pipeline.store(in: &storage)
}
}

That's not the only way to express the pipeline, but it does work, and it should give you a starting point.

Swift Combine: combining two calls to create a new output

You may want to look at the various combine and merge calls. For example, using combineLatest:

struct ContentView: View {

@State var result: CustomData?
@State var cancellable: AnyCancellable?

var body: some View {
VStack {
if let result {
Text("result: \(result.data1) \(result.data2)")
}
}
.onAppear {
cancellable =
getInt()
.combineLatest(getString())
.tryMap {
CustomData(data1: $0, data2: $1)
}
.sink(receiveCompletion: { err in
//
}, receiveValue: { customData in
self.result = customData
})
}
}

struct CustomData {
let data1: Int
let data2: String
}

func getInt() -> AnyPublisher<Int, Error> {
return Just(Int.random(in: 0...100))
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}

func getString() -> AnyPublisher<String, Error> {
return Just("\(Int.random(in: 200...400))")
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
}

Swift Combine: How to create a single publisher from a list of publishers?

Essentially, in your specific situation you're looking at something like this:

func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> {
Publishers.MergeMany(ingredients.map(createIngredient(ingredient:)))
.collect()
.eraseToAnyPublisher()
}

This 'collects' all the elements produced by the upstream publishers and – once they have all completed – produces an array with all the results and finally completes itself.

Bear in mind, if one of the upstream publishers fails – or produces more than one result – the number of elements may not match the number of subscribers, so you may need additional operators to mitigate this depending on your situation.

The more generic answer, with a way you can test it using the EntwineTest framework:

import XCTest
import Combine
import EntwineTest

final class MyTests: XCTestCase {

func testCreateArrayFromArrayOfPublishers() {

typealias SimplePublisher = Just<Int>

// we'll create our 'list of publishers' here. Each publisher emits a single
// Int and then completes successfully – using the `Just` publisher.
let publishers: [SimplePublisher] = [
SimplePublisher(1),
SimplePublisher(2),
SimplePublisher(3),
]

// we'll turn our array of publishers into a single merged publisher
let publisherOfPublishers = Publishers.MergeMany(publishers)

// Then we `collect` all the individual publisher elements results into
// a single array
let finalPublisher = publisherOfPublishers.collect()

// Let's test what we expect to happen, will happen.
// We'll create a scheduler to run our test on
let testScheduler = TestScheduler()

// Then we'll start a test. Our test will subscribe to our publisher
// at a virtual time of 200, and cancel the subscription at 900
let testableSubscriber = testScheduler.start { finalPublisher }

// we're expecting that, immediately upon subscription, our results will
// arrive. This is because we're using `just` type publishers which
// dispatch their contents as soon as they're subscribed to
XCTAssertEqual(testableSubscriber.recordedOutput, [
(200, .subscription), // we're expecting to subscribe at 200
(200, .input([1, 2, 3])), // then receive an array of results immediately
(200, .completion(.finished)), // the `collect` operator finishes immediately after completion
])
}
}

Chaining Combine.Publisher and calling completion when finished

If the publishers depend on each other, you should chain with .flatMap. If not, you could use .append instead. Either way, if all of these publishers are one-shot publishers (they all send a completion after one value), then the chain will be torn down in good order when all have fired.

Example 1:

    Just(1)
.flatMap { Just(($0,2)) }
.flatMap { Just(($0.0, $0.1, 3)) }
.sink(receiveCompletion: {print($0)},
receiveValue: {print($0)})
.store(in:&storage)
// (1,2,3) as a tuple, then finished

Example 2:

    Just(1).append(Just(2).append(Just(3)))
.sink(receiveCompletion: {print($0)},
receiveValue: {print($0)})
.store(in:&storage)
// 1, then 2, then 3, then finished


Related Topics



Leave a reply



Submit