In Project Reactor, we can add delay for a certain duration to Mono
or Flux
publisher. This tutorial shows you how to do so.
The important thing you need to aware if you use a Reactor's delay method is some of them switches the execution to a parallel scheduler by default. It implicitly calls .publishOn(Schedulers.parallel())
which means the operators below the delay operator are affected. If you don't already understand about publishOn
, you can read about it here.
Let's take a look at the example below
Mono.just("one")
.delayElement(Duration.ofSeconds(2))
.log()
.block();
If you run the code, you will find that onNext
signal is received on a parallel thread. That also happens even if you add publishOn
or subscribeOn
before the delay operator. Adding publishOn
after the delay operator allows you to switch the execution to another scheduler you want. However, most delay operators allow us to pass a second argument to set the scheduler, replacing the default setting which uses parallel scheduler.
Scheduler scheduler = Schedulers.newElastic("es1");
Mono.just("one")
.delayElement(Duration.ofSeconds(2), scheduler)
.log()
.block();
Delay on Mono
delay
It is used to delay onNext
signal by the given Duration
.
Variations:
(Duration length)
(Duration length, Scheduler timer)
Example:
Mono.just("one")
.delay(Duration.ofSeconds(2))
.log()
.subscribe();
delayElement
It is used to delay the Mono
element by the given Duration
. Delay will not be applied on empty Mono
or error signal.
Variations:
(Duration length)
(Duration length, Scheduler timer)
Example:
Mono.just("one")
.delayElement(Duration.ofSeconds(2))
.log()
.subscribe();
delaySubscription
It is used to add subscription delay to the Mono
by the given Duration
.
Variations:
(Duration length)
(Duration length, Scheduler timer)
(Publisher<U> subscriptionDelay)
Example:
Mono.just("one")
.delaySubscription(Duration.ofSeconds(2))
.log()
.subscribe();
The usage of the last variation is somewhat different to the other two variations. It requires a Publisher
in the parameter. It delays the subscription until the passed Publisher
sends a signal or complete.
Mono.just("one")
.delaySubscription(Mono.delay(Duration.ofSeconds(5)))
.log()
.subscribe();
delayUntil
It is used to delay the Mono
by generating a trigger Publisher
, then wait until the trigger Publisher
terminates. If an error occurs, it will be propagated downstream.
Variations:
(Function<? super T, ? extends Publisher<?>> triggerProvider)
Example:
Mono.just("one").delayUntil(a -> {
Mono.delay(Duration.ofSeconds(5)).block();
return Mono.just("two");
}).log().subscribe();
Delay on Flux
delayElements
It is used to add delay between element emission by the given Duration
. It affects the frequency how many elements are emitted per second. Delay will not be applied on empty Flux
or error signal.
Variations:
(Duration length)
(Duration length, Scheduler timer)
Example:
Flux.just("one", "two", "three")
.delayElements(Duration.ofSeconds(2))
.log()
.subscribe();
delaySequence
It shifts the Flux
forward by the given Duration
. It doesn't affect the gap between elements which means it doesn't affect the emission frequency. Delay will not be applied on empty Flux
or error signal.
Variations:
(Duration length)
(Duration length, Scheduler timer)
Example:
Flux.just("one", "two", "three")
.delaySequence(Duration.ofSeconds(2))
.log()
.subscribe();
delaySubscription
It's used to add subscription delay to the Flux
by the given Duration
.
Variations:
(Duration length)
(Duration length, Scheduler timer)
(Publisher<U> subscriptionDelay)
Example:
Flux.just("one", "two", "three")
.delaySubscription(Duration.ofSeconds(2))
.log()
.subscribe();
The last variation allows us to pass a Publisher
. It delays the subscription until the passed Publisher
sends a signal or complete.
Flux.just("one", "two", "three")
.delaySubscription(Flux.just(1,2,3).delayElements(Duration.ofSeconds(5)))
.log()
.subscribe();
delayUntil
It is used to delay the Flux
by generating a trigger Publisher
for each element, then wait until all trigger Publishers
terminates. If an error occurs, it will be propagated downstream.
Variations:
(Function<? super T, ? extends Publisher<?>> triggerProvider)
Example:
Flux.just("one", "two", "three").delayUntil(a -> {
Mono.delay(Duration.ofSeconds(5)).block();
return Mono.just("two");
}).log().subscribe();