Simple publish/subscribe App

 


This section provides a walkthrough of a simple WebSphere MQ JMS application.

Here is the complete example. Individual sections are discussed after.

/**
 * Basic pub/sub example
 *
 * A TopicConnectionFactory object is retrieved from LDAP; this is used
 * to create a TopicConnection. The TopicConnection is used to create
 * a TopicSession, which creates two publishers and two subscribers.
 * Both publishers subscribe to a topic; both subscribers then receive.
 */
 
import javax.jms.*;                 // JMS interfaces
import javax.naming.*;              // Used for JNDI lookup of
import javax.naming.directory.*;    //    administered objects
 
import java.io.*;                   // Java IO classes
import java.util.*;                 // Java Util classes
 
 
class PubSubSample 
{
 
    // using LDAP
    String icf = "com.sun.jndi.ldap.LdapCtxFactory";  // initial context factory
    String url = "ldap://server.company.com/o=company_us,c=us";  //url
 
    private String tcfLookup = "cn=testTCF";  // TopicConnectionFactory (TCF) lookup
    private String tLookup    = "cn=testT";    // topic lookup
 
    // Pub/Sub objects used by this program
    private TopicConnectionFactory fact = null;
    private Topic topic = null;
 
    public static void main(String args[])
    {
        // Initialise JNDI properties
        Hashtable env = new Hashtable();
        env.put( Context.INITIAL_CONTEXT_FACTORY, icf );
        env.put( Context.PROVIDER_URL, url );
        env.put( Context.REFERRAL, "throw" );
 
        Context ctx = null;
        try {
            System.out.print( "Initialising JNDI... " );
            ctx = new InitialDirContext( env );
            System.out.println( "Done!" );
        } catch ( NamingException nx ) {
            System.out.println( "ERROR: " + nx );
            System.exit(-1);
        }
 
        // Lookup TCF
        try {
            System.out.print( "Obtaining TCF from JNDI... " );
            fact = (TopicConnectionFactory)ctx.lookup( tcfLookup );
            System.out.println( "Done!" );
        } catch ( NamingException nx ) {
            System.out.println( "ERROR: " + nx );
            System.exit(-1);
        }
        // Lookup Topic
        try {
            System.out.print( "Obtaining topic T from JNDI... " );
            topic = (Topic)ctx.lookup( tLookup );
            System.out.println( "Done!" );
        } catch ( NamingException nx ) {
            System.out.println( "ERROR: " + nx );
            System.exit(-1);
        }
 
 
        try {
            ctx.close();
        } catch ( NamingException nx ) {
            // Just ignore an exception on closing the context
        }
 
 
        try {
            // Create connection
            TopicConnection conn = fact.createTopicConnection();
            // Start connection
            conn.start();
 
            // Session
            TopicSession sess = conn.createTopicSession(false,
                                        Session.AUTO_ACKNOWLEDGE);
 
            // Create a topic dynamically
            Topic t = sess.createTopic("myTopic");
 
            // Publisher
            TopicPublisher pub = sess.createPublisher(t);
            // Subscriber
            TopicSubscriber sub = sess.createSubscriber(t);
            // Publisher
            TopicPublisher pubA = sess.createPublisher(topic);
            // Subscriber
            TopicSubscriber subA = sess.createSubscriber(topic);
 
 
            // Publish "Hello World"
            TextMessage  hello = sess.createTextMessage();
            hello.setText("Hello World");
            pub.publish(hello);
            hello.setText("Hello World 2");
            pubA.publish(hello);
 
 
            // Receive message
            TextMessage m = (TextMessage) sub.receive();
            System.out.println("Message Text = " + m.getText());
            m = (TextMessage) subA.receive();
            System.out.println("Message Text = " + m.getText());
 
            // Close publishers and subscribers
            pub.close();
            pubA.close();
            sub.close();
            subA.close();
 
            // Close session and connection
            sess.close();
            conn.close();
 
            System.exit(0);
 
        }
 
        catch ( JMSException je ) {
            System.out.println("ERROR: " + je);
            System.out.println("LinkedException: " +
                                       je.getLinkedException());
            System.exit(-1);
        }
    }
}

 

Import required packages

The import statements for an application using WebSphere MQ classes for Java Message Service must include at least the following:

import javax.jms.*;                // JMS interfaces
import javax.naming.*;             // Used for JNDI lookup of
import javax.naming.directory.*;   //    administered objects

 

Obtain or create JMS objects

The next step is to obtain or create a number of JMS objects:

  1. Obtain a TopicConnectionFactory

  2. Create a TopicConnection

  3. Create a TopicSession

  4. Obtain a Topic from JNDI

  5. Create TopicPublishers and TopicSubscribers

Many of these processes are similar to those that are used for point-to-point, as shown in the following:

Obtain a TopicConnectionFactory
The preferred way to do this is to use JNDI lookup, to maintain portability of the application code. The following code initializes a JNDI context:
 String icf = "com.sun.jndi.ldap.LdapCtxFactory";  // initial context factory
 String url = "ldap://server.company.com/o=company_us,c=us";  // url
 
// Initialise JNDI properties
Java.util.Hashtable env = new Hashtable();
env.put( Context.INITIAL_CONTEXT_FACTORY, icf );
env.put( Context.PROVIDER_URL, url );
env.put( Context.REFERRAL, "throw" );
 
 Context ctx = null;
try {
      System.out.print( "Initialising JNDI... " );
      ctx = new InitialDirContext( env );
      System.out.println( "Done!" );
} catch ( NamingException nx ) {
      System.out.println( "ERROR: " + nx );
      System.exit(-1);
}

Note:
Change the icf and url variables to suit your installation and your JNDI service provider.

The properties required by JNDI initialization are in a Hashtable, which is passed to the InitialDirContext constructor. If this connection fails, an exception is thrown to indicate that the administered objects required later in the application are not available.

Obtain a TopicConnectionFactory using a lookup key that the administrator has defined:

// LOOKUP TCF
try {
    System.out.print( "Obtaining TCF from JNDI... " );
    fact = (TopicConnectionFactory)ctx.lookup( tcfLookup );
    System.out.println( "Done!" );
} catch ( NamingException nx ) {
    System.out.println( "ERROR: " + nx );
    System.exit(-1);
}

If a JNDI namespace is not available, you can create a TopicConnectionFactory at runtime. You create a new com.ibm.mq.jms.MQTopicConnectionFactory as described in Creating factories at runtime.

Create a TopicConnection
This is created from the TopicConnectionFactory object. Connections are always initialized in a stop state and must be started with the following code:
// create connection		
TopicConnection conn = fact.createTopicConnection();
//start connection
conn.start();

Create a TopicSession
This is created using the TopicConnection. This method takes two parameters: one to signify whether the session is transacted, and one to specify the acknowledgement mode:
TopicSession sess = conn.createTopicSession(false,
                                Session.AUTO_ACKNOWLEDGE);

Obtain a Topic
This object can be obtained from JNDI, for use with TopicPublishers and TopicSubscribers that are created later. The following code retrieves a Topic:
Topic topic = null;
try {
    System.out.print( "Obtaining topic T from JNDI... " );
    topic = (Topic)ctx.lookup( tLookup );
    System.out.println( "Done!" );
}
catch ( NamingException nx ) {
    System.out.println( "ERROR: " + nx );
    System.exit(-1);
}

If a JNDI namespace is not available, you can create a Topic at runtime, as described in Creating topics at runtime.

The following code creates a Topic at runtime:

// topic
Topic t = sess.createTopic("myTopic");

Create consumers and producers of publications
Depending on the nature of the JMS client application that you write, a subscriber, a publisher, or both must be created. Use the createPublisher and createSubscriber methods as follows:
// publisher
TopicPublisher pub = sess.createPublisher(t);
// subscriber
TopicSubscriber sub = sess.createSubscriber(t);
// publisher
TopicPublisher pubA = sess.createPublisher(topic);
// subscriber
TopicSubscriber subA = sess.createSubscriber(topic);

 

Publish messages

The TopicPublisher object, pub, is used to publish messages, rather like a QueueSender is used in the point-to-point domain. The following fragment creates a TextMessage using the session, and then publishes the message:

// publish "hello world"
TextMessage  hello = sess.createTextMessage();
hello.setText("Hello World");
pub.publish(hello);
hello.setText("Hello World 2");
pubA.publish(hello);

 

Receive subscriptions

Subscribers must be able to read the subscriptions that are delivered to them, as in the following code:

// receive message
TextMessage m = (TextMessage) sub.receive();
System.out.println("Message Text = " + m.getText());
m = (TextMessage) subA.receive();
System.out.println("Message Text = " + m.getText());

This fragment of code performs a get-with-wait, which means that the receive call blocks until a message is available. Alternative versions of the receive call are available (such as receiveNoWait). For details, see TopicSubscriber.

 

Close down unwanted resources

It is important to free up all the resources used by the application when it terminates. Use the close() method on objects that can be closed (publishers, subscribers, sessions, and connections):

// close publishers and subscribers
pub.close();
pubA.close();
sub.close();
subA.close();
sess.close();
 
// close session and connection
sess.close();
conn.close();

 

TopicConnectionFactory administered objects

In the example, the TopicConnectionFactory object is obtained from JNDI name space. The TopicConnectionFactory in this case is an administered object that has been created and administered using the JMSAdmin tool. Use this method of obtaining TopicConnectionFactory objects because it ensures code portability.

The TopicConnectionFactory in the example is testTCF in JMSAdmin. Create testTCF in JMSAdmin before running the application. You must also create a Topic in JMSAdmin; see Topic administered objects.

To create a TopicConnectionFactory object, invoke the JMSAdmin tool, as described in Invoking the administration tool, and execute one of the following commands, depending on the type of connection you want to make to the broker:

Bindings connection
InitCtx> def tcf(testTCF) transport(bind)

or, because this is the default transport type for TopicConnectionFactory objects:

InitCtx> def tcf(testTCF)

This creates a TopicConnectionFactory with default settings for bindings transport, connecting to the default queue manager.

Client connection
InitCtx> def tcf(testTCF) transport(client)

This creates a TopicConnectionFactory with default settings for the client transport type, connecting to localhost, on port 1414, using channel SYSTEM.DEF.SVRCONN.

Direct TCP/IP connection to WebSphere MQ Event Broker
InitCtx> def tcf(testTCF) transport(direct)

This creates a TopicConnectionFactory to make direct connections to a WebSphere MQ Event Broker, connecting to localhost on port 1506.

 

Topic administered objects

In the example, one of the Topic objects has been obtained from JNDI name space. This Topic is an administered object that has been created and administered in the JMSAdmin tool. Use this method of obtaining Topic objects because it ensures code portability.

To run the example application above, create the Topic called testT in JMSAdmin before running the application.

To create a Topic object, invoke the JMSAdmin tool, as described in Invoking the administration tool, and execute one of the following commands, depending on the type of connection you want to make to the broker:

Compatibility mode, or MQSeries Publish/Subscribe (SupportPac MA0C)
InitCtx> def t(testT) bver(V1) topic(test/topic)

Native mode, or direct to WebSphere MQ Event Broker
InitCtx> def t(testT) bver(V2) topic(test/topic)

 

WebSphere is a trademark of the IBM Corporation in the United States, other countries, or both.

 

IBM is a trademark of the IBM Corporation in the United States, other countries, or both.