How to handle errors in Reactive Spring webflux
Sharing my knowledge:
In total there are six methods provided to handle error, five discussed below:
onErrorReturn: return fallback value for entire stream (mono/flux). E.g. if there’s a flux of 10 elements, and error happens on element 3, then rest 4,5,6… won’t be executed, instead the fallback value will be considered.
onErrorResume: return fallback value in terms on Mono/Flux for entire stream (mono/flux). E.g. if there’s a flux of 10 elements, and error happens on element 3, then rest 4,5,6… won’t be executed, instead the fallback value will be considered.
onErrorContinue: consumes (error,data) and does NOT split it over. It considers the consumer for the error elements, and leave the downstream chain as it for good elements. E.g. if there’s a flux of 10 elements, and error happens on element 3, then all elements (1 to 10) except 3 will have normal execution, but element 3 will have a different execution as mentioned in the consumer of onErrorContinue
doOnError: consumes error and spills it over. Stops execution for further elements in stream.
onErrorMap: cast one error into another. Stops execution for further elements in stream.
All these five methods come in 3 variants,
- Simple: consider directly the expected argument
- With Exception: consider the expected argument, if the exception matches the exception class provided
- With Predicate: consider the expected argument, if the predicate yields true
Examples:
- onErrorReturn: return fallback value
@Test
public void onErrorReturnDirectly_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorReturn(4)
.subscribe(num -> log.info("Number: {}", num ));
}
@Test
public void onErrorReturnIfArithmeticException_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorReturn(ArithmeticException.class, 4)
.subscribe(num -> log.info("Number: {}", num ));
}
@Test
public void onErrorReturnIfPredicatePasses_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorReturn(error -> error instanceof ArithmeticException, 4)
.subscribe(num -> log.info("Number: {}", num ));
}
- onErrorResume: return fallback value in terms on Mono/Flux
@Test
public void onErrorResume_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorResume(error -> Mono.just(4))
.subscribe(num -> log.info("Number: {}", num ));
}
@Test
public void onErrorResumeIfArithmeticException_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorResume(
ArithmeticException.class,
error -> Mono.just(4)
)
.subscribe(num -> log.info("Number: {}", num ));
}
@Test
public void onErrorResumeIfPredicatePasses_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorResume(
error -> error instanceof ArithmeticException,
error -> Mono.just(4)
)
.subscribe(num -> log.info("Number: {}", num ));
}
- onErrorContinue: consumes (error,data) and does NOT split it over.
@Test
public void onErrorContinue_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorContinue((error, obj) -> log.info("error:[{}], obj:[{}]", error, obj ))
.subscribe(num -> log.info("Number: {}", num ));
}
@Test
public void onErrorContinueIfArithmeticException_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorContinue(
ArithmeticException.class,
(error, obj) -> log.info("error:[{}], obj:[{}]", error, obj )
)
.subscribe(num -> log.info("Number: {}", num ));
}
@Test
public void onErrorContinueIfPredicatePasses_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorContinue(
error -> error instanceof ArithmeticException,
(error, obj) -> log.info("error:[{}], obj:[{}]", error, obj )
)
.subscribe(num -> log.info("Number: {}", num ));
}
- doOnError: consumes error and spills it over
@Test
public void doOnError_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.doOnError(error -> log.info("caught error"))
.subscribe(num -> log.info("Number: {}", num ));
}
@Test
public void doOnErrorIfArithmeticException_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.doOnError(
ArithmeticException.class,
error -> log.info("caught error")
)
.subscribe(num -> log.info("Number: {}", num ));
}
@Test
public void doOnErrorIfPredicatePasses_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.doOnError(
error -> error instanceof ArithmeticException,
error -> log.info("caught error")
)
.subscribe(num -> log.info("Number: {}", num ));
}
- onErrorMap: cast one error into another
@Test
public void OnErrorMap_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorMap(error -> new RuntimeException("SomeMathException"))
.subscribe(num -> log.info("Number: {}", num ));
}
@Test
public void OnErrorMapIfArithmeticException_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorMap(
ArithmeticException.class,
error -> new RuntimeException("SomeMathException")
)
.subscribe(num -> log.info("Number: {}", num ));
}
@Test
public void OnErrorMapIfPredicatePasses_Mono() {
Mono.just(2)
.map(i -> i/0) // will produce ArithmeticException
.onErrorMap(
error -> error instanceof ArithmeticException,
error -> new RuntimeException("SomeMathException")
)
.subscribe(num -> log.info("Number: {}", num ));
}
Exception Handling in Spring webflux
I thought when throw is used to throw an exception before the reactive
pipeline is assembled, it will not use onErrorResume()
Mono::doOnSuccess
triggers at execution time when the Mono
completes successfully(pipeline is already assembled).
Note that inside intermediate operators like doOnNext
or map
you are free to throw exceptions as Reactor can transform them into proper error signals since at that point a Mono
is already in progress.
how can I achieve this if I'm using Mono.error(new
RuntimeException("Name is short")) instead of throw new
RuntimeException("Name is short")?
You can replace doOnSuccess
and map
with the handle
operator:
return Mono.just("Tim")
.handle((name, sink) -> {
if(name.length() < 5){
sink.error(new RuntimeException("Name is short"));
} else {
sink.next(name.toLowerCase());
}
})
How to globally handle errors thrown from WebFilter in Spring WebFlux?
Three steps are required to get full control over all exceptions thrown from application endpoints handling code:
- Implement
org.springframework.boot.web.reactive.error.ErrorWebExceptionHandler
- Annotate with
@ControllerAdvice
(or just@Component
) - Set
@Priority
less than1
to let the custom handler run before the default one (WebFluxResponseStatusExceptionHandler
)
The tricky part is where we get an instance implementing
ServerResponse.Context
for passing toServerResponse.writeTo(exchange, context)
. I did not find the final
answer, and comments are welcome. In the internal Spring code they always create a new instance of context for eachwriteTo
invocation,
although in all cases (I've manged to find) the context instance is immutable.
That is why I ended up using the sameResponseContextInstance
for all responses.
At the moment no problems detected with this approach.
@ControllerAdvice
@Priority(0) /* should go before WebFluxResponseStatusExceptionHandler */
class CustomWebExceptionHandler : ErrorWebExceptionHandler {
private val log = logger(CustomWebExceptionHandler::class)
override fun handle(exchange: ServerWebExchange, ex: Throwable): Mono<Void> {
log.error("handled ${ex.javaClass.simpleName}", ex)
val sr = when (ex) {
is FirstException -> handleFirst(ex)
is SecondException -> handleSecond(ex)
else -> defaultException(ex)
}
return sr.flatMap { it.writeTo(exchange, ResponseContextInstance) }.then()
}
private fun handleFirst(ex: FirstException): Mono<ServerResponse> {
return ServerResponse
.status(HttpStatus.INTERNAL_SERVER_ERROR)
.bodyValue("first")
}
private fun handleSecond(ex: SecondException): Mono<ServerResponse> {
return ServerResponse.status(HttpStatus.BAD_REQUEST).bodyValue("second")
}
private object ResponseContextInstance : ServerResponse.Context {
val strategies: HandlerStrategies = HandlerStrategies.withDefaults()
override fun messageWriters(): List<HttpMessageWriter<*>> {
return strategies.messageWriters()
}
override fun viewResolvers(): List<ViewResolver> {
return strategies.viewResolvers()
}
}
}
what is the right way to handle errors in spring-webflux
Spring 5 provides a WebHandler, and in the JavaDoc, there's the line:
Use HttpWebHandlerAdapter to adapt a WebHandler to an HttpHandler. The WebHttpHandlerBuilder provides a convenient way to do that while also optionally configuring one or more filters and/or exception handlers.
Currently, the official documentation suggests that we should wrap the router function into an HttpHandler before booting up any server:
HttpHandler httpHandler = RouterFunctions.toHttpHandler(routerFunction);
With the help of WebHttpHandlerBuilder, we can configure custom exception handlers:
HttpHandler httpHandler = WebHttpHandlerBuilder.webHandler(toHttpHandler(routerFunction))
.prependExceptionHandler((serverWebExchange, exception) -> {
/* custom handling goes here */
return null;
}).build();
Handling errors from Spring WebClient in another method
Well, there are many ways to handle errors, it really depends on what you want to do in case of an error.
In your current setup, the solution is straightforward: first, NotificationException
should extend RuntimeException
, thus, in case of an HTTP error, .block()
will throw a NotificationException
. It is a good practice to add it in the signature of the method, accompanied with a Javadoc entry.
In another method, you just need to catch the exception and do what you want with it.
/**
* @param notification
* @throws NotificationException in case of a HTTP error
*/
public void sendNotification(String notification) throws NotificationException {
final WebClient webClient = WebClient.builder()
.defaultHeader(CONTENT_TYPE, APPLICATION_JSON_VALUE)
.build();
webClient.post()
.uri("http://localhost:9000/api")
.body(BodyInserters.fromValue(notification))
.retrieve()
.onStatus(HttpStatus::isError, clientResponse -> Mono.error(NotificationException::new))
.toBodilessEntity()
.block();
log.info("Notification delivered successfully");
}
public void someOtherMethod() {
try {
sendNotification("test");
} catch (NotificationException e) {
// Treat exception
}
}
In a more reactive style, you could return a Mono
and use onErrorResume()
.
public Mono<Void> sendNotification(String notification) {
final WebClient webClient = WebClient.builder()
.defaultHeader(CONTENT_TYPE, APPLICATION_JSON_VALUE)
.build();
return webClient.post()
.uri("http://localhost:9000/api")
.body(BodyInserters.fromValue(notification))
.retrieve()
.onStatus(HttpStatus::isError, clientResponse -> Mono.error(NotificationException::new))
.bodyToMono(Void.class);
}
public void someOtherMethod() {
sendNotification("test")
.onErrorResume(NotificationException.class, ex -> {
log.error(ex.getMessage());
return Mono.empty();
})
.doOnSuccess(unused -> log.info("Notification delivered successfully"))
.block();
}
Example of error handling calling Restful services with WebFlux
Correct way of handling this is via .onErrorResume
that allows you to subscribe to a fallback publisher using a function, when any error occurs. You can look at the generated exception and return a custom fallback response.
You can do something like this:
Mono<ResponseObject> mono = webClient.post()
.uri(url.toString())
.header("Authorization", authToken)
.bodyValue(contract)
.exchangeToMono(response -> {
if (response.statusCode().equals(HttpStatus.OK)) {
return response.bodyToMono(ResponseObject.class);
}
else if (response.statusCode().is4xxClientError()) {
return response.bodyToMono(ResponseObject.class);
}
else {
Mono<WebClientResponseException> wcre = response.createException();
// examine wcre and create custom ResponseObject
ResponseObject customRO = new ResponseObject();
customRO.setSuccess(false);
customRO.setHttpStatus(response.rawStatusCode());
// you can set more default properties in response here
return Mono.just( customRO );
}
});
Moreover, you should not be using .block()
anywhere in your Java code. Just make sure to return a Mono<ResponseObject>
from your REST controller. If you want to examine response before returning to client you can do so in a .map()
hander like this at the end of pipeline (right after .onErrorResume
handler)
.map(response -> {
// examine content of response
// in the end just return it
return response;
});
Related Topics
The Import Org.Junit Cannot Be Resolved
Time Complexity of Hashmap Methods
Delete Everything After Part of a String
Java to Jackson Json Serialization: Money Fields
In Junit 5, How to Run Code Before All Tests
How to Remove All Special Character in a String Except Dot and Comma
Forward Httpservletrequest to a Different Server
Java Xmlbeans Throws Nosuchmethod Error
How to Create Comma Separated String in Single Quotes from Arraylist of String in Java
How to Specify the Required Java Version in a Gradle Build
How to Solve Maven 2.6 Resource Plugin Dependency
Subscript and Superscript a String in Android
Setting Default Values to Null Fields When Mapping With Jackson
Intellij Compilation Error Zip End Header Not Found
How to Split Single Row into Multiple Rows in Spark Dataframe Using Java