Jersey Rest Web Service with Activemq Middleware Integration

How to connect HornetQ message broker with RESTful service JAX-RS Jersey

Your webservice interface needs to offer two functions: submit and poll.

  • The client calls submit which in turn sends the request as a JMS message on the incoming queue, and returns the message ID. submit returns after that.
  • The request processing runs asynchronously. For example a message driven bean (MDB) listens on the incoming queue, processes the message, and puts the result on an outgoing queue. It sets the correlation ID of the result message to the message ID of the incoming request. The correlation ID is required to connect the request and its result.
  • The client calls the poll function with the message ID (as returned by submit) as an argument: poll checks the result queue using a JMS message selector on the correlation ID. It returns "not yet completed" or the result. The client possibly needs to call poll multiple times to get the result.

Notes:

  • A JMS message selector is a filter (like a where clause in a database): It is required in this scenario to identify the result for a given request.
  • Javadoc of setJMSCorrelationID

Where should I close my activemq connection (java, jersey)

Implement the

ServletContextListener: http://tomcat.apache.org/tomcat-7.0-doc/servletapi/javax/servlet/ServletContextListener.html

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;

public class MyContextListener implements ServletContextListener {
public final static String ACTIVE_MQ_SESSION = "ActiveMQSession";
public final static String ACTIVE_MQ_PRODUCER = "ActiveMQProducer";

Logger logger = Logger.getLogger(this.getClass());
private static final int ackMode = Session.AUTO_ACKNOWLEDGE;
private static final boolean transacted = false;

private static final String brokerUrl = "vm://localhost:61616";

private Connection connection;
private Session session;
private MessageProducer producer;

@Override
public void contextDestroyed(ServletContextEvent sce) {
try {
this.producer.close();
this.session.close();
this.connection.close();
} catch (JMSException e) {
logger.warn("tearDown()", e);
}

}

@Override
public void contextInitialized(ServletContextEvent sce) {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
brokerUrl);

try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(transacted, ackMode);
Destination destination = session.createQueue("queue");
producer = session.createProducer(destination);

ServletContext sc = sce.getServletContext();
sc.setAttribute(ACTIVE_MQ_SESSION, session);
sc.setAttribute(ACTIVE_MQ_PRODUCER, producer);
} catch (JMSException e) {
logger.warn("setup() failed to setup connection brokerUrl="
+ brokerUrl);
}
}

}

Register the listener in web.xml:

<web-app...>
<listener>
<listener-class>package.MyContextListener</listener-class>
</listener>
</web-app>

and then the servlet (from where you use the producer and session):

import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.servlet.http.HttpServlet;
import javax.ws.rs.Consumes;
import javax.ws.rs.PUT;
import javax.ws.rs.core.MediaType;

public class MessageServlet extends HttpServlet {

@PUT
@Consumes(MediaType.TEXT_PLAIN)
public void putString(String myString) throws JMSException {
MessageProducer producer = (MessageProducer) getServletContext()
.getAttribute(MyContextListener.ACTIVE_MQ_PRODUCER);

Session session = (Session) getServletContext().getAttribute(
MyContextListener.ACTIVE_MQ_SESSION);
TextMessage txtmessage = session.createTextMessage();
if (txtmessage != null && producer != null) {
txtmessage.clearBody();
txtmessage.setText(myString);
producer.send(txtmessage);
}
}
}

Difference between JMS and Web Service

JMS is an API which abstracts messaging middleware, like ActiveMQ or IBM MQSeries.

Messaging middleware has a store-and-forward paradigm and asynchronous message passing, while web services tend to promote a synchronous procedure calling paradigm. In distributed systems where a lot can go wrong, dealing with things asynchronously tend to focus the mind better to the things you need to do when part of the system is not available or poorly performing and the code needed to deal with that tends to be a lot less complicated.

Clustering parts become trivial if you have multiple servers listening on the same queue, parallelism and load balancing is for free in this case.

Personally I find JMS much easier to work with and more robust and reliable than web services, but the messaging middleware must support all platforms you want to use. If all the components who need to talk to each other are under your control, I would give a messaging middleware with a JMS interface serious consideration.

If the other party is external then probably Web Services rule, and in that case you could think of a using thin layer to convert the external web service to an internal message passing infrastructure so you still have the most of the advantages.

If it is "just slapping an remote API on a webapp" then of course it does not pay either to setup asynch messaging.

Failover support for activemq REST api

From the REST client perspective, you'll need to abstract the requests (proxy URL) or handle the failover of the client connection using 3rd party libraries (see HttpClient failover support, etc.)

If you go with the abstraction approach, then consider doing one of the following to proxy the requests to AMQ and provide failover support...

  • can you use Camel load balancer which supports endpoint failover support
  • use a 3rd party load balancer (Apache mod_proxy, perlbal, etc)


Related Topics



Leave a reply



Submit