In this article we will go through very basic & simple examples of RxJava 2 to understand different ways in which publisher & subscriber interact to perform desired operations.
Here is the Maven dependency for io.reactivex.rxjava2 – rxjava
RxJava Synchronous
This is example where publishing & subscribing is synchronous. Publisher & subscriber both run on same thread.
- create() – Creates Observable i.e. publisher i.e. emitter.
- subscribe() – Subscribes to Observable i.e. publisher i.e. emitter & starts receiving emitted values.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
package com.itsallbinary.rxjava.tutorial; import io.reactivex.Observable; public class RxJavaSynchronousExample { public static void main(String[] args) { Observable<Object> observableSync = Observable.create(emitter -> { // Publish 100 numbers for (int i = 0; i < 100; i++) { System.out.println(Thread.currentThread().getName() + " | Publishing = " + i); // Publish or emit a value. emitter.onNext(i); } // When all values or emitted, call complete. emitter.onComplete(); }); observableSync.subscribe(i -> { // Process received value. System.out.println(Thread.currentThread().getName() + " | Received = " + i); // 100 mills delay to simulate slow subscriber Thread.sleep(100); }); } } |
Subscriber takes 100ms to complete processing. You can see that, when subscriber takes 100 ms to process, publisher is blocked during that time. Next value will be published only when subscriber processing is finished.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
main | Publishing = 0 main | Received = 0 main | Publishing = 1 main | Received = 1 main | Publishing = 2 main | Received = 2 main | Publishing = 3 main | Received = 3 main | Publishing = 4 main | Received = 4 . <Log skipped> . main | Publishing = 97 main | Received = 97 main | Publishing = 98 main | Received = 98 main | Publishing = 99 main | Received = 99 |
RxJava Asynchronous
In this example, we will put publisher i.e. observable & subscriber on different threads & make it asynchronous.
- create() – Creates Observable i.e. publisher i.e. emitter.
- subscribeOn() – Informs Observable to put subscriber in different thread that current thread i.e. main thread.
- observeOn() – Informs Observable to put publisher i.e. observable in different thread that current thread i.e. main thread.
- subscribe() – Subscribes to Observable i.e. publisher i.e. emitter & starts receiving emitted values.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
package com.itsallbinary.rxjava.tutorial; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class RxJavaAsynchronousExample { public static void main(String[] args) throws InterruptedException { Observable<Object> observableSync = Observable.create(emitter -> { // Publish 100 numbers for (int i = 0; i < 100; i++) { System.out.println(Thread.currentThread().getName() + " | Publishing = " + i); // Publish or emit a value with 10 ms delay Thread.sleep(10); emitter.onNext(i); } // When all values or emitted, call complete. emitter.onComplete(); }).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()); /* * Notice above - subscribeOn & observeOn puts subscriber & publisher/observable * on different threads. */ observableSync.subscribe(i -> { // Process received value. System.out.println(Thread.currentThread().getName() + " | Received = " + i); // 100 mills delay to simulate slow subscriber Thread.sleep(100); }); // Since publisher & subscriber run on different thread than main thread, keep // main thread active for 5 seconds. Thread.sleep(5000); } } |
You can see in output that thread names for publisher & subscriber are different. Also publisher is not waiting for subscriber to finish processing.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
RxNewThreadScheduler-1 | Publishing = 0 RxNewThreadScheduler-1 | Publishing = 1 RxNewThreadScheduler-2 | Received = 0 RxNewThreadScheduler-1 | Publishing = 2 RxNewThreadScheduler-1 | Publishing = 3 RxNewThreadScheduler-1 | Publishing = 4 RxNewThreadScheduler-1 | Publishing = 5 RxNewThreadScheduler-1 | Publishing = 6 RxNewThreadScheduler-1 | Publishing = 7 RxNewThreadScheduler-1 | Publishing = 8 RxNewThreadScheduler-1 | Publishing = 9 RxNewThreadScheduler-1 | Publishing = 10 RxNewThreadScheduler-2 | Received = 1 RxNewThreadScheduler-1 | Publishing = 11 . . RxNewThreadScheduler-1 | Publishing = 96 RxNewThreadScheduler-2 | Received = 10 RxNewThreadScheduler-1 | Publishing = 97 RxNewThreadScheduler-1 | Publishing = 98 RxNewThreadScheduler-1 | Publishing = 99 RxNewThreadScheduler-2 | Received = 11 RxNewThreadScheduler-2 | Received = 12 . . RxNewThreadScheduler-2 | Received = 97 RxNewThreadScheduler-2 | Received = 98 RxNewThreadScheduler-2 | Received = 99 |
RxJava Parallel processing
If there is some processing that needs to be done on large emitted data set, then processing can be put on parallel operation & then after completion, it can be merged back as shown in below example.
- create() – Creates Flowable i.e. publisher i.e. emitter.
- parallel() & runOn() – Converts Flowable to ParallelFlowable & Informs it to put further processing in different thread that current thread i.e. main thread.
- map() – This does the processing of calculating squares of all numbers & map it to ParallelFlowable.
- sequential() – Merges parallel operation results & converts ParallelFlowable back to Flowable.
- blockingSubscribe() – Subscribes to Flowable & starts receiving emitted values in the same thread i.e. main thread.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
package com.itsallbinary.rxjava.tutorial; import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; import io.reactivex.parallel.ParallelFlowable; import io.reactivex.schedulers.Schedulers; public class RxJavaParallelProcessingExample { public static void main(String[] args) throws InterruptedException { ParallelFlowable<Object> flowableParallel = Flowable.create(emitter -> { // Publish 100 numbers for (int i = 0; i < 100; i++) { emitter.onNext(i); // Publish or emit a value with 10 ms delay Thread.sleep(10); } // When all values or emitted, call complete. emitter.onComplete(); }, BackpressureStrategy.BUFFER).parallel().runOn(Schedulers.computation()).map(i -> { int number = (int) i; System.out.println(Thread.currentThread().getName() + " | Sending square of " + number); // Processing - Square all numbers & provide to subscribers. return number * number; }); /* * Notice above - parallel() & runOn() combination puts further processing in * parallel threads. */ // Now here we will merge all parallel threads back in round-robin order so all // squares are printed. flowableParallel.sequential().blockingSubscribe( i -> System.out.println(Thread.currentThread().getName() + " | Received square = " + i)); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
RxComputationThreadPool-1 | Sending square of 0 RxComputationThreadPool-2 | Sending square of 1 RxComputationThreadPool-3 | Sending square of 2 RxComputationThreadPool-4 | Sending square of 3 RxComputationThreadPool-1 | Sending square of 4 RxComputationThreadPool-2 | Sending square of 5 RxComputationThreadPool-3 | Sending square of 6 RxComputationThreadPool-4 | Sending square of 7 RxComputationThreadPool-1 | Sending square of 8 . . RxComputationThreadPool-3 | Sending square of 98 RxComputationThreadPool-4 | Sending square of 99 main | Received square = 0 main | Received square = 1 main | Received square = 4 main | Received square = 9 main | Received square = 16 main | Received square = 25 main | Received square = 36 . . main | Received square = 9409 main | Received square = 9604 main | Received square = 9801 |
RxJava Async with Backpressure
During asynchronous processing, if subscriber is consuming data very slow than publisher, this situation is called as backpressure. RxJava provides a way to handle the Backpressure gracefully.
Observable – In case of Observable, there is unbounded buffer i.e. infinite buffer. So all the data published/emitted is stored & made sure that subscriber receives that. But if published data is very very huge, then it might cause OutOfMemory error eventually. Observable does not provide graceful ways to handle this backpressure.
Flowable – Flowable provides graful ways to handle backpressure. In next section, we will look at different strategies offered by Flowable. Flowable also has default buffer of 128 values.
RxJava Basics with example | Backpressure – DROP, ERROR, LATEST, MISSING | Good for beginners