WebSphere eXtreme Scale Administration Guide > Configure WebSphere eXtreme Scale > Configuring peer-to-peer replication with JMS
Distribute changes between peer Java virtual machines
The LogSequence and LogElement objects distribute changes between peer JVMs and communicate the changes that have occurred in an eXtreme Scale transaction with an ObjectGridEventListener plug-in.
For more information about how Java™ Message Service (JMS) can be used to distribute transactional changes, see Use JMS to distribute transaction changes.
A prerequisite is that the ObjectGrid instance must be cached by the ObjectGridManager. See createObjectGrid methods for more information. The cacheInstance boolean value must be set to true.
It is not necessary for you to implement this mechanism. There is a built-in peer-to-peer replication mechanism for you to use this function. See Configure peer-to-peer replication with JMS.
The objects provide a means for an application to publish changes that have occurred in an ObjectGrid using a message transport to peer ObjectGrids in remote Java virtual machines and then apply those changes on that JVM. The LogSequenceTransformer class is critical to enabling this support. This article examines how to write a listener using a Java Message Service (JMS) messaging system for propagating the messages.
To that end, eXtreme Scale supports transmitting LogSequences that result from an eXtreme Scale transaction commit across WebSphere Application Server cluster members with an IBM-provided plug-in. This function is not enabled by default, but can be configured to be operational. However, when either the consumer or producer is not a WebSphere Application Server, using an external JMS messaging system might be required.
Implement the mechanism
The LogSequenceTransformer class, and the ObjectGridEventListener, LogSequence and LogElement APIs allow any reliable publish-and-subscribe to be used to distribute the changes and filter the maps to distribute. The snippets in this topic show how to use these APIs with JMS to build a peer-to-peer ObjectGrid shared by applications that are hosted on a diverse set of platforms sharing a common message transport.
Initialize the plug-inThe ObjectGrid calls the initialize method of the plug-in, part of the ObjectGridEventListener interface contract, when the ObjectGrid starts. The initialize method must obtain its JMS resources, including connections, sessions, and publishers, and start the thread that is the JMS listener.
The following examples show the initialize method:
initialize method example public void initialize(Session session) { mySession = session; myGrid = session.getObjectGrid(); try { if (mode == null) { throw new ObjectGridRuntimeException("No mode specified"); } if (userid != null) { connection = topicConnectionFactory.createTopicConnection(userid, password); } else connection = topicConnectionFactory.createTopicConnection(); // need to start the connection to receive messages. connection.start(); // the jms session is not transactional (false). jmsSession = connection.createTopicSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); if (topic == null) if (topicName == null) { throw new ObjectGridRuntimeException("Topic not specified"); } else { topic = jmsSession.createTopic(topicName); } publisher = jmsSession.createPublisher(topic); // start the listener thread. listenerRunning = true; listenerThread = new Thread(this); listenerThread.start(); } catch (Throwable e) { throw new ObjectGridRuntimeException("Cannot initialize", e); } }
The code to start the thread uses a Java 2 Platform, Standard Edition (Java SE) thread. If you are running a WebSphere Application Server v6.x or a WebSphere Application Server v5.x Enterprise server, use the asynchronous bean application programming interface (API) to start this daemon thread. You can also use the common APIs. Following is an example replacement snippet showing the same action using a work manager:
// start the listener thread. listenerRunning = true; workManager.startWork(this, true);
The plug-in must also implement the Work interface instead of the Runnable interface. You also need to add a release method to set the listenerRunning variable to false. The plug-in must be provided with a WorkManager instance in its constructor or by injection if using an Inversion of Control (IoC) container.
Transmit the changesThe following is a sample transactionEnd method for publishing the local changes that are made to an ObjectGrid. This sample uses JMS, although you can use any message transport that is capable of reliable publish-and subscribe-messaging.
transactionEnd method example // This method is synchronized to make sure the // messages are published in the order the transaction // were committed. If we started publishing the messages // in parallel then the receivers could corrupt the Map // as deletes may arrive before inserts etc. public synchronized void transactionEnd(String txid, boolean isWriteThroughEnabled, boolean committed, Collection changes) { try { // must be write through and commited. if (isWriteThroughEnabled && committed) { // write the sequences to a byte [] ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); if (publishMaps.isEmpty()) { // serialize the whole collection LogSequenceTransformer.serialize(changes, oos, this, mode); } else { // filter LogSequences based on publishMaps contents Collection publishChanges = new ArrayList(); Iterator iter = changes.iterator(); while (iter.hasNext()) { LogSequence ls = (LogSequence) iter.next(); if (publishMaps.contains(ls.getMapName())) { publishChanges.add(ls); } } LogSequenceTransformer.serialize(publishChanges, oos, this, mode); } // make an object message for the changes oos.flush(); ObjectMessage om = jmsSession.createObjectMessage(bos.toByteArray()); // set properties om.setStringProperty(PROP_TX, txid); om.setStringProperty(PROP_GRIDNAME, myGrid.getName()); // transmit it. publisher.publish(om); } } catch (Throwable e) { throw new ObjectGridRuntimeException("Cannot push changes", e); } }
This method uses several instance variables:
- jmsSession variable: A JMS session that is used to publish messages. It is created when the plug-in initializes.
- mode variable: The distribution mode.
- publishMaps variable: A set that contains the name of each map with changes to publish. If the variable is empty, then all the maps are published.
- publisher variable: A TopicPublisher object that is created during the plug-in initialize method
Receive and apply update messagesFollowing is the run method. This method runs in a loop until the application stops the loop. Each loop iteration attempts to receive a JMS message and apply it to the ObjectGrid.
JMS message run method example private synchronized boolean isListenerRunning() { return listenerRunning; } public void run() { try { System.out.println("Listener starting"); // get a jms session for receiving the messages. // Non transactional. TopicSession myTopicSession; myTopicSession = connection.createTopicSession(false, javax.jms. Session.AUTO_ACKNOWLEDGE); // get a subscriber for the topic, true indicates don't receive // messages transmitted using publishers // on this connection. Otherwise, we'd receive our own updates. TopicSubscriber subscriber = myTopicSession.createSubscriber(topic, null, true); System.out.println("Listener started"); while (isListenerRunning()) { ObjectMessage om = (ObjectMessage) subscriber.receive(2000); if (om != null) { // Use Session that was passed in on the initialize... // very important to use no write through here mySession.beginNoWriteThrough(); byte[] raw = (byte[]) om.getObject(); ByteArrayInputStream bis = new ByteArrayInputStream(raw); ObjectInputStream ois = new ObjectInputStream(bis); // inflate the LogSequences Collection collection = LogSequenceTransformer.inflate(ois, myGrid); Iterator iter = collection.iterator(); while (iter.hasNext()) { // process each Maps changes according to the mode when // the LogSequence was serialized LogSequence seq = (LogSequence) iter.next(); mySession.processLogSequence(seq); } mySession.commit(); } // if there was a message } // while loop // stop the connection connection.close(); } catch (IOException e) { System.out.println("IO Exception: " + e); } catch (JMSException e) { System.out.println("JMS Exception: " + e); } catch (ObjectGridException e) { System.out.println("ObjectGrid exception: " + e); System.out.println("Caused by: " + e.getCause()); } catch (Throwable e) { System.out.println("Exception : " + e); } System.out.println("Listener stopped"); }
Parent topic
Configure peer-to-peer replication with JMS