How to Handle Errors in Spring-Webflux

How to handle errors in Reactive Spring webflux

Sharing my knowledge:

In total there are six methods provided to handle error, five discussed below:

  • onErrorReturn: return fallback value for entire stream (mono/flux). E.g. if there’s a flux of 10 elements, and error happens on element 3, then rest 4,5,6… won’t be executed, instead the fallback value will be considered.

  • onErrorResume: return fallback value in terms on Mono/Flux for entire stream (mono/flux). E.g. if there’s a flux of 10 elements, and error happens on element 3, then rest 4,5,6… won’t be executed, instead the fallback value will be considered.

  • onErrorContinue: consumes (error,data) and does NOT split it over. It considers the consumer for the error elements, and leave the downstream chain as it for good elements. E.g. if there’s a flux of 10 elements, and error happens on element 3, then all elements (1 to 10) except 3 will have normal execution, but element 3 will have a different execution as mentioned in the consumer of onErrorContinue

  • doOnError: consumes error and spills it over. Stops execution for further elements in stream.

  • onErrorMap: cast one error into another. Stops execution for further elements in stream.

All these five methods come in 3 variants,

  • Simple: consider directly the expected argument
  • With Exception: consider the expected argument, if the exception matches the exception class provided
  • With Predicate: consider the expected argument, if the predicate yields true

Examples:

  1. onErrorReturn: return fallback value
@Test
public void onErrorReturnDirectly_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorReturn(4)
.subscribe(num -> log.info("Number: {}", num ));
}

@Test
public void onErrorReturnIfArithmeticException_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorReturn(ArithmeticException.class, 4)
.subscribe(num -> log.info("Number: {}", num ));
}

@Test
public void onErrorReturnIfPredicatePasses_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorReturn(error -> error instanceof ArithmeticException, 4)
.subscribe(num -> log.info("Number: {}", num ));
}

  1. onErrorResume: return fallback value in terms on Mono/Flux
@Test
public void onErrorResume_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorResume(error -> Mono.just(4))
.subscribe(num -> log.info("Number: {}", num ));
}

@Test
public void onErrorResumeIfArithmeticException_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorResume(
ArithmeticException.class,
error -> Mono.just(4)
)
.subscribe(num -> log.info("Number: {}", num ));
}

@Test
public void onErrorResumeIfPredicatePasses_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorResume(
error -> error instanceof ArithmeticException,
error -> Mono.just(4)
)
.subscribe(num -> log.info("Number: {}", num ));
}

  1. onErrorContinue: consumes (error,data) and does NOT split it over.
@Test
public void onErrorContinue_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorContinue((error, obj) -> log.info("error:[{}], obj:[{}]", error, obj ))
.subscribe(num -> log.info("Number: {}", num ));
}

@Test
public void onErrorContinueIfArithmeticException_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorContinue(
ArithmeticException.class,
(error, obj) -> log.info("error:[{}], obj:[{}]", error, obj )
)
.subscribe(num -> log.info("Number: {}", num ));
}

@Test
public void onErrorContinueIfPredicatePasses_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorContinue(
error -> error instanceof ArithmeticException,
(error, obj) -> log.info("error:[{}], obj:[{}]", error, obj )
)
.subscribe(num -> log.info("Number: {}", num ));
}

  1. doOnError: consumes error and spills it over
@Test
public void doOnError_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.doOnError(error -> log.info("caught error"))
.subscribe(num -> log.info("Number: {}", num ));
}

@Test
public void doOnErrorIfArithmeticException_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.doOnError(
ArithmeticException.class,
error -> log.info("caught error")
)
.subscribe(num -> log.info("Number: {}", num ));
}

@Test
public void doOnErrorIfPredicatePasses_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.doOnError(
error -> error instanceof ArithmeticException,
error -> log.info("caught error")
)
.subscribe(num -> log.info("Number: {}", num ));
}

  1. onErrorMap: cast one error into another
@Test
public void OnErrorMap_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorMap(error -> new RuntimeException("SomeMathException"))
.subscribe(num -> log.info("Number: {}", num ));
}

@Test
public void OnErrorMapIfArithmeticException_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorMap(
ArithmeticException.class,
error -> new RuntimeException("SomeMathException")
)
.subscribe(num -> log.info("Number: {}", num ));
}

@Test
public void OnErrorMapIfPredicatePasses_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorMap(
error -> error instanceof ArithmeticException,
error -> new RuntimeException("SomeMathException")
)
.subscribe(num -> log.info("Number: {}", num ));
}

Exception Handling in Spring webflux

I thought when throw is used to throw an exception before the reactive
pipeline is assembled, it will not use onErrorResume()

Mono::doOnSuccess triggers at execution time when the Mono completes successfully(pipeline is already assembled).

Note that inside intermediate operators like doOnNext or map you are free to throw exceptions as Reactor can transform them into proper error signals since at that point a Mono is already in progress.

how can I achieve this if I'm using Mono.error(new
RuntimeException("Name is short")) instead of throw new
RuntimeException("Name is short")?

You can replace doOnSuccess and map with the handle operator:

 return Mono.just("Tim")
.handle((name, sink) -> {
if(name.length() < 5){
sink.error(new RuntimeException("Name is short"));
} else {
sink.next(name.toLowerCase());
}
})

How to globally handle errors thrown from WebFilter in Spring WebFlux?

Three steps are required to get full control over all exceptions thrown from application endpoints handling code:

  1. Implement org.springframework.boot.web.reactive.error.ErrorWebExceptionHandler
  2. Annotate with @ControllerAdvice (or just @Component)
  3. Set @Priority less than 1 to let the custom handler run before the default one (WebFluxResponseStatusExceptionHandler)

The tricky part is where we get an instance implementing
ServerResponse.Context for passing to
ServerResponse.writeTo(exchange, context). I did not find the final
answer, and comments are welcome. In the internal Spring code they always create a new instance of context for each writeTo invocation,
although in all cases (I've manged to find) the context instance is immutable.
That is why I ended up using the same ResponseContextInstance for all responses.
At the moment no problems detected with this approach.



@ControllerAdvice
@Priority(0) /* should go before WebFluxResponseStatusExceptionHandler */
class CustomWebExceptionHandler : ErrorWebExceptionHandler {

private val log = logger(CustomWebExceptionHandler::class)

override fun handle(exchange: ServerWebExchange, ex: Throwable): Mono<Void> {
log.error("handled ${ex.javaClass.simpleName}", ex)

val sr = when (ex) {
is FirstException -> handleFirst(ex)
is SecondException -> handleSecond(ex)
else -> defaultException(ex)
}

return sr.flatMap { it.writeTo(exchange, ResponseContextInstance) }.then()
}

private fun handleFirst(ex: FirstException): Mono<ServerResponse> {
return ServerResponse
.status(HttpStatus.INTERNAL_SERVER_ERROR)
.bodyValue("first")
}

private fun handleSecond(ex: SecondException): Mono<ServerResponse> {
return ServerResponse.status(HttpStatus.BAD_REQUEST).bodyValue("second")
}

private object ResponseContextInstance : ServerResponse.Context {

val strategies: HandlerStrategies = HandlerStrategies.withDefaults()

override fun messageWriters(): List<HttpMessageWriter<*>> {
return strategies.messageWriters()
}

override fun viewResolvers(): List<ViewResolver> {
return strategies.viewResolvers()
}
}
}

what is the right way to handle errors in spring-webflux

Spring 5 provides a WebHandler, and in the JavaDoc, there's the line:

Use HttpWebHandlerAdapter to adapt a WebHandler to an HttpHandler. The WebHttpHandlerBuilder provides a convenient way to do that while also optionally configuring one or more filters and/or exception handlers.

Currently, the official documentation suggests that we should wrap the router function into an HttpHandler before booting up any server:

HttpHandler httpHandler = RouterFunctions.toHttpHandler(routerFunction);

With the help of WebHttpHandlerBuilder, we can configure custom exception handlers:

HttpHandler httpHandler = WebHttpHandlerBuilder.webHandler(toHttpHandler(routerFunction))
.prependExceptionHandler((serverWebExchange, exception) -> {

/* custom handling goes here */
return null;

}).build();

Handling errors from Spring WebClient in another method

Well, there are many ways to handle errors, it really depends on what you want to do in case of an error.

In your current setup, the solution is straightforward: first, NotificationException should extend RuntimeException, thus, in case of an HTTP error, .block() will throw a NotificationException. It is a good practice to add it in the signature of the method, accompanied with a Javadoc entry.

In another method, you just need to catch the exception and do what you want with it.

/**
* @param notification
* @throws NotificationException in case of a HTTP error
*/
public void sendNotification(String notification) throws NotificationException {
final WebClient webClient = WebClient.builder()
.defaultHeader(CONTENT_TYPE, APPLICATION_JSON_VALUE)
.build();
webClient.post()
.uri("http://localhost:9000/api")
.body(BodyInserters.fromValue(notification))
.retrieve()
.onStatus(HttpStatus::isError, clientResponse -> Mono.error(NotificationException::new))
.toBodilessEntity()
.block();
log.info("Notification delivered successfully");
}

public void someOtherMethod() {
try {
sendNotification("test");
} catch (NotificationException e) {
// Treat exception
}
}

In a more reactive style, you could return a Mono and use onErrorResume().

public Mono<Void> sendNotification(String notification) {
final WebClient webClient = WebClient.builder()
.defaultHeader(CONTENT_TYPE, APPLICATION_JSON_VALUE)
.build();
return webClient.post()
.uri("http://localhost:9000/api")
.body(BodyInserters.fromValue(notification))
.retrieve()
.onStatus(HttpStatus::isError, clientResponse -> Mono.error(NotificationException::new))
.bodyToMono(Void.class);
}

public void someOtherMethod() {
sendNotification("test")
.onErrorResume(NotificationException.class, ex -> {
log.error(ex.getMessage());
return Mono.empty();
})
.doOnSuccess(unused -> log.info("Notification delivered successfully"))
.block();
}

Example of error handling calling Restful services with WebFlux

Correct way of handling this is via .onErrorResume that allows you to subscribe to a fallback publisher using a function, when any error occurs. You can look at the generated exception and return a custom fallback response.

You can do something like this:

Mono<ResponseObject> mono = webClient.post()
.uri(url.toString())
.header("Authorization", authToken)
.bodyValue(contract)
.exchangeToMono(response -> {
if (response.statusCode().equals(HttpStatus.OK)) {
return response.bodyToMono(ResponseObject.class);
}
else if (response.statusCode().is4xxClientError()) {
return response.bodyToMono(ResponseObject.class);
}
else {
Mono<WebClientResponseException> wcre = response.createException();
// examine wcre and create custom ResponseObject

ResponseObject customRO = new ResponseObject();
customRO.setSuccess(false);
customRO.setHttpStatus(response.rawStatusCode());
// you can set more default properties in response here
return Mono.just( customRO );
}
});

Moreover, you should not be using .block() anywhere in your Java code. Just make sure to return a Mono<ResponseObject> from your REST controller. If you want to examine response before returning to client you can do so in a .map() hander like this at the end of pipeline (right after .onErrorResume handler)

   .map(response -> {
// examine content of response

// in the end just return it
return response;
});


Related Topics



Leave a reply



Submit