Java 9 Reactive Streams

Reactive Streams is about asynchronous processing of stream, so there should be a Publisher and a Subscriber. The Publisher publishes the stream of data and the Subscriber consumes the data.

java 9 reactive streams

Sometimes we have to transform the data between Publisher and Subscriber. Processor is the entity sitting between the end publisher and subscriber to transform the data received from publisher so that subscriber can understand it. We can have a chain of processors.

java 9 processor publisher subscriber

It’s very clear from the above image that Processor works both as Subscriber and a Publisher.

Java 9 Flow API

Java 9 Flow API implements the Reactive Streams Specification. Flow API is a combination of Iterator and Observer pattern. Iterator works on pull model where application pulls items from the source, whereas Observer works on push model and reacts when item is pushed from source to application.

Java 9 Flow API

Java 9 Flow API subscriber can request for N items while subscribing to the publisher. Then the items are pushed from publisher to subscriber until there are no more items left to push or some error occurs.

Java 9 Flow API Classes and Interfaces

Let’s have a quick look at Flow API classes and interfaces.

  • java.util.concurrent.Flow: This is the main class of Flow API. This class encapsulates all the important interfaces of the Flow API. This is a final class and we can’t extend it.
  • java.util.concurrent.Flow.Publisher: This is a functional interface and every publisher has to implement it’s subscribe method to add the given subscriber to receive messages.
  • java.util.concurrent.Flow.Subscriber: Every subscriber has to implement this interface. The methods in the subscriber are invoked in strict sequential order. There are four methods in this interface:
    1. onSubscribe: This is the first method to get invoked when subscriber is subscribed to receive messages by publisher. Usually we invoke subscription.request to start receiving items from processor.
    2. onNext: This method gets invoked when an item is received from publisher, this is where we implement our business logic to process the stream and then request for more data from publisher.
    3. onError: This method is invoked when an irrecoverable error occurs, we can do cleanup taks in this method, such as closing database connection.
    4. onComplete: This is like finally method and gets invoked when no other items are being produced by publisher and publisher is closed. We can use it to send notification of successful processing of stream.
  • java.util.concurrent.Flow.Subscription: This is used to create asynchronous non-blocking link between publisher and subscriber. Subscriber invokes its request method to demand items from publisher. It also has cancel method to cancel the subscription i.e. closing the link between publisher and subscriber.
  • java.util.concurrent.Flow.Processor: This interface extends both Publisher and Subscriber, this is used to transform the message between publisher and subscriber.
  • java.util.concurrent.SubmissionPublisher: A Publisher implementation that asynchronously issues submitted items to current subscribers until it is closed. It uses Executor framework We will use this class in reactive stream examples to add subscriber and then submit items to them.

Java 9 Reactive Stream Example

Let’s start with a simple example where we will implement Flow API Subscriber interface and use SubmissionPublisher to create publisher and send messages.

Stream Data

Let’s say we have an Employee class that will be used to create the stream message to be sent from publisher to subscriber.

package com.dpq.reactive.beans;public class Employee {	private int id;	private String name;		public int getId() {		return id;	}	public void setId(int id) {		this.id = id;	}	public String getName() {		return name;	}	public void setName(String name) {		this.name = name;	}		public Employee(int i, String s) {		this.id = i;		this.name = s;	}		public Employee() {	}		@Override	public String toString() {		return "[id="+id+",name="+name+"]";	}}

We also have a utility class to create a list of employees for our example.

package com.dpq.reactive_streams;import java.util.ArrayList;import java.util.List;import com.dpq.reactive.beans.Employee;public class EmpHelper {	public static List<Employee> getEmps() {		Employee e1 = new Employee(1, "Pankaj");		Employee e2 = new Employee(2, "David");		Employee e3 = new Employee(3, "Lisa");		Employee e4 = new Employee(4, "Ram");		Employee e5 = new Employee(5, "Anupam");				List<Employee> emps = new ArrayList<>();		emps.add(e1);		emps.add(e2);		emps.add(e3);		emps.add(e4);		emps.add(e5);				return emps;	}}

Subscriber

package com.dpq.reactive_streams;import java.util.concurrent.Flow.Subscriber;import java.util.concurrent.Flow.Subscription;import com.dpq.reactive.beans.Employee;public class MySubscriber implements Subscriber<Employee> {	private Subscription subscription;		private int counter = 0;		@Override	public void onSubscribe(Subscription subscription) {		System.out.println("Subscribed");		this.subscription = subscription;		this.subscription.request(1); //requesting data from publisher		System.out.println("onSubscribe requested 1 item");	}	@Override	public void onNext(Employee item) {		System.out.println("Processing Employee "+item);		counter++;		this.subscription.request(1);	}	@Override	public void onError(Throwable e) {		System.out.println("Some error happened");		e.printStackTrace();	}	@Override	public void onComplete() {		System.out.println("All Processing Done");	}	public int getCounter() {		return counter;	}}
  • Subscription variable to keep reference so that request can be made in onNext method.
  • counter variable to keep count of number of items processed, notice that it’s value is increased in onNext method. This will be used in our main method to wait for execution to finish before ending the main thread.
  • Subscription request is invoked in onSubscribe method to start the processing. Also notice that it’s again called in onNext method after processing the item, demanding next item to process from the publisher.
  • onError and onComplete doesn’t have much here, but in real world scenario they should be used to perform corrective measures when error occurs or cleanup of resources when processing completes successfully.

Reactive Stream Test Program

We will use SubmissionPublisher as Publisher for our examples, so let’s look at the test program for our reactive stream implementation.

package com.dpq.reactive_streams;import java.util.List;import java.util.concurrent.SubmissionPublisher;import com.dpq.reactive.beans.Employee;public class MyReactiveApp {	public static void main(String args[]) throws InterruptedException {		// Create Publisher		SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();		// Register Subscriber		MySubscriber subs = new MySubscriber();		publisher.subscribe(subs);		List<Employee> emps = EmpHelper.getEmps();		// Publish items		System.out.println("Publishing Items to Subscriber");		emps.stream().forEach(i -> publisher.submit(i));		// logic to wait till processing of all messages are over		while (emps.size() != subs.getCounter()) {			Thread.sleep(10);		}		// close the Publisher		publisher.close();		System.out.println("Exiting the app");	}}

The most important piece of above code is subscribe and submit methods invocation of publisher. We should always close publisher to avoid any memory leaks.

We will get following output when above program is executed.

SubscribedPublishing Items to SubscriberonSubscribe requested 1 itemProcessing Employee [id=1,name=Pankaj]Processing Employee [id=2,name=David]Processing Employee [id=3,name=Lisa]Processing Employee [id=4,name=Ram]Processing Employee [id=5,name=Anupam]Exiting the appAll Processing Done

Note that if we won’t have logic for main method to wait before all the items are processed, then we will get unwanted results.

Message Transformation Example

Processor is used to transform the message between a publisher and subscriber. Let’s say we have another subscriber which is expecting a different type of message to process. Let’s say this new message type is Freelancer.

package com.dpq.reactive.beans;public class Freelancer extends Employee {	private int fid;	public int getFid() {		return fid;	}	public void setFid(int fid) {		this.fid = fid;	}		public Freelancer(int id, int fid, String name) {		super(id, name);		this.fid = fid;	}		@Override	public String toString() {		return "[id="+super.getId()+",name="+super.getName()+",fid="+fid+"]";	}}

We have a new subscriber to consume Freelancer stream data.

package com.dpq.reactive_streams;import java.util.concurrent.Flow.Subscriber;import java.util.concurrent.Flow.Subscription;public class MyFreelancerSubscriber implements Subscriber<Freelancer> {	private Subscription subscription;		private int counter = 0;		@Override	public void onSubscribe(Subscription subscription) {		System.out.println("Subscribed for Freelancer");		this.subscription = subscription;		this.subscription.request(1); //requesting data from publisher		System.out.println("onSubscribe requested 1 item for Freelancer");	}	@Override	public void onNext(Freelancer item) {		System.out.println("Processing Freelancer "+item);		counter++;		this.subscription.request(1);	}	@Override	public void onError(Throwable e) {		System.out.println("Some error happened in MyFreelancerSubscriber");		e.printStackTrace();	}	@Override	public void onComplete() {		System.out.println("All Processing Done for MyFreelancerSubscriber");	}	public int getCounter() {		return counter;	}}

Processor

The important part is the implementation of Processor interface. Since we want to utilize the SubmissionPublisher, we would extend it and use it wherever applicable.

package com.dpq.reactive_streams;import java.util.concurrent.Flow.Processor;import java.util.concurrent.Flow.Subscription;import java.util.concurrent.SubmissionPublisher;import java.util.function.Function;import com.dpq.reactive.beans.Employee;import com.dpq.reactive.beans.Freelancer;public class MyProcessor extends SubmissionPublisher<Freelancer> implements Processor<Employee, Freelancer> {	private Subscription subscription;	private Function<Employee,Freelancer> function;		public MyProcessor(Function<Employee,Freelancer> function) {  	    super();  	    this.function = function;  	  }  		@Override	public void onSubscribe(Subscription subscription) {		this.subscription = subscription;		subscription.request(1);	}	@Override	public void onNext(Employee emp) {		submit((Freelancer) function.apply(emp));  	    subscription.request(1);  	}	@Override	public void onError(Throwable e) {		e.printStackTrace();	}	@Override	public void onComplete() {		System.out.println("Done");	}}
  • Function will be used to convert Employee object to Freelancer object.
  • We will convert incoming Employee message to Freelancer message in onNext method and then use SubmissionPublisher submit method to send it to the subscriber.
  • Since Processor works as both subscriber and publisher, we can create a chain of processors between end publishers and subscribers.

Message Transformation Test

package com.dpq.reactive_streams;import java.util.List;import java.util.concurrent.SubmissionPublisher;import com.dpq.reactive.beans.Employee;import com.dpq.reactive.beans.Freelancer;public class MyReactiveAppWithProcessor {	public static void main(String[] args) throws InterruptedException {		// Create End Publisher		SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();		// Create Processor		MyProcessor transformProcessor = new MyProcessor(s -> {			return new Freelancer(s.getId(), s.getId() + 100, s.getName());		});		//Create End Subscriber		MyFreelancerSubscriber subs = new MyFreelancerSubscriber();		//Create chain of publisher, processor and subscriber		publisher.subscribe(transformProcessor); // publisher to processor		transformProcessor.subscribe(subs); // processor to subscriber		List<Employee> emps = EmpHelper.getEmps();		// Publish items		System.out.println("Publishing Items to Subscriber");		emps.stream().forEach(i -> publisher.submit(i));		// Logic to wait for messages processing to finish		while (emps.size() != subs.getCounter()) {			Thread.sleep(10);		}		// Closing publishers		publisher.close();		transformProcessor.close();		System.out.println("Exiting the app");	}}

Read the comments in the program to properly understand it, most important change is the creation of producer-processor-subscriber chain. We will get following output when above program is executed.

Subscribed for FreelancerPublishing Items to SubscriberonSubscribe requested 1 item for FreelancerProcessing Freelancer [id=1,name=Pankaj,fid=101]Processing Freelancer [id=2,name=David,fid=102]Processing Freelancer [id=3,name=Lisa,fid=103]Processing Freelancer [id=4,name=Ram,fid=104]Processing Freelancer [id=5,name=Anupam,fid=105]Exiting the appAll Processing Done for MyFreelancerSubscriberDone

Cancel Subscription

We can use Subscription cancel method to stop receiving message in subscriber. Note that if we cancel the subscription, then subscriber will not receive onComplete or onError signal.

Here is a sample code where subscriber is consuming only 3 messages and then canceling the subscription.

@Overridepublic void onNext(Employee item) {	System.out.println("Processing Employee "+item);	counter++;	if(counter==3) {		this.subscription.cancel();		return;	}	this.subscription.request(1);}

Note that in this case, our logic to halt main thread before all the messages are processed will go into infinite loop. We can add some additional logic for this scenario, may be some global variable to look for if subscriber has stopped processing or canceled subscription.

Back Pressure

When publisher is producing messages in much faster rate than it’s being consumed by subscriber, back pressure gets built. Flow API doesn’t provide any mechanism to signal about back pressure or to deal with it. But we can devise our own strategy to deal with it, such as fine tuning the subscriber or reducing the message producing rate.

Summary

Java 9 Flow API is a good move towards reactive programming and to create asynchronous non-blocking application. However creating a true reactive application is possible only when all the systems API support it.

Popular posts from this blog

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Complex SQL: fetch the users who logged in consecutively 3 or more times (lead perfect example)

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)