This tutorial gives you examples of how to use Project Reactor's ReplayProcessor
.
There can be multiple elements emitted by a Reactor's FluxProcessor
. What if there are already some emitted elements and you want late subscribers to get the previously emitted elements. It can be done using ReplayProcessor
. This tutorial shows you some different ways of how to create ReplayProcessor
with different behaviors.
Using cacheLast
Static method cacheLast
is used to create a ReplayProcessor
that only keeps the last element which will be replayed to late subscribers.
public static <T> ReplayProcessor<T> cacheLast()
Example:
ReplayProcessor<Integer> processor = ReplayProcessor.cacheLast();
Flux.range(1, 5)
.doOnNext(integer -> {
processor.onNext(integer);
})
.doOnComplete(() -> {
processor.subscribe(x -> System.out.println("A: " + x));
processor.subscribe(x -> System.out.println("B: " + x));
processor.subscribe(x -> System.out.println("C: " + x));
})
.subscribe();
Output:
A: 5
B: 5
C: 5
Using cacheLastOrDefault
Like cache
, but it will replay a default value if the publisher hasn't emit any element.
public static <T> ReplayProcessor<T> cacheLast()
Example:
ReplayProcessor<Integer> processor = ReplayProcessor.cacheLastOrDefault(-1);
processor.subscribe(x -> System.out.println("A: " + x));
processor.subscribe(x -> System.out.println("B: " + x));
processor.subscribe(x -> System.out.println("C: " + x));
Output:
A: -1
B: -1
C: -1
Using create
There are some static method named create
with different signatures. The one without parameter creates a new ReplayProcessor
that can replay an unbounded number of elements depending on SMALL_BUFFER_SIZE
value.
public static <E> ReplayProcessor<E> create()
Example:
ReplayProcessor<Integer> processor = ReplayProcessor.create();
Flux.range(1, 5)
.doOnNext(integer -> {
processor.onNext(integer);
})
.doOnComplete(() -> {
processor.subscribe(x -> System.out.println("A: " + x));
processor.subscribe(x -> System.out.println("B: " + x));
processor.subscribe(x -> System.out.println("C: " + x));
})
.subscribe();
Output:
A: 1
A: 2
A: 3
A: 4
A: 5
B: 1
B: 2
B: 3
B: 4
B: 5
C: 1
C: 2
C: 3
C: 4
C: 5
The second has a parameter named histroySize
. It creates a processor that replays up to historySize
elements.
public static <E> ReplayProcessor<E> create(int historySize)
Example:
ReplayProcessor<Integer> processor = ReplayProcessor.create(2);
Flux.range(1, 5)
.doOnNext(integer -> {
processor.onNext(integer);
})
.doOnComplete(() -> {
processor.subscribe(x -> System.out.println("A: " + x));
processor.subscribe(x -> System.out.println("B: " + x));
processor.subscribe(x -> System.out.println("C: " + x));
})
.subscribe();
Output:
A: 4
A: 5
B: 4
B: 5
C: 4
C: 5
The last one has two parameters histroySize
and unbounded
. The created processor replays up to historySize
elements if unbounded
is set to false
. If unbounded
is set to true
, it will store and replay unbounded number of elements.
public static <E> ReplayProcessor<E> create(int historySize, boolean unbounded)
Example with unbounded
set to false
:
ReplayProcessor<Integer> processor = ReplayProcessor.create(2, false);
Flux.range(1, 5)
.doOnNext(integer -> {
processor.onNext(integer);
})
.doOnComplete(() -> {
processor.subscribe(x -> System.out.println("A: " + x));
processor.subscribe(x -> System.out.println("B: " + x));
processor.subscribe(x -> System.out.println("C: " + x));
})
.subscribe();
Output:
A: 4
A: 5
B: 4
B: 5
C: 4
C: 5
Example with unbounded
set to true
:
ReplayProcessor<Integer> processor = ReplayProcessor.create(2, true);
Flux.range(1, 5)
.doOnNext(integer -> {
processor.onNext(integer);
})
.doOnComplete(() -> {
processor.subscribe(x -> System.out.println("A: " + x));
processor.subscribe(x -> System.out.println("B: " + x));
processor.subscribe(x -> System.out.println("C: " + x));
})
.subscribe();
Output:
A: 1
A: 2
A: 3
A: 4
A: 5
B: 1
B: 2
B: 3
B: 4
B: 5
C: 1
C: 2
C: 3
C: 4
C: 5
Using createTimeout
createTimeout
creates a processor that only keeps received elements for the given duration. It tags each element using timestamp provided by Schedulers.parallel
. If an element's age is older than the given duration value, it will be deleted.
public static <T> ReplayProcessor<T> createTimeout(Duration maxAge)
To make it obvious, the below example adds one second delay between emitted elements:
ReplayProcessor<Integer> processor = ReplayProcessor.createTimeout(Duration.ofSeconds(3));
Flux.range(1, 5)
.delayElements(Duration.ofSeconds(1))
.doOnNext(integer -> {
processor.onNext(integer);
})
.doOnComplete(() -> {
processor.subscribe(x -> System.out.println("A: " + x));
processor.subscribe(x -> System.out.println("B: " + x));
processor.subscribe(x -> System.out.println("C: " + x));
})
.blockLast();
Output:
A: 3
A: 4
A: 5
B: 3
B: 4
B: 5
C: 3
C: 4
C: 5
There is another variant of createTimeout
that allows you to pass a specific scheduler (instead of using Schedulers.parallel
).
public static <T> ReplayProcessor<T> createTimeout(Duration maxAge, Scheduler scheduler)
Here's an example of how to pass a specific scheduler. The output should be the same.
ReplayProcessor<Integer> processor = ReplayProcessor.createTimeout(Duration.ofSeconds(3), Schedulers.newElastic("myScheduler"));
Using createSizeAndTimeout
createSizeAndTimeout
applies two rules, maximum number of elements and maximum age, to determine which elements will be replayed to late subscribers. So, the subscribers can only keep up to certain number of elements where each element's age must be lower than the given maximum age. Like createTimeout
, it uses timestamp from Schedulers.parallel
.
public static <T> ReplayProcessor<T> createSizeAndTimeout(int size, Duration maxAge)
In the below example, there is one second delay between each element. Though the maxAge
is set to 3, there are only two elements received by the subscribers as the size
value is set to 2.
ReplayProcessor<Integer> processor = ReplayProcessor.createSizeAndTimeout(2, Duration.ofSeconds(3));
Flux.range(1, 5)
.delayElements(Duration.ofSeconds(1))
.doOnNext(integer -> {
processor.onNext(integer);
})
.doOnComplete(() -> {
processor.subscribe(x -> System.out.println("A: " + x));
processor.subscribe(x -> System.out.println("B: " + x));
processor.subscribe(x -> System.out.println("C: " + x));
})
.blockLast();
Output:
A: 4
A: 5
B: 4
B: 5
C: 4
C: 5
In the below example, the subscribers only receives the last three elements though the size
value is set to 4. That's because the maxAge
duration is only 3 seconds.
ReplayProcessor<Integer> processor = ReplayProcessor.createSizeAndTimeout(4, Duration.ofSeconds(3));
Flux.range(1, 5)
.delayElements(Duration.ofSeconds(1))
.doOnNext(integer -> {
processor.onNext(integer);
})
.doOnComplete(() -> {
processor.subscribe(x -> System.out.println("A: " + x));
processor.subscribe(x -> System.out.println("B: " + x));
processor.subscribe(x -> System.out.println("C: " + x));
})
.blockLast();
Output:
A: 3
A: 4
A: 5
B: 3
B: 4
B: 5
C: 3
C: 4
C: 5
It also has a variant that uses a specific scheduler for the timestamp.
public static <T> ReplayProcessor<T> createSizeAndTimeout(int size, Duration maxAge, Scheduler scheduler)
Here's an example how to use the variant with scheduler
parameter.
ReplayProcessor<Integer> processor = ReplayProcessor.createSizeAndTimeout(
4,
Duration.ofSeconds(3),
Schedulers.newElastic("myScheduler")
);