In Project Reactor, we can use publishOn
and subscribeOn
to control on which scheduler operators in a chain should be executed. This tutorial gives you explanations and some examples for showing difference between publishOn
and subscribeOn
.
Reactor is concurrency-agnostic. Rather than enforcing a concurrency model, developers can control how the code should be executed in threads. We can create different kinds of schedulers in Reactor.
For this tutorial, to make it easy to know where the executions take place, we are going to use two schedulers with different names:
Scheduler schedulerA = Schedulers.newParallel("scheduler-a", 4);
Scheduler schedulerB = Schedulers.newParallel("scheduler-b", 4);
publishOn
Like other operators in general, publishOn
is applied in the middle of a chain. It affects subsequent operators after publishOn
- they will be executed on a thread picked from publishOn
's scheduler.
Flux.range(1, 2)
.map(i -> {
System.out.println(String.format("First map - (%s), Thread: %s", i, Thread.currentThread().getName()));
return i;
})
.publishOn(schedulerA)
.map(i -> {
System.out.println(String.format("Second map - (%s), Thread: %s", i, Thread.currentThread().getName()));
return i;
})
.blockLast();
The output of the code above is:
First map - (1), Thread: main
First map - (2), Thread: main
Second map - (1), Thread: scheduler-a-1
Second map - (2), Thread: scheduler-a-1
As you can see, only the second map
which is placed after publishOn
in the chain is executed on the scheduler. pubilshOn
doesn't affect any operator before it.
subscribeOn
subscribeOn is applied to the subscription process. If you place a subscribeOn
in a chain, it affects the source emission in the entire chain. However, operators after publishOn
do not affected as the execution will be switched to one of the threads from publishOn
's schedulers. Meanwhile, operators in the chain that's not affected by publishOn
will be executed on a thread picked from subscribeOn
's scheduler.
Flux.range(1, 2)
.map(i -> {
System.out.println(String.format("First map - (%s), Thread: %s", i, Thread.currentThread().getName()));
return i;
})
.subscribeOn(schedulerA)
.map(i -> {
System.out.println(String.format("Second map - (%s), Thread: %s", i, Thread.currentThread().getName()));
return i;
})
.blockLast();
Here's the output
First map - (1), Thread: scheduler-a-1
Second map - (1), Thread: scheduler-a-1
First map - (2), Thread: scheduler-a-1
Second map - (2), Thread: scheduler-a-1
More Advanced Examples
Multiple publishOn
Operators
If there is more than one publishOn in a chain, how will it behave? You can see on the following example and its output.
Flux.range(1, 2)
.map(i -> {
System.out.println(String.format("First map - (%s), Thread: %s", i, Thread.currentThread().getName()));
return i;
})
.publishOn(schedulerA)
.map(i -> {
System.out.println(String.format("Second map - (%s), Thread: %s", i, Thread.currentThread().getName()));
return i;
})
.publishOn(schedulerB)
.map(i -> {
System.out.println(String.format("Third map - (%s), Thread: %s", i, Thread.currentThread().getName()));
return i;
})
.blockLast();
Output:
First map - (1), Thread: main
First map - (2), Thread: main
Second map - (1), Thread: scheduler-a-2
Second map - (2), Thread: scheduler-a-2
Third map - (1), Thread: scheduler-b-1
Third map - (2), Thread: scheduler-b-1
The first publishOn
affects the subsequent operators after it, which means it should affect the second and third map
s. However, there is another publishOn
which affects the third map
s. The result shows us that the third map uses thread from scheduler B. We can conclude that if there is more than one preceding publishOn
operators, the nearest preceding publishOn
will be used.
Multiple subscribeOn
Operators
What if we have multiple subscribeOn
operators in a chain.
Flux.range(1, 2)
.map(i -> {
System.out.println(String.format("First map - (%s), Thread: %s", i, Thread.currentThread().getName()));
return i;
})
.subscribeOn(schedulerA)
.map(i -> {
System.out.println(String.format("Second map - (%s), Thread: %s", i, Thread.currentThread().getName()));
return i;
})
.subscribeOn(schedulerB)
.map(i -> {
System.out.println(String.format("Third map - (%s), Thread: %s", i, Thread.currentThread().getName()));
return i;
})
.blockLast();
The result is:
First map - (1), Thread: scheduler-a-2
Second map - (1), Thread: scheduler-a-2
Third map - (1), Thread: scheduler-a-2
First map - (2), Thread: scheduler-a-2
Second map - (2), Thread: scheduler-a-2
Third map - (2), Thread: scheduler-a-2
As you can see, all map
s are executed on a thread picked from scheduler A which is used by the first subscribeOn
. If you define multiple subscribeOn operators in a chain, it will use the first one.
Using Both publishOn
and subscribeOn
Now, we have a subscribeOn
followed by a publishOn
.
Flux.range(1, 2)
.map(i -> {
System.out.println(String.format("First map - (%s), Thread: %s", i, Thread.currentThread().getName()));
return i;
})
.subscribeOn(schedulerA)
.map(i -> {
System.out.println(String.format("Second map - (%s), Thread: %s", i, Thread.currentThread().getName()));
return i;
})
.publishOn(schedulerB)
.map(i -> {
System.out.println(String.format("Third map - (%s), Thread: %s", i, Thread.currentThread().getName()));
return i;
})
.blockLast();
Output:
First map - (1), Thread: scheduler-a-2
Second map - (1), Thread: scheduler-a-2
First map - (2), Thread: scheduler-a-2
Third map - (1), Thread: scheduler-b-1
Second map - (2), Thread: scheduler-a-2
Third map - (2), Thread: scheduler-b-1
Initially, subscribeOn
schedules all operators on schedulerA
. But the presence of publishOn
makes the third map
which is placed after it to use its scheduler.
If we switch the position of publishOn
and subscribeOn
in the code above, the second and third map
s will be exucuted by publishOn
's schedulers.
Nested Chain
Let's take a look at another example. This time, there is a nested chain.
Flux.range(1, 5)
.map(i -> {
System.out.println(String.format("First map - (%s), Thread: %s", i, Thread.currentThread().getName()));
return i;
})
.subscribeOn(schedulerA)
.map(i -> {
System.out.println(String.format("Second map - (%s), Thread: %s", i, Thread.currentThread().getName()));
return Flux
.range(1, 2)
.map(j -> {
System.out.println(String.format("First map - (%s.%s), Thread: %s", i, j, Thread.currentThread().getName()));
return j;
})
.subscribeOn(schedulerB)
.map(j -> {
System.out.println(String.format("Second map - (%s.%s), Thread: %s", i, j, Thread.currentThread().getName()));
return "value " + j;
}).subscribe();
})
.blockLast();
At first, the second outer map
runs on the scheduler A. But there is another chain insideit which uses scheduler B. Here's the output
First map - (1), Thread: scheduler-a-1
Second map - (1), Thread: scheduler-a-1
First map - (2), Thread: scheduler-a-1
Second map - (2), Thread: scheduler-a-1
First map - (1.1), Thread: scheduler-b-2
Second map - (1.1), Thread: scheduler-b-2
First map - (3), Thread: scheduler-a-1
Second map - (3), Thread: scheduler-a-1
First map - (2.1), Thread: scheduler-b-3
First map - (4), Thread: scheduler-a-1
Second map - (2.1), Thread: scheduler-b-3
Second map - (4), Thread: scheduler-a-1
First map - (3.1), Thread: scheduler-b-4
Second map - (3.1), Thread: scheduler-b-4
First map - (5), Thread: scheduler-a-1
First map - (4.1), Thread: scheduler-b-5
Second map - (5), Thread: scheduler-a-1
Second map - (4.1), Thread: scheduler-b-5
First map - (1.2), Thread: scheduler-b-2
First map - (3.2), Thread: scheduler-b-4
First map - (2.2), Thread: scheduler-b-3
Second map - (1.2), Thread: scheduler-b-2
Second map - (3.2), Thread: scheduler-b-4
Second map - (2.2), Thread: scheduler-b-3
First map - (5.1), Thread: scheduler-b-2
First map - (4.2), Thread: scheduler-b-5
Second map - (5.1), Thread: scheduler-b-2
Second map - (4.2), Thread: scheduler-b-5
First map - (5.2), Thread: scheduler-b-2
Second map - (5.2), Thread: scheduler-b-2
The inner chain is subscribed on scheduler B. So, if we have nested chain, we can control the inner chain to use different scheduler.
Conclusion
To determine the execution of operators in a chain if you use publishOn
or subscribeOn
, what you need to do are:
- First, find the topmost
subscribeOn
. If found, set all operators in the chain to use a thread picked from that its scheduler. - Then, find all
publishOn
operators from top to bottom. For each, set the subsequent operators below it to use a thread picked from its scheduler.
Please be aware that some operators may change the scheduler. For example, adding .delayElements
implicitly calls .publishOn(Schedulers.parallel())
.