Develop JMS Applications
Overview
To develop a J2EE application (servlet or enterprise bean) to use the JMS API directly for asynchronous messaging:
- Import JMS packages, including...
import javax.jms.*; import javax.naming.*;- Get an initial context.
try { ctx = new InitialContext(env); ...- Retrieve administered objects from the JNDI namespace. The javax.naming.Name)">InitialContext.lookup() method is used to retrieve administered objects (a queue connection factory and the queue destinations); for example, to receive a message from a queue
qcf = (QueueConnectionFactory)ctx.lookup( qcfName ); ... inQueue = (Queue)ctx.lookup( qnameIn ); ...- Create a connection to the messaging service provider. The connection provides access to the underlying transport, and is used to create sessions. The createQueueConnection() method on the factory object is used to create the connection
connection = qcf.createQueueConnection();The JMS specification defines that connections should be created in the stopped state. Until the connection starts, MessageConsumers that are associated with the connection cannot receive any messages. To start the connection, issue the following command
connection.start();- Create a session, for sending or receiving messages. The session provides a context for producing and consuming messages, including the methods used to create MessageProducers and MessageConsumers. The createQueueSession method is used on the connection to obtain a session. The method takes two parameters...
- A boolean that determines whether or not the session is transacted.
- A parameter that determines the acknowledge mode.
boolean transacted = false; session = connection.createQueueSession( transacted, Session.AUTO_ACKNOWLEDGE);In this example, the session is not transacted, and it should automatically acknowledge received messages. With these settings, a message is backed out only after a system error or if the application terminates unexpectedly.
The following points, as defined in the EJB specification, apply to these flags...
- The transacted flag passed on createQueueSession is ignored inside a global transaction and all work is performed as part of the transaction. Outside of a transaction the transacted flag is not used and, if set to true, the application should use session.commit() and session.rollback() to control the completion of the work. In an EJB2.0 module, if the transacted flag is set to true and outside of an XA transaction, then the session is involved in the WebSphere local transaction and the unresolved action attribute of the method applies to the JMS work.
- Clients cannot use using Message.acknowledge() to acknowledge messages. If a value of CLIENT_ACKNOWLEDGE is passed on the createxxxSession call, then messages are automatically acknowledged by the appserver and Message.acknowledge() is not used.
- Send a message.
- Create MessageProducers to create messages. For point-to-point messaging the MessageProducer is a QueueSender that is created by passing an output queue object (retrieved earlier) into the createSender method on the session. A QueueSender is normally created for a specific queue, so that all messages sent using that sender are sent to the same destination
QueueSender queueSender = session.createSender(inQueue);- Create the message. Use the session to create an empty message and add the data passed.
JMS provides several message types, each of which embodies some knowledge of its content. To avoid referencing the vendor-specific class names for the message types, methods are provided on the Session object for message creation.
In this example, a text message is created from the outString property
TextMessage outMessage = session.createTextMessage(outString);- Send the message.
To send the message, the message is passed to the send method on the QueueSender
queueSender.send(outMessage);
- Receive replies.
- Create a correlation ID to link the message sent with any replies. In this example, the client receives reply messages that are related to the message that it has sent, by using a provider-specific message ID in a JMSCorrelationID
messageID = outMessage.getJMSMessageID();The correlation ID is then used in a message selector, to select only messages that have that ID
String selector = "JMSCorrelationID = '"+messageID+"'";- Create a MessageReceiver to receive messages. For point-to-point the MessageReceiver is a QueueReceiver that is created by passing an input queue object (retrieved earlier) and the message selector into the createReceiver method on the session
QueueReceiver queueReceiver = session.createReceiver(outQueue, selector);- Retrieve the reply message. To retrieve a reply message, the receive method on the QueueReceiver is used
Message inMessage = queueReceiver.receive(2000);The parameter in the receive call is a timeout in milliseconds. This parameter defines how long the method should wait if there is no message available immediately. If you omit this parameter, the call blocks indefinitely. If you do not want any delay, use the receiveNoWait()method. In this example, the receive call returns when the message arrives, or after 2000ms, whichever is sooner.
- Act on the message received. When a message is received, you can act on it as needed by the business logic of the client. Some general JMS actions are to check that the message is of the correct type and extract the content of the message. To extract the content from the body of the message, it is necessary to cast from the generic Message class (which is the declared return type of the receive methods) to the more specific subclass, such as TextMessage. It is good practice always to test the message class before casting, so that unexpected errors can be handled gracefully.
In this example, the instanceof operator is used to check that the message received is of the TextMessage type. The message content is then extracted by casting ti the TextMessage subclass
if ( inMessage instanceof TextMessage ) ... String replyString = ((TextMessage) inMessage).getText();
- Closing down. If the application needs to create many short-lived JMS objects at the Session level or lower, it is important to close all the JMS resources used. To do this, you call the close() method on the various classes (QueueConnection, QueueSession, QueueSender, and QueueReceiver) when the resources are no longer required
queueReceiver.close(); ... queueSender.close(); ... session.close(); session = null; ... connection.close(); connection = null;- Publishing and subscribing messages. To use JMS Publish/Subscribe support instead of point-to-point messaging, the general actions are the same; for example, to create a session and connection. The exceptions are that topic resources are used instead of queue resources (such as TopicPublisher instead of QueueSender), as shown in the following example to publish a message
// Creating a TopicPublisher TopicPublisher pub = session.createPublisher(topic); ... pub.publish(outMessage); ... // Closing TopicPublisher pub.close();- Handling errors Any JMS runtime errors are reported by exceptions. The majority of methods in JMS throw JMSExceptions to indicate errors. It is good programming practice to catch these exceptions and display them on a suitable output.
Unlike normal Java exceptions, a JMSException can contain another exception embedded in it. The implementation of JMSException does not include the embedded exception in the output of its toString()method. Therefore, you need to check explicitly for an embedded exception and print it out, as shown in the following example
catch JMSException(je) { System.out.println("JMS failed with "+je); Exception le = je.getLinkedException(); if (le != null) { System.out.println("linked exception "+le); } }
JMS Enterprise Application Design
Applications refer to WAS JMS resources, such as queues and topics, using names bound into the JNDI namespace. An enterprise application can retrieve these objects without needing to know anything about their implementation, enabling the underlying messaging architecture to be changed without requiring changes to the enterprise application.
Connection factories are used to create connections with JMS providers for a specific JMS queue or topic destination, such as an MQ queue.
The appserver pools connections and sessions with the JMS provider to improve performance. This is independent from any WebSphere MQ connection pooling.
Do not cache session handles in stateless session beans that operate in transactions started by a client of the bean. Caching handles in this way causes the bean to be returned to the pool while the session is still involved in the transaction. Also, not cache non-durable subscribers due to the restriction mentioned above.
A non-durable subscriber can only be used in the same transactional context (for example, a global transaction or an unspecified transaction context) that existed when the subscriber was created.
JMS defines a generic view of a messaging that maps onto the underlying transport. An enterprise application that uses JMS, makes use of the following interfaces that are defined in Sun's javax.jms package...
Connection Provides access to the underlying transport, and is used to create Sessions. Session Provides a context for producing and consuming messages, including the methods used to create MessageProducers and MessageConsumers. MessageProducer Used to send messages. MessageConsumer Used to receive messages. The generic JMS interfaces are subclassed into the following more specific versions for Point-to-Point and Publish/Subscribe behavior:
- Point-to-Point
QueueConnection
QueueSession
QueueSender
QueueReceiver- Publish/Subscribe
TopicConnection
TopicSession
TopicSender
TopicReceiverThe section "J2EE.6.7 JMS 1.0 Requirements" of the J2EE specification gives a list of methods that must not be called in Web and EJB containers. This is enforced in IBM WAS by throwing a javax.jms.IllegalStateException.
The following points, as defined in the EJB specification, apply to the use of flags on createxxxSession calls...
- The transacted flag passed on createxxxSession is ignored inside a global transaction and all work is performed as part of the transaction. Outside of a transaction the transacted flag is not used and, if set to true, the application should use session.commit() and session.rollback() to control the completion of the work. In an EJB2.0 module, if the transacted flag is set to true and outside of an XA transaction, then the session is involved in the WebSphere local transaction and the unresolved action attribute of the method applies to the JMS work.
- Clients cannot use using Message.acknowledge() to acknowledge messages. If a value of CLIENT_ACKNOWLEDGE is passed on the createxxxSession call, then messages are automatically acknowledged by the appserver and Message.acknowledge() is not used.
Use the JMS message selector mechanism to select a subset of the messages on a queue so that this subset is returned by a receive call. The selector can refer to fields in the JMS message header and fields in the message properties.
When a message is received, you can act on it as needed by the business logic of the application. Some general JMS actions are to check that the message is of the correct type and extract the content of the message. To extract the content from the body of the message, you need to cast from the generic Message class (which is the declared return type of the receive methods) to the more specific subclass, such as TextMessage. It is good practice always to test the message class before casting, so that unexpected errors can be handled gracefully.
In this example, the instanceof operator is used to check that the message received is of the TextMessage type. The message content is then extracted by casting to the TextMessage subclass
if ( inMessage instanceof TextMessage ) ... String replyString = ((TextMessage) inMessage).getText();An alternative to making calls to QueueReceiver.receive() is to register a method that is called automatically when a suitable message is available; for example
... MyClass listener =new MyClass(); queueReceiver.setMessageListener(listener); //application continues with other application-specific behavior. ...When a message is available, it is retrieved by the onMessage() method on the listener object
import javax.jms.*; public class MyClass implements MessageListener { public void onMessage(Message message) { System.out.println("message is "+message); //application specific processing here ... } }A MessageListener can only be used in the client container. (The J2EE specification forbids the use of the JMS MessageListener mechanism for the asynchronous receipt of messages in the EJB and Web containers.)
For asynchronous message delivery, the application code cannot catch exceptions raised by failures to receive messages. This is because the application code does not make explicit calls to receive() methods. To cope with this situation, you can register an ExceptionListener, which is an instance of a class that implements the onException()method. When an error occurs, this method is called with the JMSException passed as its only parameter.
An alternative to developing your own JMS listener class, you can use a message-driven bean.
Take care when performing a JMS receive() from a server-side application component if that receive() invocation is waiting on a message produced by another application component that is deployed in the same server. Such a JMS receive() is synchronous, so blocks until the response message is received.
This type of application design can lead to the consumer/producer problem where the entire set of work threads can be exhausted by the receiving component, which has been blocked waiting for responses, leaving no available worker thread for which to dispatch the application component that would generate the response JMS message.
To illustrate this problem, picture a servlet and a message-driven bean deployed in the same server. When this servlet dispatches a request it sends a message to a queue which is serviced by the message-driven bean (that is, messages produced by the servlet are consumed by the message-driven bean's onMessage() method). The servlet subsequently issues a receive(), waiting for a reply on a temporary ReplyTo queue. The message-driven bean's onMessage() method performs a database query and sends back a reply to the servlet on the temporary queue. If a large number of servlet requests occur at once (relative to the number of server worker threads), then it is likely that all available server worker threads will be used to dispatch a servlet request, send a message, and wait for a reply. The appserver enters a deadly-embrace condition whereby no threads remain to process any of the message-driven beans that are now pending. Since the servlets are waiting in blocking recieves, the server hangs, likely leading to application failure.
Possible solutions are...
- Ensure that the number of worker threads (# of threads per server region * # of server regions per server) exceeds the number of concurrent dispatches of the application component doing the receive() so that there is always a worker thread available to dispatch the message producing component.
- Use an application topology that places the receiver application component in a separate server than the producer application component. While worker thread usage can still need to be carefully considered under such a deployment scenario, this separation ensures that there are always be threads that cannot be blocked by the message receiving component. There can be other interactions to consider, such as an appserver that has multiple applications installed.
- Refactor your application to do the message receives from a client component, which will not compete with the producer component for worker threads. Furthermore, the client component can do asynchronous (non-blocking) receives, which are prohibited from J2EE servers. So, for example, the example application above could be refactored to have a client sending messages to a queue and then waiting for a response from the MDB.
Develop a JMS client
JMS clients require that JMS resources such as queue connection factories and queue destination already exist.
For server-administered JMS objects, configure Resource Environment References. For client container-administered JMS objects, configure Resource References.
To use JMS, a typical JMS client program completes the following general steps:
- Import JMS packages.An enterprise application that uses JMS starts with a number of import statements for JMS; for example
import javax.naming.Context; import javax.naming.InitialContext; import javax.rmi.PortableRemoteObject; import javax.jms.*;- Get an initial context
try { ctx = new InitialContext(env); ...- Define the parameters that the client wants to use; for example, to identify the queue connection factory and to assemble a message to be sent.
public class JMSppSampleClient { public static void main(String[] args) throws JMSException, Exception { String messageID = null; String outString = null; String qcfName = "java:comp/env/jms/ConnectionFactory"; String qnameIn = "java:comp/env/jms/Q1"; String qnameOut = "java:comp/env/jms/Q2"; boolean verbose = false; QueueSession session = null; QueueConnection connection = null; Context ctx = null; QueueConnectionFactory qcf = null; Queue inQueue = null; Queue outQueue = null; ...- Retrieve administered objects from the JNDI namespace.The InitialContext.lookup() method is used to retrieve administered objects (a queue connection factory and the queue destinations)
qcf = (QueueConnectionFactory)ctx.lookup( qcfName ); ... inQueue = (Queue)ctx.lookup( qnameIn ); outQueue = (Queue)ctx.lookup( qnameOut ); ...- Create a connection to the messaging service provider.The connection provides access to the underlying transport, and is used to create sessions. The createQueueConnection() method on the factory object is used to create the connection
connection = qcf.createQueueConnection();The JMS specification defines that connections should be created in the stopped state. Until the connection starts, MessageConsumers that are associated with the connection cannot receive any messages. To start the connection, issue the following command
connection.start();- Create a session, for sending and receiving messages.The session provides a context for producing and consuming messages, including the methods used to create MessageProducers and MessageConsumers. The createQueueSession method is used on the connection to obtain a session. The method takes two parameters...
- A boolean that determines whether or not the session is transacted.
- A parameter that determines the acknowledge mode.
boolean transacted = false; session = connection.createQueueSession( transacted, Session.AUTO_ACKNOWLEDGE);In this example, the session is not transacted, and it should automatically acknowledge received messages. With these settings, a message is backed out only after a system error or if the client application terminates unexpectedly.
- Send the message.
- Create MessageProducers to create messages.For point-to-point the MessageProducer is a QueueSender that is created by passing an output queue object (retrieved earlier) into the createSender method on the session. A QueueSender is normally created for a specific queue, so that all messages sent using that sender are sent to the same destination
QueueSender queueSender = session.createSender(inQueue);- Create the message.Use the session to create an empty message and add the data passed.
JMS provides several message types, each of which embodies some knowledge of its content. To avoid referencing the vendor-specific class names for the message types, methods are provided on the Session object for message creation.
In this example, a text message is created from the outString property, which could be provided as an input parameter on invocation of the client program or constructed in some other way
TextMessage outMessage = session.createTextMessage(outString);- Send the message.
To send the message, the message is passed to the send method on the QueueSender
queueSender.send(outMessage);
- Receive replies.
- Create a correlation ID to link the message sent with any replies. In this example, the client receives reply messages that are related to the message that it has sent, by using a provider-specific message ID in a JMSCorrelationID
messageID = outMessage.getJMSMessageID();The correlation ID is then used in a message selector, to select only messages that have that ID
String selector = "JMSCorrelationID = '"+messageID+"'";- Create a MessageReceiver to receive messages.For point-to-point the MessageReceiver is a QueueReceiver that is created by passing an input queue object (retrieved earlier) and the message selector into the createReceiver method on the session
QueueReceiver queueReceiver = session.createReceiver(outQueue, selector);- Retrieve the reply message.To retrieve a reply message, the receive method on the QueueReceiver is used
Message inMessage = queueReceiver.receive(2000);The parameter in the receive call is a timeout in milliseconds. This parameter defines how long the method should wait if there is no message available immediately. If you omit this parameter, the call blocks indefinitely. If you do not want any delay, use the receiveNoWait()method. In this example, the receive call returns when the message arrives, or after 2000ms, whichever is sooner.
- Act on the message received.When a message is received, you can act on it as needed by the business logic of the client. Some general JMS actions are to check that the message is of the correct type and extract the content of the message. To extract the content from the body of the message, you need to cast from the generic Message class (which is the declared return type of the receive methods) to the more specific subclass, such as TextMessage. It is good practice always to test the message class before casting, so that unexpected errors can be handled gracefully.
In this example, the instanceof operator is used to check that the message received is of the TextMessage type. The message content is then extracted by casting to the TextMessage subclass
if ( inMessage instanceof TextMessage ) ... String replyString = ((TextMessage) inMessage).getText();
- Closing down.If the application needs to create many short-lived JMS objects at the Session level or lower, it is important to close all the JMS resources used. To do this, you call the close() method on the various classes (QueueConnection, QueueSession, QueueSender, and QueueReceiver) when the resources are no longer required
queueReceiver.close(); ... queueSender.close(); ... session.close(); session = null; ... connection.close(); connection = null;- Publishing and subscribing messages.To use publish/subscribe support instead of point-to-point messaging, the general client actions are the same; for example, to create a session and connection. The exceptions are that topic resources are used instead of queue resources (such as TopicPublisher instead of QueueSender), as shown in the following example to publish a message
// Creating a TopicPublisher TopicPublisher pub = session.createPublisher(topic); ... pub.publish(outMessage); ... // Closing TopicPublisher pub.close();- Handling errorsAny JMS runtime errors are reported by exceptions. The majority of methods in JMS throw JMSExceptions to indicate errors. It is good programming practice to catch these exceptions and display them on a suitable output.
Unlike normal Java exceptions, a JMSException can contain another exception embedded in it. The implementation of JMSException does not include the embedded exception in the output of its toString()method. Therefore, you need to check explicitly for an embedded exception and print it out, as shown in the following example
catch JMSException(je) { System.out.println("JMS failed with "+je); Exception le = je.getLinkedException(); if (le != null) { System.out.println("linked exception "+le); } }