Spring Integration Filters (XML Messages with XPath) :Lab-7

XML is popular in many messaging systems to include Spring Integration. XML describes
the data while also providing the data. When dealing with many messages, it can be
inconvenient to convert the XML messages to objects or even strings and parse the data for
relevant messages. Spring Integration already comes with a built-in XPath filter that allows
you to define a filter with an XPath expression to select only messages of interest.

You will learn more about transforms in an upcoming tutorial. There are many types of
transformer message endpoints. Spring provides one called a file-to-string transformer.
This transformer turns a file message (that is a SI message with a file as its payload) into a
string message (a SI message with a string as its payload). The string that fills the resulting
message is from the contents of the file.
Why is this important? In order for an XPath filter to accept or reject the XML in a message,
it must first be able to access the XML contents rather than the file that holds the XML.

spring-integration-xml-xpath-filter.xml

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"	xmlns:int="http://www.springframework.org/schema/integration"	xmlns:int-file="http://www.springframework.org/schema/integration/file"	xmlns:int-mail="http://www.springframework.org/schema/integration/mail"	xmlns:int-xml="http://www.springframework.org/schema/integration/xml"	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int-stream="http://www.springframework.org/schema/integration/stream"	xsi:schemaLocation="	http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd	http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd	http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd	http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd	http://www.springframework.org/schema/integration/xml http://www.springframework.org/schema/integration/xml/spring-integration-xml.xsd">	<!-- Adapter for reading files -->	<int-file:inbound-channel-adapter id="producer-file-adapter"		channel="inboundChannel" directory="/Users/dpq/Desktop/techWorrk/fileInbound"		prevent-duplicates="true">		<int:poller fixed-rate="5000" />	</int-file:inbound-channel-adapter>	<int:channel id="inboundChannel" />	<int-file:file-to-string-transformer		id="file-2-string-transformer" input-channel="inboundChannel"		output-channel="xml-inboundChannel" charset="UTF-8" />	<int:channel id="xml-inboundChannel" />	 <int-xml:xpath-filter id="xpathFilter" input-channel="xml-inboundChannel" match-type="exact" 	    output-channel="outboundChannel" xpath-expression-ref="filterXpathExp" />	 	 <int-xml:xpath-expression id="filterXpathExp" expression="//country='USA'" />	<int:channel id="outboundChannel" /> 	<!-- Adapter for writing files -->	<int-file:outbound-channel-adapter		channel="outboundChannel" id="consumer-file-adapter" directory="/Users/dpq/Desktop/techWorrk/fileOutbound" />	<int:poller id="defaultPoller" default="true"		max-messages-per-poll="5" fixed-rate="200" /></beans>

XmlXPathFilterStartup.java

package com.dpq.filter.containers;import org.springframework.context.support.ClassPathXmlApplicationContext;import org.springframework.integration.support.MessageBuilder;import org.springframework.messaging.Message;import org.springframework.messaging.MessageChannel;public class XmlXPathFilterStartup {	public static void main(String[] args) {		ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(				"spring-integration-xml-xpath-filter.xml");		System.out.println("Contanier started.....");		while(true) {					}			}}

Below are few XMLs which will be pushed into ‘/Users/dpq/Desktop/techWorrk/fileInbound

shiporder1.xml

<?xml version="1.0" encoding="UTF-8"?><shiporder orderid="889923"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:noNamespaceSchemaLocation="shiporder.xsd">  <orderperson>John Smith</orderperson>  <shipto>    <name>George Blatt</name>    <address>Elm St</address>    <city>New Orleans</city>    <country>USA</country>  </shipto>  <item>    <title>Hide your heart</title>    <quantity>2</quantity>    <price>19.80</price>  </item></shiporder>

shiporder2.xml

<?xml version="1.0" encoding="UTF-8"?><shiporder orderid="889923"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:noNamespaceSchemaLocation="shiporder.xsd">  <orderperson>John Smith</orderperson>  <shipto>    <name>Ola Nordmann</name>    <address>Langgt 23</address>    <city>4000 Stavanger</city>    <country>Norway</country>  </shipto>  <item>    <title>Empire Burlesque</title>    <note>Special Edition</note>    <quantity>1</quantity>    <price>10.90</price>  </item>  <item>    <title>Hide your heart</title>    <quantity>1</quantity>    <price>9.90</price>  </item></shiporder>

shiporder3.xml

<?xml version="1.0" encoding="UTF-8"?><shiporder orderid="889923"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:noNamespaceSchemaLocation="shiporder.xsd">   <shipto>    <name>Ola Nordmann</name>    <address>Langgt 23</address>    <city>4000 Stavanger</city>    <country>Norway</country>  </shipto>  <item>    <title>Empire Burlesque</title>    <note>Special Edition</note>    <quantity>1</quantity>    <price>10.90</price>  </item>  <item>    <title>Hide your heart</title>    <quantity>1</quantity>    <price>9.90</price>  </item></shiporder>

If we push all above XMLs, See that the accepted messages are now in the folder. Rejected messages (files with names that contain with Country tag value is ‘USA’ have been filtered and processed

Popular posts from this blog

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Complex SQL: fetch the users who logged in consecutively 3 or more times (lead perfect example)

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)