Swift. Combine. How to Call a Publisher Block More Than Once When Retry

Swift. Combine. Is there any way to call a publisher block more than once when retry?

I managed to achieve the expected behaviour by utilising tryCatch() function and making another request: Link

The link contains two ways to achieve the same behaviour including Deferred {} mentioned above.

Swift combine retry only for some error types

Basically, you need a retryIf operator, so you can provide a closure to tell Combine which errors should be retried, and which not. I'm not aware of such an operator, but it's not hard to build one for yourself.

The idiomatic way is to extend the Publishers namespace with a new type for your operator, and then extend Publisher to add support for that operator so that yo can chain it along with other operators.

The implementation could look like this:

extension Publishers {
struct RetryIf<P: Publisher>: Publisher {
typealias Output = P.Output
typealias Failure = P.Failure

let publisher: P
let times: Int
let condition: (P.Failure) -> Bool

func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
guard times > 0 else { return publisher.receive(subscriber: subscriber) }

publisher.catch { (error: P.Failure) -> AnyPublisher<Output, Failure> in
if condition(error) {
return RetryIf(publisher: publisher, times: times - 1, condition: condition).eraseToAnyPublisher()
} else {
return Fail(error: error).eraseToAnyPublisher()
}
}.receive(subscriber: subscriber)
}
}
}

extension Publisher {
func retry(times: Int, if condition: @escaping (Failure) -> Bool) -> Publishers.RetryIf<Self> {
Publishers.RetryIf(publisher: self, times: times, condition: condition)
}
}

Usage:

func createRequest(for message: Message) -> AnyPublisher<ResponseMessage, Error> {
Deferred {
Future<ResponseMessage, Error> { promise in
// future code
}
}
.retry(times: 3) { error in
if case let MessageBusError.messageError(responseError) = error, responseError.isRecoverable {
return true
}
return false
}
.eraseToAnyPublisher()
}

Note that I wrapped your Future within a Deferred one, otherwise the retry operator would be meaningless, as the closure will not be executed multiple times. More details about that behaviour here: Swift. Combine. Is there any way to call a publisher block more than once when retry?.


Alternatively, you can write the Publisher extension like this:

extension Publisher {
func retry(_ times: Int, if condition: @escaping (Failure) -> Bool) -> Publishers.RetryIf<Self> {
Publishers.RetryIf(publisher: self, times: times, condition: condition)
}

func retry(_ times: Int, unless condition: @escaping (Failure) -> Bool) -> Publishers.RetryIf<Self> {
retry(times, if: { !condition($0) })
}
}

, which enables some funky stuff, like this:

extension Error {
var isRecoverable: Bool { ... }
var isUnrecoverable: Bool { ... }
}

// retry at most 3 times while receiving recoverable errors
// bail out the first time when encountering an error that is
// not recoverable
somePublisher
.retry(3, if: \.isRecoverable)

// retry at most 3 times, bail out the first time when
// an unrecoverable the error is encountered
somePublisher
.retry(3, unless: \.isUnrecoverable)

Or even funkier, ruby-style:

extension Int {
var times: Int { self }
}

somePublisher
.retry(3.times, unless: \.isUnrecoverable)

Is there a way to call assign() more than once in Swift Combine?

UPDATE

If your deployment target is a 2020 system (like iOS 14+ or macOS 11+), you can use a different version of the assign operator to avoid retain cycles and to avoid storing cancellables:

init() {
external.XPublisher.assign(to: $PublishedX)
external.YPublisher.assign(to: $PublishedY)
external.XPublisher
.combineLatest(external.YPublisher) { $0 * $1 }
.assign(to: $PublishedProduct)
}

ORIGINAL

There is no built-in variant of assign(to:on:) that returns another Publisher instead of a Cancellable.

Just use multiple assigns:

class ProductViewModel: ObservableObject {
@Published var PublishedX: Int = 0
@Published var PublishedY: Int = 0
@Published var PublishedProduct: Int = 0

init() {
external.XPublisher
.assign(to: \.PublishedX, on: self)
.store(in: &tickets)
internal.YPublisher
.assign(to: \.PublishedY, on: self)
.store(in: &tickets)
external.XPublisher
.combineLatest(internal.YPublisher) { $0 * $1 }
.assign(to: \.PublishedProduct, on: self)
.store(in: &tickets)
}

private var tickets: [AnyCancellable] = []
}

Note that these subscriptions create retain cycles. Swift will not be able to destroy an instance of ProductViewModel until the tickets array is cleared. (This is not a property of my suggestion. Your original code also needs to store its subscription somewhere, else it will be cancelled immediately.)

Also, the existence of PublishedProduct is questionable. Why not just a computed property?

var product: Int { PublishedX * PublishedY }

Combine framework retry after delay?

It was a topic of conversation on the Using Combine project repo a while back - the whole thread: https://github.com/heckj/swiftui-notes/issues/164.

The long and short was we made an example that I think does what you want, although it does use catch:

let resultPublisher = upstreamPublisher.catch { error -> AnyPublisher<String, Error> in
return Publishers.Delay(upstream: upstreamPublisher,
interval: 3,
tolerance: 1,
scheduler: DispatchQueue.global())
// moving retry into this block reduces the number of duplicate requests
// In effect, there's the original request, and the `retry(2)` here will operate
// two additional retries on the otherwise one-shot publisher that is initiated with
// the `Publishers.Delay()` just above. Just starting this publisher with delay makes
// an additional request, so the total number of requests ends up being 4 (assuming all
// fail). However, no delay is introduced in this sequence if the original request
// is successful.
.retry(2)
.eraseToAnyPublisher()
}

This is referencing the a retry pattern I have in the book/online, which is basically what you describe (but wasn't what you asked about).

The person I was corresponding with on the issue provided a variant in that thread as an extension that might be interesting as well:

extension Publisher {
func retryWithDelay<T, E>()
-> Publishers.Catch<Self, AnyPublisher<T, E>> where T == Self.Output, E == Self.Failure
{
return self.catch { error -> AnyPublisher<T, E> in
return Publishers.Delay(
upstream: self,
interval: 3,
tolerance: 1,
scheduler: DispatchQueue.global()).retry(2).eraseToAnyPublisher()
}
}
}

Combine handle different type of publishers

So you have a network request, which in case of a successful request, returns a 200 response and an empty body, while in case of a form error, it returns a specific status code and an error in the response.

I would suggest keeping the Output type of your Publisher as Void, however, in case of a form error, decoding the error and throwing it as part of your APIError.

struct LoginError: Decodable {
let usernameError: String
let emailError: String
}

enum APIError: Error {
case failureStatus(code: Int)
case login(LoginError)
case nonHttpResponse(description: String)
case network(Error)
}

func emptyResult() -> AnyPublisher<Void, APIError> {
return URLSession.shared.dataTaskPublisher(for: urlRequest)
.print("EMPTY RESULT")
.tryMap { data, response in
guard let httpResponse = response as? HTTPURLResponse else { throw APIError.nonHttpResponse(description: "Not http response") }
let statusCode = httpResponse.statusCode

guard (200..<300).contains(statusCode) else {
if statusCode == 442 {
let loginError = try JSONDecoder().decode(LoginError.self, from: data)
throw APIError.login(loginError)
} else {
throw APIError.failureStatus(code: statusCode)
}
}
return Void()
}.mapError { error in
switch error {
case let apiError as APIError:
return apiError
default:
return .network(error)
}
}
.eraseToAnyPublisher()
}

Then you can handle the specific error by switching over the error in sink:

API.registration(name: name, email: email, password: password, schoolID: selectedSchool?.id ?? 0)
.print("Registration")
.receive(on: DispatchQueue.main)
.sink(receiveCompletion: { (completion) in
switch completion {
case let .failure(error):
switch error {
case .login(let loginError):
print("Login failed, \(loginError.emailError), \(loginError.usernameError)")
default:
print(error)
}
case .finished: break
}
}, receiveValue: { value in
print(value)
})
.store(in: &disposables)

Swift Combine. What is a correct way to return from a block of a timeout handler?

Not sure if this is what you want, but the best way I found to handle timeout on publishers with no failure (Failure == Never) is to force a specific error type and handle timeout error in the completion.

enum SomeKindOfPublisherError: Error {
case timeout
}

publisher
.setFailureType(to: SomeKindOfPublisherError.self)
.timeout(1, scheduler: backgroundQueue, customError: { .timeout })
.sink(receiveCompletion: {
switch $0 {
case .failure(let error):
// error is SomeKindOfPublisherError.timeout if timeout error occurs
print("failure: \(error)")
case .finished:
print("finished")
}
}, receiveValue: { print($0) })

If think it's strange that the timeout operator doesn't changes the publisher failure type to the custom error on its own, but this is some kind of workaround I found.

Swift combine recursive retry

If you have the original send(isRetry:completionBlock:), you can use Future to convert it to a publisher:

func send() -> AnyPublisher<URLSession.HTTPResponse, Error> {
Future { [weak self] promise in
self?.send(isRetry: false) { result in promise(result) }
}
.eraseToAnyPublisher()
}


Alternatively, Combine already has a .retry operator, so the entire thing could be made purely in Combine:

URLSession.shared.dataTaskPublisher(for: request)
.tryMap { data, response in
let response = response as! HTTPURLResponse
if response.statusCode == 401 {
throw CommunicationError.counterOutOfSync
} else {
return (response: response, data: data)
}
}
.retry(1)
.eraseToAnyPublisher()

This will retry once whenever there's any error (not just 401) from upstream. You can play around more to only retry under some conditions (e.g. see this answer)

Why does .delay break this short piece of code using Combine framework in swift

This happens because you do not store Cancellable returned after subscription. As soon as Cancellable is deallocated the whole subscription is cancelled.
Without delay everything works because subscriber is called immediatly, right after subscription.

Add property to your view:

@State var cancellable: AnyCancellable?

And save Cancellable returned after subscription:

cancellable = initialSequence.publisher

However your code won't add delay between each color change. All colors are sent immediatly -> you add delay to each event -> after 1 sec all colors are sent to subscriber :) .

Is there a way to detect when a publisher has a new subscriber? | Swift, Combine

If you want full control over subscriptions, you can create a custom Publisher and Subscription.

Publisher's func receive<S: Subscriber>(subscriber: S) method is the one that gets called when the publisher receives a new subscriber.

If you simply want to make a network request when this happens, you just need to create a custom Publisher and return a Future that wraps the network request from this method.

In general, you should use Future for one-off async events, PassthroughSubject is not the ideal Publisher to use for network requests.



Related Topics



Leave a reply



Submit