In this article we will go through very basic & simple examples of Project Reactor (by Pivotal) to understand different ways in which publisher & subscriber interact to perform desired operations.
Here is the Maven dependency for io.projectreactor – reactor-core
Reactor Synchronous
This is example where publishing & subscribing is synchronous. Publisher & subscriber both run on same thread.
- create() – Creates Flux i.e. publisher i.e. emitter.
- subscribe() – Subscribes to Flux i.e. publisher i.e. emitter & starts receiving emitted values.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
package com.itsallbinary.reactor; import reactor.core.publisher.Flux; public class ReactorSynchronousExample { public static void main(String[] args) { Flux<Object> fluxSync = Flux.create(emitter -> { // Publish 100 numbers for (int i = 0; i < 100; i++) { System.out.println(Thread.currentThread().getName() + " | Publishing = " + i); // Publish or emit a value. emitter.next(i); } // When all values or emitted, call complete. emitter.complete(); }); fluxSync.subscribe(i -> { // Process received value. System.out.println(Thread.currentThread().getName() + " | Received = " + i); // 100 mills delay to simulate slow subscriber try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } }); } } |
Subscriber takes 100ms to complete processing. You can see that, when subscriber takes 100 ms to process, publisher is blocked during that time. Next value will be published only when subscriber processing is finished.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
[DEBUG] (main) Using Console logging main | Publishing = 0 main | Received = 0 main | Publishing = 1 main | Received = 1 main | Publishing = 2 main | Received = 2 main | Publishing = 3 main | Received = 3 main | Publishing = 4 main | Received = 4 main | Publishing = 5 main | Received = 5 . . main | Publishing = 95 main | Received = 95 main | Publishing = 96 main | Received = 96 main | Publishing = 97 main | Received = 97 main | Publishing = 98 main | Received = 98 main | Publishing = 99 main | Received = 99 |
Reactor Asynchronous
In this example, we will put publisher i.e. Flux & subscriber on different threads & make it asynchronous.
- create() – Creates Flux i.e. publisher i.e. emitter.
- subscribeOn() – Informs Flux to put subscriber in different thread that current thread i.e. main thread.
- publishOn() – Informs Flux to put publisher i.e. Flux in different thread that current thread i.e. main thread.
- subscribe() – Subscribes to Flux i.e. publisher i.e. emitter & starts receiving emitted values.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
package com.itsallbinary.reactor; import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; public class ReactorAsynchronousExample { public static void main(String[] args) throws InterruptedException { Flux<Object> fluxSync = Flux.create(emitter -> { // Publish 100 numbers for (int i = 0; i < 100; i++) { System.out.println(Thread.currentThread().getName() + " | Publishing = " + i); // Publish or emit a value with 10 ms delay try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } emitter.next(i); } // When all values or emitted, call complete. emitter.complete(); }).subscribeOn(Schedulers.elastic()).publishOn(Schedulers.elastic()); /* * Notice above - subscribeOn & publishOn puts subscriber & publisher/flux on * different threads. */ fluxSync.subscribe(i -> { // Process received value. System.out.println(Thread.currentThread().getName() + " | Received = " + i); // 100 mills delay to simulate slow subscriber try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } }); // Since publisher & subscriber run on different thread than main thread, keep // main thread active for 5 seconds. Thread.sleep(5000); } } |
You can see in output that thread names for publisher & subscriber are different. Also publisher is not waiting for subscriber to finish processing.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
[DEBUG] (main) Using Console logging elastic-3 | Publishing = 0 elastic-3 | Publishing = 1 elastic-2 | Received = 0 elastic-3 | Publishing = 2 elastic-3 | Publishing = 3 elastic-3 | Publishing = 4 elastic-3 | Publishing = 5 elastic-3 | Publishing = 6 elastic-3 | Publishing = 7 elastic-3 | Publishing = 8 elastic-3 | Publishing = 9 elastic-3 | Publishing = 10 elastic-2 | Received = 1 elastic-3 | Publishing = 11 . . elastic-3 | Publishing = 96 elastic-3 | Publishing = 97 elastic-2 | Received = 10 elastic-3 | Publishing = 98 elastic-3 | Publishing = 99 elastic-2 | Received = 11 elastic-2 | Received = 12 . . elastic-2 | Received = 95 elastic-2 | Received = 96 elastic-2 | Received = 97 elastic-2 | Received = 98 elastic-2 | Received = 99 |
Reactor Parallel processing
If there is some processing that needs to be done on large emitted data set, then processing can be put on parallel operation & then after completion, it can be merged back as shown in below example.
- create() – Creates Flux i.e. publisher i.e. emitter.
- parallel() & runOn() – Converts Flux to ParallelFlux & Informs it to put further processing in different thread that current thread i.e. main thread.
- map() – This does the processing of calculating squares of all numbers & map it to ParallelFlux.
- sequential() – Merges parallel operation results & converts ParallelFlux back to Flux.
- subscribe() – Subscribes to Flux & starts receiving emitted values.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
package com.itsallbinary.reactor; import reactor.core.publisher.Flux; import reactor.core.publisher.ParallelFlux; import reactor.core.scheduler.Schedulers; public class ReactorParallelProcessingExample { public static void main(String[] args) throws InterruptedException { ParallelFlux<Object> fluxParallel = Flux.create(emitter -> { // Publish 100 numbers for (int i = 0; i < 100; i++) { emitter.next(i); // Publish or emit a value with 10 ms delay try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } // When all values or emitted, call complete. emitter.complete(); }).parallel().runOn(Schedulers.parallel()).map(i -> { int number = (int) i; System.out.println(Thread.currentThread().getName() + " | Sending square of " + number); // Processing - Square all numbers & provide to subscribers. return number * number; }); /* * Notice above - parallel() & runOn() combination puts further processing in * parallel threads. */ // Now here we will merge all parallel threads back in round-robin order so all // squares are printed. fluxParallel.sequential() .subscribe(i -> System.out.println(Thread.currentThread().getName() + " | Received square = " + i)); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
[DEBUG] (main) Using Console logging parallel-1 | Sending square of 0 parallel-1 | Received square = 0 parallel-2 | Sending square of 1 parallel-2 | Received square = 1 parallel-3 | Sending square of 2 parallel-3 | Received square = 4 parallel-4 | Sending square of 3 parallel-4 | Received square = 9 . . parallel-4 | Sending square of 95 parallel-4 | Received square = 9025 parallel-1 | Sending square of 96 parallel-1 | Received square = 9216 parallel-2 | Sending square of 97 parallel-2 | Received square = 9409 parallel-3 | Sending square of 98 parallel-3 | Received square = 9604 parallel-4 | Sending square of 99 parallel-4 | Received square = 9801 |