In this article we will go through very basic & simple examples of backpressure handling in RxJava 2. For synchronous, async & parallel processing refer this article.
Backpressure
During asynchronous processing, if subscriber is consuming data slower than publisher, this situation is called as backpressure. RxJava provides ways to handle the Backpressure gracefully.
Observable – In case of Observable, there is unbounded buffer i.e. infinite buffer. So all the data published/emitted is stored in memory & made sure that subscriber receives that. But if published data is very very huge, then it might cause OutOfMemory error eventually. Observable does not provide graceful ways to handle this backpressure.
Flowable – Flowable provides graceful ways to handle backpressure. In next section, we will look at different strategies offered by Flowable. Flowable also has default buffer of 128 values.
Backpressure Strategy: DROP
DROP strategy drops the most recent onNext value if the downstream can’t keep up because its too slow. There are also ways provided to consume dropped values and handle them separately.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
package com.itsallbinary.rxjava.tutorial; import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; import io.reactivex.schedulers.Schedulers; public class RxJavaAsyncWithBackpressureDropExample { public static void main(String[] args) throws InterruptedException { Flowable<Object> flowableAsyncBackp = Flowable.create(emitter -> { // Publish 1000 numbers for (int i = 0; i < 1000; i++) { System.out.println(Thread.currentThread().getName() + " | Publishing = " + i); emitter.onNext(i); } // When all values or emitted, call complete. emitter.onComplete(); }, BackpressureStrategy.DROP) .onBackpressureDrop(i -> System.out.println(Thread.currentThread().getName() + " | DROPPED = " + i)); flowableAsyncBackp.subscribeOn(Schedulers.newThread()).observeOn(Schedulers.single()).subscribe(i -> { // Process received value. System.out.println(Thread.currentThread().getName() + " | Received = " + i); // 500 mills delay to simulate slow subscriber Thread.sleep(500); }); /* * Notice above - * * BackpressureStrategy.DROP - If subscriber can't keep up with values, then * drop the values. * * subscribeOn & observeOn - Put subscriber & publishers on different threads. */ // Since publisher & subscriber run on different thread than main thread, keep // main thread active for 100 seconds. Thread.sleep(100000); } } |
In below output you can see that till 128 (default buffer size), values were successfully published & then values starts dropping. Subscriber also received 128 values successfully.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
RxNewThreadScheduler-1 | Publishing = 0 RxSingleScheduler-1 | Received = 0 RxNewThreadScheduler-1 | Publishing = 1 RxNewThreadScheduler-1 | Publishing = 2 RxNewThreadScheduler-1 | Publishing = 3 RxNewThreadScheduler-1 | Publishing = 4 RxNewThreadScheduler-1 | Publishing = 5 . . RxNewThreadScheduler-1 | Publishing = 126 RxNewThreadScheduler-1 | Publishing = 127 RxNewThreadScheduler-1 | Publishing = 128 RxNewThreadScheduler-1 | DROPPED = 128 RxNewThreadScheduler-1 | Publishing = 129 RxNewThreadScheduler-1 | DROPPED = 129 RxNewThreadScheduler-1 | Publishing = 130 RxNewThreadScheduler-1 | DROPPED = 130 RxNewThreadScheduler-1 | Publishing = 131 RxNewThreadScheduler-1 | DROPPED = 131 . . RxNewThreadScheduler-1 | Publishing = 997 RxNewThreadScheduler-1 | DROPPED = 997 RxNewThreadScheduler-1 | Publishing = 998 RxNewThreadScheduler-1 | DROPPED = 998 RxNewThreadScheduler-1 | Publishing = 999 RxNewThreadScheduler-1 | DROPPED = 999 RxSingleScheduler-1 | Received = 1 RxSingleScheduler-1 | Received = 2 RxSingleScheduler-1 | Received = 3 RxSingleScheduler-1 | Received = 4 RxSingleScheduler-1 | Received = 5 . . RxSingleScheduler-1 | Received = 103 RxSingleScheduler-1 | Received = 104 RxSingleScheduler-1 | Received = 105 RxSingleScheduler-1 | Received = 106 . . RxSingleScheduler-1 | Received = 121 RxSingleScheduler-1 | Received = 122 RxSingleScheduler-1 | Received = 123 RxSingleScheduler-1 | Received = 124 RxSingleScheduler-1 | Received = 125 RxSingleScheduler-1 | Received = 126 RxSingleScheduler-1 | Received = 127 |
Backpressure Strategy: LATEST
LATEST strategy keeps only the latest onNext value, overwriting any previous value if the downstream can’t keep up because its too slow.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
package com.itsallbinary.rxjava.tutorial; import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; import io.reactivex.schedulers.Schedulers; public class RxJavaAsyncWithBackpressureLatestExample { public static void main(String[] args) throws InterruptedException { Flowable<Object> flowableAsyncBackp = Flowable.create(emitter -> { // Publish 1000 numbers for (int i = 0; i < 1000; i++) { System.out.println(Thread.currentThread().getName() + " | Publishing = " + i); emitter.onNext(i); Thread.sleep(10); } // When all values or emitted, call complete. emitter.onComplete(); }, BackpressureStrategy.LATEST); flowableAsyncBackp.subscribeOn(Schedulers.newThread()).observeOn(Schedulers.single()).subscribe(i -> { // Process received value. System.out.println(Thread.currentThread().getName() + " | Received = " + i); // 100 mills delay to simulate slow subscriber Thread.sleep(100); }, e -> { // Process error System.err.println(Thread.currentThread().getName() + " | Error = " + e.getMessage()); }); /* * Notice above - * * BackpressureStrategy.LATEST - Overwrites values if subscriber can't keep up. * * subscribeOn & observeOn - Put subscriber & publishers on different threads. */ // Since publisher & subscriber run on different thread than main thread, keep // main thread active for 100 seconds. Thread.sleep(100000); } } |
In below output you can see that publishing of all 999 values seems to have gone fine. Subscriber also started asynchronously receiving values. But you can see subscriber directly received 923 after 127. This means that after 127 (default buffer of 128), all values were replaced with latest & finally last values of 923 & above remained in buffer & received by subscriber.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
RxNewThreadScheduler-1 | Publishing = 0 RxSingleScheduler-1 | Received = 0 RxNewThreadScheduler-1 | Publishing = 1 RxNewThreadScheduler-1 | Publishing = 2 RxNewThreadScheduler-1 | Publishing = 3 RxNewThreadScheduler-1 | Publishing = 4 RxNewThreadScheduler-1 | Publishing = 5 RxNewThreadScheduler-1 | Publishing = 6 RxNewThreadScheduler-1 | Publishing = 7 RxNewThreadScheduler-1 | Publishing = 8 RxNewThreadScheduler-1 | Publishing = 9 RxSingleScheduler-1 | Received = 1 RxNewThreadScheduler-1 | Publishing = 10 RxNewThreadScheduler-1 | Publishing = 11 . . RxNewThreadScheduler-1 | Publishing = 989 RxNewThreadScheduler-1 | Publishing = 990 RxSingleScheduler-1 | Received = 103 RxNewThreadScheduler-1 | Publishing = 991 RxNewThreadScheduler-1 | Publishing = 992 RxNewThreadScheduler-1 | Publishing = 993 RxNewThreadScheduler-1 | Publishing = 994 RxNewThreadScheduler-1 | Publishing = 995 RxNewThreadScheduler-1 | Publishing = 996 RxNewThreadScheduler-1 | Publishing = 997 RxNewThreadScheduler-1 | Publishing = 998 RxNewThreadScheduler-1 | Publishing = 999 RxSingleScheduler-1 | Received = 104 RxSingleScheduler-1 | Received = 105 RxSingleScheduler-1 | Received = 106 RxSingleScheduler-1 | Received = 107 . . RxSingleScheduler-1 | Received = 125 RxSingleScheduler-1 | Received = 126 RxSingleScheduler-1 | Received = 127 RxSingleScheduler-1 | Received = 923 RxSingleScheduler-1 | Received = 924 RxSingleScheduler-1 | Received = 925 RxSingleScheduler-1 | Received = 926 . . RxSingleScheduler-1 | Received = 996 RxSingleScheduler-1 | Received = 997 RxSingleScheduler-1 | Received = 998 RxSingleScheduler-1 | Received = 999 |
Backpressure Strategy: ERROR
ERROR strategy throws MissingBackpressureException in case the downstream can’t keep up due to slowness. Publisher can handle exception & make sure to call onError handle so that subscriber can do handling on subscriber side for such error scenarios.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
package com.itsallbinary.rxjava.tutorial; import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; import io.reactivex.exceptions.MissingBackpressureException; import io.reactivex.schedulers.Schedulers; public class RxJavaAsyncWithBackpressureErrorExample { public static void main(String[] args) throws InterruptedException { Flowable<Object> flowableAsyncBackp = Flowable.create(emitter -> { // Publish 1000 numbers for (int i = 0; i < 1000; i++) { System.out.println(Thread.currentThread().getName() + " | Publishing = " + i); // BackpressureStrategy.ERROR will cause MissingBackpressureException when // subscriber can't keep up. So handle exception & call error handler. try { emitter.onNext(i); } catch (MissingBackpressureException exception) { emitter.onError(exception); } } // When all values or emitted, call complete. emitter.onComplete(); }, BackpressureStrategy.ERROR); flowableAsyncBackp.subscribeOn(Schedulers.newThread()).observeOn(Schedulers.single()).subscribe(i -> { // Process received value. System.out.println(Thread.currentThread().getName() + " | Received = " + i); }, e -> { // Process error System.err.println(Thread.currentThread().getName() + " | Error = " + e.getClass().getSimpleName() + " " + e.getMessage()); }); /* * Notice above - * * BackpressureStrategy.ERROR - Throws MissingBackpressureException is * subscriber can't keep up. * * subscribeOn & observeOn - Put subscriber & publishers on different threads. */ // Since publisher & subscriber run on different thread than main thread, keep // main thread active for 100 seconds. Thread.sleep(100000); } } |
You can see in below output that publishing & subscribing started on different threads. Subscriber received values till 314 & then onError handler was called due to MissingBackpressureException. After that subscriber stopped.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
RxNewThreadScheduler-1 | Publishing = 0 RxNewThreadScheduler-1 | Publishing = 1 RxSingleScheduler-1 | Received = 0 RxNewThreadScheduler-1 | Publishing = 2 RxNewThreadScheduler-1 | Publishing = 3 RxNewThreadScheduler-1 | Publishing = 4 RxSingleScheduler-1 | Received = 1 RxNewThreadScheduler-1 | Publishing = 5 RxSingleScheduler-1 | Received = 2 RxNewThreadScheduler-1 | Publishing = 6 . . RxSingleScheduler-1 | Received = 308 RxSingleScheduler-1 | Received = 309 RxSingleScheduler-1 | Received = 310 RxSingleScheduler-1 | Received = 311 RxSingleScheduler-1 | Received = 312 RxSingleScheduler-1 | Received = 313 RxSingleScheduler-1 | Received = 314 RxSingleScheduler-1 | Error = MissingBackpressureException create: could not emit value due to lack of requests RxNewThreadScheduler-1 | Publishing = 321 RxNewThreadScheduler-1 | Publishing = 322 RxNewThreadScheduler-1 | Publishing = 323 RxNewThreadScheduler-1 | Publishing = 324 . . RxNewThreadScheduler-1 | Publishing = 990 RxNewThreadScheduler-1 | Publishing = 991 RxNewThreadScheduler-1 | Publishing = 992 RxNewThreadScheduler-1 | Publishing = 993 RxNewThreadScheduler-1 | Publishing = 994 RxNewThreadScheduler-1 | Publishing = 995 RxNewThreadScheduler-1 | Publishing = 996 RxNewThreadScheduler-1 | Publishing = 997 RxNewThreadScheduler-1 | Publishing = 998 RxNewThreadScheduler-1 | Publishing = 999 |
Backpressure Strategy: MISSING
With MISSING strategy, as name suggests there is no buffering or dropping. Subscriber must handle overflow else they will receive error.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
package com.itsallbinary.rxjava.tutorial; import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; import io.reactivex.exceptions.MissingBackpressureException; import io.reactivex.schedulers.Schedulers; public class RxJavaAsyncWithBackpressureMissingExample { public static void main(String[] args) throws InterruptedException { Flowable<Object> flowableAsyncBackp = Flowable.create(emitter -> { // Publish 1000 numbers for (int i = 0; i < 1000; i++) { System.out.println(Thread.currentThread().getName() + " | Publishing = " + i); // BackpressureStrategy.MISSING will cause MissingBackpressureException // eventually try { emitter.onNext(i); } catch (MissingBackpressureException exception) { emitter.onError(exception); } } // When all values or emitted, call complete. emitter.onComplete(); }, BackpressureStrategy.MISSING); flowableAsyncBackp.subscribeOn(Schedulers.newThread()).observeOn(Schedulers.single()).subscribe(i -> { // Process received value. System.out.println(Thread.currentThread().getName() + " | Received = " + i); }, e -> { // Process error System.err.println(Thread.currentThread().getName() + " | Error = " + e.getClass().getSimpleName() + " " + e.getMessage()); }); /* * Notice above - * * subscribeOn & observeOn - Put subscriber & publishers on different threads. */ // Since publisher & subscriber run on different thread than main thread, keep // main thread active for 100 seconds. Thread.sleep(100000); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
RxNewThreadScheduler-1 | Publishing = 0 RxNewThreadScheduler-1 | Publishing = 1 RxNewThreadScheduler-1 | Publishing = 2 RxNewThreadScheduler-1 | Publishing = 3 RxNewThreadScheduler-1 | Publishing = 4 . . RxNewThreadScheduler-1 | Publishing = 108 RxNewThreadScheduler-1 | Publishing = 109 RxSingleScheduler-1 | Received = 0 RxSingleScheduler-1 | Received = 1 RxSingleScheduler-1 | Received = 2 RxSingleScheduler-1 | Received = 3 RxSingleScheduler-1 | Received = 4 . . RxNewThreadScheduler-1 | Publishing = 143 RxNewThreadScheduler-1 | Publishing = 144 RxNewThreadScheduler-1 | Publishing = 145 RxSingleScheduler-1 | Received = 16 RxSingleScheduler-1 | Received = 17 RxSingleScheduler-1 | Received = 18 RxSingleScheduler-1 | Received = 19 . . RxNewThreadScheduler-1 | Publishing = 188 RxNewThreadScheduler-1 | Publishing = 189 RxNewThreadScheduler-1 | Publishing = 190 RxNewThreadScheduler-1 | Publishing = 191 RxNewThreadScheduler-1 | Publishing = 192RxSingleScheduler-1 | Error = MissingBackpressureException Queue is full?! RxNewThreadScheduler-1 | Publishing = 193 RxNewThreadScheduler-1 | Publishing = 194 RxNewThreadScheduler-1 | Publishing = 195 . . RxNewThreadScheduler-1 | Publishing = 995 RxNewThreadScheduler-1 | Publishing = 996 RxNewThreadScheduler-1 | Publishing = 997 RxNewThreadScheduler-1 | Publishing = 998 RxNewThreadScheduler-1 | Publishing = 999 |
Backpressure Strategy: BUFFER
With BUFFER strategy, as name suggests all values are buffered so that subscriber can receive all values. As per program below, buffer is infinite, so if published values are large in count & subscriber is too slow, then there is chance of out of memory just like Observable.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
package com.itsallbinary.rxjava.tutorial; import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; import io.reactivex.schedulers.Schedulers; public class RxJavaAsyncWithBackpressureBufferExample { public static void main(String[] args) throws InterruptedException { Flowable<Object> flowableAsyncBackp = Flowable.create(emitter -> { // Publish 1000 numbers for (int i = 0; i < 999_000_000; i++) { System.out.println(Thread.currentThread().getName() + " | Publishing = " + i); emitter.onNext(i); // Thread.sleep(10); } // When all values or emitted, call complete. emitter.onComplete(); }, BackpressureStrategy.BUFFER); flowableAsyncBackp.subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()).subscribe(i -> { // Process received value. System.out.println(Thread.currentThread().getName() + " | Received = " + i); // 500 mills delay to simulate slow subscriber Thread.sleep(100); }, e -> { // Process error System.err.println(Thread.currentThread().getName() + " | Error = " + e.getClass().getSimpleName() + " " + e.getMessage()); }); /* * Notice above - * * subscribeOn & observeOn - Put subscriber & publishers on different threads. */ // Since publisher & subscriber run on different thread than main thread, keep // main thread active for 100 seconds. Thread.sleep(1000000); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
RxNewThreadScheduler-2 | Publishing = 0 RxNewThreadScheduler-2 | Publishing = 1 RxNewThreadScheduler-1 | Received = 0 RxNewThreadScheduler-2 | Publishing = 2 RxNewThreadScheduler-2 | Publishing = 3 RxNewThreadScheduler-2 | Publishing = 4 RxNewThreadScheduler-2 | Publishing = 5 . . RxNewThreadScheduler-2 | Publishing = 5372 RxNewThreadScheduler-2 | Publishing = 5373 RxNewThreadScheduler-2 | Publishing = 5374 RxNewThreadScheduler-1 | Received = 1 RxNewThreadScheduler-2 | Publishing = 5375 RxNewThreadScheduler-2 | Publishing = 5376 . . RxNewThreadScheduler-2 | Publishing = 16866 RxNewThreadScheduler-2 | Publishing = 16867 RxNewThreadScheduler-1 | Received = 3 RxNewThreadScheduler-2 | Publishing = 16868 RxNewThreadScheduler-2 | Publishing = 16869 . . RxNewThreadScheduler-2 | Publishing = 21101 RxNewThreadScheduler-2 | Publishing = 21102 RxNewThreadScheduler-1 | Received = 4 RxNewThreadScheduler-2 | Publishing = 21103 RxNewThreadScheduler-2 | Publishing = 21104 . . RxNewThreadScheduler-2 | Publishing = 1015248 RxNewThreadScheduler-2 | Publishing = 1015249 RxNewThreadScheduler-1 | Received = 163 RxNewThreadScheduler-2 | Publishing = 1015250 RxNewThreadScheduler-2 | Publishing = 1015251 . . |