Simple publish/subscribe App
This section provides a walkthrough of a simple WebSphere MQ JMS application.
Here is the complete example. Individual sections are discussed after.
/** * Basic pub/sub example * * A TopicConnectionFactory object is retrieved from LDAP; this is used * to create a TopicConnection. The TopicConnection is used to create * a TopicSession, which creates two publishers and two subscribers. * Both publishers subscribe to a topic; both subscribers then receive. */ import javax.jms.*; // JMS interfaces import javax.naming.*; // Used for JNDI lookup of import javax.naming.directory.*; // administered objects import java.io.*; // Java IO classes import java.util.*; // Java Util classes class PubSubSample { // using LDAP String icf = "com.sun.jndi.ldap.LdapCtxFactory"; // initial context factory String url = "ldap://server.company.com/o=company_us,c=us"; //url private String tcfLookup = "cn=testTCF"; // TopicConnectionFactory (TCF) lookup private String tLookup = "cn=testT"; // topic lookup // Pub/Sub objects used by this program private TopicConnectionFactory fact = null; private Topic topic = null; public static void main(String args[]) { // Initialise JNDI properties Hashtable env = new Hashtable(); env.put( Context.INITIAL_CONTEXT_FACTORY, icf ); env.put( Context.PROVIDER_URL, url ); env.put( Context.REFERRAL, "throw" ); Context ctx = null; try { System.out.print( "Initialising JNDI... " ); ctx = new InitialDirContext( env ); System.out.println( "Done!" ); } catch ( NamingException nx ) { System.out.println( "ERROR: " + nx ); System.exit(-1); } // Lookup TCF try { System.out.print( "Obtaining TCF from JNDI... " ); fact = (TopicConnectionFactory)ctx.lookup( tcfLookup ); System.out.println( "Done!" ); } catch ( NamingException nx ) { System.out.println( "ERROR: " + nx ); System.exit(-1); } // Lookup Topic try { System.out.print( "Obtaining topic T from JNDI... " ); topic = (Topic)ctx.lookup( tLookup ); System.out.println( "Done!" ); } catch ( NamingException nx ) { System.out.println( "ERROR: " + nx ); System.exit(-1); } try { ctx.close(); } catch ( NamingException nx ) { // Just ignore an exception on closing the context } try { // Create connection TopicConnection conn = fact.createTopicConnection(); // Start connection conn.start(); // Session TopicSession sess = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // Create a topic dynamically Topic t = sess.createTopic("myTopic"); // Publisher TopicPublisher pub = sess.createPublisher(t); // Subscriber TopicSubscriber sub = sess.createSubscriber(t); // Publisher TopicPublisher pubA = sess.createPublisher(topic); // Subscriber TopicSubscriber subA = sess.createSubscriber(topic); // Publish "Hello World" TextMessage hello = sess.createTextMessage(); hello.setText("Hello World"); pub.publish(hello); hello.setText("Hello World 2"); pubA.publish(hello); // Receive message TextMessage m = (TextMessage) sub.receive(); System.out.println("Message Text = " + m.getText()); m = (TextMessage) subA.receive(); System.out.println("Message Text = " + m.getText()); // Close publishers and subscribers pub.close(); pubA.close(); sub.close(); subA.close(); // Close session and connection sess.close(); conn.close(); System.exit(0); } catch ( JMSException je ) { System.out.println("ERROR: " + je); System.out.println("LinkedException: " + je.getLinkedException()); System.exit(-1); } } }
Import required packages
The import statements for an application using WebSphere MQ classes for Java Message Service must include at least the following:
import javax.jms.*; // JMS interfaces import javax.naming.*; // Used for JNDI lookup of import javax.naming.directory.*; // administered objects
Obtain or create JMS objects
The next step is to obtain or create a number of JMS objects:
- Obtain a TopicConnectionFactory
- Create a TopicConnection
- Create a TopicSession
- Obtain a Topic from JNDI
- Create TopicPublishers and TopicSubscribers
Many of these processes are similar to those that are used for point-to-point, as shown in the following:
- Obtain a TopicConnectionFactory
- The preferred way to do this is to use JNDI lookup, to maintain portability of the application code. The following code initializes a JNDI context:
String icf = "com.sun.jndi.ldap.LdapCtxFactory"; // initial context factory String url = "ldap://server.company.com/o=company_us,c=us"; // url // Initialise JNDI properties Java.util.Hashtable env = new Hashtable(); env.put( Context.INITIAL_CONTEXT_FACTORY, icf ); env.put( Context.PROVIDER_URL, url ); env.put( Context.REFERRAL, "throw" ); Context ctx = null; try { System.out.print( "Initialising JNDI... " ); ctx = new InitialDirContext( env ); System.out.println( "Done!" ); } catch ( NamingException nx ) { System.out.println( "ERROR: " + nx ); System.exit(-1); }
- Note:
- Change the icf and url variables to suit your installation and your JNDI service provider.
The properties required by JNDI initialization are in a Hashtable, which is passed to the InitialDirContext constructor. If this connection fails, an exception is thrown to indicate that the administered objects required later in the application are not available.
Obtain a TopicConnectionFactory using a lookup key that the administrator has defined:
// LOOKUP TCF try { System.out.print( "Obtaining TCF from JNDI... " ); fact = (TopicConnectionFactory)ctx.lookup( tcfLookup ); System.out.println( "Done!" ); } catch ( NamingException nx ) { System.out.println( "ERROR: " + nx ); System.exit(-1); }If a JNDI namespace is not available, you can create a TopicConnectionFactory at runtime. You create a new com.ibm.mq.jms.MQTopicConnectionFactory as described in Creating factories at runtime.
- Create a TopicConnection
- This is created from the TopicConnectionFactory object. Connections are always initialized in a stop state and must be started with the following code:
// create connection TopicConnection conn = fact.createTopicConnection(); //start connection conn.start();
- Create a TopicSession
- This is created using the TopicConnection. This method takes two parameters: one to signify whether the session is transacted, and one to specify the acknowledgement mode:
TopicSession sess = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- Obtain a Topic
- This object can be obtained from JNDI, for use with TopicPublishers and TopicSubscribers that are created later. The following code retrieves a Topic:
Topic topic = null; try { System.out.print( "Obtaining topic T from JNDI... " ); topic = (Topic)ctx.lookup( tLookup ); System.out.println( "Done!" ); } catch ( NamingException nx ) { System.out.println( "ERROR: " + nx ); System.exit(-1); }If a JNDI namespace is not available, you can create a Topic at runtime, as described in Creating topics at runtime.
The following code creates a Topic at runtime:
// topic Topic t = sess.createTopic("myTopic");
- Create consumers and producers of publications
- Depending on the nature of the JMS client application that you write, a subscriber, a publisher, or both must be created. Use the createPublisher and createSubscriber methods as follows:
// publisher TopicPublisher pub = sess.createPublisher(t); // subscriber TopicSubscriber sub = sess.createSubscriber(t); // publisher TopicPublisher pubA = sess.createPublisher(topic); // subscriber TopicSubscriber subA = sess.createSubscriber(topic);
Publish messages
The TopicPublisher object, pub, is used to publish messages, rather like a QueueSender is used in the point-to-point domain. The following fragment creates a TextMessage using the session, and then publishes the message:
// publish "hello world" TextMessage hello = sess.createTextMessage(); hello.setText("Hello World"); pub.publish(hello); hello.setText("Hello World 2"); pubA.publish(hello);
Receive subscriptions
Subscribers must be able to read the subscriptions that are delivered to them, as in the following code:
// receive message TextMessage m = (TextMessage) sub.receive(); System.out.println("Message Text = " + m.getText()); m = (TextMessage) subA.receive(); System.out.println("Message Text = " + m.getText());This fragment of code performs a get-with-wait, which means that the receive call blocks until a message is available. Alternative versions of the receive call are available (such as receiveNoWait). For details, see TopicSubscriber.
Close down unwanted resources
It is important to free up all the resources used by the application when it terminates. Use the close() method on objects that can be closed (publishers, subscribers, sessions, and connections):
// close publishers and subscribers pub.close(); pubA.close(); sub.close(); subA.close(); sess.close(); // close session and connection sess.close(); conn.close();
TopicConnectionFactory administered objects
In the example, the TopicConnectionFactory object is obtained from JNDI name space. The TopicConnectionFactory in this case is an administered object that has been created and administered using the JMSAdmin tool. Use this method of obtaining TopicConnectionFactory objects because it ensures code portability.
The TopicConnectionFactory in the example is testTCF in JMSAdmin. Create testTCF in JMSAdmin before running the application. You must also create a Topic in JMSAdmin; see Topic administered objects.
To create a TopicConnectionFactory object, invoke the JMSAdmin tool, as described in Invoking the administration tool, and execute one of the following commands, depending on the type of connection you want to make to the broker:
- Bindings connection
InitCtx> def tcf(testTCF) transport(bind)or, because this is the default transport type for TopicConnectionFactory objects:
InitCtx> def tcf(testTCF)This creates a TopicConnectionFactory with default settings for bindings transport, connecting to the default queue manager.
- Client connection
InitCtx> def tcf(testTCF) transport(client)This creates a TopicConnectionFactory with default settings for the client transport type, connecting to localhost, on port 1414, using channel SYSTEM.DEF.SVRCONN.
- Direct TCP/IP connection to WebSphere MQ Event Broker
InitCtx> def tcf(testTCF) transport(direct)This creates a TopicConnectionFactory to make direct connections to a WebSphere MQ Event Broker, connecting to localhost on port 1506.
Topic administered objects
In the example, one of the Topic objects has been obtained from JNDI name space. This Topic is an administered object that has been created and administered in the JMSAdmin tool. Use this method of obtaining Topic objects because it ensures code portability.
To run the example application above, create the Topic called testT in JMSAdmin before running the application.
To create a Topic object, invoke the JMSAdmin tool, as described in Invoking the administration tool, and execute one of the following commands, depending on the type of connection you want to make to the broker:
- Compatibility mode, or MQSeries Publish/Subscribe (SupportPac MA0C)
InitCtx> def t(testT) bver(V1) topic(test/topic)
- Native mode, or direct to WebSphere MQ Event Broker
InitCtx> def t(testT) bver(V2) topic(test/topic)WebSphere is a trademark of the IBM Corporation in the United States, other countries, or both.
IBM is a trademark of the IBM Corporation in the United States, other countries, or both.