Using Just with Flatmap Produce Failure Mismatch. Combine

Using Just with flatMap produce Failure mismatch. Combine

There is special operator setFailureType(to:). You can override failure type to whatever error type you need.

func request(request: URLRequest) -> AnyPublisher<Data, Error> {
return Just(request)
.setFailureType(to: Error.self)
.flatMap { request in
RequestManager.request(request) // returns AnyPublisher<Data, Error>
}
.eraseToAnyPublisher()
}

https://developer.apple.com/documentation/combine/just/3343941-setfailuretype

How to flatMap two Publishers with different Failure types in Combine

When you have a publisher with Never as failure type, you can use setFailureType(to:) to match the failure type of another publisher. Note that this method can only be used when the failure type is Never, according to the doc. When you have an actual failure type you can convert the error with mapError(_:). So you can do something like this:

func viewModel(trigger: AnyPublisher<Void, Never>,
dataTask: AnyPublisher<SomeType, DataTaskError>) -> AnyPublisher<AnotherType, ViewModelError> {
trigger
.setFailureType(to: ViewModelError.self) // Publisher<Void, ViewModelError>
.flatMap {
dataTask // Publisher<SomeType, DataTaskError>
.mapError { _ in ViewModelError.bang } // Publisher<SomeType, ViewModelError>
.map { _ in AnotherType() } // Publisher<AnotherType, ViewModelError>
}
.eraseToAnyPublisher()
}

Combine flatMap returning no expected contextual result type

There are several issues in your flatMap. You need to call setFailureType and eraseToAnyPublisher on the Just inside the flatMap and then also call eraseToAnyPublisher on the flatMap itself.

You should also use named closure arguments when working with nested closures - you have a sorted(by:) nested inside flatMap.

func getWorldwideData() -> AnyPublisher<WorldwideResponseItem, ErrorType> {
let url = RestEndpoints.worldwideStats.endpoint()
let publisher: AnyPublisher<WorldwideResponseItem, ErrorType> = RestManager().fetchData(url: url)

return publisher
.flatMap { response -> AnyPublisher<WorldwideResponseItem, ErrorType> in
let filteredCountries = Array(response.countries.sorted(by: { $0.totalConfirmed > $1.totalConfirmed }).prefix(3))
response.countries = filteredCountries
return Just(response).setFailureType(to: ErrorType.self).eraseToAnyPublisher()
}
.eraseToAnyPublisher()
}

However, you shouldn't be using flatMap in the first place. You are just mapping an output to a different value, so you can simply use map, which simplifies the closure quite a lot.

func getWorldwideData() -> AnyPublisher<WorldwideResponseItem, ErrorType> {
let url = RestEndpoints.worldwideStats.endpoint()
let publisher: AnyPublisher<WorldwideResponseItem, ErrorType> = RestManager().fetchData(url: url)

return publisher
.map { response -> WorldwideResponseItem in
let filteredCountries = Array(response.countries.sorted(by: { $0.totalConfirmed > $1.totalConfirmed }).prefix(3))
response.countries = filteredCountries
return response
}
.eraseToAnyPublisher()
}

Combine FlatMap duplicates for every new successful upstream

FlatMap's closure returns a publisher for every upstream value. That returned publisher can be emitting multiple values. So, in your example, every second a FlatMap returns a new Timer publisher that keeps publishing values every second.

Instead, use the Just publisher to emit each upstream value and

Timer.publish(every: 1, on: .main, in: .default)
.autoconnect()
.flatMap { value in
Just(value)
.tryMap { value throws -> String? in
return "\(Date())"
}
.catch { _ in
Just(nil)
}
.filter { $0 != nil }
}

It might be simpler for your example to use compactMap and use try? for throwing calls, instead of the flatMap approach:

Timer.publish(every: 1, on: .main, in: .default)
.autoconnect()
.compactMap { value in
try? someThrowingFunction(value)
}

FlatMap with Generic ReturnType using Combine

I took your code and boiled it down to just the Combine parts. I could not reproduce the issue you are describing. I'll post that code below. I recommend you start simplifying your code a bit at a time to see if that helps. Factoring out the Auth and Facebook token code seems like a good candidate to start with. Another good debugging technique might be to put in more explicit type declarations to make sure your closures are taking and returning what you expect. (just the other day I had a map that I thought I was applying to an Array when I was really mapping over Optional).

Here's the playground:

import UIKit
import Combine

func asURLRequest(baseURL: String) -> AnyPublisher<URLRequest, Error> {
return Deferred {
Future<URLRequest, Error> { promise in
promise(.success(URLRequest(url: URL(string: "https://www.apple.com")!)))
}
}.eraseToAnyPublisher()
}

struct APIClient {
var networkDispatcher: NetworkDispatcher!
init(networkDispatcher: NetworkDispatcher = NetworkDispatcher()) {
self.networkDispatcher = networkDispatcher
}

func dispatch() -> AnyPublisher<Data, Error> {
return asURLRequest(baseURL: "Boo!")
.flatMap { (request: URLRequest) -> AnyPublisher<Data, Error> in
print("Request Received. \(String(describing: request))")
return self.networkDispatcher.dispatch(request: request)
}.eraseToAnyPublisher()
}
}

func httpError(_ code: Int) -> Error {
return NSError(domain: "Bad Things", code: -1, userInfo: nil)
}

func handleError(_ error: Error) -> Error {
debugPrint(error)
return error
}

struct NetworkDispatcher {
let urlSession: URLSession!

public init(urlSession: URLSession = .shared) {
self.urlSession = urlSession
}

func dispatch(request: URLRequest) -> AnyPublisher<Data, Error> {
return urlSession
.dataTaskPublisher(for: request)
.tryMap({ data, response in
if let response = response as? HTTPURLResponse,
!(200...299).contains(response.statusCode) {
throw httpError(response.statusCode)
}

// Return Response data
return data
})
.mapError { error in
handleError(error)
}
.eraseToAnyPublisher()
}
}

let apiClient = APIClient()
var cancellables = [AnyCancellable]()

apiClient.dispatch()
.print()
.receive(on: DispatchQueue.main)
.sink(
receiveCompletion: { result in
debugPrint(result)

switch result {
case .failure(let error):
// Handle API response errors here (WKNetworkRequestError)
print("##### Error loading data: \(error)")
default: break
}
},
receiveValue: { value in
debugPrint(value)
})
.store(in: &cancellables)

Premature completion of publisher in flatMap in Combine

Your use of DispatchQueue.global() is the problem. The values.publisher sends all of its outputs, and its completion, downstream to the delay operator as soon as values.publisher receives the subscription. The delay operator schedules six blocks (five for the output numbers and one for the completion) to run on DispatchQueue.global() 0.1 seconds later.

DispatchQueue.global() is a concurrent queue. This means that it can run any number of blocks simultaneously. So there is no guarantee which of the six scheduled blocks will finish first.

It is in general a bad idea to use a concurrent queue as a Combine scheduler. You should use a serial queue instead. Here's a simple example:

var cancel = values.publisher
.delay(for: 0.1, scheduler: DispatchQueue(label: "my queue"))
.print()
.flatMap() { i in
[i].publisher.first()
}
.sink { completion in
print("Received Completion: \(completion)")
} receiveValue: { v in
print("Received Value: \(v)")
}

But you probably want to create the queue once and store it in a property, so you can use it with multiple publishers.

If you actually want the outputs to be delivered on the main queue (perhaps because you're going to use them to update a view), you should just use DispatchQueue.main.

Chaining with flatMap

Hi @cole for this operation you don't need either the merge or the zip, because you are not subscribing to two publishers, you are attempting to do an action after your first publisher emitted an event.

For this you only need a map .handleEvents in my opinion.

So lets try to enhance your code, we want to update both movies and videos separately, but we still need videos to be dependent of movies

First we will create the publisher to request the movies:

var request = URLRequest(url:URL(string:"https://api.themoviedb.org/3/movie/upcoming?api_key=API_KEY&language=en-US&page=1")!)
request.httpMethod = "GET"
let publisher = URLSession.shared.dataTaskPublisher(for: request)
.map{ $0.data }
.decode(type: TMDb.self, decoder: JSONDecoder())

Now we enhance this publisher handling by assigning movies:

var request = URLRequest(url:URL(string:"https://api.themoviedb.org/3/movie/upcoming?api_key=API_KEY&language=en-US&page=1")!)
request.httpMethod = "GET"
let publisher = URLSession.shared.dataTaskPublisher(for: request)
.map{ $0.data }
.decode(type: TMDb.self, decoder: JSONDecoder())
.sink(receiveCompletion: { print ($0) },
receiveValue: { self.movies = $0.results })

Now we will add .handleEvent in order to iterate through our movies to create all the publishers which emit videos events and append videos for the videos array:

var request = URLRequest(url:URL(string:"https://api.themoviedb.org/3/movie/upcoming?api_key=API_KEY&language=en-US&page=1")!)
request.httpMethod = "GET"
let publisher = URLSession.shared.dataTaskPublisher(for: request)
.map{ $0.data }
.decode(type: TMDb.self, decoder: JSONDecoder())
.sink(receiveCompletion: { print ($0) },
receiveValue: { self.movies = $0.results })
.handleEvents(receiveSubscription:nil, receiveOutput: { [weak self] movies in guard let self = self else {return}
self.videos = [Videos]()
for movie in movies.results {
self.fetchVideos(movie.id)
}, receiveCompletion:nil, receiveCancel:nil, receiveRequest:nil)
})
.store(in: &cancellables)

Now for the last step lets update the fetchVideos accordingly:

func fetchVideos(_ id: Int) {
let url = URL(string: "https://api.themoviedb.org/3/movie/\(id)/videos?api_key=API_KEY&language=en-US")!
return URLSession.shared.dataTaskPublisher(for: url)
.mapError { $0 as Error }
.map{ $0.data }
.decode(type: Videos.self, decoder: JSONDecoder())
.sink(receiveCompletion: { print ($0) },
receiveValue: { [weak self] videos in guard let self = self else {return}
self.videos.append(videos)
})
.store(in: &cancellables)
}


Related Topics



Leave a reply



Submit