Correct Way of Throwing Exceptions with Reactor

Correct way of throwing exceptions with Reactor

There are a couple of ways that could be considered as a convenient way of exception throwing:

Handle your element using Flux/Mono.handle

One of the way that could simplify handling of an element which may result in an error or empty stream is operator handle.

The following code shows how we can use it in order to solve our problem:

Mono.just(userId)
.map(repo::findById)
.handle((user, sink) -> {
if(!isValid(user)){
sink.error(new InvalidUserException());
} else if (isSendable(user))
sink.next(user);
}
else {
//just ignore element
}
})

as we can see, the .handle operator requires to pass BiConsumer<T, SynchronousSink<> in order to handle an element. Here we have two parameters in our BiConsumer. The first one is an element from the upstream where the second one is SynchronousSink which helps us to supply element to downstream synchronously. Such a technique expands an ability to supply different results of our element's processing. For example, in case the element is invalid, we can supply error to the same SycnchronousSync which will cancel upstream and produce onError signal to downstream. In turn, we can "filter" using the same handle operator. Once the handle BiConsumer is executed and no element has been supplied, Reactor will consider that as kind of filtering and will request for an additional element for us. Finally, in case the element is valid, we can simply call SynchronousSink#next and propagate our element downstream or apply some mapping on it, so we will have handle as the map operator here. Moreover, we can safely use that operator with no-performance impact and provide complex element verification such as validation of element or error sending to downstream.

Throws using #concatMap + Mono.error

One of the options to throw an exception during mapping is to replace map with concatMap. In its essence, concatMap does almost the same flatMap does. The only difference is that concatMap allows only one substream at a time. Such behavior simplifies internal implementation a lot and does not impact performance. So we can use the following code in order to throw an exception in a more functional way:

Mono.just(userId)
.map(repo::findById)
.concatMap(user-> {
if(!isValid(user)){
return Mono.error(new InvalidUserException());
}
return Mono.just(user);
})

In the sample above in case of an invalid user, we return exception using Mono.error. The same we can do for flux using Flux.error:

Flux.just(userId1, userId2, userId3)
.map(repo::findById)
.concatMap(user-> {
if(!isValid(user)){
return Flux.error(new InvalidUserException());
}
return Mono.just(user);
})

Note, in both cases we return cold stream which has only one element. In Reactor, there is a couple of optimizations that improve performance in the case returned stream is a cold scalar stream. Thus, it is recommended to use Flux/Mono concatMap + .just, empty, error as a result when we need more complex mapping, that could end up with return null or throw new ....

Attention! Don't ever check incoming element on nullability. The Reactor Project will never send a null value for you since this violates Reactive Streams spec (see Rule 2.13) Thus, in case if repo.findById returns null, Reactor will throw NullPointerException for you.

Wait, Why concatMap is better than flatMap?

In its essence, flatMap is designed to merge elements from the multiple substreams that is executing at a time. It means that flatMap should have asynchronous streams underneath so, they could potentially process data on the multiple threads or that could be a several network calls. Subsequently, such expectations impact implementation a lot so flatMap should be able to handle data from the multiple streams (Threads) (means usage of concurrent data structures), enqueue elements if there is a draining from another stream (means additional memory allocation for Queues for each substream) and do not violate Reactive Streams specification rules (means really complex implementation). Counting all these facts and the fact that we replace a plain map operation (which is synchronous) onto the more convenient way of throwing an exception using Flux/Mono.error (which does not change synchronicity of execution) leads to the fact that we do not need such a complex operator and we can use much simpler concatMap which is designed for asynchronous handling of a single stream at a time and has a couple of optimization in order to handle scalar, cold stream.

Throws exception using switchIfEmpty

So, another approach to throw an exception when the result is empty is switchIfEmpty operator. The following code demonstrates how we can use that approach :

Mono.just(userId)
.flatMap(repo::findById)
.switchIfEmpty(Mono.error(new UserNotFoundExeception()))

As we can see, in this case repo::findById should have Mono of User as the return type. Therefore, in case a User instance will not be found, the result stream will be empty. Thus, Reactor will call an alternative Mono, specified as switchIfEmpty parameter.

Throw your exception as is (e.g. in your map, filter and other similar operators)

It could be counted as a less readable code or bad practice (my own opinion), but you can throw your exception as is (e.g. .map(v -> throw ...)) with Project Reactor. Even though, in someway doing so can violate Reactive Streams specification (in this context violate from the semantic perspective, because your operator under the hood is a Subscriber in a chain of Subscribers, therefore - semantically, throwing an exception in lambda could be mapped to throwing an exception in the onNext method which violates the spec's rule 2.13). However, since Reactor will catch the thrown exception for you and propagate it then as the onError signal to your downstream, it is not prohibited to do that.

Takeaways

  1. Use .handle operator in order to provide complex element processing
  2. Use concatMap+ Mono.error when we need to throw an exception during mapping but such a technique is most suitable for cases of asynchronous element processing.
  3. Use flatMap + Mono.error when we have already had flatMap in place
  4. Null as a return type is forbidden so instead of null in your downstream map you will get unexpected onError with NullPointerException
  5. Use switchIfEmpty in all cases when you need to send an error signal if the result of calling some specific function finished with the empty stream

Spring Reactor: How to throw an exception when publisher emit a value?

To propagate an exception when a publisher emits a value, you can use one of several operators that operate on emitted values.

Some examples:

fluxOrMono.flatMap(next -> Mono.error(new IllegalArgumentException()))
fluxOrMono.map(next -> { throw new IllegalArgumentException(); })
fluxOrMono.doOnNext(next -> { throw new IllegalArgumentException(); })
fluxOrMono.handle((next, sink) -> sink.error(new IllegalArgumentException()))

Throwing an exception vs Mono.error() in Spring webflux

As @Joao already stated, the recommended way to deal with an error is to call the error method on a Publisher(Mono.error/Flux.error).

I would like to show you an example in which the traditional throw does not work as you may expect:

public void testErrorHandling() {
Flux.just("a", "b", "c")
.flatMap(e -> performAction()
.onErrorResume(t -> {
System.out.println("Error occurred");
return Mono.empty();
}))
.subscribe();
}

Mono<Void> performAction() {
throw new RuntimeException();
}

The onErrorResume operator will never be executed because the exception is thrown before Mono is assembled.

spring rector throw Exception if NOT empty

Try to use Hooks.onOperatorDebug() hook to get better debugging experience.

Correct way to use hasElement (assuming that find_SomePojo never returns null)

Mono<Boolean> monoPresent =  find_SomePojo(accountId, contentIdExtn)
.filter(i -> i.getId() != null)
.hasElement();

return monoPresent.flatMap(isPresent -> {
if(isPresent){
Mono.error(new SomeException(ITEM_ALREADY_EXISTS)));
}else{
SomePojoRepo.save(reqPojo);
}
}).then();

Sidenote

There is a common misconception about what Mono actually is. It does not hold any data - it's just a fragment of pipeline, which transmits signals and data flowing through it. Therefore, line System.out.println("monoPresent="+monoPresent.toString()); makes no sense, because it just prints the hasElements() decorator around the existsing pipeline. Internal name of this decorator is MonoHasElement, no matter what is contained in it(true /false), MonoHasElement would be printed anyway.

Correct ways to print signal (and data transmitted along with them) are:
Mono.log(), Mono.doOnEach/next(System.out::println) or System.out.println("monoPresent="+monoPresent.block());. Beware of third one: it will block whole thread until data is emitted, so use it only if you know what you are doing.

Example with Monos printing to play with:

   Mono<String> abc = Mono.just("abc").delayElement(Duration.ofSeconds(99999999));

System.out.println(abc); //this will print MonoDelayElement instantly
System.out.println(abc.block()); //this will print 'abc', if you are patient enough ;^)
abc.subscribe(System.out::println); //this will also print 'abc' after 99999999 seconds, but without blocking current thread

Reactor proper way of throwing a common error from multiple reactive methods

Instead of relying a single Mono<Integer> errorMono = Mono.error(() -> new Exception("Empty"));, you can create a specific Mono.error to each function. Then you write the specific message on these Mono.error like the following:

private Mono<Integer> function1(Integer input) {
return Mono.justOrEmpty(input)
.doOnNext(i -> System.out.println("Function 1 " + i))
.switchIfEmpty(Mono.error(() -> new Exception("function 1")));
}

private Mono<Integer> function2(Integer input) {
return Mono.justOrEmpty(input)
.doOnNext(i -> System.out.println("Function 2 " + i))
.switchIfEmpty(Mono.error(() -> new Exception("function 2")));
}

private Mono<Integer> function3(Integer input) {
return Mono.justOrEmpty(input)
.doOnNext(i -> System.out.println("Function 3 " + i))
.switchIfEmpty(Mono.error(() -> new Exception("function 3")));
}

Then you will be able to see on the stack trace which function is throwing your error:

java.lang.Exception: function 1
at com.github.felipegutierrez.explore.advance.CustomExceptionTest.lambda$function1$2(CustomExceptionTest.java:30)
...
java.lang.Exception: function 2
at com.github.felipegutierrez.explore.advance.CustomExceptionTest.lambda$function2$4(CustomExceptionTest.java:36)
...
java.lang.Exception: function 3
at com.github.felipegutierrez.explore.advance.CustomExceptionTest.lambda$function3$6(CustomExceptionTest.java:42)

How to throw an exception properly when do Flux processing?

You are attempting to map from a List<TestData> to either a List<Integer> or a Flux<?> (error), which makes the desired result type ambiguous. Returning a reactive type in a mapping function is generally not desired (you'd want to do that in a flatmapping function).

(side note: even if you were in a flatMap, it wouldn't work either because at that point you're in Mono API due to collectList, so Mono.flatMap expects a Mono result to the Function).

Note that the map operator catches exceptions from the lambda and turn them into an onError signal, so technically you could replace the Flux.error with a throw.

Otherwise, you'd need to turn the map into a flatMap and the Flux.error into a Mono.error, for the reasons stated above.

Exception thrown in a flatMap are ignored by onErrorResume operator

A method creating/returning a Mono should not throw exception in such way. Since the exception is thrown before Mono is assembled (created), the subsequent operators inside the flatMap can't possibly take effect since they need a Mono to operate on.

If you have no control over the processEvent() method to fix its behaviour then you can wrap it with a Mono.defer which will ensure that even the errors raised during the assembly period will be propagated through the Mono inside the flatMap:

Flux.range(0, 5)
.doOnNext(event -> log.info("Processing - {}", event))
.flatMap(event -> Mono.defer(() -> processEvent(event))
.doOnSuccess(result -> log.info("Processed - {}", event))
.onErrorResume(t -> handleError(t, event)))
.doOnError(t -> log.error("Exception propagated", t))

private Mono<Void> processEvent(Object object) {
throw new RuntimeException("test");
}

Note that inside other intermediate operators like map or doOnNext you are free to throw exception in the ugly way as Reactor can transform them into proper error signals since at that point a Mono is already in progress.



Related Topics



Leave a reply



Submit