How to connect HornetQ message broker with RESTful service JAX-RS Jersey
Your webservice interface needs to offer two functions: submit
and poll
.
- The client calls
submit
which in turn sends the request as a JMS message on the incoming queue, and returns the message ID.submit
returns after that. - The request processing runs asynchronously. For example a message driven bean (MDB) listens on the incoming queue, processes the message, and puts the result on an outgoing queue. It sets the correlation ID of the result message to the message ID of the incoming request. The correlation ID is required to connect the request and its result.
- The client calls the
poll
function with the message ID (as returned bysubmit
) as an argument:poll
checks the result queue using a JMS message selector on the correlation ID. It returns "not yet completed" or the result. The client possibly needs to callpoll
multiple times to get the result.
Notes:
- A JMS message selector is a filter (like a where clause in a database): It is required in this scenario to identify the result for a given request.
- Javadoc of setJMSCorrelationID
Where should I close my activemq connection (java, jersey)
Implement the
ServletContextListener: http://tomcat.apache.org/tomcat-7.0-doc/servletapi/javax/servlet/ServletContextListener.html
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;
public class MyContextListener implements ServletContextListener {
public final static String ACTIVE_MQ_SESSION = "ActiveMQSession";
public final static String ACTIVE_MQ_PRODUCER = "ActiveMQProducer";
Logger logger = Logger.getLogger(this.getClass());
private static final int ackMode = Session.AUTO_ACKNOWLEDGE;
private static final boolean transacted = false;
private static final String brokerUrl = "vm://localhost:61616";
private Connection connection;
private Session session;
private MessageProducer producer;
@Override
public void contextDestroyed(ServletContextEvent sce) {
try {
this.producer.close();
this.session.close();
this.connection.close();
} catch (JMSException e) {
logger.warn("tearDown()", e);
}
}
@Override
public void contextInitialized(ServletContextEvent sce) {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
brokerUrl);
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(transacted, ackMode);
Destination destination = session.createQueue("queue");
producer = session.createProducer(destination);
ServletContext sc = sce.getServletContext();
sc.setAttribute(ACTIVE_MQ_SESSION, session);
sc.setAttribute(ACTIVE_MQ_PRODUCER, producer);
} catch (JMSException e) {
logger.warn("setup() failed to setup connection brokerUrl="
+ brokerUrl);
}
}
}
Register the listener in web.xml:
<web-app...>
<listener>
<listener-class>package.MyContextListener</listener-class>
</listener>
</web-app>
and then the servlet (from where you use the producer and session):
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.servlet.http.HttpServlet;
import javax.ws.rs.Consumes;
import javax.ws.rs.PUT;
import javax.ws.rs.core.MediaType;
public class MessageServlet extends HttpServlet {
@PUT
@Consumes(MediaType.TEXT_PLAIN)
public void putString(String myString) throws JMSException {
MessageProducer producer = (MessageProducer) getServletContext()
.getAttribute(MyContextListener.ACTIVE_MQ_PRODUCER);
Session session = (Session) getServletContext().getAttribute(
MyContextListener.ACTIVE_MQ_SESSION);
TextMessage txtmessage = session.createTextMessage();
if (txtmessage != null && producer != null) {
txtmessage.clearBody();
txtmessage.setText(myString);
producer.send(txtmessage);
}
}
}
Difference between JMS and Web Service
JMS is an API which abstracts messaging middleware, like ActiveMQ or IBM MQSeries.
Messaging middleware has a store-and-forward paradigm and asynchronous message passing, while web services tend to promote a synchronous procedure calling paradigm. In distributed systems where a lot can go wrong, dealing with things asynchronously tend to focus the mind better to the things you need to do when part of the system is not available or poorly performing and the code needed to deal with that tends to be a lot less complicated.
Clustering parts become trivial if you have multiple servers listening on the same queue, parallelism and load balancing is for free in this case.
Personally I find JMS much easier to work with and more robust and reliable than web services, but the messaging middleware must support all platforms you want to use. If all the components who need to talk to each other are under your control, I would give a messaging middleware with a JMS interface serious consideration.
If the other party is external then probably Web Services rule, and in that case you could think of a using thin layer to convert the external web service to an internal message passing infrastructure so you still have the most of the advantages.
If it is "just slapping an remote API on a webapp" then of course it does not pay either to setup asynch messaging.
Failover support for activemq REST api
From the REST client perspective, you'll need to abstract the requests (proxy URL) or handle the failover of the client connection using 3rd party libraries (see HttpClient failover support, etc.)
If you go with the abstraction approach, then consider doing one of the following to proxy the requests to AMQ and provide failover support...
- can you use Camel load balancer which supports endpoint failover support
- use a 3rd party load balancer (Apache mod_proxy, perlbal, etc)
Related Topics
Using File.Listfiles with Filenameextensionfilter
How to Change Webservice Url Endpoint
Exposed Beyond App Through Clipdata.Item.Geturi
Change Position of Google Maps API's "My Location" Button
:App:Lintvitalrelease' Error When Generating Signed APK
Android Recyclerview Scrolling Performance
How to Implement a Fileobserver from an Android Service
How to Save an Image in Android Q Using Mediastore
Wait Until Firebase Retrieves Data
Why Doesn't Java Throw an Exception When Dividing by 0.0
Math.Random, Only Generating a 0
JPA Native Query Select and Cast Object
Getextractedtext on Inactive Inputconnection Warning on Android
Intellij Idea with Junit 4.7 "!!! Junit Version 3.8 or Later Expected:"
Where to Stop/Destroy Threads in Android Service Class
How to Use Weakreference in Java and Android Development
Android Microsoft Office Library (.Doc, .Docx, .Xls, .Ppt, etc.)