In this article we will go through very basic & simple examples of Java Flow API (java.util.concurrent.Flow) which was introduced in JDK 9. Examples will help to understand different ways in which publisher & subscriber interact to perform desired operations.
Java FLOW API consists of below interfaces which are based on reactive-streams specification.
Lets look at code examples to use these interfaces.
Java Flow API Synchronous
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
package com.itsallbinary.jdk9.flow; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; public class JDK9FlowSynchronousExample { public static void main(String[] args) { // Create publisher Publisher<Integer> publisherSync = new Publisher<Integer>() { @Override public void subscribe(Subscriber<? super Integer> subscriber) { // Publish 100 numbers for (int i = 0; i < 100; i++) { System.out.println(Thread.currentThread().getName() + " | Publishing = " + i); // Publish or emit a value. subscriber.onNext(i); } // When all values or emitted, call complete. subscriber.onComplete(); } }; // Create subscriber Subscriber<Integer> subscriberSync = new Subscriber<Integer>() { @Override public void onSubscribe(Subscription subscription) { } @Override public void onNext(Integer item) { // Process received value. System.out.println(Thread.currentThread().getName() + " | Received = " + item); // 100 mills delay to simulate slow subscriber try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { } }; // Subscriber subscribing to publisher publisherSync.subscribe(subscriberSync); } } |
Subscriber takes 100 ms to complete processing. You can see that, when subscriber takes 100 ms to process, publisher is blocked during that time. Next value will be published only when subscriber processing is finished for earlier value. You can also see that both subscribing & publishing happens on same thread i.e. main thread.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
main | Publishing = 0 main | Received = 0 main | Publishing = 1 main | Received = 1 main | Publishing = 2 main | Received = 2 main | Publishing = 3 main | Received = 3 main | Publishing = 4 main | Received = 4 . . main | Publishing = 96 main | Received = 96 main | Publishing = 97 main | Received = 97 main | Publishing = 98 main | Received = 98 main | Publishing = 99 main | Received = 99 |
Java Flow API Asynchronous
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
package com.itsallbinary.jdk9.flow; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.SubmissionPublisher; public class JDK9FlowAsynchronousExample { public static void main(String[] args) throws InterruptedException { /* * Create a asynchronous publisher using * java.util.concurrent.SubmissionPublisher.SubmissionPublisher. This uses * ForkJoinPool.commonPool() for async. */ Publisher<Integer> publisher = new SubmissionPublisher<>(); // Create subscriber Subscriber<Integer> subscriber = new Subscriber<Integer>() { // Store subscription to request next value. private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { System.out.println("Subscribed"); // Store subscription this.subscription = subscription; subscription.request(1); } @Override public void onNext(Integer item) { // Process received value. System.out.println(Thread.currentThread().getName() + " | Received = " + item); // 100 mills delay to simulate slow subscriber try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } // Processing of item is done so request next value. subscription.request(1); } @Override public void onError(Throwable throwable) { System.out.println(Thread.currentThread().getName() + " | ERROR = " + throwable.getClass().getSimpleName() + " | " + throwable.getMessage()); } @Override public void onComplete() { System.out.println("Completed"); } }; // Subscriber subscribing to publisher publisher.subscribe(subscriber); // Publish 500 numbers for (int i = 0; i < 500; i++) { System.out.println(Thread.currentThread().getName() + " | Publishing = " + i); ((SubmissionPublisher<Integer>) publisher).submit(i); } // Close publisher once publishing done ((SubmissionPublisher<Integer>) publisher).close(); // Since subscriber run on different thread than main thread, keep // main thread active for 60 seconds. Thread.sleep(60000); } } |
You can see in output that thread names for publisher & subscriber are different. Also publisher is not waiting for subscriber to finish processing. SubmissionPublisher has default buffer of 256, so you can see that till 256 values publisher went without getting blocked i.e. non-blocking. But after 256, since buffer is full, publisher is blocked until buffer is freed up one-by-one.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
main | Publishing = 0 Subscribed main | Publishing = 1 main | Publishing = 2 ForkJoinPool.commonPool-worker-3 | Received = 0 main | Publishing = 3 main | Publishing = 4 main | Publishing = 5 . . main | Publishing = 256 main | Publishing = 257 ForkJoinPool.commonPool-worker-3 | Received = 1 main | Publishing = 258 ForkJoinPool.commonPool-worker-3 | Received = 2 main | Publishing = 259 ForkJoinPool.commonPool-worker-3 | Received = 3 main | Publishing = 260 . . main | Publishing = 496 ForkJoinPool.commonPool-worker-3 | Received = 240 main | Publishing = 497 ForkJoinPool.commonPool-worker-3 | Received = 241 main | Publishing = 498 main | Publishing = 499 ForkJoinPool.commonPool-worker-3 | Received = 242 ForkJoinPool.commonPool-worker-3 | Received = 243 ForkJoinPool.commonPool-worker-3 | Received = 244 ForkJoinPool.commonPool-worker-3 | Received = 245 . . ForkJoinPool.commonPool-worker-3 | Received = 495 ForkJoinPool.commonPool-worker-3 | Received = 496 ForkJoinPool.commonPool-worker-3 | Received = 497 ForkJoinPool.commonPool-worker-3 | Received = 498 ForkJoinPool.commonPool-worker-3 | Received = 499 Completed |
Java Flow API Backpressure handling (Buffer strategy)
During asynchronous processing, if subscriber is consuming data very slow than publisher, this situation is called as backpressure. There are different ways to handle such situation gracefully. Here is an example in which overflow or backpressure can be handled by buffering values.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
package com.itsallbinary.jdk9.flow; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.SubmissionPublisher; public class JDK9FlowAsyncWithBackpressureBufferExample { public static void main(String[] args) throws InterruptedException { // Choose value in power of 2. Else SubmissionPublisher will round up to nearest // value of power of 2. final int BUFFER = 16; // Create publisher with defined buffer size Publisher<Integer> publisherBkp = new SubmissionPublisher<>(ForkJoinPool.commonPool(), BUFFER); // Create Subscriber Subscriber<Integer> subscriberBkp = new Subscriber<Integer>() { // Store subscription to request next value. private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { System.out.println("Subscribed"); // Store subscription this.subscription = subscription; subscription.request(1); } @Override public void onNext(Integer item) { // Process received value. System.out.println(Thread.currentThread().getName() + " | Received = " + item); // 100 mills delay to simulate slow subscriber try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } // Processing of item is done so request next value. subscription.request(1); } @Override public void onError(Throwable throwable) { System.out.println(Thread.currentThread().getName() + " | ERROR = " + throwable.getClass().getSimpleName() + " | " + throwable.getMessage()); } @Override public void onComplete() { System.out.println("Completed"); } }; // Subscriber subscribing to publisher publisherBkp.subscribe(subscriberBkp); // Publish 100 numbers for (int i = 0; i < 100; i++) { System.out.println(Thread.currentThread().getName() + " | Publishing = " + i); ((SubmissionPublisher<Integer>) publisherBkp).submit(i); } // Close publisher once publishing done ((SubmissionPublisher<Integer>) publisherBkp).close(); // Since subscriber run on different thread than main thread, keep // main thread active for 100 seconds. Thread.sleep(100000); } } |
As you can see in below output, till buffer is full i.e. 16 values, publisher was not blocked & publishing of values continued in non-blocking mode. But after buffer is full, publisher started getting blocked until previous values are consumed by subscriber. Buffer values should be chosen wisely so that publisher is not blocked during expected backpressure.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
main | Publishing = 0 Subscribed ForkJoinPool.commonPool-worker-3 | Received = 0 main | Publishing = 1 main | Publishing = 2 main | Publishing = 3 main | Publishing = 4 . . main | Publishing = 14 main | Publishing = 15 main | Publishing = 16 main | Publishing = 17 ForkJoinPool.commonPool-worker-3 | Received = 1 main | Publishing = 18 ForkJoinPool.commonPool-worker-3 | Received = 2 main | Publishing = 19 ForkJoinPool.commonPool-worker-3 | Received = 3 main | Publishing = 20 ForkJoinPool.commonPool-worker-3 | Received = 4 . . ForkJoinPool.commonPool-worker-3 | Received = 78 main | Publishing = 95 ForkJoinPool.commonPool-worker-3 | Received = 79 main | Publishing = 96 ForkJoinPool.commonPool-worker-3 | Received = 80 main | Publishing = 97 ForkJoinPool.commonPool-worker-3 | Received = 81 main | Publishing = 98 ForkJoinPool.commonPool-worker-3 | Received = 82 main | Publishing = 99 ForkJoinPool.commonPool-worker-3 | Received = 83 ForkJoinPool.commonPool-worker-3 | Received = 84 . . ForkJoinPool.commonPool-worker-3 | Received = 96 ForkJoinPool.commonPool-worker-3 | Received = 97 ForkJoinPool.commonPool-worker-3 | Received = 98 ForkJoinPool.commonPool-worker-3 | Received = 99 Completed |
Java Flow API Backpressure handling (Drop strategy)
In this example, we will see another strategy to handle backpressure i.e. dropping values in subscriber is not able to consume values at the speed of publisher. We will also have a handler for dropped values so that appropriate handling can be done. In our example, we just call error handler so that subscriber is aware of dropped values with proper exception & message.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
package com.itsallbinary.jdk9.flow; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.SubmissionPublisher; public class JDK9FlowAsyncWithBackpressureDropExample { public static void main(String[] args) throws InterruptedException { // Create publisher with defined buffer size Publisher<Integer> publisherBkpE = new SubmissionPublisher<>(); // Create Subscriber Subscriber<Integer> subscriberBkpE = new Subscriber<Integer>() { // Store subscription to request next value. private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { System.out.println("Subscribed"); // Store subscription this.subscription = subscription; subscription.request(1); } @Override public void onNext(Integer item) { // Process received value. System.out.println(Thread.currentThread().getName() + " | Received = " + item); // 100 mills delay to simulate slow subscriber try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } // Processing of item is done so request next value. subscription.request(1); } @Override public void onError(Throwable throwable) { System.out.println(Thread.currentThread().getName() + " | ERROR = " + throwable.getClass().getSimpleName() + " | " + throwable.getMessage()); } @Override public void onComplete() { System.out.println("Completed"); } }; // Subscriber subscribing to publisher publisherBkpE.subscribe(subscriberBkpE); // Publish 500 numbers for (int i = 0; i < 500; i++) { System.out.println(Thread.currentThread().getName() + " | Publishing = " + i); ((SubmissionPublisher<Integer>) publisherBkpE).offer(i, (s, a) -> { s.onError(new Exception("Can't handle backpressure any more. Dropping value " + a)); return true; }); } // Close publisher once publishing done ((SubmissionPublisher<Integer>) publisherBkpE).close(); // Since subscriber run on different thread than main thread, keep // main thread active for 600 seconds. Thread.sleep(600000); } } |
As you can see in output, since SubmissionPublisher has default buffer of 256, values were published till 256 values. After that values started getting dropped. Towards the end of the output, you can see that subscriber only received 256 values & rest of the values were not received because they got dropped.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
main | Publishing = 0 Subscribed main | Publishing = 1 main | Publishing = 2 . . main | Publishing = 8 main | Publishing = 9 ForkJoinPool.commonPool-worker-3 | Received = 0 main | Publishing = 10 . . main | Publishing = 254 main | Publishing = 255 main | Publishing = 256 main | Publishing = 257 main | ERROR = Exception | Can't handle backpressure any more. Dropping value 257 main | Publishing = 258 main | ERROR = Exception | Can't handle backpressure any more. Dropping value 258 main | Publishing = 259 main | ERROR = Exception | Can't handle backpressure any more. Dropping value 259 main | Publishing = 260 main | ERROR = Exception | Can't handle backpressure any more. Dropping value 260 . . main | Publishing = 494 main | ERROR = Exception | Can't handle backpressure any more. Dropping value 494 main | Publishing = 495 main | ERROR = Exception | Can't handle backpressure any more. Dropping value 495 main | Publishing = 496 main | ERROR = Exception | Can't handle backpressure any more. Dropping value 496 main | Publishing = 497 main | ERROR = Exception | Can't handle backpressure any more. Dropping value 497 main | Publishing = 498 main | ERROR = Exception | Can't handle backpressure any more. Dropping value 498 main | Publishing = 499 main | ERROR = Exception | Can't handle backpressure any more. Dropping value 499 ForkJoinPool.commonPool-worker-3 | Received = 1 ForkJoinPool.commonPool-worker-3 | Received = 2 ForkJoinPool.commonPool-worker-3 | Received = 3 ForkJoinPool.commonPool-worker-3 | Received = 4 . . ForkJoinPool.commonPool-worker-3 | Received = 252 ForkJoinPool.commonPool-worker-3 | Received = 253 ForkJoinPool.commonPool-worker-3 | Received = 254 ForkJoinPool.commonPool-worker-3 | Received = 255 ForkJoinPool.commonPool-worker-3 | Received = 256 Completed |