diff --git a/src/java/org/jivesoftware/multiplexer/ClientSession.java b/src/java/org/jivesoftware/multiplexer/ClientSession.java index da4c0c5..3a4b0ff 100644 --- a/src/java/org/jivesoftware/multiplexer/ClientSession.java +++ b/src/java/org/jivesoftware/multiplexer/ClientSession.java @@ -22,9 +22,6 @@ import org.xmlpull.v1.XmlPullParser; import org.xmlpull.v1.XmlPullParserException; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - /** * Session that represents a client to server connection. * @@ -45,14 +42,6 @@ */ private static long idleTimeout; - private static StreamIDFactory idFactory = new StreamIDFactory(); - - /** - * Map of existing sessions. A session is added just after the initial stream header - * was processed. Key: stream ID, value: the session. - */ - private static Map sessions = - new ConcurrentHashMap(); /** * Socket reader that is processing incoming packets from the client. */ @@ -148,7 +137,7 @@ // Set the stream ID that identifies the client when forwarding traffic to a client fails ((ClientFailoverDeliverer) connection.getPacketDeliverer()).setStreamID(streamID); // Register that the new session is associated with the specified stream ID - sessions.put(streamID, session); + Session.addSession(streamID, session); // Send to the server that a new client session has been created serverSurrogate.clientSessionCreated(streamID); @@ -206,28 +195,6 @@ return session; } - /** - * Closes connections of connected clients since the server or the connection - * manager is being shut down. If the server is the one that is being shut down - * then the connection manager will keep running and will try to establish new - * connections to the server (on demand). - */ - public static void closeAll() { - for (ClientSession session : sessions.values()) { - session.close(true); - } - } - - /** - * Returns the session whose stream ID matches the specified stream ID. - * - * @param streamID the stream ID of the session to look for. - * @return the session whose stream ID matches the specified stream ID. - */ - public static ClientSession getSession(String streamID) { - return sessions.get(streamID); - } - public ClientSession(String serverName, Connection connection, String streamID) { super(serverName, connection, streamID); } @@ -327,7 +294,7 @@ // Changhe the status to closed status = STATUS_CLOSED; // Remove session from list of sessions - sessions.remove(getStreamID()); + removeSession(getStreamID()); // Tell the server that the client session has been closed ConnectionManager.getInstance().getServerSurrogate().clientSessionClosed(getStreamID()); } diff --git a/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java b/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java index c532592..620a38c 100644 --- a/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java +++ b/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java @@ -70,7 +70,7 @@ // Check if the server is informing us that we need to close a session if (wrapper.element("close") != null) { // Get the session that matches the requested stream ID - ClientSession session = ClientSession.getSession(streamID); + Session session = Session.getSession(streamID); if (session != null) { session.close(); } @@ -123,11 +123,12 @@ // Get the wrapped stanza Element stanza = (Element) route.elementIterator().next(); // Get the session that matches the requested stream ID - ClientSession session = ClientSession.getSession(streamID); + Session session = Session.getSession(streamID); if (session != null) { // Deliver the wrapped stanza to the client session.deliver(stanza); - } else { + } + else { // Inform the server that the wrapped stanza was not delivered String tag = stanza.getName(); if ("message".equals(tag)) { diff --git a/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java b/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java index 1617f95..a614f21 100644 --- a/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java +++ b/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java @@ -55,7 +55,7 @@ /** * Cache the SASL mechanisms supported by the server for client authentication */ - private String saslMechanisms; + private Element saslMechanisms; /** * Flag indicating if non-sasl authentication is supported by the server. */ @@ -146,7 +146,7 @@ * * @param streamID the stream ID assigned by the connection manager to the new session. */ - void clientSessionCreated(final String streamID) { + public void clientSessionCreated(final String streamID) { threadPool.execute(new NewSessionTask(streamID)); } @@ -190,6 +190,10 @@ * @return the SASL mechanisms supported by the server for client authentication. */ public String getSASLMechanisms(Session session) { + return saslMechanisms.asXML(); + } + + public Element getSASLMechanismsElement(Session session) { return saslMechanisms; } @@ -199,7 +203,7 @@ * @param mechanisms the SASL mechanisms supported by the server for client authentication. */ public void setSASLMechanisms(Element mechanisms) { - saslMechanisms = mechanisms.asXML(); + saslMechanisms = mechanisms.createCopy(); } /** diff --git a/src/java/org/jivesoftware/multiplexer/Session.java b/src/java/org/jivesoftware/multiplexer/Session.java index 27769db..d20eeeb 100644 --- a/src/java/org/jivesoftware/multiplexer/Session.java +++ b/src/java/org/jivesoftware/multiplexer/Session.java @@ -11,7 +11,11 @@ package org.jivesoftware.multiplexer; +import org.dom4j.Element; + import java.util.Date; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * The session represents a connection between the server and a client (c2s) or @@ -28,7 +32,7 @@ /** * Version of the XMPP spec supported as MAJOR_VERSION.MINOR_VERSION (e.g. 1.0). */ - public static final int MAJOR_VERSION = 1; + public static final int MAJOR_VERSION = 1; public static final int MINOR_VERSION = 0; /** @@ -61,6 +65,44 @@ private Date startDate = new Date(); /** + * Map of existing sessions. A session is added just after the initial stream header + * was processed. Key: stream ID, value: the session. + */ + private static Map sessions = new ConcurrentHashMap(); + + public static StreamIDFactory idFactory = new StreamIDFactory(); + + public static void addSession(String streamID, Session session) { + sessions.put(streamID, session); + } + + protected static void removeSession(String streamID) { + sessions.remove(streamID); + } + + /** + * Returns the session whose stream ID matches the specified stream ID. + * + * @param streamID the stream ID of the session to look for. + * @return the session whose stream ID matches the specified stream ID. + */ + public static Session getSession(String streamID) { + return sessions.get(streamID); + } + + /** + * Closes connections of connected clients since the server or the connection + * manager is being shut down. If the server is the one that is being shut down + * then the connection manager will keep running and will try to establish new + * connections to the server (on demand). + */ + public static void closeAll() { + for (Session session : sessions.values()) { + session.close(true); + } + } + + /** * Creates a session with an underlying connection and permission protection. * * @param connection The connection we are proxying @@ -133,6 +175,10 @@ */ public abstract void close(); + public abstract void close(boolean isServerShuttingDown); + + public abstract void deliver(Element stanza); + public String toString() { return super.toString() + " status: " + status + " id: " + streamID; } diff --git a/src/java/org/jivesoftware/multiplexer/StreamIDFactory.java b/src/java/org/jivesoftware/multiplexer/StreamIDFactory.java index f466771..0bb0bba 100644 --- a/src/java/org/jivesoftware/multiplexer/StreamIDFactory.java +++ b/src/java/org/jivesoftware/multiplexer/StreamIDFactory.java @@ -22,7 +22,7 @@ * * @author Gaston Dombiak */ -class StreamIDFactory { +public class StreamIDFactory { /** * The random number to use, someone with Java can predict stream IDs if they can guess the current seed * */ diff --git a/src/java/org/jivesoftware/multiplexer/net/http/HttpConnection.java b/src/java/org/jivesoftware/multiplexer/net/http/HttpConnection.java index 4ce2827..50a9fc3 100644 --- a/src/java/org/jivesoftware/multiplexer/net/http/HttpConnection.java +++ b/src/java/org/jivesoftware/multiplexer/net/http/HttpConnection.java @@ -11,24 +11,21 @@ import org.jivesoftware.multiplexer.Connection; import org.dom4j.Element; -import java.net.InetAddress; -import java.net.UnknownHostException; - /** * */ -public class HttpConnection implements Connection { - private int majorVersion = 1; - private int minorVersion = 0; +public class HttpConnection { + private Connection.CompressionPolicy compressionPolicy; + private long requestId; + + public HttpConnection(long requestID) { + this.requestId = requestID; + } public boolean validate() { return false; } - public InetAddress getInetAddress() throws UnknownHostException { - return null; - } - public void close() { } @@ -46,34 +43,19 @@ public void deliver(Element doc) { } - public void deliverRawText(String text) { - } - - public boolean isFlashClient() { - return false; - } - - public int getMajorXMPPVersion() { - return majorVersion; - } - - public int getMinorXMPPVersion() { - return minorVersion; - } - - public String getLanguage() { - return null; - } - public boolean isCompressed() { return false; } - public CompressionPolicy getCompressionPolicy() { - return null; + public Connection.CompressionPolicy getCompressionPolicy() { + return compressionPolicy; } - public TLSPolicy getTlsPolicy() { - return null; + public void setCompressionPolicy(Connection.CompressionPolicy compressionPolicy) { + this.compressionPolicy = compressionPolicy; + } + + public long getRequestId() { + return requestId; } } diff --git a/src/java/org/jivesoftware/multiplexer/net/http/HttpSession.java b/src/java/org/jivesoftware/multiplexer/net/http/HttpSession.java index da625ff..52fb365 100644 --- a/src/java/org/jivesoftware/multiplexer/net/http/HttpSession.java +++ b/src/java/org/jivesoftware/multiplexer/net/http/HttpSession.java @@ -8,20 +8,28 @@ */ package org.jivesoftware.multiplexer.net.http; -import org.jivesoftware.multiplexer.Session; -import org.jivesoftware.multiplexer.Connection; +import org.jivesoftware.multiplexer.*; +import org.dom4j.Element; + +import java.util.Queue; +import java.util.LinkedList; /** * */ public class HttpSession extends Session { - /** - * Creates a session with an underlying connection and permission protection. - * - * @param connection The connection we are proxying - */ - public HttpSession(String serverName, Connection connection, String streamID) { - super(serverName, connection, streamID); + private int wait; + private int hold; + private String language; + private final Queue connectionQueue = new LinkedList(); + + + public HttpSession(String serverName, String streamID) { + super(serverName, null, streamID); + } + + void addConnection(HttpConnection connection) { + connectionQueue.offer(connection); } public String getAvailableStreamFeatures() { @@ -30,4 +38,64 @@ public void close() { } + + public void close(boolean isServerShuttingDown) { + } + + public void deliver(Element stanza) { + } + + + /** + * This attribute specifies the longest time (in seconds) that the connection manager is allowed + * to wait before responding to any request during the session. This enables the client to + * prevent its TCP connection from expiring due to inactivity, as well as to limit the delay + * before it discovers any network failure. + * + * @param wait the longest time it is permissible to wait for a response. + */ + public void setWait(int wait) { + this.wait = wait; + } + + /** + * This attribute specifies the longest time (in seconds) that the connection manager is allowed + * to wait before responding to any request during the session. This enables the client to + * prevent its TCP connection from expiring due to inactivity, as well as to limit the delay + * before it discovers any network failure. + * + * @return the longest time it is permissible to wait for a response. + */ + public int getWait() { + return wait; + } + + /** + * This attribute specifies the maximum number of requests the connection manager is allowed + * to keep waiting at any one time during the session. (For example, if a constrained client + * is unable to keep open more than two HTTP connections to the same HTTP server simultaneously, + * then it SHOULD specify a value of "1".) + * + * @param hold the maximum number of simultaneous waiting requests. + * + */ + public void setHold(int hold) { + this.hold = hold; + } + + /** + * This attribute specifies the maximum number of requests the connection manager is allowed + * to keep waiting at any one time during the session. (For example, if a constrained client + * is unable to keep open more than two HTTP connections to the same HTTP server simultaneously, + * then it SHOULD specify a value of "1".) + * + * @return the maximum number of simultaneous waiting requests + */ + public int getHold() { + return hold; + } + + public void setLanaguage(String language) { + this.language = language; + } } diff --git a/src/java/org/jivesoftware/multiplexer/net/http/HttpSessionManager.java b/src/java/org/jivesoftware/multiplexer/net/http/HttpSessionManager.java index 81b813d..6f5abda 100644 --- a/src/java/org/jivesoftware/multiplexer/net/http/HttpSessionManager.java +++ b/src/java/org/jivesoftware/multiplexer/net/http/HttpSessionManager.java @@ -8,8 +8,143 @@ */ package org.jivesoftware.multiplexer.net.http; +import org.jivesoftware.util.JiveGlobals; +import org.jivesoftware.multiplexer.ServerSurrogate; +import org.jivesoftware.multiplexer.ConnectionManager; +import org.jivesoftware.multiplexer.Session; +import org.dom4j.Element; +import org.dom4j.DocumentHelper; + /** * */ public class HttpSessionManager { + /** + * Milliseconds a connection has to be idle to be closed. Default is 30 minutes. Sending + * stanzas to the client is not considered as activity. We are only considering the connection + * active when the client sends some data or hearbeats (i.e. whitespaces) to the server. + * The reason for this is that sending data will fail if the connection is closed. And if + * the thread is blocked while sending data (because the socket is closed) then the clean up + * thread will close the socket anyway. + */ + private static long inactivityTimeout; + + /** + * The connection manager MAY limit the number of simultaneous requests the client makes with + * the 'requests' attribute. The RECOMMENDED value is "2". Servers that only support polling + * behavior MUST prevent clients from making simultaneous requests by setting the 'requests' + * attribute to a value of "1" (however, polling is NOT RECOMMENDED). In any case, clients MUST + * NOT make more simultaneous requests than specified by the connection manager. + */ + private static int maxRequests; + + /** + * The connection manager SHOULD include two additional attributes in the session creation + * response element, specifying the shortest allowable polling interval and the longest + * allowable inactivity period (both in seconds). Communication of these parameters enables + * the client to engage in appropriate behavior (e.g., not sending empty request elements more + * often than desired, and ensuring that the periods with no requests pending are + * never too long). + */ + private static int pollingInterval; + + static { + // Set the default read idle timeout. If none was set then assume 30 minutes + inactivityTimeout = JiveGlobals.getIntProperty("xmpp.httpbind.client.idle", 30 * 1000); + maxRequests = JiveGlobals.getIntProperty("xmpp.httpbind.client.requests.max", 2); + pollingInterval = JiveGlobals.getIntProperty("xmpp.httpbind.client.requests.polling", 5); + } + + public HttpSession getSession(String streamID) { + Session session = Session.getSession(streamID); + if(session instanceof HttpSession) { + return (HttpSession) session; + } + return null; + } + + public HttpSession createSession(String serverName, Element rootNode, HttpConnection connection) + { + // TODO Check if IP address is allowed to connect to the server + + // Default language is English ("en"). + String language = rootNode.attributeValue("xml:lang"); + if(language == null || "".equals(language)) { + language = "en"; + } + + int wait = getIntAttribute(rootNode.attributeValue("wait"), 60); + int hold = getIntAttribute(rootNode.attributeValue("hold"), 1); + + long rid = getLongAttribue(rootNode.attributeValue("rid"), -1); + + ServerSurrogate serverSurrogate = ConnectionManager.getInstance().getServerSurrogate(); + // Indicate the compression policy to use for this connection + connection.setCompressionPolicy(serverSurrogate.getCompressionPolicy()); + + HttpSession session = createSession(serverName); + session.setWait(wait); + session.setHold(hold); + // Store language and version information in the connection. + session.setLanaguage(language); + session.addConnection(connection); + + session.deliver(createSessionCreationResponse(session, serverSurrogate)); + + return session; + } + + private long getLongAttribue(String value, long defaultValue) { + if(value == null || "".equals(value)) { + return defaultValue; + } + try { + return Long.valueOf(value); + } + catch (Exception ex) { + return defaultValue; + } + } + + private HttpSession createSession(String serverName) { + ServerSurrogate serverSurrogate = ConnectionManager.getInstance().getServerSurrogate(); + // Create a ClientSession for this user. + String streamID = Session.idFactory.createStreamID(); + HttpSession session = new HttpSession(serverName, streamID); + // Register that the new session is associated with the specified stream ID + HttpSession.addSession(streamID, session); + // Send to the server that a new client session has been created + serverSurrogate.clientSessionCreated(streamID); + return session; + } + + private static int getIntAttribute(String value, int defaultValue) { + if(value == null || "".equals(value)) { + return defaultValue; + } + try { + return Integer.valueOf(value); + } + catch (Exception ex) { + return defaultValue; + } + } + + private static Element createSessionCreationResponse(HttpSession session, + ServerSurrogate serverSurrogate) + { + Element element = DocumentHelper.createElement("body"); + element.addAttribute("xmlns", "http://jabber.org/protocol/httpbind"); + element.addAttribute("authID", session.getStreamID()); + element.addAttribute("sid", session.getStreamID()); + element.addAttribute("secure", "true"); + element.addAttribute("requests", String.valueOf(maxRequests)); + element.addAttribute("inactivity", String.valueOf(inactivityTimeout)); + element.addAttribute("polling", String.valueOf(pollingInterval)); + element.addAttribute("wait", String.valueOf(session.getWait())); + + Element features = element.addElement("stream:features"); + features.appendContent(serverSurrogate.getSASLMechanismsElement(session)); + return element; + } } diff --git a/src/java/org/jivesoftware/multiplexer/spi/ServerFailoverDeliverer.java b/src/java/org/jivesoftware/multiplexer/spi/ServerFailoverDeliverer.java index 00d4677..6eef4d7 100644 --- a/src/java/org/jivesoftware/multiplexer/spi/ServerFailoverDeliverer.java +++ b/src/java/org/jivesoftware/multiplexer/spi/ServerFailoverDeliverer.java @@ -12,8 +12,8 @@ package org.jivesoftware.multiplexer.spi; import org.dom4j.Element; -import org.jivesoftware.multiplexer.ClientSession; import org.jivesoftware.multiplexer.PacketDeliverer; +import org.jivesoftware.multiplexer.Session; /** * Deliverer to use when a stanza received from a client failed to be forwarded @@ -42,7 +42,7 @@ error.addElement("internal-server-error") .addAttribute("xmlns", "urn:ietf:params:xml:ns:xmpp-stanzas"); // Get the session that matches the specified stream ID - ClientSession session = ClientSession.getSession(streamID); + Session session = Session.getSession(streamID); if (session != null) { // Bounce the failed packet session.deliver(reply);