HttpBasedClient.java
001 package examples.webapp.pubsub.stock;
002 
003 import java.io.IOException;
004 import java.util.Arrays;
005 
006 import org.apache.commons.httpclient.HostConfiguration;
007 import org.apache.commons.httpclient.HttpClient;
008 import org.apache.commons.httpclient.SimpleHttpConnectionManager;
009 import org.apache.commons.httpclient.UsernamePasswordCredentials;
010 import org.apache.commons.httpclient.auth.AuthScope;
011 import org.apache.commons.httpclient.cookie.CookiePolicy;
012 import org.apache.commons.httpclient.methods.PostMethod;
013 import org.apache.commons.logging.Log;
014 import org.apache.commons.logging.LogFactory;
015 
016 import org.json.JSONArray;
017 import org.json.JSONObject;
018 
019 public class HttpBasedClient implements Client {
020 
021   private static final Log logger = LogFactory.getLog(HttpBasedClient.class);
022   
023   private static final String USERNAME = "Publisher";
024   private static final String PASSWORD = "Publisher";
025   
026   private HttpClient httpClient;
027   private String clientId = null;
028   private String root = "";
029   
030   public HttpBasedClient(String protocol, String host, int port, String root) {
031     createHttpClient(protocol, host, port);
032     this.root = root;
033   }
034   
035   public void publish(String[] channels, JSONObject[] datathrows IOException {
036     if(clientId == null) {
037       handshake();
038       connect();
039     }
040     reconnect(channels, data);
041   }
042   
043   public void publish(String channel, JSONObject datathrows IOException {
044     publish(new String[]{channel}new JSONObject[]{data});
045   }
046   
047   public void close() {
048 
049   }
050   
051   private void createHttpClient(String protocol, String host, int port) {
052     httpClient = new HttpClient(new SimpleHttpConnectionManager());
053     HostConfiguration hostConfiguration = new HostConfiguration();
054     hostConfiguration.setHost(host, port, protocol);
055     httpClient.setHostConfiguration(hostConfiguration);
056     httpClient.getHttpConnectionManager().getParams().setConnectionTimeout(30000);
057     httpClient.getParams().setCookiePolicy(CookiePolicy.BROWSER_COMPATIBILITY);  
058     httpClient.getParams().setAuthenticationPreemptive(true);
059     httpClient.getState().setCredentials(new AuthScope(host, port, AuthScope.ANY_REALM)new UsernamePasswordCredentials(USERNAME, PASSWORD));
060   }
061 
062   private void handshake() throws IOException{
063     logger.info("Begin handshake ...");
064     PostMethod postMethod = new PostMethod(root);
065     JSONObject handshake = new JSONObject();
066     handshake.put(BayeuxConstants.FIELD_CHANNEL, BayeuxConstants.META_HANDSHAKE);
067     handshake.put(BayeuxConstants.FIELD_VERSION, "0.1");
068     handshake.put(BayeuxConstants.FIELD_MINIMUM_VERSION, "0.1");
069     handshake.put(BayeuxConstants.FIELD_SUPPORTED_CONNECTION_TYPES, Arrays.asList("long-polling"));
070     JSONArray handshakeRequest = new JSONArray();
071     handshakeRequest.put(handshake);
072     postMethod.setParameter("message", handshakeRequest.toString());
073     try {
074       httpClient.executeMethod(postMethod);
075       JSONArray handshakeResponse = new JSONArray(postMethod.getResponseBodyAsString());
076       JSONObject obj = handshakeResponse.getJSONObject(0);
077       clientId = obj.getString(BayeuxConstants.FIELD_CLIENT_ID);
078       if (isSuccessfulResponse(obj&& logger.isInfoEnabled()) {
079         logger.info("Handshake successed. Client ID is [" + clientId + "].");
080       }    
081     catch (IOException e) {
082       logger.info("Handshake failed.", e);
083       throw e;
084     }
085   }
086   
087   private void connect() throws IOException {
088     logger.info("Begin connect ...");
089     PostMethod postMethod = new PostMethod(root);
090     JSONObject connect = new JSONObject();
091     connect.put(BayeuxConstants.FIELD_CHANNEL, BayeuxConstants.META_CONNECT);
092     connect.put(BayeuxConstants.FIELD_CLIENT_ID, clientId);
093     connect.put(BayeuxConstants.FIELD_CONNECTION_TYPE, "long-polling");
094     JSONArray connectRequest = new JSONArray();
095     connectRequest.put(connect);
096     postMethod.setParameter("message", connectRequest.toString());
097     try {
098       httpClient.executeMethod(postMethod);
099       JSONArray connectResponse = new JSONArray(postMethod.getResponseBodyAsString());
100       JSONObject obj = connectResponse.getJSONObject(0);
101       if (isSuccessfulResponse(obj)) {
102         logger.info("Connect successed.");
103       }      
104     catch (IOException e) {
105       logger.info("Connect failed.", e);
106       throw e;
107     }
108   
109   
110   private void reconnect(String[] channels, JSONObject[] datathrows IOException {
111     PostMethod postMethod = new PostMethod(root);
112     JSONObject reconnect = new JSONObject();
113     reconnect.put(BayeuxConstants.FIELD_CHANNEL, BayeuxConstants.META_RECONNECT);
114     reconnect.put(BayeuxConstants.FIELD_CLIENT_ID, clientId);
115     reconnect.put(BayeuxConstants.FIELD_CONNECTION_TYPE, "long-polling");
116     JSONArray inputArray = new JSONArray();
117     inputArray.put(reconnect);
118     for (int i=0; i<channels.length; i++) {
119       JSONObject publish = new JSONObject();
120       publish.put(BayeuxConstants.FIELD_CHANNEL, channels[i]);
121       publish.put(BayeuxConstants.FIELD_DATA, data[i]);
122       inputArray.put(publish);
123     }
124 
125     postMethod.setParameter("message", inputArray.toString());
126 
127     try {
128       httpClient.executeMethod(postMethod);
129       JSONArray publishResponse = new JSONArray(postMethod.getResponseBodyAsString());
130       JSONObject obj = publishResponse.getJSONObject(0);
131       if (isSuccessfulResponse(obj&& logger.isInfoEnabled()) {
132         logger.info("Published " + inputArray.toString() ".");
133       }      
134     }  catch (IOException e) {
135         logger.info("Published failed.");
136         throw e;
137     }
138   }  
139   
140   private static boolean isSuccessfulResponse(JSONObject obj) {
141     return obj.getBoolean(BayeuxConstants.FIELD_SUCCESSFUL);
142   }
143 }
144 
145 interface BayeuxConstants {
146 
147   public static final String FIELD_CHANNEL = "channel";
148   public static final String FIELD_VERSION = "version";
149   public static final String FIELD_MINIMUM_VERSION = "minimumVersion";
150   public static final String FIELD_SUPPORTED_CONNECTION_TYPES = "supportedConnectionTypes";
151   public static final String FIELD_CONNECTION_TYPE = "connectionType";
152   public static final String FIELD_SUCCESSFUL = "successful";
153   public static final String FIELD_DATA = "data";
154   public static final String FIELD_CLIENT_ID = "clientId";
155   public static final String META = "/meta/";
156   public static final String META_CONNECT = "/meta/connect".intern();
157   public static final String META_HANDSHAKE = "/meta/handshake".intern();
158   public static final String META_RECONNECT = "/meta/reconnect".intern();
159 }