001 package examples.webapp.pubsub.stock;
002
003 import java.io.IOException;
004 import java.util.Arrays;
005
006 import org.apache.commons.httpclient.HostConfiguration;
007 import org.apache.commons.httpclient.HttpClient;
008 import org.apache.commons.httpclient.SimpleHttpConnectionManager;
009 import org.apache.commons.httpclient.UsernamePasswordCredentials;
010 import org.apache.commons.httpclient.auth.AuthScope;
011 import org.apache.commons.httpclient.cookie.CookiePolicy;
012 import org.apache.commons.httpclient.methods.PostMethod;
013 import org.apache.commons.logging.Log;
014 import org.apache.commons.logging.LogFactory;
015
016 import org.json.JSONArray;
017 import org.json.JSONObject;
018
019 public class HttpBasedClient implements Client {
020
021 private static final Log logger = LogFactory.getLog(HttpBasedClient.class);
022
023 private static final String USERNAME = "Publisher";
024 private static final String PASSWORD = "Publisher";
025
026 private HttpClient httpClient;
027 private String clientId = null;
028 private String root = "";
029
030 public HttpBasedClient(String protocol, String host, int port, String root) {
031 createHttpClient(protocol, host, port);
032 this.root = root;
033 }
034
035 public void publish(String[] channels, JSONObject[] data) throws IOException {
036 if(clientId == null) {
037 handshake();
038 connect();
039 }
040 reconnect(channels, data);
041 }
042
043 public void publish(String channel, JSONObject data) throws IOException {
044 publish(new String[]{channel}, new JSONObject[]{data});
045 }
046
047 public void close() {
048
049 }
050
051 private void createHttpClient(String protocol, String host, int port) {
052 httpClient = new HttpClient(new SimpleHttpConnectionManager());
053 HostConfiguration hostConfiguration = new HostConfiguration();
054 hostConfiguration.setHost(host, port, protocol);
055 httpClient.setHostConfiguration(hostConfiguration);
056 httpClient.getHttpConnectionManager().getParams().setConnectionTimeout(30000);
057 httpClient.getParams().setCookiePolicy(CookiePolicy.BROWSER_COMPATIBILITY);
058 httpClient.getParams().setAuthenticationPreemptive(true);
059 httpClient.getState().setCredentials(new AuthScope(host, port, AuthScope.ANY_REALM), new UsernamePasswordCredentials(USERNAME, PASSWORD));
060 }
061
062 private void handshake() throws IOException{
063 logger.info("Begin handshake ...");
064 PostMethod postMethod = new PostMethod(root);
065 JSONObject handshake = new JSONObject();
066 handshake.put(BayeuxConstants.FIELD_CHANNEL, BayeuxConstants.META_HANDSHAKE);
067 handshake.put(BayeuxConstants.FIELD_VERSION, "0.1");
068 handshake.put(BayeuxConstants.FIELD_MINIMUM_VERSION, "0.1");
069 handshake.put(BayeuxConstants.FIELD_SUPPORTED_CONNECTION_TYPES, Arrays.asList("long-polling"));
070 JSONArray handshakeRequest = new JSONArray();
071 handshakeRequest.put(handshake);
072 postMethod.setParameter("message", handshakeRequest.toString());
073 try {
074 httpClient.executeMethod(postMethod);
075 JSONArray handshakeResponse = new JSONArray(postMethod.getResponseBodyAsString());
076 JSONObject obj = handshakeResponse.getJSONObject(0);
077 clientId = obj.getString(BayeuxConstants.FIELD_CLIENT_ID);
078 if (isSuccessfulResponse(obj) && logger.isInfoEnabled()) {
079 logger.info("Handshake successed. Client ID is [" + clientId + "].");
080 }
081 } catch (IOException e) {
082 logger.info("Handshake failed.", e);
083 throw e;
084 }
085 }
086
087 private void connect() throws IOException {
088 logger.info("Begin connect ...");
089 PostMethod postMethod = new PostMethod(root);
090 JSONObject connect = new JSONObject();
091 connect.put(BayeuxConstants.FIELD_CHANNEL, BayeuxConstants.META_CONNECT);
092 connect.put(BayeuxConstants.FIELD_CLIENT_ID, clientId);
093 connect.put(BayeuxConstants.FIELD_CONNECTION_TYPE, "long-polling");
094 JSONArray connectRequest = new JSONArray();
095 connectRequest.put(connect);
096 postMethod.setParameter("message", connectRequest.toString());
097 try {
098 httpClient.executeMethod(postMethod);
099 JSONArray connectResponse = new JSONArray(postMethod.getResponseBodyAsString());
100 JSONObject obj = connectResponse.getJSONObject(0);
101 if (isSuccessfulResponse(obj)) {
102 logger.info("Connect successed.");
103 }
104 } catch (IOException e) {
105 logger.info("Connect failed.", e);
106 throw e;
107 }
108 }
109
110 private void reconnect(String[] channels, JSONObject[] data) throws IOException {
111 PostMethod postMethod = new PostMethod(root);
112 JSONObject reconnect = new JSONObject();
113 reconnect.put(BayeuxConstants.FIELD_CHANNEL, BayeuxConstants.META_RECONNECT);
114 reconnect.put(BayeuxConstants.FIELD_CLIENT_ID, clientId);
115 reconnect.put(BayeuxConstants.FIELD_CONNECTION_TYPE, "long-polling");
116 JSONArray inputArray = new JSONArray();
117 inputArray.put(reconnect);
118 for (int i=0; i<channels.length; i++) {
119 JSONObject publish = new JSONObject();
120 publish.put(BayeuxConstants.FIELD_CHANNEL, channels[i]);
121 publish.put(BayeuxConstants.FIELD_DATA, data[i]);
122 inputArray.put(publish);
123 }
124
125 postMethod.setParameter("message", inputArray.toString());
126
127 try {
128 httpClient.executeMethod(postMethod);
129 JSONArray publishResponse = new JSONArray(postMethod.getResponseBodyAsString());
130 JSONObject obj = publishResponse.getJSONObject(0);
131 if (isSuccessfulResponse(obj) && logger.isInfoEnabled()) {
132 logger.info("Published " + inputArray.toString() + ".");
133 }
134 } catch (IOException e) {
135 logger.info("Published failed.");
136 throw e;
137 }
138 }
139
140 private static boolean isSuccessfulResponse(JSONObject obj) {
141 return obj.getBoolean(BayeuxConstants.FIELD_SUCCESSFUL);
142 }
143 }
144
145 interface BayeuxConstants {
146
147 public static final String FIELD_CHANNEL = "channel";
148 public static final String FIELD_VERSION = "version";
149 public static final String FIELD_MINIMUM_VERSION = "minimumVersion";
150 public static final String FIELD_SUPPORTED_CONNECTION_TYPES = "supportedConnectionTypes";
151 public static final String FIELD_CONNECTION_TYPE = "connectionType";
152 public static final String FIELD_SUCCESSFUL = "successful";
153 public static final String FIELD_DATA = "data";
154 public static final String FIELD_CLIENT_ID = "clientId";
155 public static final String META = "/meta/";
156 public static final String META_CONNECT = "/meta/connect".intern();
157 public static final String META_HANDSHAKE = "/meta/handshake".intern();
158 public static final String META_RECONNECT = "/meta/reconnect".intern();
159 }
|