Use shared subscriptions in JMS 2.0
JMS 2.0 introduces the concept of shared subscriptions, where a single subscription is shared among multiple consumers, with only one of the consumers receiving a publication at any point in time. IBM MQ classes for JMS.
When we are developing a JMS application for IBM MQ Version 8.0 or later, you might need to consider the impact of this functionality on your queue manager.
The idea behind shared subscriptions is basically to share the load among multiple consumers. A durable subscription can also be shared among multiple consumers.
For example, assume there is a :
- Subscription SUB, subscribing to a topic FIFA2014/UPDATES to receive football match updates, being shared by three consumers C1, C2, and C3
- Producer P1 publishing on the FIFA2014/UPDATES topic
When a publication is made on FIFA2014/UPDATES, the publication will be received by only one of the three consumers (C1, or C2, or C3) but not all.
The following sample demonstrates the usage of shared subscriptions, and also demonstrates the usage of the additional API in JMS 2.0, Message.receiveBody(), to retrieve only the message body.
The sample creates three subscriber threads, which create a shared subscription to the FIFA2014/UPDATES topic, and one publisher thread.
package mqv91Samples; import javax.jms.JMSException; import com.ibm.msg.client.jms.JmsConnectionFactory; import com.ibm.msg.client.jms.JmsFactoryFactory; import com.ibm.msg.client.wmq.WMQConstants; import javax.jms.JMSContext; import javax.jms.Topic; import javax.jms.Queue; import javax.jms.JMSConsumer; import javax.jms.Message; import javax.jms.JMSProducer; /* * Implements both Subscriber and Publisher */ class SharedNonDurableSubscriberAndPublisher implements Runnable { private Thread t; private String threadName; SharedNonDurableSubscriberAndPublisher( String name){ threadName = name; System.out.println("Creating Thread:" + threadName ); } /* * Demonstrates shared non-durable subscription in JMS 2.0 */ private void sharedNonDurableSubscriptionDemo(){ JmsConnectionFactory cf = null; JMSContext msgContext = null; try { // Create Factory for WMQ JMS provider JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER); // Create connection factory cf = ff.createConnectionFactory(); // Set MQ properties cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, "QM3"); cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_BINDINGS); // Create message context msgContext = cf.createContext(); // Create a topic destination Topic fifaScores = msgContext.createTopic("/FIFA2014/UPDATES"); // Create a consumer. Subscription name specified, required for sharing of subscription. JMSConsumer msgCons = msgContext.createSharedConsumer(fifaScores, "FIFA2014SUBID"); // Loop around to receive publications while(true){ String msgBody=null; // Use JMS 2.0 receiveBody method as we are interested in message body only. msgBody = msgCons.receiveBody(String.class); if(msgBody != null){ System.out.println(threadName + " : " + msgBody); } } }catch(JMSException jmsEx){ System.out.println(jmsEx); } }/* * Publisher publishes match updates like current attendance in the stadium, goal score and ball possession by teams. */ private void matchUpdatePublisher(){ JmsConnectionFactory cf = null; JMSContext msgContext = null; int nederlandsGoals = 0; int chileGoals = 0; int stadiumAttendence = 23231; int switchIndex = 0; String msgBody = ""; int nederlandsHolding = 60; int chileHolding = 40; try { // Create Factory for WMQ JMS provider JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER); // Create connection factory cf = ff.createConnectionFactory(); // Set MQ properties cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, "QM3"); cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_BINDINGS); // Create message context msgContext = cf.createContext(); // Create a topic destination Topic fifaScores = msgContext.createTopic("/FIFA2014/UPDATES"); // Create publisher to publish updates from stadium JMSProducer msgProducer = msgContext.createProducer(); while(true){ // Send match updates switch(switchIndex){ // Attendance case 0: msgBody ="Stadium Attendence " + stadiumAttendence; stadiumAttendence += 314; break; // Goals case 1: msgBody ="SCORE: The Netherlands: " + nederlandsGoals + " - Chile:" + chileGoals; break; // Ball possession percentage case 2: msgBody ="Ball possession: The Netherlands: " + nederlandsHolding + "% - Chile: " + chileHolding + "%"; if((nederlandsHolding > 60) && (nederlandsHolding < 70)){ nederlandsHolding -= 2; chileHolding += 2; }else{ nederlandsHolding += 2; chileHolding -= 2; } break; } // Publish and wait for two seconds to publish next update msgProducer.send (fifaScores, msgBody); try{ Thread.sleep(2000); }catch(InterruptedException iex){ } // Increment and reset the index if greater than 2 switchIndex++; if(switchIndex > 2) switchIndex = 0; } }catch(JMSException jmsEx){ System.out.println(jmsEx); } } /* * (non-Javadoc) * @see java.lang.Runnable#run() */ public void run() { // If this is a publisher thread if(threadName == "PUBLISHER"){ matchUpdatePublisher(); }else{ // Create subscription and start receiving publications sharedNonDurableSubscriptionDemo(); } } // Start thread public void start (){ System.out.println("Starting " + threadName ); if (t == null) { t = new Thread (this, threadName); t.start (); } } }/* * Demonstrate JMS 2.0 Simplified API using IBM MQ v91 JMS Implementation */ public class Mqv91jms2Sample { public static void main(String[] args) { // TODO Auto-generated method stub // Create first subscriber and start SharedNonDurableSubscriberAndPublisher subOne = new SharedNonDurableSubscriberAndPublisher( "SUB1"); subOne.start(); // Create second subscriber and start SharedNonDurableSubscriberAndPublisher subTwo = new SharedNonDurableSubscriberAndPublisher( "SUB2"); subTwo.start(); // Create third subscriber and start SharedNonDurableSubscriberAndPublisher subThree = new SharedNonDurableSubscriberAndPublisher( "SUB3"); subThree.start(); // Create publisher and start SharedNonDurableSubscriberAndPublisher publisher = new SharedNonDurableSubscriberAndPublisher( "PUBLISHER"); publisher.start(); } }Parent topic: Use JMS 2.0 functionality
Related information