How to Loop Over The Output of a Publisher with Combine

How can I loop over the output of a publisher with Combine?

Hmm.. It doesn't look like there is a Publishers.ZipMany that accepts a collection of publishers, so instead I merged the stories and collected them instead. Ideally this would collect them in the correct order but I haven't tested that and the documentation is still somewhat sparse across Combine.

func fetchStories(feed: FeedType) -> AnyPublisher<[Story], Error> {
fetchStoryIds(feed: feed)
.flatMap { ids -> AnyPublisher<[Story], Error> in
let stories = ids.map { self.fetchStory(id: $0) }
return Publishers.MergeMany(stories)
.collect()
.eraseToAnyPublisher()
}
.eraseToAnyPublisher()
}

If you are open to external code this is a gist implementation of ZipMany that will preserve the order:
https://gist.github.com/mwahlig/725fe5e78e385093ba53e6f89028a41c

Although I would think that such a thing would exist in the framework.

Combine framework: how to process each element of array asynchronously before proceeding

With your latest edit and this comment below:

I literally am asking is there a Combine equivalent of "don't proceed to the next step until this step, involving multiple asynchronous steps, has finished"

I think this pattern can be achieved with .flatMap to an array publisher (Publishers.Sequence), which emits one-by-one and completes, followed by whatever per-element async processing is needed, and finalized with a .collect, which waits for all elements to complete before proceeding

So, in code, assuming we have these functions:

func getFoos() -> AnyPublisher<[Foo], Error>
func getPartials(for: Foo) -> AnyPublisher<[Partial], Error>
func getMoreInfo(for: Partial, of: Foo) -> AnyPublisher<MoreInfo, Error>

We can do the following:

getFoos()
.flatMap { fooArr in
fooArr.publisher.setFailureType(to: Error.self)
}

// per-foo element async processing
.flatMap { foo in

getPartials(for: foo)
.flatMap { partialArr in
partialArr.publisher.setFailureType(to: Error.self)
}

// per-partial of foo async processing
.flatMap { partial in

getMoreInfo(for: partial, of: foo)
// build completed partial with more info
.map { moreInfo in
var newPartial = partial
newPartial.moreInfo = moreInfo
return newPartial
}
}
.collect()
// build completed foo with all partials
.map { partialArr in
var newFoo = foo
newFoo.partials = partialArr
return newFoo
}
}
.collect()

(Deleted the old answer)

How to observe array's new values with combine?

The publisher method of the array does exactly that - emits every element of the array and then completes, because the array at the time of calling the publisher has a finite number of elements.

If you want to be notified every time the arrays changes (not just when something is appended to it, but on any change) then create a @Published var for the array and attach the observer to it:

@Published var arr = [1,2,3]

cancellable = $arr
.sink { completion in print("Completed with \(completion)")
} receiveValue: { val in
print("Received value \(val)")
}

arr.append(4)

The output will be:

Received value [1, 2, 3]
Received value [1, 2, 3, 4]

But looks like what you are really looking for is listening to a stream of numbers emitted one at a time. Then define a @Published var of that type and listen to it. You'll get called every time the var changes:

@Published var value = 1

cancellable = $value
.sink { completion in print("Completed with \(completion)")
} receiveValue: { val in
print("Received value \(val)")
}

value = 2
value = 3
value = 4

The output will be:

Received value 1
Received value 2
Received value 3
Received value 4

Swift Combine: How to create a single publisher from a list of publishers?

Essentially, in your specific situation you're looking at something like this:

func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> {
Publishers.MergeMany(ingredients.map(createIngredient(ingredient:)))
.collect()
.eraseToAnyPublisher()
}

This 'collects' all the elements produced by the upstream publishers and – once they have all completed – produces an array with all the results and finally completes itself.

Bear in mind, if one of the upstream publishers fails – or produces more than one result – the number of elements may not match the number of subscribers, so you may need additional operators to mitigate this depending on your situation.

The more generic answer, with a way you can test it using the EntwineTest framework:

import XCTest
import Combine
import EntwineTest

final class MyTests: XCTestCase {

func testCreateArrayFromArrayOfPublishers() {

typealias SimplePublisher = Just<Int>

// we'll create our 'list of publishers' here. Each publisher emits a single
// Int and then completes successfully – using the `Just` publisher.
let publishers: [SimplePublisher] = [
SimplePublisher(1),
SimplePublisher(2),
SimplePublisher(3),
]

// we'll turn our array of publishers into a single merged publisher
let publisherOfPublishers = Publishers.MergeMany(publishers)

// Then we `collect` all the individual publisher elements results into
// a single array
let finalPublisher = publisherOfPublishers.collect()

// Let's test what we expect to happen, will happen.
// We'll create a scheduler to run our test on
let testScheduler = TestScheduler()

// Then we'll start a test. Our test will subscribe to our publisher
// at a virtual time of 200, and cancel the subscription at 900
let testableSubscriber = testScheduler.start { finalPublisher }

// we're expecting that, immediately upon subscription, our results will
// arrive. This is because we're using `just` type publishers which
// dispatch their contents as soon as they're subscribed to
XCTAssertEqual(testableSubscriber.recordedOutput, [
(200, .subscription), // we're expecting to subscribe at 200
(200, .input([1, 2, 3])), // then receive an array of results immediately
(200, .completion(.finished)), // the `collect` operator finishes immediately after completion
])
}
}

How to combine 2 publishers and erase values to Void?

You're looking for Merge instead of CombineLatest. Your code for this would look a bit like the following:

var hasChangedPublisher: AnyPublisher<Void, Never> {
preferences.publisher
.merge(state.$permissionStatus)
.map({ _ in
return () // transform to Void
})
.eraseToAnyPublisher()
}

Swift Combine Nested Publishers

You can use Publishers.MergeMany and collect() for this:

let decoder = JSONDecoder()
decoder.keyDecodingStrategy = .convertFromSnakeCase

func loadTrailerLinks() -> AnyPublisher<[LPLinkMetadata], Error> {
// Download data
URLSession.shared.dataTaskPublisher(for: URL(string: "Doesn't matter")!)
.tryMap() { element -> Data in
guard let httpResponse = element.response as? HTTPURLResponse,
httpResponse.statusCode == 200 else {
throw URLError(.badServerResponse)
}
return element.data
}
.decode(type: TrailerVideoResult.self, decoder: decoder)
// Convert the TrailerVideoResult to a MergeMany publisher, which merges the
// [AnyPublisher<LPLinkMetadata, Never>] into a single publisher with output
// type LPLinkMetadata
.flatMap {
Publishers.MergeMany(
$0.results
.filter { $0.site == "YouTube" }
.compactMap { URL(string: "https://www.youtube.com/watch?v=\($0.key)") }
.map(fetchMetaData)
)
// Change the error type from Never to Error
.setFailureType(to: Error.self)
}
// Collect all the LPLinkMetadata and then publish a single result of
// [LPLinkMetadata]
.collect()
.eraseToAnyPublisher()
}

iOS Swift Combine: Emit Publisher with single value

The main thing we can do to tighten up your code is to use .eraseToAnyPublisher() instead of AnyPublisher.init everywhere. This is the only real nitpick I have with your code. Using AnyPublisher.init is not idiomatic, and is confusing because it adds an extra layer of nested parentheses.

Aside from that, we can do a few more things. Note that what you wrote (aside from not using .eraseToAnyPublisher() appropriately) is fine, especially for an early version. The following suggestions are things I would do after I have gotten a more verbose version past the compiler.

We can use Optional's flatMap method to transform user.imageURL into a URL. We can also let Swift infer the Result type parameters, because we're using Result in a return statement so Swift knows the expected types. Hence:

func downloadUserProfilePhoto(user: User) -> AnyPublisher<UIImage?, StoreError> {
guard let url = user.imageURL.flatMap({ URL(string: $0) }) else {
return Result.Publisher(nil).eraseToAnyPublisher()
}

We can use mapError instead of catch. The catch operator is general: you can return any Publisher from it as long as the Success type matches. But in your case, you're just discarding the incoming failure and returning a constant failure, so mapError is simpler:

    return NetworkService.getData(url: url)
.mapError { _ in .cantDownloadProfileImage }

We can use the dot shortcut here because this is part of the return statement. Because it's part of the return statement, Swift deduces that the mapError transform must return a StoreError. So it knows where to look for the meaning of .cantDownloadProfileImage.

The flatMap operator requires the transform to return a fixed Publisher type, but it doesn't have to return AnyPublisher. Because you are using Result<UIImage?, StoreError>.Publisher in all paths out of flatMap, you don't need to wrap them in AnyPublisher. In fact, we don't need to specify the return type of the transform at all if we change the transform to use Optional's map method instead of a guard statement:

        .flatMap({ data in
UIImage(data: data)
.map { Result.Publisher($0) }
?? Result.Publisher(.cantDownloadProfileImage)
})
.eraseToAnyPublisher()

Again, this is part of the return statement. That means Swift can deduce the Output and Failure types of the Result.Publisher for us.

Also note that I put parentheses around the transform closure because doing so makes Xcode indent the close brace properly, to line up with .flatMap. If you don't wrap the closure in parens, Xcode lines up the close brace with the return keyword instead. Ugh.

Here it is all together:

func downloadUserProfilePhoto(user: User) -> AnyPublisher<UIImage?, StoreError> {
guard let url = user.imageURL.flatMap({ URL(string: $0) }) else {
return Result.Publisher(nil).eraseToAnyPublisher()
}
return NetworkService.getData(url: url)
.mapError { _ in .cantDownloadProfileImage }
.flatMap({ data in
UIImage(data: data)
.map { Result.Publisher($0) }
?? Result.Publisher(.cantDownloadProfileImage)
})
.eraseToAnyPublisher()
}


Related Topics



Leave a reply



Submit