Implementing Reconnection with Urlsession Publisher and Combine

Receive task progress from `URLSession.dataTaskPublisher`

No, you're not missing anything. To retrieve the progress information you would need to construct your own Future and not use the built-in data task publisher.

intermittent results using URLSession with Combine

Put a print statement in your receiveCompletion: closure and you will learn why your code doesn't print sometimes.

.sink(
receiveCompletion: { completion in
print(completion)
},
receiveValue: { clues in
print(clues)
}
)

I saw this when I did it:

failure(Swift.DecodingError.valueNotFound(Swift.Int, Swift.DecodingError.Context(codingPath: [_JSONKey(stringValue: "Index 2", intValue: 2), CodingKeys(stringValue: "value", intValue: nil)], debugDescription: "Expected Int value but found null instead.", underlyingError: nil)))

Background URLSession + Combine?

URLSession.DataTaskPublisher is built on top of URLSessionDataTask and sets a completion handler on the task. So you cannot use DataTaskPublisher with a background session.

You can find the source code of DataTaskPublisher in the Swift project repo. Here are the relevant lines:

let task = p.session.dataTask(
with: p.request,
completionHandler: handleResponse(data:response:error:)
)

URLSession dataTaskPublisher with retry as MJPEG stream

URLSession.DataTaskPublisher isn't suitable for this purpose. It will only emit at most one output. You can look at its implementation to understand why: it creates a URLSessionDataTask with a completion handler. The task only calls the completion handler once, when it has received the complete response.

If you want to receive the data as it arrives, you need to create an object that conforms to URLSessionDataDelegate and use that object as the delegate of your URLSession. Apple doesn't provide a Publisher that does this. If you want a Publisher that provides data incrementally, you'll have to write it yourself.

How can I continue URLSession dataTaskPublisher or another Publisher after error?

Publishing a failure always ends a subscription. Since you want to continue publishing after an error, you cannot publish your error as a failure. You must instead change your publisher's output type. The standard library provides Result, and that's what you should use.

func makeStatusPublisher() -> AnyPublisher<Result<StatusResponse, Error>, Never> {
let timer = Timer
.publish(every: 30, tolerance: 10, on: .main, in: .common)
.autoconnect()
.map { _ in true } // This is the correct way to merge with the notification publisher.

let notes = NotificationCenter.default
.publisher(for: UIApplication.willEnterForegroundNotification)
.map { _ in true }

return timer.merge(with: notes)
.flatMap({ _ in
statusResponsePublisher()
.map { Result.success($0) }
.catch { Just(Result.failure($0)) }
})
.eraseToAnyPublisher()
}

This publisher emits either .success(response) or .failure(error) periodically, and never completes with a failure.

However, you should ask yourself, what happens if the user switches apps repeatedly? Or what if the API request takes more that 30 seconds to complete? (Or both?) You'll get multiple requests running simultaneously, and the responses will be handled in the order they arrive, which might not be the order in which the requests were sent.

One way to fix this would be to use flatMap(maxPublisher: .max(1)) { ... }, which makes flatMap ignore timer and notification signals while it's got a request outstanding. But it would perhaps be even better for it to start a new request on each signal, and cancel the prior request. Change flatMap to map followed by switchToLatest for that behavior:

func makeStatusPublisher2() -> AnyPublisher<Result<StatusResponse, Error>, Never> {
let timer = Timer
.publish(every: 30, tolerance: 10, on: .main, in: .common)
.autoconnect()
.map { _ in true } // This is the correct way to merge with the notification publisher.

let notes = NotificationCenter.default
.publisher(for: UIApplication.willEnterForegroundNotification)
.map { _ in true }

return timer.merge(with: notes)
.map({ _ in
statusResponsePublisher()
.map { Result<StatusResponse, Error>.success($0) }
.catch { Just(Result<StatusResponse, Error>.failure($0)) }
})
.switchToLatest()
.eraseToAnyPublisher()
}

Error is not triggering on url session using combine

It seems you have a misunderstanding regarding when a URLSession.DataTaskPublisher should fail with an error. A data task only fails with an error in case there is a network error (such as no internet connection, SSL error, etc).

Inputting an incorrect username or password is not a network error and hence will not result in the data task throwing an error. Depending on your backend implementation, it might result in an error status code (not in the 200..<300 range) and an error response in the body of the request.

To check the status code of the HTTPURLResponse and throw an error in case it's incorrect, you can use tryMap on the dataTaskPublisher.

Here's how you can define convenience methods on URLSession.DataTaskPublisher that handle the status code of the HTTPURLResponse and throw an error in case it's incorrect.

enum NetworkingError: Error {
case decoding(DecodingError)
case incorrectStatusCode(Int)
case network(URLError)
case nonHTTPResponse
case unknown(Error)
}

extension Publisher {
func mapErrorToNetworkingError() -> AnyPublisher<Output, NetworkingError> {
mapError { error -> NetworkingError in
switch error {
case let decodingError as DecodingError:
return .decoding(decodingError)
case let networkingError as NetworkingError:
return networkingError
case let urlError as URLError:
return .network(urlError)
default:
return .unknown(error)
}
}
.eraseToAnyPublisher()
}
}

extension URLSession.DataTaskPublisher {
func emptyBodyResponsePublisher() -> AnyPublisher<Void, NetworkingError> {
httpResponseValidator()
.map { _ in Void() }
.eraseToAnyPublisher()
}
}

extension URLSession.DataTaskPublisher {
func httpResponseValidator() -> AnyPublisher<Output, NetworkingError> {
tryMap { data, response in
guard let httpResponse = response as? HTTPURLResponse else { throw NetworkingError.nonHTTPResponse }
let statusCode = httpResponse.statusCode
guard (200..<300).contains(statusCode) else { throw NetworkingError.incorrectStatusCode(statusCode) }
return (data, httpResponse)
}
.mapErrorToNetworkingError()
}

func httpResponseValidatorDataPublisher() -> AnyPublisher<Data, NetworkingError> {
httpResponseValidator()
.map(\.data)
.eraseToAnyPublisher()
}

func jsonDecodingPublisher<T:Decodable>(type: T.Type) -> AnyPublisher<T, NetworkingError> {
httpResponseValidatorDataPublisher()
.decode(type: T.self, decoder: JSONDecoder())
.mapErrorToNetworkingError()
}
}

And then you can simplify your performOperation function as below and it will throw an error in case the status code of the response is not in the expected range.

private func performOperation<T: Decodable>(requestUrl: URLRequest, responseType: T.Type) -> AnyPublisher<T, NetworkingError> {
URLSession.shared.dataTaskPublisher(for: requestUrl)
.jsonDecodingPublisher(type: T.self)
}

How to have a publisher emit only to the last subscriber in Combine

You are right. Unfortunately, there is no way to list/track subscribers of a publisher. To solve your problem, you have to implement a custom publisher. There are two possibilities here. Either you implement a custom publisher with the Publisher protocol, but Apple advises against this (see here), or you create a custom publisher with already existing types, as Apple recommends. I have prepared an example for the second option.

The logic is very simple. We create a publisher with a PassthroughSubject inside (it can also be a CurrentValueSubject). Then we implement the methods typical of a PassthroughSubject and use them to overwrite the same methods of the PassthroughSubject, which is inside our class. In the sink method we store all returning subscriptions BUT before we add a new subscription to the Set, we go through all the already cached subscriptions and cancel them. This way we achieve the goal that only the last subscription works.


// The subscriptions will be cached in the publisher.
// To avoid strong references, I use the WeakBox recommendation from the Swift forum.
struct WeakBox<T: AnyObject & Hashable>: Hashable {
weak var item: T?

func hash(into hasher: inout Hasher) {
hasher.combine(item)
}
}

class MyPublisher<T, E: Error> {
private let subject = PassthroughSubject<T, E>()
private var subscriptions = Set<WeakBox<AnyCancellable>>()

deinit {
subscriptions.removeAll()
}

public func send(_ input: T) {
subject.send(input)
}

public func send(completion: Subscribers.Completion<E>) {
subject.send(completion: completion)
}

public func sink(receiveCompletion receivedCompletion: @escaping (Subscribers.Completion<E>) -> Void, receiveValue receivedValue: @escaping (T) -> Void) -> AnyCancellable {
let subscription = subject
.sink(receiveCompletion: { completion in
receivedCompletion(completion)
}, receiveValue: { value in
receivedValue(value)
})

// Cancel previous subscriptions.
subscriptions.forEach { $0.item?.cancel() }

// Add new subscription.
subscriptions.insert(WeakBox(item: subscription))

return subscription
}
}

I tested the class in Playground as follows.

let publisher = MyPublisher<Int, Never>()

let firstSubscription = publisher
.sink(receiveCompletion: { completion in
print("1st subscription completion \(completion)")
}, receiveValue: { value in
print("1st subscription value \(value)")
})

let secondSubscription = publisher
.sink(receiveCompletion: { completion in
print("2st subscription completion \(completion)")
}, receiveValue: { value in
print("2st subscription value \(value)")
})

let thirdSubscription = publisher
.sink(receiveCompletion: { completion in
print("3st subscription completion \(completion)")
}, receiveValue: { value in
print("3st subscription value \(value)")
})

publisher.send(123)

Console output:

3st subscription value 123

If you comment out the line subscriptions.forEach { $0.cancel() }, then you get:

3st subscription value 123
1st subscription value 123
2st subscription value 123

Hopefully I could help you.



Related Topics



Leave a reply



Submit