In Project Reactor, we can create a chain of operators. If we use subscribe
on the chain, the next code outside chain will be executed without waiting the operations in chain to be finished. If you want to wait the chain to finish first, you can use block
method instead of subscribe
. This tutorial shows you how to use block
and its variations on both Mono
and Flux
, including how to set timeout.
In Reactor, in order for data to flow through a chain, there must be a subscription process. You can use subscribe
to make it happens. But besides subscribe
you can also use block
, which also does the subscription. However, it blocks the code outside chain.
To give you a better understanding, look at the exaample below
Mono.delay(Duration.ofSeconds(2))
.then(Mono.just("text"))
.block();
System.out.println("After chain")
If you run the code, "After chain" will be printed after the chain completed which is after 2 seconds. Below are some examples how to use block on Mono
and Flux
.
Using block
on Mono
Using .block()
This method is used to subscribe to the Mono
and wait until the next signal is received. It returns the value or null
if the Mono
returns empty.
String result = Mono.just("text")
.block(); // text
String resultEmpty = Mono.empty()
.block(); // null
You can also set a Duration
to set how long it should wait. If it's still waiting after the wait duration, it will throw IllegalStateException
.
String result = Mono.delay(Duration.ofSeconds(2))
.then(Mono.just("text"))
.block(Duration.ofSeconds(1));
The code above throws:
java.lang.IllegalStateException: Timeout on blocking read for 1000 MILLISECONDS
.
Using .blockOptional()
It's similar to the previous method, but it wraps the value inside java.util.Optional
instead. So, you need to use .isPresent()
to check whether the value is null
or not and use .get()
to get the value.
Optional result = Mono.just("text")
.blockOptional();
System.out.println(result.isPresent()); // true
System.out.println(result.get()); // text
Optional emptyResult = Mono.empty()
.blockOptional();
System.out.println(emptyResult.isPresent()); // false
System.out.println(emptyResult.get()); // NoSuchElementException
Setting duration is also possible for blockOptional
. It willl also throw java.lang.IllegalStateException
if maximum waiting time period reached and no signal received.
Mono.delay(Duration.ofSeconds(2))
.blockOptional(Duration.ofSeconds(1));
Using block
on Flux
Using .blockFirst()
This method is used to subscribe to the Flux
and wait until the first value from upstream received.
Flux.just("one", "two", "three")
.delayElements(Duration.ofSeconds(2))
.blockFirst();
System.out.println("After chain");
Right after the first element emitted, at that moment you should be able see "After chain" printed.
Optionally, you can set waiting duration before it throws IllegalStateException
Flux.just("one", "two", "three")
.delayElements(Duration.ofSeconds(2))
.blockFirst(Duration.ofSeconds(1));
System.out.println("After chain");
Using .blockLast()
This method is used to subscribe to the Flux
and wait until the last value from upstream received.
Flux.just("one", "two", "three")
.delayElements(Duration.ofSeconds(2))
.blockLast();
System.out.println("After chain");
You have to wait for 6 seconds (3 x 2 seconds) to see "After chain" printed.
Like blockFirst
, you can also set waiting duration for blockLast
.
Flux.just("one", "two", "three")
.delayElements(Duration.ofSeconds(2))
.blockLast(Duration.ofSeconds(3));