Block()/Blockfirst()/Blocklast() Are Blocking Error When Calling Bodytomono After Exchange()

block()/blockFirst()/blockLast() are blocking error when calling bodyToMono AFTER exchange()

First, a few things that will help you understand the code snippet solving this use case.

  1. You should never call a blocking method within a method that returns a reactive type; you will block one of the few threads of your application and it is very bad for the application
  2. Anyway as of Reactor 3.2, blocking within a reactive pipeline throws an error
  3. Calling subscribe, as suggested in the comments, is not a good idea either. It is more or less like starting that job as a task in a separate thread. You'll get a callback when it's done (the subscribe methods can be given lambdas), but you're in fact decoupling your current pipeline with that task. In this case, the client HTTP response could be closed and resources cleaned before you get a chance to read the full response body to write it to a file
  4. If you don't want to buffer the whole response in memory, Spring provides DataBuffer (think ByteBuffer instances that can be pooled).
  5. You can call block if the method you're implementing is itself blocking (returning void for example), for example in a test case.

Here's a code snippet that you could use to do this:

Mono<Void> fileWritten = WebClient.create().post()
.uri(uriBuilder -> uriBuilder.path("/file/").build())
.exchange()
.flatMap(response -> {
if (MediaType.APPLICATION_JSON_UTF8.equals(response.headers().contentType().get())) {
Mono<NoPayloadResponseDto> dto = response.bodyToMono(NoPayloadResponseDto.class);
return createErrorFile(dto);
}
else {
Flux<DataBuffer> body = response.bodyToFlux(DataBuffer.class);
return createSpreadsheet(body);
}
});
// Once you get that Mono, you should give plug it into an existing
// reactive pipeline, or call block on it, depending on the situation

As you can see, we're not blocking anywhere and methods dealing with I/O are returning Mono<Void>, which is the reactive equivalent of a done(error) callback that signals when things are done and if an error happened.

Since I'm not sure what the createErrorFile method should do, I've provided a sample for createSpreadsheet that just writes the body bytes to a file. Note that since databuffers might be recycled/pooled, we need to release them once we're done.

private Mono<Void> createSpreadsheet(Flux<DataBuffer> body) {
try {
Path file = //...
WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
return DataBufferUtils.write(body, channel).map(DataBufferUtils::release).then();
} catch (IOException exc) {
return Mono.error(exc);
}
}

With this implementation, your application will hold a few DataBuffer instances in memory at a given time (the reactive operators are prefetching values for performance reasons) and will write bytes as they come in a reactive fashion.

How to get value out of mono

If you are using reactive programming, try to avoid blocking calls and use already existing features.

public Mono<String> getVal() {
return webClient.get()
.uri("/service")
.retrieve()
.bodyToMono(String.class)
.map(str -> {
// You have your string unwrapped here
return str;
});
}

block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-server-epoll-18

The reactive approach is used in non-blocking applications. You choose to use reactive because you don't want to block, and still you do it.

The solution is simple. You should never call block in a reactive application.

return movieMono.block();

From your createMovie you should return a Mono<Movie> and then keep chaining on. But since you have not provided your implementation of who is calling createMovie i can't help you more than this.

Getting exception while doing block() on Mono object I got back from ReactiveMongoRepository object

Blocking is bad, since it ties up a thread waiting for a response. It's very bad in a reactive framework which has few threads at its disposal, and is designed so that none of them should be unnecessarily blocked.

This is the very thing that reactive frameworks are designed to avoid, so in this case it simply stops you doing it:

block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4

Your new code, in contrast, works asynchronously. The thread isn't blocked, as nothing actually happens until the repository returns a value (and then the lambda that you passed to savedQuote.subscribe() is executed, printing out you result to the console.)

However, the new code still isn't optimal / normal from a reactive streams perspective, as you're doing all your logic in your subscribe method. The normal thing to do is to us a series of flatMap/map calls to transform the items in the stream, and use doOnNext() for side effects (such as printing out a value):

stockQuoteClient.getQuoteStream()
.log("quote-monitor-service")
.flatMap(quoteRepository::insert)
.doOnNext(result -> System.out.println("I saved a quote! Id :: " + result.getId())))
.subscribe();

If you're doing any serious amount of work with reactor / reactive streams, it would be worth reading up on them in general. They're very powerful for non-blocking work, but they do require a different way of thinking (and coding) than more "standard" Java.



Related Topics



Leave a reply



Submit