When using Project Reactor, you may need to combine the result of more than one publishers, either Mono or Flux
. In this tutorial, I'm going to show you some built-in Reactor methods that can be used for it, including the differences between those methods and examples for each method.
Combining Flux
Publilshers
For this tutorial, we are going to create these Flux
publishers
Flux<Integer> numbers1 = Flux
.range(1, 3);
Flux<Integer> numbers2 = Flux
.range(4, 2);
Flux strings = Flux.fromIterable(new ArrayList(
Arrays.asList("Woolha", "dot", "com")
));
Using concat
concat
forwards elements emitted by the sources downstream. It subscribes to sources sequentially, waiting for a source to finish before continuing to the next source. The parameter is a varargs, so you can pass as many arguments as you want as long as you don't break Java method's 64K size limit.
Flux.concat(numbers1, numbers2)
.subscribe(System.out::println);
Output:
1
2
3
4
5
Using concatWith
concatWith
works like concat
, but it is an instance method and only accepts one argument.
numbers1.concatWith(numbers2)
.subscribe(System.out::println);
Output:
1
2
3
4
5
Using merge
merge
subscribes to sources eagerly. It combines them into an interleaved merged sequence. To make it easy to get the difference between concat
and merge
, add delay between element emissions.
Flux.merge(
numbers1.delayElements(Duration.ofMillis(150L)),
numbers2.delayElements(Duration.ofMillis(100L))
)
.subscribe(System.out::println);
Output:
4
1
5
2
3
Using mergeSequential
Like concat
, the result is an ordered sequence. However, it subscribes to sources eagerly.
Flux.mergeSequential(
numbers1.delayElements(Duration.ofMillis(150L)),
numbers2.delayElements(Duration.ofMillis(100L))
)
.subscribe(System.out::println);
Output:
1
2
3
4
5
Using mergeDelayError
mergeDelayError
is a variation of merge
. If an error occurs, the error will be delayed until all sources have been processed.
Flux.mergeDelayError(1,
numbers1.delayElements(Duration.ofMillis(150L)),
numbers2.delayElements(Duration.ofMillis(100L))
)
.subscribe(System.out::println);
Output:
4
1
5
2
3
Using mergeWith
Similar to merge
, but this is an instance method and only accepts one argument.
numbers1.delayElements(Duration.ofMillis(150L)).mergeWith(numbers2.delayElements(Duration.ofMillis(100L)))
.subscribe(System.out::println);
Output:
4
1
5
2
3
Using mergeOrderedWith
Similar to mergeWith
, but you can pass a comparator for sorting the result.
numbers1.delayElements(Duration.ofMillis(150L)).mergeOrderedWith(
numbers2.delayElements(Duration.ofMillis(100L)),
comparator
)
.subscribe(System.out::println);
Using zip
It zips multiple sources by waiting for each source to at least emit one element. It produces combinations from the source, where each combination must have result from all sources.
Flux.zip(numbers1, numbers2).subscribe(System.out::println);
Output:
[1,4]
[2,5]
Flux.zip(
numbers1,
numbers2,
(a, b) -> a + b
)
.subscribe(System.out::println);
Output:
5
7
The code above sums the output of the first and second sources. The first source has three elements, while the second has two elements. Therefore, it only produces two combinations as the third element from the first source doesn't have the pair from second source to make a complete combination.
If you need to combine more than two sources, look at the example below. The result is a Tuple
(either Tuple
, Tuple2
, ..., Tuple6
) depending on the number of publishers. You can use tuple.getTn()
(replace n with the index which starts from 1) to get the result of n-th publisher.
Flux.zip(
numbers1,
numbers2,
strings
)
.flatMap(tuple -> {
System.out.println(tuple.getT1());
System.out.println(tuple.getT2());
System.out.println(tuple.getT3());
System.out.println("-----------");
return Mono.empty();
})
.subscribe();
Output:
For combining two sources, you can pass a BiFunction
as the third argument.
1
4
Woolha
-----------
2
5
dot
-----------
The above way only supports up to six sources. To combine more than six sources, you can pass an Iterable
instead.
ArrayList<Flux<Integer>> publishers = new ArrayList<>();
publishers.add(numbers1);
publishers.add(numbers2);
publishers.add(numbers1);
Flux.zip(
publishers,
arr -> {
System.out.println(arr[0]);
System.out.println(arr[1]);
System.out.println(arr[2]);
System.out.println("-----------");
return Mono.empty();
}
).subscribe();
Output:
1
4
1
-----------
2
5
2
-----------
Using zipWith
Similar to zip
, but this is an instance method and only supports one source to be zipped with.
numbers1.zipWith(numbers2, (a, b) -> a * b)
.subscribe(System.out::println);
Output:
5
7
Using combineLatest
combineLatest
is used to combine with the latest data from sources. If there are two sources, you can use a BiFunction
like below.
Flux.combineLatest(
numbers2,
numbers1,
(a, b) -> a + b
)
.subscribe(System.out::println);
Output:
6
7
8
First, it waits until the first source emits all values. Then the last element from the first source is combined with the elements from the second source.
If there are more than two sources, you need to pass a Function
that accepts one parameter of type Array
.
Flux.combineLatest(
numbers1,
numbers2,
numbers1,
(arr) -> (int) arr[0] + (int) arr[1] + (int) arr[2]
)
.subscribe(System.out::println);
Output:
9
10
11
It uses the latest value from the first and second sources, then using those values to be combined with each value from the third source.
The above way supports up to six sources. If you need more than that, you can pass an Iterablle
instead.
ArrayList<Flux<Integer>> publishers = new ArrayList<>();
publishers.add(numbers1);
publishers.add(numbers2);
Flux.combineLatest(
publishers,
(arr) -> (int) arr[0] + (int) arr[1]
)
Output:
7
8
Combining Mono
Publishers
Mono<String> firstMono = Mono.just("one");
Mono<String> secondMono = Mono.just("two");
Mono<Integer> thirdMono = Mono.just(3);
Using concatWith
You can concat a Mono
with another Mono
using concatWith
instance method. The result is a Flux
.
firstMono.concatWith(secondMono)
.subscribe(System.out::println);
Output:
one
two
Using mergeWith
This instance method is used to combine two Mono
publishers and the elements may be interleaved. The result is a Flux
.
firstMono.mergeWith(secondMono)
.subscribe(System.out::println);
Output:
one
two
To see the difference between mergeWith
and concatWith
, you can add delay to the Mono
.
firstMono.delayElement(Duration.ofSeconds(5)).mergeWith(secondMono)
.subscribe(System.out::println);
Output:
two
one
Using zip
It is used to aggregate sources by waiting all Mono publishers to have emitted the resullt. If you only pass two sources, you can use a BiFunction
to combine the result.
Mono.zip(firstMono, secondMono, (a, b) -> a + b)
.subscribe();
Output:
onetwo
Below is an example how to combine more than two sources. The result is a Tuple
(either Tuple
, Tuple2
, ..., Tuple6
) depending on the number of publishers. You can use tuple.getTn()
(replace n with the index which starts from 1) to get the result of n-th publisher.
Mono.zip(
firstMono,
secondMono,
thirdMono
)
.flatMap(tuple -> {
System.out.println(tuple.getT1());
System.out.println(tuple.getT2());
System.out.println(tuple.getT3());
return Mono.empty();
})
.subscribe();
Output:
one
two
3
You can have up to six sources with the above way. If you need to combine more sources, you can pass an Iterable
instead.
ArrayList<Mono<String>> publishers = new ArrayList<>();
publishers.add(firstMono);
publishers.add(secondMono);
Mono.zip(
publishers,
arr -> {
System.out.println(arr[0]);
System.out.println(arr[1]);
return Mono.empty();
}
).subscribe();
Output:
one
two
Using zipDelayError
Similar to zip
, but if an error occurs it will be delayed until all sources have been processed.
The usage is also similar, except it doesn't support combining two sources with BiFunction
as the third parameter (the first Mono.zip
example).