diff --git a/build/lib/merge/dom4j.jar b/build/lib/merge/dom4j.jar index c8c4dbb..d6b29e7 100644 --- a/build/lib/merge/dom4j.jar +++ b/build/lib/merge/dom4j.jar Binary files differ diff --git a/src/java/org/jivesoftware/multiplexer/ConnectionManager.java b/src/java/org/jivesoftware/multiplexer/ConnectionManager.java index cd73acd..962e9a9 100644 --- a/src/java/org/jivesoftware/multiplexer/ConnectionManager.java +++ b/src/java/org/jivesoftware/multiplexer/ConnectionManager.java @@ -26,6 +26,7 @@ import org.jivesoftware.multiplexer.net.ClientConnectionHandler; import org.jivesoftware.multiplexer.net.SSLConfig; import org.jivesoftware.multiplexer.net.SocketSendingTracker; +import org.jivesoftware.multiplexer.net.StalledSessionsFilter; import org.jivesoftware.multiplexer.net.XMPPCodecFactory; import org.jivesoftware.multiplexer.net.http.HttpBindManager; import org.jivesoftware.util.*; @@ -316,6 +317,8 @@ socketAcceptor.getDefaultConfig().setThreadModel(threadModel); // Add the XMPP codec filter socketAcceptor.getFilterChain().addFirst("xmpp", new ProtocolCodecFilter(new XMPPCodecFactory())); + // Kill sessions whose outgoing queues keep growing and fail to send traffic + socketAcceptor.getFilterChain().addAfter("xmpp", "outCap", new StalledSessionsFilter()); try { // Listen on a specific network interface if it has been set. diff --git a/src/java/org/jivesoftware/multiplexer/ServerPacketReader.java b/src/java/org/jivesoftware/multiplexer/ServerPacketReader.java index 0f05fd4..3f64ef4 100644 --- a/src/java/org/jivesoftware/multiplexer/ServerPacketReader.java +++ b/src/java/org/jivesoftware/multiplexer/ServerPacketReader.java @@ -18,6 +18,7 @@ import org.jivesoftware.util.Log; import java.io.IOException; +import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -79,8 +80,17 @@ shutdown(); } else { - // Queue task that process incoming stanzas - threadPool.execute(new ProcessStanzaTask(packetsHandler, doc)); + // If this element belongs to a session, queue it so that it can + // be processed in the correct order. + Session session = getSession(doc); + if( session != null ) { + Queue sessionQueue = session.getStanzaQueue(); + sessionQueue.add(doc); + threadPool.execute(new ProcessSessionQueueTask(packetsHandler,session)); + } else { + // Queue task that process incoming stanzas not related to a specific streamID + threadPool.execute(new ProcessStanzaTask(packetsHandler, doc)); + } } } catch (IOException e) { @@ -108,6 +118,26 @@ } /** + * @param stanza The stanza to find the session for using the streamid or id attribute + * @return the session associated with the given stanza, if any + */ + private Session getSession(Element stanza) { + if( "route".equals(stanza.getName())){ + String streamID = stanza.attributeValue("streamid"); + return(Session.getSession(streamID)); + } + else { + Element wrapper = stanza.element("session"); + if( wrapper != null ) { + String streamID = wrapper.attributeValue("id"); + return(Session.getSession(streamID)); + } else { + return(null); + } + } + } + + /** * Task that processes incoming stanzas from the server. */ private class ProcessStanzaTask implements Runnable { @@ -129,4 +159,43 @@ handler.handle(stanza); } } + + /** + * Task that processes a Session's stanza queue. This guarantees + * that stanzas are processed in the same order that they are received + */ + private class ProcessSessionQueueTask implements Runnable { + /** + * The session + */ + private final Session session; + + /** + * Actual object responsible for handling incoming traffic. + */ + private final ServerPacketHandler handler; + + public ProcessSessionQueueTask(ServerPacketHandler handler, Session session) { + this.session = session; + this.handler = handler; + } + + /** + * Process all the stanzas currently in the queue for this session + */ + public void run() { + // Synchronize on the session here to ensure that all stanzas + // for a given session get processed in order. This can be sub-optimal + // since we might block if multiple threads are processing stanzas + // for the same client, but the handler.handle() call should be quick, + // and correctness seems more important here. + synchronized(session) { + Element stanza = session.getStanzaQueue().poll(); + while( stanza != null ){ + handler.handle(stanza); + stanza = session.getStanzaQueue().poll(); + } + } + } + } } diff --git a/src/java/org/jivesoftware/multiplexer/Session.java b/src/java/org/jivesoftware/multiplexer/Session.java index 1f964fa..a406831 100644 --- a/src/java/org/jivesoftware/multiplexer/Session.java +++ b/src/java/org/jivesoftware/multiplexer/Session.java @@ -15,7 +15,9 @@ import java.util.Date; import java.util.Map; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; /** * The session represents a connection between the server and a client (c2s) or @@ -63,6 +65,8 @@ private String serverName; private Date startDate = new Date(); + + private Queue stanzaQueue = new ConcurrentLinkedQueue(); /** * Map of existing sessions. A session is added just after the initial stream header @@ -160,6 +164,13 @@ public Date getCreationDate() { return startDate; } + + /** + * @return the current stanza queue for this session + */ + public Queue getStanzaQueue() { + return stanzaQueue; + } /** * Returns a text with the available stream features. Each subclass may return different @@ -193,4 +204,4 @@ } public abstract boolean isClosed(); -} \ No newline at end of file +} diff --git a/src/java/org/jivesoftware/multiplexer/net/StalledSessionsFilter.java b/src/java/org/jivesoftware/multiplexer/net/StalledSessionsFilter.java new file mode 100644 index 0000000..9ac30ea --- /dev/null +++ b/src/java/org/jivesoftware/multiplexer/net/StalledSessionsFilter.java @@ -0,0 +1,51 @@ +/** + * $RCSfile: StalledSessionsFilter.java,v $ + * $Revision: $ + * $Date: $ + * + * Copyright (C) 2005-2008 Jive Software. All rights reserved. + * + * This software is published under the terms of the GNU Public License (GPL), + * a copy of which is included in this distribution, or a commercial license + * agreement with Jive. + */ + +package org.jivesoftware.multiplexer.net; + +import org.apache.mina.common.IoFilterAdapter; +import org.apache.mina.common.IoSession; +import org.jivesoftware.util.JiveGlobals; +import org.jivesoftware.util.Log; + +import java.io.IOException; +import java.util.Date; + +/** + * MINA filter that will close sessions that are failing to read outgoing traffic + * and whose outgoing queue is around 5MB. Use the system property session.stalled.cap + * to set the max number of bytes allowed in the outgoing queue of a session before considering + * it stalled. + * + * @author Gaston Dombiak + */ +public class StalledSessionsFilter extends IoFilterAdapter { + private static final int bytesCap = JiveGlobals.getIntProperty("session.stalled.cap", 5242880); + + public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) + throws Exception { + // Get number of pending requests + int pendingBytes = session.getScheduledWriteBytes(); + if (pendingBytes > bytesCap) { + // Get last time we were able to send something to the connected client + long writeTime = session.getLastWriteTime(); + int pendingRequests = session.getScheduledWriteRequests(); + Log.debug("About to kill session with pendingBytes: " + pendingBytes + " pendingWrites: " + + pendingRequests + " lastWrite: " + new Date(writeTime) + "session: " + session); + // Close the session and throw an exception + session.close(); + throw new IOException("Closing session that seems to be stalled. Preventing OOM"); + } + // Call next filter (everything is fine) + super.filterWrite(nextFilter, session, writeRequest); + } +} \ No newline at end of file diff --git a/src/java/org/jivesoftware/multiplexer/net/StanzaHandler.java b/src/java/org/jivesoftware/multiplexer/net/StanzaHandler.java index 7a36597..18d8301 100644 --- a/src/java/org/jivesoftware/multiplexer/net/StanzaHandler.java +++ b/src/java/org/jivesoftware/multiplexer/net/StanzaHandler.java @@ -14,6 +14,7 @@ import org.dom4j.Element; import org.dom4j.io.XMPPPacketReader; import org.jivesoftware.multiplexer.*; +import org.jivesoftware.multiplexer.net.http.FlashCrossDomainServlet; import org.jivesoftware.util.Log; import org.jivesoftware.util.StringUtils; import org.xmlpull.v1.XmlPullParser; @@ -97,8 +98,15 @@ boolean initialStream = stanza.startsWith(" - return; + // Allow requests for flash socket policy files directly on the client listener port + if (stanza.startsWith("")) { + connection.deliverRawText(FlashCrossDomainServlet.getCrossDomainString() + '\0'); + return; + } + else { + // Ignore + return; + } } // Found an stream:stream tag... if (!sessionCreated) { diff --git a/src/java/org/jivesoftware/multiplexer/net/http/FlashCrossDomainServlet.java b/src/java/org/jivesoftware/multiplexer/net/http/FlashCrossDomainServlet.java index d7871a2..ed6fa8b 100644 --- a/src/java/org/jivesoftware/multiplexer/net/http/FlashCrossDomainServlet.java +++ b/src/java/org/jivesoftware/multiplexer/net/http/FlashCrossDomainServlet.java @@ -44,6 +44,11 @@ protected void doGet(HttpServletRequest httpServletRequest, HttpServletResponse response) throws ServletException, IOException { + response.setContentType("text/xml"); + response.getOutputStream().write(getCrossDomainString().getBytes()); + } + + public static String getCrossDomainString() { StringBuilder builder = new StringBuilder(); builder.append(CROSS_DOMAIN_TEXT); getPortList(builder); @@ -51,11 +56,10 @@ getSecure(builder); builder.append(CROSS_DOMAIN_END_TEXT); builder.append("\n"); - response.setContentType("text/xml"); - response.getOutputStream().write(builder.toString().getBytes()); + return(builder.toString()); } - private StringBuilder getPortList(StringBuilder builder) { + private static StringBuilder getPortList(StringBuilder builder) { boolean multiple = false; if(ConnectionManager.getInstance().getClientListenerPort() > 0) { builder.append(ConnectionManager.getInstance().getClientListenerPort()); @@ -89,7 +93,7 @@ return builder; } - private StringBuilder getSecure(StringBuilder builder) { + private static StringBuilder getSecure(StringBuilder builder) { if (JiveGlobals.getBooleanProperty(CROSS_DOMAIN_SECURE_ENABLED,CROSS_DOMAIN_SECURE_DEFAULT)) { builder.append("true"); } else {