Spring Integration Channels Pollable :Lab-2
Subscribable channels don’t buffer messages and deliver the messages to any and all
subscribers. Pollable channels, can buffer messages and deliver the message to a single
subscriber. If there are more than one subscribers, it picks the first subscriber and skips
the others. In this step, you replace the subscriber channel with a pollable channel to see
the effect on the SI application.
We will have same example of lab1 with minor changes. those changes are
Remove Subscribable channel and add Pollable channel
Remove channel:
<int:publish-subscribe-channel id=”messageChannel” />
Add Channel:
<int:channel id=”messageChannel” >
<int:channel id="messageChannel" > <int:queue capacity="2"/></int:channel>
So all xmls would be like below:
spring-integration-channels.xml
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:int="http://www.springframework.org/schema/integration" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int-stream="http://www.springframework.org/schema/integration/stream" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd"> <!-- message producer / a Spring Integration wrapped Java Standard input stream --> <int-stream:stdin-channel-adapter id="producer" channel="messageChannel" /> <!-- a pair of message consumers / a pair of Spring Integration wrapped Java Standard output streams --> <int-stream:stdout-channel-adapter id="consumer1" channel="messageChannel" append-newline="true" /> <int-stream:stdout-channel-adapter id="consumer2" channel="messageChannel" append-newline="true" /> <int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-rate="200" /> <!-- a pub/sub message channel --> <!--<int:publish-subscribe-channel id="messageChannel" />--> <int:channel id="messageChannel" > <int:queue capacity="2"/> </int:channel></beans>
Main container loader class which will load all the beans
Startup.java
import org.springframework.context.support.ClassPathXmlApplicationContext;public class Startup { @SuppressWarnings({ "resource", "unused" }) // @SuppressWarnings({ "resource" }) public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "/META-INF/spring/si-components.xml"); while (true) { } }}
Retest the application. With the pollable channel in place of the subscribable
channel, rerun the application.
The application is now running awaiting your text input. In the Console view, again
enter some text and then hit the Enter key. A text message created from the text you enter
into the Standard Input will immediately be marshalled into a text message and entered into
the pollable channel. This time, however, the message is only delivered to a single (the first)
consumer of the messages from the channel. Note that the text is only echoed one time.
log4j:WARN No appenders could be found for logger (org.springframework.core.env.StandardEnvironment).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Contanier started…..
Deepak
Deepak
Garv
Garv
Above Source code can be found in below GIT repository
Now it is clear with output text is echoed one time, but question still remains in mind how messages are buffered with Pollable channel, so In order to check that we will follow below changes.
Remove the Producer and Consumer. In order to see the buffering of messages, it
is necessary to change the producers and consumers. By default, they
automatically publish and pull messages from the pollable channel without giving
you a chance to see the messages stack up in the queue associated with the
channel. In this step, you use the Statup.java file to publish messages onto the
channel. Without a consumer, this will allow you to see messages stack up (and
fill) in the pollable channel’s queue.
So we will comment producer and consumer channels from below XML
spring-integration-channels.xml
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:int="http://www.springframework.org/schema/integration" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int-stream="http://www.springframework.org/schema/integration/stream" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd"> <int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-rate="200" /> <!-- a pub/sub message channel --> <int:publish-subscribe-channel id="messageChannel" /> <int:channel id="messageChannel" > <int:queue capacity="2"/> </int:channel></beans>
Startup.java
import org.springframework.context.support.ClassPathXmlApplicationContext;public class Startup { @SuppressWarnings({ "resource", "unused" }) // @SuppressWarnings({ "resource" }) public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "/META-INF/spring/si-components.xml"); MessageChannel channel = context.getBean("messageChannel", MessageChannel.class); Message<String> message1 = MessageBuilder.withPayload( "Hello world - one!").build(); Message<String> message2 = MessageBuilder.withPayload( "Hello world - two!").build(); Message<String> message3 = MessageBuilder.withPayload( "Hello world - three!").build(); System.out.println("sending message1"); channel.send(message1); System.out.println("sending message2"); channel.send(message2); System.out.println("sending message3"); channel.send(message3); System.out.println("done sending messages"); }}
Retest the application.
This time, the application does not require your input (the messages are added to
the pollable channel by the Startup code). Look in the Console view to see what is
happening.
log4j:WARN No appenders could be found for logger (org.springframework.core.env.StandardEnvironment).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Contanier started…..
sending message1
sending message2
sending message3
Notice that the System outputs show the sending of messages, but not the last output before
the application ends (the “done sending messages” text). Why? The queue is only two deep.
The application is waiting for a consumer to empty a message out of the channel so the last
message can be placed into the channel. The pollable channels are buffered, but only as big
as you set their capacity.
Source code can be seen in below GIT repository
https://github.com/techstudioonline/spring-integration/tree/master/src