Reactor - Retry Execution with retryWhen Examples

Sometimes, you need to retry the execution of a method if it throws error. If you're using reactive programming like Project Reactor, you need a different way rather than using conventional for and while loops. In Project Reactor, you can use retry or retryWhen.

How retry works

First, you have to understand how retry and retryWhen work. Below is a simple method that returns Mono. It always returns error except for the 100th attempt.

  public class MyClass {
  
      static int x = 0;
  
      private Mono myMethod() {
          System.out.println("execute myMethod");
  
          x++;
  
          if (x == 100) {
              return Mono.just("Value");
          }
  
          return Mono.error(new Exception());
      }
  }

retry and retryWhen work by re-subscribing to the Mono if the Mono throws error and the retry conditions are fulfilled. However, it doesn't call the method again.

This is an example that doesn't work

  myMethod()
      .retryWhen( ... )
      .block();

If retry or retryWhen is applied on myMethod, it will re-subscribe to myMethod again. However, myMethod won't be executed again. So, how can we make myMethod to be executed at each retry. There are some ways to make it happens. First, you can wrap it inside Mono.defer

  Mono.defer(() -> myClass.myMethod())
      .retryWhen( ... )
      .block();

If the method returns Flux, use Flux.defer.

  Flux.defer(() -> myClass.myMethod())
      .retryWhen( ... )
      .block();

When Mono.defer is being re-subscribed, the lambda will be re-evaluated which means myMethod will be executed.

Another way is using Mono.fromSupplier, then flatten the result.

  Mono.fromSupplier(myClass::myMethod).flatMap(Function.identity())
      .retryWhen( ... )
      .block();

Using retry

Setting maximum retry

Here's the most basic usage of retry which only sets the maximum number of retries.

  Mono.defer(() -> myClass.myMethod())
      .retry(MAX_RETRY)
      .doOnSuccess(System.out::println).block();

Using Custom Retry Conditions

If you need to use custom logic, you can pass a Predicate instead. It will only retry if it returns true. For example, you can control what kind of errors should be retried and also how many times it should retry.

  Mono.defer(() -> myClass.myMethod())
      .retry(e -> {
          e.printStackTrace();
          return e.getClass().equals(PartnerUnavailableException.class) && x < MAX_RETRY;
      })
      .doOnSuccess(System.out::println).block();

Using retryWhen

Retry Any Error

  Mono.defer(myClass::myMethod)
      .retryWhen(Retry.any().retryMax(MAX_RETRY))
      .doOnSuccess(System.out::println).block();

The example above will retry for any error thrown by myMethod. To limit how many times it should retry, use retryMax. By default, if you don't specify retryMax, the default value is 9223372036854775807L which is practically no limit.

Retry for Specific Error

If you want to retry only for certain errors, you can use anyOf with the list of errors separated by comma.

  Mono.defer(myClass::myMethod)
          .retryWhen(Retry.anyOf(MyCustomException.class, MyAnotherCustomException.class).retryMax(MAX_RETRY))
          .doOnSuccess(System.out::println).block();

Alternatively, you can prevent retry if specific errors thrown by using allBut

  Mono.defer(myClass::myMethod)
          .retryWhen(Retry.allBut(MyCustomException.class, MyAnotherCustomException.class).retryMax(MAX_RETRY))
          .doOnSuccess(System.out::println).block();

Setting Backoff

You can set backoff to create some gap between each retry.

  Mono.defer(myClass::myMethod)
          .retryWhen(Retry.any().retryMax(MAX_RETRY).fixedBackoff(Duration.ofSeconds(1)))
          .doOnSuccess(System.out::println).block();

The example above sets a fixed backoff of 1 second. Not only fixedBackoff, there are also some other options

  • .fixedBackoff(Duration backoffInterval)
  • .noBackoff()
  • .randomBackoff(Duration firstBackoff, Duration maxBackoff)
  • .exponentialBackoff(Duration firstBackoff, Duration maxBackoff)
  • .exponentialBackoffWithJitter(Duration firstBackoff, Duration maxBackoff)

Using zipWith

  Mono.defer(() -> myClass.myMethod())
      .retryWhen(companion -> companion
              .doOnNext(System.out::println)
              .zipWith(Flux.range(1, MAX_RETRY), (error, index) -> {
                  System.out.println("Retry #" + index);
                  if (index < MAX_RETRY) {
                      return index;
                  }
  
                  throw Exceptions.propagate(error);
              })
              .flatMap(index -> Mono.delay(Duration.ofMillis(100)))
      )
      .doOnSuccess(System.out::println).block();

Using zipWith, you can track each retry and get the original error. In the example above, after the maximum retry reached, it throws the original error.

Another thing you can do is implementing exponential back-off by using the retry index returned from zipWith.

  Mono.defer(() -> myClass.myMethod())
      .retryWhen(companion -> companion
              .doOnNext(System.out::println)
              .zipWith(Flux.range(1, MAX_RETRY), (error, index) -> {
                  System.out.println("Retry #" + index);
                  if (index < MAX_RETRY) {
                      return index;
                  }
    
                  throw Exceptions.propagate(error);
              })
              .flatMap(index -> Mono.delay(Duration.ofMillis(index * 100)))
      )
      .doOnSuccess(System.out::println).block();