001 package examples.jta.jmsjdbc;
002
003 import java.sql.Connection;
004 import java.sql.PreparedStatement;
005 import java.sql.SQLException;
006 import javax.ejb.CreateException;
007 import javax.jms.*;
008 import javax.naming.Context;
009 import javax.naming.InitialContext;
010 import javax.sql.DataSource;
011 import javax.transaction.UserTransaction;
012 import weblogic.ejb.GenericSessionBean;
013 import weblogic.ejbgen.*;
014 import weblogic.ejbgen.Session;
015
016 /**
017 * ReceiveInTxBean is a stateless session bean using bean-managed
018 * transaction. This EJBean illustrates distributed transactions
019 * and two phase commit across two XA resources: JMS queue and
020 * a Pointbase database. It begins a distributed transaction, receives
021 * messages from the JMS queue, updates the database, and then commits
022 * the distributed transaction.
023 *
024 * @author Copyright (c) 1999,2009, Oracle and/or its affiliates. All Rights Reserved.
025 */
026 @EnvEntries({
027 @EnvEntry(name = "queueName",
028 type = "java.lang.String",
029 value = "weblogic.examples.jms.exampleQueue"),
030 @EnvEntry(name = "queueConnFactoryName",
031 type = "java.lang.String",
032 value = "weblogic.examples.jms.QueueConnectionFactory"),
033 @EnvEntry(name = "tableName",
034 type = "java.lang.String",
035 value = "jtaSamples"),
036 @EnvEntry(name = "xaDataSrcName",
037 type = "java.lang.String",
038 value = "examples-dataSource-demoXAPool")
039 })
040 @FileGeneration(remoteClass = Constants.Bool.TRUE,
041 localHome = Constants.Bool.FALSE,
042 remoteHome = Constants.Bool.TRUE,
043 remoteClassName = "ReceiveInTx",
044 remoteHomeName = "ReceiveInTxHome",
045 localClass = Constants.Bool.FALSE)
046 @JarSettings(ejbClientJar = "jta_ejb_jmsjdbc_client.jar")
047 @JndiName(remote = "jta-jmsjdbc-ReceiveInTxHome")
048 @ResourceRefs({
049 @ResourceRef(name = "demoXADataSource",
050 jndiName = "examples-dataSource-demoXAPool",
051 type = "javax.sql.DataSource",
052 auth = ResourceRef.Auth.CONTAINER,
053 sharingScope = ResourceRef.SharingScope.SHAREABLE)
054 })
055 @Session(maxBeansInFreePool = "100",
056 initialBeansInFreePool = "0",
057 transTimeoutSeconds = "0",
058 type = Session.SessionType.STATELESS,
059 transactionType = Session.SessionTransactionType.BEAN,
060 ejbName = "ReceiveInTxEJB")
061 public class ReceiveInTxBean extends GenericSessionBean {
062
063 private static final boolean VERBOSE = true;
064
065 // You might also consider using WebLogic's log service
066 private void log(String s) {
067 if (VERBOSE) System.out.println(s);
068 }
069
070 /**
071 * This method is required by the EJB Specification,
072 * but is not used by this example.
073 */
074 public void ejbCreate() throws CreateException { }
075 public void ejbPostCreate() throws CreateException { }
076
077 /**
078 * This method implements the receiveMessages method in the ReceiveInTx
079 * remote interface.
080 *
081 * This method begins a distributed transaction, receives messages from
082 * the JMS queue, updates the database, and then commit the distributed
083 * transaction.
084 */
085 @RemoteMethod(transactionAttribute = Constants.TransactionAttribute.SUPPORTS)
086 public void receiveMessages() {
087 QueueConnection qcon = null;
088 QueueSession qsession = null;
089 QueueReceiver qreceiver = null;
090 Connection jcon = null;
091
092 try {
093 Context ictx = new InitialContext();
094
095 Context env = (Context) ictx.lookup("java:comp/env");
096 String queueConnFactoryName = (String)env.lookup("queueConnFactoryName");
097 String queueName = (String)env.lookup("queueName");
098 String xaDataSrcName = (String)env.lookup("xaDataSrcName");
099 String tableName = (String)env.lookup("tableName");
100
101 QueueConnectionFactory qconFactory =
102 (QueueConnectionFactory) ictx.lookup(queueConnFactoryName);
103 qcon = qconFactory.createQueueConnection();
104 qsession = qcon.createQueueSession(false,
105 javax.jms.Session.AUTO_ACKNOWLEDGE);
106 Queue queue = (Queue) ictx.lookup(queueName);
107 qreceiver = qsession.createReceiver(queue);
108 qcon.start();
109
110 DataSource xads = (DataSource) ictx.lookup(xaDataSrcName);
111
112 UserTransaction utx = getSessionContext().getUserTransaction();
113 utx.begin();
114 log("TRANSACTION BEGUN");
115 String msgText = null;
116 do {
117 msgText = queueReceive(qreceiver);
118 if (msgText.equalsIgnoreCase("quit")) {
119 utx.commit();
120 log("TRANSACTION COMMITTED");
121 } else {
122 updateDatabase(xads, tableName, msgText);
123 }
124 } while (msgText != null && !msgText.equals("quit"));
125
126 } catch (javax.naming.NamingException nex) {
127 log("Naming exception: " + nex);
128 } catch (javax.jms.JMSException jex) {
129 log("JMS exception: " + jex);
130 } catch (javax.transaction.NotSupportedException nse) {
131 log("TRANSACTION COULD NOT BEGIN DUE TO: " + nse);
132 } catch (javax.transaction.RollbackException rbe) {
133 log("TRANSACTION ROLLED BACK DUE TO: " + rbe);
134 } catch (javax.transaction.HeuristicRollbackException hre) {
135 log("TRANSACTION ROLLED BACK DUE TO: " + hre);
136 } catch (javax.transaction.HeuristicMixedException hme) {
137 log("TRANSACTION ROLLED BACK DUE TO: " + hme);
138 } catch (javax.transaction.SystemException se) {
139 log("TRANSACTION EXCEPTION: " + se);
140 } finally {
141 if (qreceiver != null) {
142 try { qreceiver.close(); } catch (JMSException ex) {}
143 }
144 if (qsession != null) {
145 try { qsession.close(); } catch (JMSException ex) {}
146 }
147 if (qcon != null) {
148 try { qcon.close(); } catch (JMSException ex) {}
149 }
150 }
151 }
152
153 /**
154 * Receives the message, converts it to a string, and returns it
155 */
156 private String queueReceive(QueueReceiver qr) {
157 String msgText = null;
158 try {
159 Message msg = qr.receive();
160 if (msg != null) {
161 if (msg instanceof TextMessage) {
162 msgText = ((TextMessage)msg).getText();
163 } else {
164 msgText = msg.toString();
165 }
166 log("Message Received: " + msgText);
167 }
168 } catch (JMSException jmse) {
169 log("Error receiving JMS message: " + jmse);
170 }
171 return msgText;
172 }
173
174 /**
175 * Adds String data to String tableName
176 */
177 private void updateDatabase(DataSource xads, String tableName, String data) {
178 Connection jcon = null;
179 PreparedStatement stmt = null;
180 try {
181 jcon = xads.getConnection();
182 String sql = "insert into " + tableName + " (data) values (?)";
183 stmt = jcon.prepareStatement(sql);
184 stmt.setString(1, data);
185 stmt.executeUpdate();
186 } catch (SQLException ex) {
187 log("Cannot update database:" + data);
188 } finally {
189 if (stmt != null) {
190 try { stmt.close(); } catch (SQLException ex) {}
191 }
192 if (jcon != null) {
193 try { jcon.close(); } catch (SQLException ex) {}
194 }
195 }
196 }
197 }
|