In this article we will go through very basic & simple examples of backpressure handling in Project Reactor (by Pivotal). For synchronous, async & parallel processing refer this article.
Backpressure / Overflow
During asynchronous processing, if subscriber is consuming data slower than publisher, this situation is called as backpressure or overflow. In reactor code it is referred as Overflow. Reactor provides ways to handle the Overflow/Backpressure gracefully.
Flux uses default small buffer of size 256 which we will see in below examples.
Backpressure (Overflow) Strategy: DROP
DROP strategy drops the most recent next value if the downstream can’t keep up because its too slow. There are also ways provided to consume dropped values and handle them separately.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
package com.itsallbinary.reactor; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.scheduler.Schedulers; public class ReactorAsyncWithBackpressureDropExample { public static void main(String[] args) throws InterruptedException { Flux<Object> fluxAsyncBackp = Flux.create(emitter -> { // Publish 1000 numbers for (int i = 0; i < 1000; i++) { System.out.println(Thread.currentThread().getName() + " | Publishing = " + i); emitter.next(i); } // When all values or emitted, call complete. emitter.complete(); }, OverflowStrategy.DROP) .onBackpressureDrop(i -> System.out.println(Thread.currentThread().getName() + " | DROPPED = " + i)); fluxAsyncBackp.subscribeOn(Schedulers.elastic()).publishOn(Schedulers.elastic()).subscribe(i -> { // Process received value. System.out.println(Thread.currentThread().getName() + " | Received = " + i); // 500 mills delay to simulate slow subscriber try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } }); /* * Notice above - * * OverflowStrategy.DROP - If subscriber can't keep up with values, then drop * the values. * * subscribeOn & publishOn - Put subscriber & publishers on different threads. */ // Since publisher & subscriber run on different thread than main thread, keep // main thread active for 100 seconds. Thread.sleep(100000); } } |
In below output you can see that till 256 (default buffer size), values were successfully published & then values starts dropping. Subscriber also received 256 values successfully.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
elastic-3 | Publishing = 0 elastic-3 | Publishing = 1 elastic-3 | Publishing = 2 elastic-3 | Publishing = 3 elastic-3 | Publishing = 4 elastic-3 | Publishing = 5 elastic-3 | Publishing = 6 elastic-2 | Received = 0 elastic-3 | Publishing = 7 elastic-3 | Publishing = 8 elastic-3 | Publishing = 9 elastic-3 | Publishing = 10 . . elastic-3 | Publishing = 254 elastic-3 | Publishing = 255 elastic-3 | Publishing = 256 elastic-3 | DROPPED = 256 elastic-3 | Publishing = 257 elastic-3 | DROPPED = 257 elastic-3 | Publishing = 258 elastic-3 | DROPPED = 258 . . elastic-3 | Publishing = 996 elastic-3 | DROPPED = 996 elastic-3 | Publishing = 997 elastic-3 | DROPPED = 997 elastic-3 | Publishing = 998 elastic-3 | DROPPED = 998 elastic-3 | Publishing = 999 elastic-3 | DROPPED = 999 elastic-2 | Received = 1 elastic-2 | Received = 2 elastic-2 | Received = 3 elastic-2 | Received = 4 elastic-2 | Received = 5 . . elastic-2 | Received = 252 elastic-2 | Received = 253 elastic-2 | Received = 254 elastic-2 | Received = 255 |
Backpressure (Overflow) Strategy: LATEST
LATEST strategy keeps only the latest next value, overwriting any previous value if the downstream can’t keep up because its too slow.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
package com.itsallbinary.reactor; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.scheduler.Schedulers; public class ReactorAsyncWithBackpressureLatestExample { public static void main(String[] args) throws InterruptedException { Flux<Object> fluxAsyncBackp = Flux.create(emitter -> { // Publish 1000 numbers for (int i = 0; i < 1000; i++) { System.out.println(Thread.currentThread().getName() + " | Publishing = " + i); emitter.next(i); try { Thread.sleep(10); } catch (InterruptedException e1) { e1.printStackTrace(); } } // When all values or emitted, call complete. emitter.complete(); }, OverflowStrategy.LATEST); fluxAsyncBackp.subscribeOn(Schedulers.elastic()).publishOn(Schedulers.elastic()).subscribe(i -> { // Process received value. System.out.println(Thread.currentThread().getName() + " | Received = " + i); // 100 mills delay to simulate slow subscriber try { Thread.sleep(100); } catch (InterruptedException e1) { e1.printStackTrace(); } }, e -> { // Process error System.err.println(Thread.currentThread().getName() + " | Error = " + e.getMessage()); }); /* * Notice above - * * OverflowStrategy.LATEST - Overwrites values if subscriber can't keep up. * * subscribeOn & publishOn - Put subscriber & publishers on different threads. */ // Since publisher & subscriber run on different thread than main thread, keep // main thread active for 100 seconds. Thread.sleep(100000); } } |
In below output you can see that publishing of all 999 values seems to have gone fine. Subscriber also started asynchronously receiving values. But you can see subscriber directly received 999 after 255. This means that after 255 (default buffer of 256), all values were replaced with latest & finally last value of 999 received by subscriber.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
elastic-3 | Publishing = 0 elastic-2 | Received = 0 elastic-3 | Publishing = 1 elastic-3 | Publishing = 2 elastic-3 | Publishing = 3 elastic-3 | Publishing = 4 elastic-3 | Publishing = 5 elastic-3 | Publishing = 6 elastic-3 | Publishing = 7 elastic-3 | Publishing = 8 elastic-3 | Publishing = 9 elastic-2 | Received = 1 elastic-3 | Publishing = 10 . . elastic-3 | Publishing = 199 elastic-3 | Publishing = 200 elastic-2 | Received = 21 elastic-3 | Publishing = 201 elastic-3 | Publishing = 202 elastic-3 | Publishing = 203 elastic-3 | Publishing = 204 elastic-3 | Publishing = 205 elastic-3 | Publishing = 206 elastic-3 | Publishing = 207 elastic-3 | Publishing = 208 elastic-3 | Publishing = 209 elastic-3 | Publishing = 210 elastic-2 | Received = 22 elastic-3 | Publishing = 211 . . elastic-3 | Publishing = 998 elastic-3 | Publishing = 999 elastic-2 | Received = 104 elastic-2 | Received = 105 elastic-2 | Received = 106 . . elastic-2 | Received = 250 elastic-2 | Received = 251 elastic-2 | Received = 252 elastic-2 | Received = 253 elastic-2 | Received = 254 elastic-2 | Received = 255 elastic-2 | Received = 999 |
Backpressure (Overflow) Strategy: ERROR
ERROR strategy throws OverflowException in case the downstream can’t keep up due to slowness. Publisher can handle exception & make sure to call error handle so that subscriber can do handling on subscriber side for such error scenarios.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
package com.itsallbinary.reactor; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.scheduler.Schedulers; public class ReactorAsyncWithBackpressureErrorExample { public static void main(String[] args) throws InterruptedException { Flux<Object> fluxAsyncBackp = Flux.create(emitter -> { // Publish 1000 numbers for (int i = 0; i < 1000; i++) { System.out.println(Thread.currentThread().getName() + " | Publishing = " + i); // BackpressureStrategy.ERROR will cause MissingBackpressureException when // subscriber can't keep up. So handle exception & call error handler. emitter.next(i); } // When all values or emitted, call complete. emitter.complete(); }, OverflowStrategy.ERROR); fluxAsyncBackp.subscribeOn(Schedulers.elastic()).publishOn(Schedulers.elastic()).subscribe(i -> { // Process received value. System.out.println(Thread.currentThread().getName() + " | Received = " + i); }, e -> { // Process error System.err.println(Thread.currentThread().getName() + " | Error = " + e.getClass().getSimpleName() + " " + e.getMessage()); }); /* * Notice above - * * OverflowStrategy.ERROR - Throws MissingBackpressureException is subscriber * can't keep up. * * subscribeOn & publishOn - Put subscriber & publishers on different threads. */ // Since publisher & subscriber run on different thread than main thread, keep // main thread active for 100 seconds. Thread.sleep(100000); } } |
You can see in below output that publishing & subscribing started on different threads. Subscriber received values till 255 & then error handler was called due to OverflowException. After that subscriber stopped.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
[DEBUG] (main) Using Console logging elastic-3 | Publishing = 0 elastic-3 | Publishing = 1 elastic-2 | Received = 0 elastic-3 | Publishing = 2 elastic-2 | Received = 1 elastic-3 | Publishing = 3 elastic-2 | Received = 2 elastic-3 | Publishing = 4 elastic-2 | Received = 3 elastic-3 | Publishing = 5 . . elastic-3 | Publishing = 129 elastic-2 | Received = 128 elastic-2 | Received = 129 elastic-3 | Publishing = 130 elastic-3 | Publishing = 131 elastic-3 | Publishing = 132 . . elastic-3 | Publishing = 230 elastic-3 | Publishing = 231 elastic-2 | Received = 142 elastic-2 | Received = 143 elastic-2 | Received = 144 . . elastic-2 | Received = 189 elastic-2 | Received = 190 elastic-3 | Publishing = 232 elastic-3 | Publishing = 233 . . elastic-3 | Publishing = 254 elastic-3 | Publishing = 255 elastic-3 | Publishing = 256 elastic-2 | Received = 192 elastic-2 | Received = 193 elastic-2 | Received = 194 . . elastic-2 | Received = 254 elastic-2 | Received = 255 elastic-2 | Error = OverflowException The receiver is overrun by more signals than expected (bounded queue...) elastic-3 | Publishing = 257 [DEBUG] (elastic-3) onNextDropped: 257 elastic-3 | Publishing = 258 [DEBUG] (elastic-3) onNextDropped: 258 elastic-3 | Publishing = 259 [DEBUG] (elastic-3) onNextDropped: 259 . . elastic-3 | Publishing = 995 [DEBUG] (elastic-3) onNextDropped: 995 elastic-3 | Publishing = 996 [DEBUG] (elastic-3) onNextDropped: 996 elastic-3 | Publishing = 997 [DEBUG] (elastic-3) onNextDropped: 997 elastic-3 | Publishing = 998 [DEBUG] (elastic-3) onNextDropped: 998 elastic-3 | Publishing = 999 [DEBUG] (elastic-3) onNextDropped: 999 |
Backpressure (Overflow) Strategy: IGNORE
With IGNORE strategy, as name suggests tit ignores any backpressure strategy. Subscriber must handle overflow else they will receive error.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
package com.itsallbinary.reactor; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.scheduler.Schedulers; public class ReactorAsyncWithBackpressureIgnoreExample { public static void main(String[] args) throws InterruptedException { Flux<Object> fluxAsyncBackp = Flux.create(emitter -> { // Publish 1000 numbers for (int i = 0; i < 1000; i++) { System.out.println(Thread.currentThread().getName() + " | Publishing = " + i); // BackpressureStrategy.MISSING will cause MissingBackpressureException // eventually emitter.next(i); } // When all values or emitted, call complete. emitter.complete(); }, OverflowStrategy.IGNORE); fluxAsyncBackp.subscribeOn(Schedulers.elastic()).publishOn(Schedulers.elastic()).subscribe(i -> { // Process received value. System.out.println(Thread.currentThread().getName() + " | Received = " + i); }, e -> { // Process error System.err.println(Thread.currentThread().getName() + " | Error = " + e.getClass().getSimpleName() + " " + e.getMessage()); }); /* * Notice above - * * subscribeOn & publishOn - Put subscriber & publishers on different threads. */ // Since publisher & subscriber run on different thread than main thread, keep // main thread active for 100 seconds. Thread.sleep(100000); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
[DEBUG] (main) Using Console logging elastic-3 | Publishing = 0 elastic-3 | Publishing = 1 elastic-2 | Received = 0 elastic-3 | Publishing = 2 elastic-2 | Received = 1 . . elastic-2 | Received = 98 elastic-2 | Received = 99 elastic-2 | Received = 100 elastic-2 | Received = 101 elastic-2 | Received = 102 . . elastic-3 | Publishing = 428 elastic-3 | Publishing = 429 elastic-3 | Publishing = 430 elastic-2 | Received = 192 elastic-2 | Received = 193 elastic-2 | Received = 194 . . elastic-2 | Received = 371 elastic-2 | Received = 372 elastic-2 | Received = 373 elastic-3 | Publishing = 431 elastic-3 | Publishing = 432 elastic-3 | Publishing = 433 . . elastic-2 | Received = 638 elastic-2 | Received = 639 elastic-2 | Received = 640 elastic-3 | Publishing = 642 [DEBUG] (elastic-3) onNextDropped: 642 elastic-3 | Publishing = 643 elastic-2 | Error = OverflowException Queue is full: Reactive Streams source doesn't respect backpressure [DEBUG] (elastic-3) onNextDropped: 643 elastic-3 | Publishing = 644 [DEBUG] (elastic-3) onNextDropped: 644 elastic-3 | Publishing = 645 [DEBUG] (elastic-3) onNextDropped: 645 . . elastic-3 | Publishing = 998 [DEBUG] (elastic-3) onNextDropped: 998 elastic-3 | Publishing = 999 [DEBUG] (elastic-3) onNextDropped: 999 |
Backpressure (Overflow) Strategy: BUFFER
With BUFFER strategy, as name suggests all values are buffered so that subscriber can receive all values. As per program below, buffer is infinite, so if published values are large in count & subscriber is too slow, then there is chance of out of memory just like Observable.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
package com.itsallbinary.reactor; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.scheduler.Schedulers; public class ReactorAsyncWithBackpressureBufferExample { public static void main(String[] args) throws InterruptedException { Flux<Object> fluxAsyncBackp = Flux.create(emitter -> { // Publish 1000 numbers for (int i = 0; i < 999_000_000; i++) { System.out.println(Thread.currentThread().getName() + " | Publishing = " + i); emitter.next(i); // Thread.sleep(10); } // When all values or emitted, call complete. emitter.complete(); }, OverflowStrategy.BUFFER); fluxAsyncBackp.subscribeOn(Schedulers.elastic()).publishOn(Schedulers.elastic()).subscribe(i -> { // Process received value. System.out.println(Thread.currentThread().getName() + " | Received = " + i); // 500 mills delay to simulate slow subscriber try { Thread.sleep(100); } catch (InterruptedException e1) { e1.printStackTrace(); } }, e -> { // Process error System.err.println(Thread.currentThread().getName() + " | Error = " + e.getClass().getSimpleName() + " " + e.getMessage()); }); /* * Notice above - * * subscribeOn & publishOn - Put subscriber & publishers on different threads. */ // Since publisher & subscriber run on different thread than main thread, keep // main thread active for 100 seconds. Thread.sleep(1000000); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
[DEBUG] (main) Using Console logging elastic-3 | Publishing = 0 elastic-3 | Publishing = 1 elastic-3 | Publishing = 2 elastic-3 | Publishing = 3 elastic-2 | Received = 0 elastic-3 | Publishing = 4 elastic-3 | Publishing = 5 elastic-3 | Publishing = 6 . . elastic-3 | Publishing = 22 elastic-3 | Publishing = 23 . . elastic-3 | Publishing = 4333 elastic-3 | Publishing = 4334 elastic-2 | Received = 1 elastic-3 | Publishing = 4335 elastic-3 | Publishing = 4336 . . elastic-3 | Publishing = 10841 elastic-3 | Publishing = 10842 elastic-2 | Received = 2 elastic-3 | Publishing = 10843 elastic-3 | Publishing = 10844 . . elastic-3 | Publishing = 344458 elastic-3 | Publishing = 344459 elastic-3 | Publishing = 344460 elastic-2 | Received = 50 elastic-3 | Publishing = 344461 elastic-3 | Publishing = 344462 . . elastic-3 | Publishing = 344482 elastic-3 | Publishing = 344483 elastic-3 | Publishing = 344484 . . |