Java 9 Reactive Streams
Reactive Streams is about asynchronous processing of stream, so there should be a Publisher and a Subscriber. The Publisher publishes the stream of data and the Subscriber consumes the data.

Sometimes we have to transform the data between Publisher and Subscriber. Processor is the entity sitting between the end publisher and subscriber to transform the data received from publisher so that subscriber can understand it. We can have a chain of processors.

It’s very clear from the above image that Processor works both as Subscriber and a Publisher.
Java 9 Flow API
Java 9 Flow API implements the Reactive Streams Specification. Flow API is a combination of Iterator and Observer pattern. Iterator works on pull model where application pulls items from the source, whereas Observer works on push model and reacts when item is pushed from source to application.

Java 9 Flow API subscriber can request for N items while subscribing to the publisher. Then the items are pushed from publisher to subscriber until there are no more items left to push or some error occurs.
Java 9 Flow API Classes and Interfaces
Let’s have a quick look at Flow API classes and interfaces.
java.util.concurrent.Flow
: This is the main class of Flow API. This class encapsulates all the important interfaces of the Flow API. This is a final class and we can’t extend it.java.util.concurrent.Flow.Publisher
: This is a functional interface and every publisher has to implement it’s subscribe method to add the given subscriber to receive messages.java.util.concurrent.Flow.Subscriber
: Every subscriber has to implement this interface. The methods in the subscriber are invoked in strict sequential order. There are four methods in this interface:onSubscribe
: This is the first method to get invoked when subscriber is subscribed to receive messages by publisher. Usually we invokesubscription.request
to start receiving items from processor.onNext
: This method gets invoked when an item is received from publisher, this is where we implement our business logic to process the stream and then request for more data from publisher.onError
: This method is invoked when an irrecoverable error occurs, we can do cleanup taks in this method, such as closing database connection.onComplete
: This is like finally method and gets invoked when no other items are being produced by publisher and publisher is closed. We can use it to send notification of successful processing of stream.
java.util.concurrent.Flow.Subscription
: This is used to create asynchronous non-blocking link between publisher and subscriber. Subscriber invokes itsrequest
method to demand items from publisher. It also hascancel
method to cancel the subscription i.e. closing the link between publisher and subscriber.java.util.concurrent.Flow.Processor
: This interface extends both Publisher and Subscriber, this is used to transform the message between publisher and subscriber.java.util.concurrent.SubmissionPublisher
: A Publisher implementation that asynchronously issues submitted items to current subscribers until it is closed. It uses Executor framework We will use this class in reactive stream examples to add subscriber and then submit items to them.
Java 9 Reactive Stream Example
Let’s start with a simple example where we will implement Flow API Subscriber interface and use SubmissionPublisher to create publisher and send messages.
Stream Data
Let’s say we have an Employee class that will be used to create the stream message to be sent from publisher to subscriber.
package com.dpq.reactive.beans;public class Employee { private int id; private String name; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Employee(int i, String s) { this.id = i; this.name = s; } public Employee() { } @Override public String toString() { return "[id="+id+",name="+name+"]"; }}
We also have a utility class to create a list of employees for our example.
package com.dpq.reactive_streams;import java.util.ArrayList;import java.util.List;import com.dpq.reactive.beans.Employee;public class EmpHelper { public static List<Employee> getEmps() { Employee e1 = new Employee(1, "Pankaj"); Employee e2 = new Employee(2, "David"); Employee e3 = new Employee(3, "Lisa"); Employee e4 = new Employee(4, "Ram"); Employee e5 = new Employee(5, "Anupam"); List<Employee> emps = new ArrayList<>(); emps.add(e1); emps.add(e2); emps.add(e3); emps.add(e4); emps.add(e5); return emps; }}
Subscriber
package com.dpq.reactive_streams;import java.util.concurrent.Flow.Subscriber;import java.util.concurrent.Flow.Subscription;import com.dpq.reactive.beans.Employee;public class MySubscriber implements Subscriber<Employee> { private Subscription subscription; private int counter = 0; @Override public void onSubscribe(Subscription subscription) { System.out.println("Subscribed"); this.subscription = subscription; this.subscription.request(1); //requesting data from publisher System.out.println("onSubscribe requested 1 item"); } @Override public void onNext(Employee item) { System.out.println("Processing Employee "+item); counter++; this.subscription.request(1); } @Override public void onError(Throwable e) { System.out.println("Some error happened"); e.printStackTrace(); } @Override public void onComplete() { System.out.println("All Processing Done"); } public int getCounter() { return counter; }}
Subscription
variable to keep reference so that request can be made inonNext
method.counter
variable to keep count of number of items processed, notice that it’s value is increased in onNext method. This will be used in our main method to wait for execution to finish before ending the main thread.- Subscription request is invoked in
onSubscribe
method to start the processing. Also notice that it’s again called inonNext
method after processing the item, demanding next item to process from the publisher. onError
andonComplete
doesn’t have much here, but in real world scenario they should be used to perform corrective measures when error occurs or cleanup of resources when processing completes successfully.
Reactive Stream Test Program
We will use SubmissionPublisher
as Publisher for our examples, so let’s look at the test program for our reactive stream implementation.
package com.dpq.reactive_streams;import java.util.List;import java.util.concurrent.SubmissionPublisher;import com.dpq.reactive.beans.Employee;public class MyReactiveApp { public static void main(String args[]) throws InterruptedException { // Create Publisher SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>(); // Register Subscriber MySubscriber subs = new MySubscriber(); publisher.subscribe(subs); List<Employee> emps = EmpHelper.getEmps(); // Publish items System.out.println("Publishing Items to Subscriber"); emps.stream().forEach(i -> publisher.submit(i)); // logic to wait till processing of all messages are over while (emps.size() != subs.getCounter()) { Thread.sleep(10); } // close the Publisher publisher.close(); System.out.println("Exiting the app"); }}
The most important piece of above code is subscribe
and submit
methods invocation of publisher. We should always close publisher to avoid any memory leaks.
We will get following output when above program is executed.
SubscribedPublishing Items to SubscriberonSubscribe requested 1 itemProcessing Employee [id=1,name=Pankaj]Processing Employee [id=2,name=David]Processing Employee [id=3,name=Lisa]Processing Employee [id=4,name=Ram]Processing Employee [id=5,name=Anupam]Exiting the appAll Processing Done
Note that if we won’t have logic for main method to wait before all the items are processed, then we will get unwanted results.
Message Transformation Example
Processor is used to transform the message between a publisher and subscriber. Let’s say we have another subscriber which is expecting a different type of message to process. Let’s say this new message type is Freelancer
.
package com.dpq.reactive.beans;public class Freelancer extends Employee { private int fid; public int getFid() { return fid; } public void setFid(int fid) { this.fid = fid; } public Freelancer(int id, int fid, String name) { super(id, name); this.fid = fid; } @Override public String toString() { return "[id="+super.getId()+",name="+super.getName()+",fid="+fid+"]"; }}
We have a new subscriber to consume Freelancer stream data.
package com.dpq.reactive_streams;import java.util.concurrent.Flow.Subscriber;import java.util.concurrent.Flow.Subscription;public class MyFreelancerSubscriber implements Subscriber<Freelancer> { private Subscription subscription; private int counter = 0; @Override public void onSubscribe(Subscription subscription) { System.out.println("Subscribed for Freelancer"); this.subscription = subscription; this.subscription.request(1); //requesting data from publisher System.out.println("onSubscribe requested 1 item for Freelancer"); } @Override public void onNext(Freelancer item) { System.out.println("Processing Freelancer "+item); counter++; this.subscription.request(1); } @Override public void onError(Throwable e) { System.out.println("Some error happened in MyFreelancerSubscriber"); e.printStackTrace(); } @Override public void onComplete() { System.out.println("All Processing Done for MyFreelancerSubscriber"); } public int getCounter() { return counter; }}
Processor
The important part is the implementation of Processor
interface. Since we want to utilize the SubmissionPublisher
, we would extend it and use it wherever applicable.
package com.dpq.reactive_streams;import java.util.concurrent.Flow.Processor;import java.util.concurrent.Flow.Subscription;import java.util.concurrent.SubmissionPublisher;import java.util.function.Function;import com.dpq.reactive.beans.Employee;import com.dpq.reactive.beans.Freelancer;public class MyProcessor extends SubmissionPublisher<Freelancer> implements Processor<Employee, Freelancer> { private Subscription subscription; private Function<Employee,Freelancer> function; public MyProcessor(Function<Employee,Freelancer> function) { super(); this.function = function; } @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(Employee emp) { submit((Freelancer) function.apply(emp)); subscription.request(1); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onComplete() { System.out.println("Done"); }}
Function
will be used to convert Employee object to Freelancer object.- We will convert incoming Employee message to Freelancer message in
onNext
method and then use SubmissionPublisher submit method to send it to the subscriber. - Since Processor works as both subscriber and publisher, we can create a chain of processors between end publishers and subscribers.
Message Transformation Test
package com.dpq.reactive_streams;import java.util.List;import java.util.concurrent.SubmissionPublisher;import com.dpq.reactive.beans.Employee;import com.dpq.reactive.beans.Freelancer;public class MyReactiveAppWithProcessor { public static void main(String[] args) throws InterruptedException { // Create End Publisher SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>(); // Create Processor MyProcessor transformProcessor = new MyProcessor(s -> { return new Freelancer(s.getId(), s.getId() + 100, s.getName()); }); //Create End Subscriber MyFreelancerSubscriber subs = new MyFreelancerSubscriber(); //Create chain of publisher, processor and subscriber publisher.subscribe(transformProcessor); // publisher to processor transformProcessor.subscribe(subs); // processor to subscriber List<Employee> emps = EmpHelper.getEmps(); // Publish items System.out.println("Publishing Items to Subscriber"); emps.stream().forEach(i -> publisher.submit(i)); // Logic to wait for messages processing to finish while (emps.size() != subs.getCounter()) { Thread.sleep(10); } // Closing publishers publisher.close(); transformProcessor.close(); System.out.println("Exiting the app"); }}
Read the comments in the program to properly understand it, most important change is the creation of producer-processor-subscriber chain. We will get following output when above program is executed.
Subscribed for FreelancerPublishing Items to SubscriberonSubscribe requested 1 item for FreelancerProcessing Freelancer [id=1,name=Pankaj,fid=101]Processing Freelancer [id=2,name=David,fid=102]Processing Freelancer [id=3,name=Lisa,fid=103]Processing Freelancer [id=4,name=Ram,fid=104]Processing Freelancer [id=5,name=Anupam,fid=105]Exiting the appAll Processing Done for MyFreelancerSubscriberDone
Cancel Subscription
We can use Subscription cancel method to stop receiving message in subscriber. Note that if we cancel the subscription, then subscriber will not receive onComplete or onError signal.
Here is a sample code where subscriber is consuming only 3 messages and then canceling the subscription.
@Overridepublic void onNext(Employee item) { System.out.println("Processing Employee "+item); counter++; if(counter==3) { this.subscription.cancel(); return; } this.subscription.request(1);}
Note that in this case, our logic to halt main thread before all the messages are processed will go into infinite loop. We can add some additional logic for this scenario, may be some global variable to look for if subscriber has stopped processing or canceled subscription.
Back Pressure
When publisher is producing messages in much faster rate than it’s being consumed by subscriber, back pressure gets built. Flow API doesn’t provide any mechanism to signal about back pressure or to deal with it. But we can devise our own strategy to deal with it, such as fine tuning the subscriber or reducing the message producing rate.
Summary
Java 9 Flow API is a good move towards reactive programming and to create asynchronous non-blocking application. However creating a true reactive application is possible only when all the systems API support it.