In Project Reactor, after you create a Mono
or Flux
chain, nothing happens until it is being subscribed. This tutorial shows you how to use subscribe
method on Mono
and Flux
, including what parameters can be passed to the method.
By using subscribe
, it triggers data flow through the chain. The code after the chain can be executed immediately without waiting for the chain to be completed.
Flux.range(1, 10).delayElements(Duration.ofSeconds(1))
.subscribe();
System.out.println("Below chain");
If you run the code, "Below chain" will be printed immediately without waiting for the chain to be completed which should take at least 10 seconds. If you want to wait for the chain to be completed, you can use block
instead.
Below are the methods you can use
// 1
Disposable subscribe()
// 2
Disposable subscribe(Consumer<? super T> consumer)
// 3
Disposable subscribe(@Nullable Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer)
// 4
Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer)
// 5
Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable
Consumer<? super Subscription> subscriptionConsumer)
// 6
abstract void subscribe(CoreSubscriber<? super T> var1)
Those methods are applicable on both Mono
and Flux
. This tutorial only gives examples on Flux
as you can easily do the same on Mono
.
If you only want to make the subscription happens without processing the result, the simplest way is using the first method which takes no argument.
Flux.range(1, 5)
.subscribe();
If you need the process the result, you can pass a Consumer
(a lambda) as the first argument. It will be invoked on each value.
Flux.range(1, 5)
.subscribe(result -> System.out.println(result));
The Consumer
above will not be invoked when an error signal received. To handle error, you can pass a second Consumer
which takes Throwable
as the argument.
Flux.range(1, 5)
.map(i -> {
if (i < 5) return i;
throw new RuntimeException("Error");
})
.subscribe(
result -> System.out.println(result),
error -> System.err.println("Error: " + error)
);
After the Flux completes, it will throw a complete signal that can be handled by passing a third argument.
Flux.range(1, 5)
.subscribe(
result -> System.out.println(result),
error -> System.err.println("Error: " + error),
() -> System.out.println("Done")
);
You can also pass a fourth argument which is a Consumer
that will be invoked on subscribe signal.
Flux.range(1, 5)
.subscribe(
result -> System.out.println(result),
error -> System.err.println("Error: " + error),
() -> System.out.println("Done"),
sub -> sub.request(2) // Up to 2 elements from source
);
The example above only requests up to 2 elements from the source. That causes completed signal not generated and the lambda on the third argument will never be called.
Alternatively, you can pass a CoreSubscriber
instead. Inside, you can override its methods to control what to do when a signal received. You can override hookOnSubscribe
, hookOnNext
, hookOnComplete
, hookOnError
, and hookFinally
.
Flux.range(1, 5)
.subscribe(new BaseSubscriber() {
@Override
public void hookOnSubscribe(Subscription subscription) {
System.out.println("Subscribe");
request(5);
}
@Override
public void hookOnNext(Integer value) {
System.out.println("Value:" + value);
}
@Override
public void hookOnComplete() {
System.out.println("Complete");
}
@Override
public void hookOnError(Throwable throwable) {
System.out.println("Error: " + throwable);
}
@Override
public void hookFinally(SignalType signalType) {
System.out.println("SignalType: " + signalType);
}
});