This tutorial shows you how to use CacheMono
and CacheFlux
in Project Reactor, including how to use them with Caffeine.
If you're using Project Reactor, sometimes you may want to cache a Mono or a Flux. Fortunately, it already provides an opinionated caching helper called CacheMono
and CacheFlux
. They define a cache abstraction for storing and restoring a Mono
or a Flux
. This tutorial explains how to use those two and also how to integrate them with Caffeine
, a popular caching library.
Using CacheMono
Using CacheMono
basically consists of three steps. The first one is looking up the value from the source based on the given key. The second step is handling cache missing, which will be done if the first step results in a cache miss. The last step is writing the value to the source in case of cache miss.
There are different ways to use CacheMono
. One of which requires you to handle all the steps above (lookup value, handle cache misses, write value to the cache) manually. Alternatively, you can also provide a Map
and let Project Reactor handle the lookup and write value to the cache – you only need to handle cache misses.
Manually Handle Lookup and Write
Let's start with the first way in which you need to handle lookup and write data to the cache manually. First, you need to handle how to retrieve the value from the cache using the below lookup
method.
public static <KEY, VALUE> MonoCacheBuilderCacheMiss<KEY, VALUE> lookup(Function<KEY, Mono<Signal<? extends VALUE>>> reader, KEY key);
It requires you to pass a Function
as the first argument. The passed Function
needs to accept a key as the parameter and returns a Mono
. So, you need to pass a function which is responsible for retrieving a value based on the given key. You can use any source to store the value of each key. For example, you can use Reactor's Context
, Map
, Tree
, or any data structure. For the second argument, you need to pass the key of the value to be retrieved.
Using CacheMono
also requires you to handle cases when the given key cannot be found in the cache, usually known as cache miss. The return type of the first lookup method is MonoCacheBuilderCacheMiss
. You can use one of its onCacheMissResume
methods to handle cache misses.
MonoCacheBuilderCacheWriter<KEY, VALUE> onCacheMissResume(Supplier<Mono<VALUE>> otherSupplier);
MonoCacheBuilderCacheWriter<KEY, VALUE> onCacheMissResume(Mono<VALUE> other);
There are two onCacheMissResume
variants. The first one requires you to pass a Supplier
that returns a Mono
. It can be used if you don't need to pass any argument for generating the value. However, if you need to pass an argument (usually the key) for generating the value, you should use the variant that accepts a Mono
as the argument.
Another thing you need to handle is writing the data to the cache. The MonoCacheBuilderCacheWriter
class has a method called andWriteWith
. You need to call it and pass a BiFunction
that writes the value returned by onCacheMissResume
to the cache. The andWriteWith
method is only called on cache misses.
Mono<VALUE> andWriteWith(BiFunction<KEY, Signal<? extends VALUE>, Mono<Void>> writer)
With the above explanation, below is an example of how to use CacheMono
using the above lookup
method. In the example below, we are going to store the values in a Map<String, String>
.
final Map<String, String> mapStringCache = new HashMap<>();
Below are the methods that can be passed as onCacheMissResume
argument. The first one (with 0 parmeter) can be passed as a Supplier
, while the second one can be passed as a Mono
.
private Mono<String> handleCacheMiss() {
System.out.println("Cache miss!");
return Mono.just(ZonedDateTime.now().toString());
}
private Mono<String> handleCacheMiss(String key) {
System.out.println("Cache miss!");
return Mono.just(key + ": " + Instant.now().toString());
}
The code below is a complete chain that uses the lookup
, onCacheMissResume
, and andWriteWith
methods.
final Mono<String> cachedMono1 = CacheMono
.lookup(
k -> Mono.justOrEmpty(mapStringCache.get(key)).map(Signal::next),
key
)
// .onCacheMissResume(this::handleCacheMiss) // Uncomment this if you want to pass a Supplier
.onCacheMissResume(this.handleCacheMiss(key))
.andWriteWith((k, sig) -> Mono.fromRunnable(() ->
mapStringCache.put(k, Objects.requireNonNull(sig.get()))
));
Provide a Map
Another way to use CacheMono
is by providing a Map
. Using this way requires you to use one of the lookup
methods that accepts a Map
.
public static <KEY, VALUE> MonoCacheBuilderMapMiss<VALUE> lookup(Map<KEY, ? super Signal<? extends VALUE>> cacheMap, KEY key);
public static <KEY, VALUE> MonoCacheBuilderMapMiss<VALUE> lookup(Map<KEY, ? super Signal<? extends VALUE>> cacheMap, KEY key, Class<VALUE> valueClass);
The first method requires you to pass a Map
as the first argument and a key as the second argument. The key type of the Map
must be compatible with the type of the passed key. The Map
's value type must be a Signal<T>
. If you want to use the first method, you have to be able to provide the cache data representation in a Map<KEY, ? super Signal<? extends VALUE>>
. That means you can store any type of Signal
including next
, complete
, and error
.
You also need to handle cache misses. The MonoCacheBuilderMapMiss
also has onCacheMissResume
methods, as shown below. One of which accepts a Supplier
and the other accepts a Mono
. Below are the methods.
Mono<VALUE> onCacheMissResume(Supplier<Mono<VALUE>> otherSupplier)
Mono<VALUE> onCacheMissResume(Mono<VALUE> other)
If a given key doesn't exist in the Map
, the onCacheMissResume
method will be invoked. Unlike the previous lookup
method (which doesn't use Map
), the returned value will be stored to the Map
cache automatically. Therefore, it doesn't provide andWriteWith
method and you don't need to manually save the value to the cache.
Let's start with the example of using the first lookup method (the one with two parameters). First, you need to provide a Map
whose value type is Signal<? extends String>
.
final Map<String, Signal<? extends String>> mapStringSignalCache = new HashMap<>();
Below is the usage example.
final Mono<String> cachedMono2 = CacheMono
.lookup(mapStringSignalCache, key)
// .onCacheMissResume(this::handleCacheMiss) // Uncomment this if you want to pass a Supplier
.onCacheMissResume(this.handleCacheMiss(key));
The second method (the one with three parameters) is similar to the first one. The difference is it accepts a third argument whose type is Class<VALUE>
that indicates the generic class of the resulting Mono
. You can use this method if you want to cast the cached signal value to a given type (must be a subtype of the signal value type).
For the second lookup
method, we are going to create another Map
whose type value is Signal<?>
.
final Map<String, Signal<? extends Object>> mapObjectSignalCache = new HashMap<>();
Below is the usage example which passes String.class
as the third argument of the lookup
method. As a result the value is casted to String
. Be careful as it may throw ClassCastException
if the value cannot be casted to the given class.
final Mono<String> cachedMono3 = CacheMono
.lookup(mapObjectSignalCache, key, String.class)
// .onCacheMissResume(this::handleCacheMiss) // Uncomment this if you want to pass a Supplier
.onCacheMissResume(this.handleCacheMiss(key));
Using CacheFlux
The usage of CacheFlux
is similar to CacheMono
. You need to handle how to data lookup from cache, handle cache misses, and write data to the cache. The main difference is you need to work with a list of values or Signal
s. With CacheFlux
, you can also choose whether to manually handle the lookup and write process or provide a Map
, depending on the used lookup
method.
Manually Handle Lookup and Write
The first way is to manually handle lookup and write values to the cache, using the below lookup
method.
public static <KEY, VALUE> FluxCacheBuilderCacheMiss<KEY, VALUE> lookup(
Function<KEY, Mono<List<Signal<VALUE>>>> reader,
KEY key
)
You need to pass a function that accepts a key as the argument. Inside the passed function, you need to obtain the values from the cache based on the given key and return it as a Mono
of List
of Signal
(Mono<List<Signal<VALUE>>>
). If the key doesn't exist in the cache, you have to return an empty Mono
.
Next, you have to handle how to generate values on cache misses. The FluxCacheBuilderCacheMiss
class has two methods named onCacheMissResume
. You can choose to pass a Flux
or a Supplier
. The former should be used if you need to generate the value based on the given key.
FluxCacheBuilderCacheWriter<KEY, VALUE> onCacheMissResume(Supplier<Flux<VALUE>> otherSupplier);
FluxCacheBuilderCacheWriter<KEY, VALUE> onCacheMissResume(Flux<VALUE> other);
Lastly, you need to handle how to store the data generated by <onCacheMissResume
method by using FluxCacheBuilderCacheMiss
's onCacheMissResume
method.
Flux<VALUE> andWriteWith(BiFunction<KEY, List<Signal<VALUE>>, Mono<Void>> writer);
Let's start with the example. This time, we have a Map
whose key type is Integer
and value type is List<Integer>
.
final Map<Integer, List<Integer>> mapIntCache = new HashMap<>();
Below are the methods that can be passed as onCacheMissResume argument. The first one (with 0 parmeter) can be passed as a Supplier, while the other can be passed as a Mono.
private Flux<Integer> handleCacheMiss() {
System.out.println("Cache miss!");
final List<Integer> values = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
values.add(i);
}
return Flux.fromIterable(values);
}
private Flux<Integer> handleCacheMiss(Integer key) {
System.out.println("Cache miss!");
final List<Integer> values = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
values.add(i * key);
}
return Flux.fromIterable(values);
}
Below is a complete chain that uses the lookup
, onCacheMissResume
, and andWriteWith
methods.
final Flux<Integer> cachedFlux1 = CacheFlux
.lookup(
k -> {
if (mapIntCache.get(k) != null) {
Mono<List<Signal<Integer>>> res = Flux.fromIterable(mapIntCache.get(k))
.map(Signal::next)
.collectList();
return res;
} else {
return Mono.empty();
}
},
key
)
.onCacheMissResume(this::handleCacheMiss) // Uncomment this if you want to pass a Supplier
// .onCacheMissResume(() -> Flux.defer(() -> this.handleCacheMiss(key)))
.andWriteWith((k, sig) -> Mono.fromRunnable(() ->
mapCache.put(
k,
sig.stream()
.filter(signal -> signal.getType() == SignalType.ON_NEXT)
.map(Signal::get)
.collect(Collectors.toList())
)
));
Provide a Map
Another way to use CacheFlux
is by passing a Map
. CacheFlux
has another static lookup
method that allows you to pass a Map
.
public static <KEY, VALUE> FluxCacheBuilderMapMiss<VALUE> lookup(
Map<KEY, ? super List> cacheMap,
KEY key,
Class<VALUE> valueClass
)
The Map
has to be passed as the first argument. The key type of the Map
must be compatible with the type of the key passed as the second argument. The Map
's value type must be a List
or another type that extends a List
. Unfortunately, you cannot define a generic type for the List
. However, it doesn't mean you can pass any value as the element of the List
. The List
can only contain Project Reactor's Signal
. For the third argument, you have to pass a Class
which is used to cast each Flux
element.
Then, you need to handle cache misses by using one of the onCacheMissResume
methods. One of the methods requires you to pass a Supplier
, while the other requires you to pass a Flux
. You should use the latter if you need to generate the values based on the given key. When a cache miss occurs, the onCacehMissResume
method will be called and the resulting values will be stored to the Map
automatically.
Flux<VALUE> onCacheMissResume(Supplier<Flux<VALUE>> otherSupplier);
Flux<VALUE> onCacheMissResume(Flux<VALUE> other);
To use CacheFlux
by providing a Map
, you need to have a Map
whose value type is List
.
final Map<Integer, List> mapCache = new HashMap<>();
Below is the usage example.
final Flux<Integer> cachedFlux2 = CacheFlux
.lookup(
mapCache,
key,
Integer.class
)
.onCacheMissResume(this::handleCacheMiss);
// .onCacheMissResume(Flux.defer(() -> this.handleCacheMiss(key)));
return cachedFlux2
.doOnNext(res -> System.out.println("Value is " + res));
Using CacheMono
and CacheFlux
with Caffeine
If you need more advanced features for caching such as expiration time, most likely you'll use a cache library that provides the features you need. The CacheMono
and CacheFlux
can be used with any caching library, such as Caffeine. This tutorial doesn't explain how to use Caffeine
in detail, as it will be very long to explain it. We only focus on how to use it with CacheMono
and CacheFlux
. First of all, we need to create a Caffeine cache. Caffeine has some classes for creating caches, such as Cache
, AsyncCache
, and LoadingCache
. In this example, we are going to use the Cache
class.
final Cache<String, String> caffeineCache = Caffeine.newBuilder()
.expireAfterWrite(Duration.ofSeconds(30))
.recordStats()
.build();
As I have explained above, to use CacheMono
, you can provide a Map
whose value type is Reactor's Signal
. If not possible, you need to pass a Function
for looking up the value and handle write data to the cache manually. Caffeine has a method for converting the cache into a Map
, but the value type is not a Signal
, which means it's not compatible with the lookup methods that accept a Map
parameter. Therefore, you need to handle the lookup by passing a Function
and store the generated values to the Caffeine cache.
final Mono<String> cachedMonoCaffeine = CacheMono
.lookup(
k -> Mono.justOrEmpty(caffeineCache.getIfPresent(k)).map(Signal::next),
key
)
// .onCacheMissResume(this::handleCacheMiss) // Uncomment this if you want to pass a Supplier
.onCacheMissResume(this.handleCacheMiss(key))
.andWriteWith((k, sig) -> Mono.fromRunnable(() ->
caffeineCache.put(k, Objects.requireNonNull(sig.get()))
));
The usage for CaceFlux
is also similar. Below is another Caffeine cache whose value type is List<Integer>
.
final Cache<Integer, List<Integer>> caffeineCache = Caffeine.newBuilder()
.expireAfterWrite(Duration.ofSeconds(30))
.recordStats()
.build();
To use Caffeine cache with CacheFlux
, you need to use the lookup
method that accepts a Function
as the first parameter. The passed Function
is responsible to get the value from the cache or return an empty Mono
if the key is not present. You also need to store the values to the Caffeine cache.
final Flux<Integer> cachedFluxCaffeine = CacheFlux
.lookup(
k -> {
final List<Integer> cached = caffeineCache.getIfPresent(k);
if (cached == null) {
return Mono.empty();
}
return Mono.just(cached)
.flatMapMany(Flux::fromIterable)
.map(Signal::next)
.collectList();
},
key
)
// .onCacheMissResume(this::handleCacheMiss) // Uncomment this if you want to pass a Supplier
.onCacheMissResume(this.handleCacheMiss(key))
.andWriteWith((k, sig) -> Mono.fromRunnable(() ->
caffeineCache.put(
k,
sig.stream()
.filter(signal -> signal.getType() == SignalType.ON_NEXT)
.map(Signal::get)
.collect(Collectors.toList())
)
));
Summary
That's how to use CacheMono
and CacheFlux
in Project Reactor. Basically, there are two options: handle lookup and store values manually or provide a compatible Map
whose value type is a Signal
(for CacheMono
) or List<Signal>
(for CacheFlux
). The latter option is preferred if possible because it's simpler. CacheMono
and CacheFlux
can also be used with any caching library such as Caffeine. The full code of this tutorial is available on GitHub.