diff --git a/src/java/org/jivesoftware/multiplexer/ClientSession.java b/src/java/org/jivesoftware/multiplexer/ClientSession.java index 3a4b0ff..9c6221b 100644 --- a/src/java/org/jivesoftware/multiplexer/ClientSession.java +++ b/src/java/org/jivesoftware/multiplexer/ClientSession.java @@ -299,4 +299,8 @@ ConnectionManager.getInstance().getServerSurrogate().clientSessionClosed(getStreamID()); } } + + public boolean isClosed() { + return status == STATUS_CLOSED; + } } diff --git a/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java b/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java index 620a38c..4e5fdc5 100644 --- a/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java +++ b/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java @@ -124,7 +124,7 @@ Element stanza = (Element) route.elementIterator().next(); // Get the session that matches the requested stream ID Session session = Session.getSession(streamID); - if (session != null) { + if (session != null && !session.isClosed()) { // Deliver the wrapped stanza to the client session.deliver(stanza); } diff --git a/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java b/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java index a614f21..d509af7 100644 --- a/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java +++ b/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java @@ -156,7 +156,7 @@ * * @param streamID the stream ID assigned by the connection manager to the session. */ - void clientSessionClosed(final String streamID) { + public void clientSessionClosed(final String streamID) { threadPool.execute(new CloseSessionTask(streamID)); } diff --git a/src/java/org/jivesoftware/multiplexer/Session.java b/src/java/org/jivesoftware/multiplexer/Session.java index d20eeeb..a674933 100644 --- a/src/java/org/jivesoftware/multiplexer/Session.java +++ b/src/java/org/jivesoftware/multiplexer/Session.java @@ -76,7 +76,7 @@ sessions.put(streamID, session); } - protected static void removeSession(String streamID) { + public static void removeSession(String streamID) { sessions.remove(streamID); } @@ -191,4 +191,5 @@ return answer; } + public abstract boolean isClosed(); } \ No newline at end of file diff --git a/src/java/org/jivesoftware/multiplexer/net/http/HttpBindException.java b/src/java/org/jivesoftware/multiplexer/net/http/HttpBindException.java new file mode 100644 index 0000000..f2e6f06 --- /dev/null +++ b/src/java/org/jivesoftware/multiplexer/net/http/HttpBindException.java @@ -0,0 +1,31 @@ +/** + * $RCSfile: $ + * $Revision: $ + * $Date: $ + * + * Copyright (C) 2006 Jive Software. All rights reserved. + * This software is the proprietary information of Jive Software. Use is subject to license terms. + */ +package org.jivesoftware.multiplexer.net.http; + +/** + * + */ +public class HttpBindException extends Exception { + private boolean shouldCloseSession; + private int httpError; + + public HttpBindException(String message, boolean shouldCloseSession, int httpError) { + super(message); + this.shouldCloseSession = shouldCloseSession; + this.httpError = httpError; + } + + public int getHttpError() { + return httpError; + } + + public boolean shouldCloseSession() { + return shouldCloseSession; + } +} diff --git a/src/java/org/jivesoftware/multiplexer/net/http/HttpBindServlet.java b/src/java/org/jivesoftware/multiplexer/net/http/HttpBindServlet.java index 56ccc04..06bfe8d 100644 --- a/src/java/org/jivesoftware/multiplexer/net/http/HttpBindServlet.java +++ b/src/java/org/jivesoftware/multiplexer/net/http/HttpBindServlet.java @@ -51,7 +51,8 @@ @Override protected void doPost(HttpServletRequest request, HttpServletResponse response) - throws ServletException, IOException { + throws ServletException, IOException + { if (isContinuation(request, response)) { return; } @@ -77,7 +78,7 @@ String sid = node.attributeValue("sid"); // We have a new session if (sid == null) { - createNewSession(response, node); + createNewSession(request, response, node); } else { handleSessionRequest(sid, request, response, node); @@ -85,7 +86,8 @@ } private boolean isContinuation(HttpServletRequest request, HttpServletResponse response) - throws IOException { + throws IOException + { HttpConnection connection = (HttpConnection) request.getAttribute("request-connection"); if (connection == null) { return false; @@ -96,7 +98,8 @@ private void handleSessionRequest(String sid, HttpServletRequest request, HttpServletResponse response, Element rootNode) - throws IOException { + throws IOException + { long rid = getLongAttribue(rootNode.attributeValue("rid"), -1); if (rid <= 0) { response.sendError(HttpServletResponse.SC_BAD_REQUEST, "Body missing RID (Request ID)"); @@ -111,28 +114,42 @@ return; } synchronized (session) { - HttpConnection connection = sessionManager.forwardRequest(rid, session, rootNode); + HttpConnection connection; + try { + connection = sessionManager.forwardRequest(rid, session, + request.isSecure(), rootNode); + } + catch (HttpBindException e) { + response.sendError(e.getHttpError(), e.getMessage()); + if(e.shouldCloseSession()) { + session.close(); + } + return; + } connection.setContinuation(ContinuationSupport.getContinuation(request, connection)); request.setAttribute("request-connection", connection); respond(response, connection); } } - private void createNewSession(HttpServletResponse response, Element rootNode) - throws IOException { + private void createNewSession(HttpServletRequest request, HttpServletResponse response, + Element rootNode) + throws IOException + { long rid = getLongAttribue(rootNode.attributeValue("rid"), -1); if (rid <= 0) { response.sendError(HttpServletResponse.SC_BAD_REQUEST, "Body missing RID (Request ID)"); return; } - HttpConnection connection = new HttpConnection(rid); + HttpConnection connection = new HttpConnection(rid, request.isSecure()); connection.setSession(sessionManager.createSession(rootNode, connection)); respond(response, connection); } private void respond(HttpServletResponse response, HttpConnection connection) - throws IOException { + throws IOException + { byte[] content; try { content = connection.getDeliverable().getBytes("utf-8"); diff --git a/src/java/org/jivesoftware/multiplexer/net/http/HttpConnection.java b/src/java/org/jivesoftware/multiplexer/net/http/HttpConnection.java index 08f12c2..54013e2 100644 --- a/src/java/org/jivesoftware/multiplexer/net/http/HttpConnection.java +++ b/src/java/org/jivesoftware/multiplexer/net/http/HttpConnection.java @@ -25,9 +25,11 @@ private HttpSession session; private Continuation continuation; private boolean isClosed; + private boolean isSecure = false; - public HttpConnection(long requestID) { - this.requestId = requestID; + public HttpConnection(long requestId, boolean isSecure) { + this.requestId = requestId; + this.isSecure = isSecure; } public boolean validate() { @@ -55,7 +57,7 @@ } public boolean isSecure() { - return false; + return isSecure; } /** diff --git a/src/java/org/jivesoftware/multiplexer/net/http/HttpSession.java b/src/java/org/jivesoftware/multiplexer/net/http/HttpSession.java index 8222cb7..5018542 100644 --- a/src/java/org/jivesoftware/multiplexer/net/http/HttpSession.java +++ b/src/java/org/jivesoftware/multiplexer/net/http/HttpSession.java @@ -11,6 +11,7 @@ package org.jivesoftware.multiplexer.net.http; import org.jivesoftware.multiplexer.*; +import org.jivesoftware.multiplexer.spi.ClientFailoverDeliverer; import org.dom4j.Element; import org.dom4j.DocumentHelper; @@ -31,17 +32,31 @@ private String language; private final Queue connectionQueue = new LinkedList(); private final List pendingElements = new ArrayList(); - + private boolean isSecure; + private int maxPollingInterval; + private long lastPoll = -1; + private Set listeners = new HashSet(); + private boolean isClosed; + private int inactivityTimeout; protected HttpSession(String serverName, String streamID) { super(serverName, null, streamID); } - void addConnection(HttpConnection connection, boolean isPoll) { + void addConnection(HttpConnection connection, boolean isPoll) throws HttpBindException { if(connection == null) { throw new IllegalArgumentException("Connection cannot be null."); } + if(isPoll) { + checkPollingInterval(); + } + + if(isSecure && !connection.isSecure()) { + throw new HttpBindException("Session was started from secure connection, all " + + "connections on this session must be secured.", false, 403); + } + connection.setSession(this); if(pendingElements.size() > 0) { createDeliverable(pendingElements); @@ -57,14 +72,51 @@ connectionQueue.offer(connection); } + private void checkPollingInterval() throws HttpBindException { + long time = System.currentTimeMillis(); + if(lastPoll > 0 && ((lastPoll - time) / 1000) < maxPollingInterval) { + throw new HttpBindException("Too frequent polling", true, 403); + } + lastPoll = time; + } + public String getAvailableStreamFeatures() { return null; } - public void close() { + public synchronized void close() { + close(false); } - public void close(boolean isServerShuttingDown) { + public synchronized void close(boolean isServerShuttingDown) { + if(isClosed) { + return; + } + isClosed = true; + + if(pendingElements.size() > 0) { + failDelivery(); + } + + Collection listeners = + new HashSet(this.listeners); + this.listeners.clear(); + for(SessionCloseListener listener : listeners) { + listener.sessionClosed(this); + } + } + + private void failDelivery() { + ClientFailoverDeliverer deliverer = new ClientFailoverDeliverer(); + deliverer.setStreamID(getStreamID()); + for(Element element : pendingElements) { + deliverer.deliver(element); + } + pendingElements.clear(); + } + + public synchronized boolean isClosed() { + return isClosed; } public synchronized void deliver(Element stanza) { @@ -158,9 +210,46 @@ * Sets the max interval within which a client can send polling requests. If more than one * request occurs in the interval the session will be terminated. * - * @param pollingInterval time in seconds a client needs to wait before sending polls to the + * @param maxPollingInterval time in seconds a client needs to wait before sending polls to the * server, a negative int indicates that there is no limit. */ - public void setMaxPollingInterval(int pollingInterval) { + public void setMaxPollingInterval(int maxPollingInterval) { + this.maxPollingInterval = maxPollingInterval; + } + + /** + * Sets whether the initial request on the session was secure. + * + * @param isSecure true if the initial request was secure and false if it wasn't. + */ + protected void setSecure(boolean isSecure) { + this.isSecure = isSecure; + } + + /** + * Returns true if all connections on this session should be secured, and false if + * they should not. + * + * @return true if all connections on this session should be secured, and false if + * they should not. + */ + public boolean isSecure() { + return isSecure; + } + + public void addSessionCloseListener(SessionCloseListener listener) { + listeners.add(listener); + } + + public void removeSessionCloseListener(SessionCloseListener listener) { + listeners.remove(listener); + } + + public void setInactivityTimeout(int inactivityTimeout) { + this.inactivityTimeout = inactivityTimeout; + } + + public int getInactivityTimeout() { + return inactivityTimeout; } } diff --git a/src/java/org/jivesoftware/multiplexer/net/http/HttpSessionManager.java b/src/java/org/jivesoftware/multiplexer/net/http/HttpSessionManager.java index b1b6807..aff80e8 100644 --- a/src/java/org/jivesoftware/multiplexer/net/http/HttpSessionManager.java +++ b/src/java/org/jivesoftware/multiplexer/net/http/HttpSessionManager.java @@ -16,7 +16,7 @@ import org.jivesoftware.multiplexer.Session; import org.dom4j.Element; -import java.util.List; +import java.util.*; /** * @@ -31,7 +31,7 @@ * the thread is blocked while sending data (because the socket is closed) then the clean up * thread will close the socket anyway. */ - private static long inactivityTimeout; + private static int inactivityTimeout; /** * The connection manager MAY limit the number of simultaneous requests the client makes with @@ -54,6 +54,7 @@ private String serverName; private ServerSurrogate serverSurrogate; + private InactivityTimer timer = new InactivityTimer(); static { // Set the default read idle timeout. If none was set then assume 30 minutes @@ -93,7 +94,9 @@ HttpSession session = createSession(serverName); session.setWait(wait); session.setHold(hold); + session.setSecure(connection.isSecure()); session.setMaxPollingInterval(pollingInterval); + session.setInactivityTimeout(inactivityTimeout); // Store language and version information in the connection. session.setLanaguage(language); try { @@ -103,11 +106,11 @@ /* This won't happen here. */ } + timer.reset(session); return session; } private HttpSession createSession(String serverName) { - ServerSurrogate serverSurrogate = ConnectionManager.getInstance().getServerSurrogate(); // Create a ClientSession for this user. String streamID = Session.idFactory.createStreamID(); HttpSession session = new HttpSession(serverName, streamID); @@ -115,6 +118,12 @@ HttpSession.addSession(streamID, session); // Send to the server that a new client session has been created serverSurrogate.clientSessionCreated(streamID); + session.addSessionCloseListener(new SessionCloseListener() { + public void sessionClosed(Session session) { + HttpSession.removeSession(session.getStreamID()); + serverSurrogate.clientSessionClosed(session.getStreamID()); + } + }); return session; } @@ -130,7 +139,7 @@ } } - private String createSessionCreationResponse(HttpSession session) { + private String createSessionCreationResponse(HttpSession session) { StringBuilder builder = new StringBuilder(); builder.append(""); @@ -152,11 +162,15 @@ return builder.toString(); } - public HttpConnection forwardRequest(long rid, HttpSession session, Element rootNode) { + public HttpConnection forwardRequest(long rid, HttpSession session, boolean isSecure, + Element rootNode) throws HttpBindException + { + timer.reset(session); + //noinspection unchecked List elements = rootNode.elements(); boolean isPoll = elements.size() <= 0; - HttpConnection connection = new HttpConnection(rid); + HttpConnection connection = new HttpConnection(rid, isSecure); session.addConnection(connection, isPoll); for (Element packet : elements) { @@ -165,4 +179,40 @@ return connection; } + + private class InactivityTimer extends Timer { + private Map sessionMap + = new HashMap(); + + public void reset(HttpSession session) { + InactivityTimeoutTask task = sessionMap.remove(session.getStreamID()); + if(task != null) { + session.removeSessionCloseListener(task); + task.cancel(); + } + if(session.isClosed()) { + return; + } + task = new InactivityTimeoutTask(session); + schedule(task, session.getInactivityTimeout() * 1000); + session.addSessionCloseListener(task); + sessionMap.put(session.getStreamID(), task); + } + } + + private class InactivityTimeoutTask extends TimerTask implements SessionCloseListener { + private Session session; + + public InactivityTimeoutTask(Session session) { + this.session = session; + } + + public void run() { + session.close(); + } + + public void sessionClosed(Session session) { + this.cancel(); + } + } } diff --git a/src/java/org/jivesoftware/multiplexer/net/http/SessionCloseListener.java b/src/java/org/jivesoftware/multiplexer/net/http/SessionCloseListener.java new file mode 100644 index 0000000..8d724fb --- /dev/null +++ b/src/java/org/jivesoftware/multiplexer/net/http/SessionCloseListener.java @@ -0,0 +1,18 @@ +/** + * $RCSfile: $ + * $Revision: $ + * $Date: $ + * + * Copyright (C) 2006 Jive Software. All rights reserved. + * This software is the proprietary information of Jive Software. Use is subject to license terms. + */ +package org.jivesoftware.multiplexer.net.http; + +import org.jivesoftware.multiplexer.Session; + +/** + * + */ +public interface SessionCloseListener { + public void sessionClosed(Session session); +}