diff --git a/src/java/org/jivesoftware/multiplexer/ClientSession.java b/src/java/org/jivesoftware/multiplexer/ClientSession.java index 15f6bf6..8fd4bc6 100644 --- a/src/java/org/jivesoftware/multiplexer/ClientSession.java +++ b/src/java/org/jivesoftware/multiplexer/ClientSession.java @@ -237,7 +237,7 @@ // Deliver stanza to client if (conn != null && !conn.isClosed()) { try { - conn.deliver(stanza); + conn.deliver(stanza.asXML()); } catch (Exception e) { Log.error(LocaleUtils.getLocalizedString("admin.error"), e); diff --git a/src/java/org/jivesoftware/multiplexer/Connection.java b/src/java/org/jivesoftware/multiplexer/Connection.java index a5aa158..d468de2 100644 --- a/src/java/org/jivesoftware/multiplexer/Connection.java +++ b/src/java/org/jivesoftware/multiplexer/Connection.java @@ -11,8 +11,6 @@ package org.jivesoftware.multiplexer; -import org.dom4j.Element; - import java.net.InetAddress; import java.net.UnknownHostException; @@ -90,14 +88,14 @@ * Delivers the packet to this connection without checking the recipient. * The method essentially calls socket.send(packet.getWriteBuffer()). * - * @param doc the packet to deliver. + * @param stanza the stanza to deliver. */ - public void deliver(Element doc); + public void deliver(String stanza); /** * Delivers raw text to this connection. This is a very low level way for sending * XML stanzas to the client. This method should not be used unless you have very - * good reasons for not using {@link #deliver(Element)}.

+ * good reasons for not using {@link #deliver(String)}.

* * This method avoids having to get the writer of this connection and mess directly * with the writer. Therefore, this method ensures a correct delivery of the stanza diff --git a/src/java/org/jivesoftware/multiplexer/ConnectionManager.java b/src/java/org/jivesoftware/multiplexer/ConnectionManager.java index 1c7ccf7..a3e49e3 100644 --- a/src/java/org/jivesoftware/multiplexer/ConnectionManager.java +++ b/src/java/org/jivesoftware/multiplexer/ConnectionManager.java @@ -287,9 +287,11 @@ // Customize Executor that will be used by processors to process incoming stanzas ExecutorThreadModel threadModel = ExecutorThreadModel.getInstance("client"); int eventThreads = JiveGlobals.getIntProperty("xmpp.processor.threads.standard", 16); - Executor eventExecutor = new ThreadPoolExecutor( - eventThreads + 1, eventThreads + 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue() ); - threadModel.setExecutor(eventExecutor); + ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor)threadModel.getExecutor(); + eventExecutor.setCorePoolSize(eventThreads + 1); + eventExecutor.setMaximumPoolSize(eventThreads + 1); + eventExecutor.setKeepAliveTime(60, TimeUnit.SECONDS); + socketAcceptor.getDefaultConfig().setThreadModel(threadModel); // Add the XMPP codec filter socketAcceptor.getFilterChain().addFirst("xmpp", new ProtocolCodecFilter(new XMPPCodecFactory())); @@ -336,9 +338,11 @@ // Customize thread model for c2s (old ssl port) ExecutorThreadModel threadModel = ExecutorThreadModel.getInstance("client_ssl"); int eventThreads = JiveGlobals.getIntProperty("xmpp.processor.threads.ssl", 16); - Executor eventExecutor = new ThreadPoolExecutor( - eventThreads + 1, eventThreads + 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue() ); - threadModel.setExecutor(eventExecutor); + ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor)threadModel.getExecutor(); + eventExecutor.setCorePoolSize(eventThreads + 1); + eventExecutor.setMaximumPoolSize(eventThreads + 1); + eventExecutor.setKeepAliveTime(60, TimeUnit.SECONDS); + sslSocketAcceptor.getDefaultConfig().setThreadModel(threadModel); // Add the XMPP codec filter sslSocketAcceptor.getFilterChain().addFirst("xmpp", new ProtocolCodecFilter(new XMPPCodecFactory())); diff --git a/src/java/org/jivesoftware/multiplexer/ConnectionWorkerThread.java b/src/java/org/jivesoftware/multiplexer/ConnectionWorkerThread.java index 0e2fbea..db3aa30 100644 --- a/src/java/org/jivesoftware/multiplexer/ConnectionWorkerThread.java +++ b/src/java/org/jivesoftware/multiplexer/ConnectionWorkerThread.java @@ -13,7 +13,6 @@ import com.jcraft.jzlib.JZlib; import com.jcraft.jzlib.ZInputStream; -import org.dom4j.DocumentFactory; import org.dom4j.Element; import org.dom4j.io.XMPPPacketReader; import org.jivesoftware.multiplexer.net.DNSUtil; @@ -54,7 +53,6 @@ */ public static final int DEFAULT_MULTIPLEX_PORT = 5262; - private static DocumentFactory docFactory = DocumentFactory.getInstance(); // Sequence and random number generator used for creating unique IQ ID's. private static int sequence = 0; private static Random random = new Random(); @@ -388,16 +386,14 @@ * @param streamID the stream ID assigned by the connection manager to the new session. */ public void clientSessionCreated(String streamID) { - Element iq = docFactory.createDocument().addElement("iq"); - iq.addAttribute("type", "set"); - iq.addAttribute("to", serverName); - iq.addAttribute("from", jidAddress); - iq.addAttribute("id", String.valueOf(random.nextInt(1000) + "-" + sequence++)); - Element child = iq.addElement("session", "http://jabber.org/protocol/connectionmanager"); - child.addAttribute("id", streamID); - child.addElement("create"); + StringBuilder sb = new StringBuilder(100); + sb.append(""); // Forward the notification to the server - connection.deliver(iq); + connection.deliver(sb.toString()); } /** @@ -406,16 +402,14 @@ * @param streamID the stream ID assigned by the connection manager to the closed session. */ public void clientSessionClosed(String streamID) { - Element iq = docFactory.createDocument().addElement("iq"); - iq.addAttribute("type", "set"); - iq.addAttribute("to", serverName); - iq.addAttribute("from", jidAddress); - iq.addAttribute("id", String.valueOf(random.nextInt(1000) + "-" + sequence++)); - Element child = iq.addElement("session", "http://jabber.org/protocol/connectionmanager"); - child.addAttribute("id", streamID); - child.addElement("close"); + StringBuilder sb = new StringBuilder(100); + sb.append(""); // Forward the notification to the server - connection.deliver(iq); + connection.deliver(sb.toString()); } /** @@ -427,16 +421,14 @@ * longer available session. */ public void deliveryFailed(Element stanza, String streamID) { - Element iq = docFactory.createDocument().addElement("iq"); - iq.addAttribute("type", "set"); - iq.addAttribute("to", serverName); - iq.addAttribute("from", jidAddress); - iq.addAttribute("id", String.valueOf(random.nextInt(1000) + "-" + sequence++)); - Element child = iq.addElement("session", "http://jabber.org/protocol/connectionmanager"); - child.addAttribute("id", streamID); - child.addElement("failed").add(stanza.createCopy()); + StringBuilder sb = new StringBuilder(100); + sb.append("").append(stanza.asXML()).append(""); // Send notification to the server - connection.deliver(iq); + connection.deliver(sb.toString()); } public void run() { @@ -470,14 +462,17 @@ * @param stanza the original client stanza that is going to be wrapped. * @param streamID the stream ID assigned by the connection manager to the client session. */ - public void deliver(Element stanza, String streamID) { + public void deliver(String stanza, String streamID) { // Wrap the stanza - Element wrapper = docFactory.createDocument().addElement("route"); - wrapper.addAttribute("to", serverName); - wrapper.addAttribute("from", jidAddress); - wrapper.addAttribute("streamid", streamID); - wrapper.add(stanza.createCopy()); + StringBuilder sb = new StringBuilder(80); + sb.append(""); + sb.append(stanza); + sb.append(""); + // Forward the wrapped stanza to the server - connection.deliver(wrapper); + connection.deliver(sb.toString()); } } diff --git a/src/java/org/jivesoftware/multiplexer/PacketRouter.java b/src/java/org/jivesoftware/multiplexer/PacketRouter.java index 9301e51..7e15cb0 100644 --- a/src/java/org/jivesoftware/multiplexer/PacketRouter.java +++ b/src/java/org/jivesoftware/multiplexer/PacketRouter.java @@ -11,8 +11,6 @@ package org.jivesoftware.multiplexer; -import org.dom4j.Element; - /** * A router that handles incoming packets. Packets will be routed to their * corresponding handler. A router is much like a forwarded with some logic @@ -25,8 +23,8 @@ /** * Routes the given packet based on its type. * - * @param doc The packet to route. + * @param stanza The stanza to route. * @param streamID The ID of the client's stream. */ - void route(Element doc, String streamID); + void route(String stanza, String streamID); } diff --git a/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java b/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java index 4e5fdc5..b015b05 100644 --- a/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java +++ b/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java @@ -147,7 +147,7 @@ error.addElement("unexpected-request") .addAttribute("xmlns", "urn:ietf:params:xml:ns:xmpp-stanzas"); // Bounce the failed IQ packet - connectionManager.getServerSurrogate().send(reply, streamID); + connectionManager.getServerSurrogate().send(reply.asXML(), streamID); } } } @@ -195,7 +195,7 @@ reply.addAttribute("type", "result"); reply.addAttribute("to", connectionManager.getServerName()); reply.addAttribute("from", jidAddress); - connection.deliver(reply); + connection.deliver(reply.asXML()); } } diff --git a/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java b/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java index d509af7..e53f31a 100644 --- a/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java +++ b/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java @@ -179,7 +179,7 @@ * @param stanza the stanza to send to the server. * @param streamID the stream ID assigned by the connection manager to the session. */ - public void send(Element stanza, String streamID) { + public void send(String stanza, String streamID) { threadPool.execute(new RouteTask(streamID, stanza)); } diff --git a/src/java/org/jivesoftware/multiplexer/net/ClientConnectionHandler.java b/src/java/org/jivesoftware/multiplexer/net/ClientConnectionHandler.java index 79e74bf..d1a7e2b 100644 --- a/src/java/org/jivesoftware/multiplexer/net/ClientConnectionHandler.java +++ b/src/java/org/jivesoftware/multiplexer/net/ClientConnectionHandler.java @@ -14,6 +14,7 @@ import org.apache.mina.common.IoSession; import org.jivesoftware.multiplexer.spi.ClientFailoverDeliverer; import org.jivesoftware.util.JiveGlobals; +import org.xmlpull.v1.XmlPullParserException; /** * ConnectionHandler that knows which subclass of {@link StanzaHandler} should @@ -23,7 +24,7 @@ */ public class ClientConnectionHandler extends ConnectionHandler { - StanzaHandler createStanzaHandler(NIOConnection connection) { + StanzaHandler createStanzaHandler(NIOConnection connection) throws XmlPullParserException { return new ClientStanzaHandler(router, serverName, connection); } diff --git a/src/java/org/jivesoftware/multiplexer/net/ClientStanzaHandler.java b/src/java/org/jivesoftware/multiplexer/net/ClientStanzaHandler.java index 249c574..9d345e6 100644 --- a/src/java/org/jivesoftware/multiplexer/net/ClientStanzaHandler.java +++ b/src/java/org/jivesoftware/multiplexer/net/ClientStanzaHandler.java @@ -25,7 +25,8 @@ */ class ClientStanzaHandler extends StanzaHandler { - public ClientStanzaHandler(PacketRouter router, String serverName, Connection connection) { + public ClientStanzaHandler(PacketRouter router, String serverName, Connection connection) + throws XmlPullParserException { super(router, serverName, connection); } diff --git a/src/java/org/jivesoftware/multiplexer/net/ConnectionHandler.java b/src/java/org/jivesoftware/multiplexer/net/ConnectionHandler.java index 54cd641..20f2ea6 100644 --- a/src/java/org/jivesoftware/multiplexer/net/ConnectionHandler.java +++ b/src/java/org/jivesoftware/multiplexer/net/ConnectionHandler.java @@ -19,8 +19,13 @@ import org.jivesoftware.multiplexer.PacketRouter; import org.jivesoftware.multiplexer.spi.ServerRouter; import org.jivesoftware.util.Log; +import org.xmlpull.v1.XmlPullParser; +import org.xmlpull.v1.XmlPullParserException; +import org.xmlpull.v1.XmlPullParserFactory; import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * A ConnectionHandler is responsible for creating new sessions, destroying sessions and delivering @@ -40,6 +45,21 @@ protected static PacketRouter router = new ServerRouter(); protected static String serverName = ConnectionManager.getInstance().getServerName(); + private static Map parsers = new ConcurrentHashMap(); + /** + * Reuse the same factory for all the connections. + */ + private static XmlPullParserFactory factory = null; + + static { + try { + factory = XmlPullParserFactory.newInstance(MXParser.class.getName(), null); + factory.setNamespaceAware(true); + } + catch (XmlPullParserException e) { + Log.error("Error creating a parser factory", e); + } + } public void sessionOpened(IoSession session) throws Exception { // Create a new XML parser for the new connection. The parser will be used by the XMPPDecoder filter. @@ -87,9 +107,21 @@ //System.out.println("RCVD: " + message); // Get the stanza handler for this session StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER); + // Get the parser to use to process stanza. For optimization there is going + // to be a parser for each running thread. Each Filter will be executed + // by the Executor placed as the first Filter. So we can have a parser associated + // to each Thread + int hashCode = Thread.currentThread().hashCode(); + XmlPullParser parser = parsers.get(hashCode); + if (parser == null) { + parser = factory.newPullParser(); + parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); + parsers.put(hashCode, parser); + } + // Let the stanza handler process the received stanza try { - handler.process( (String) message); + handler.process( (String) message, parser); } catch (Exception e) { Log.error("Closing connection due to error while processing message: " + message, e); Connection connection = (Connection) session.getAttribute(CONNECTION); @@ -99,7 +131,7 @@ abstract NIOConnection createNIOConnection(IoSession session); - abstract StanzaHandler createStanzaHandler(NIOConnection connection); + abstract StanzaHandler createStanzaHandler(NIOConnection connection) throws XmlPullParserException; /** * Returns the max number of seconds a connection can be idle (both ways) before diff --git a/src/java/org/jivesoftware/multiplexer/net/NIOConnection.java b/src/java/org/jivesoftware/multiplexer/net/NIOConnection.java index 73fc333..dc0e842 100644 --- a/src/java/org/jivesoftware/multiplexer/net/NIOConnection.java +++ b/src/java/org/jivesoftware/multiplexer/net/NIOConnection.java @@ -17,21 +17,25 @@ import org.apache.mina.filter.CompressionFilter; import org.apache.mina.filter.SSLFilter; import org.dom4j.Element; -import org.dom4j.io.OutputFormat; +import org.dom4j.io.XMPPPacketReader; import org.jivesoftware.multiplexer.Connection; import org.jivesoftware.multiplexer.ConnectionCloseListener; import org.jivesoftware.multiplexer.PacketDeliverer; import org.jivesoftware.multiplexer.Session; import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.Log; -import org.jivesoftware.util.XMLWriter; +import org.xmlpull.v1.XmlPullParserException; +import org.xmlpull.v1.XmlPullParserFactory; import javax.net.ssl.KeyManager; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; +import java.io.StringReader; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; import java.security.KeyStore; import java.util.HashMap; import java.util.Map; @@ -50,6 +54,10 @@ * The utf-8 charset for decoding and encoding XMPP packet streams. */ public static final String CHARSET = "UTF-8"; + /** + * Reuse the same factory for all the connections. + */ + private static XmlPullParserFactory factory = null; private Session session; private IoSession ioSession; @@ -77,11 +85,22 @@ * Compression policy currently in use for this connection. */ private CompressionPolicy compressionPolicy = CompressionPolicy.disabled; + private CharsetEncoder encoder; + static { + try { + factory = XmlPullParserFactory.newInstance(MXParser.class.getName(), null); + factory.setNamespaceAware(true); + } + catch (XmlPullParserException e) { + Log.error("Error creating a parser factory", e); + } + } public NIOConnection(IoSession session, PacketDeliverer packetDeliverer) { this.ioSession = session; this.backupDeliverer = packetDeliverer; + encoder = Charset.forName(CHARSET).newEncoder(); } public boolean validate() { @@ -186,9 +205,17 @@ return ioSession.getFilterChain().contains("tls"); } - public void deliver(Element doc) { + public void deliver(String stanza) { if (isClosed()) { - backupDeliverer.deliver(doc); + XMPPPacketReader xmppReader = new XMPPPacketReader(); + xmppReader.setXPPFactory(factory); + try { + xmppReader.read(new StringReader(stanza)); + Element doc = xmppReader.parseDocument().getRootElement(); + backupDeliverer.deliver(doc); + } catch (Exception e) { + Log.error("Error parsing stanza: " + stanza, e); + } } else { ByteBuffer buffer = ByteBuffer.allocate(4096); @@ -196,9 +223,7 @@ boolean errorDelivering = false; try { - XMLWriter xmlSerializer = new XMLWriter(buffer.asOutputStream(), new OutputFormat()); - xmlSerializer.write(doc); - xmlSerializer.flush(); + buffer.putString(stanza, encoder); if (flashClient) { buffer.put((byte) '\0'); } @@ -214,7 +239,15 @@ close(); // Retry sending the packet again. Most probably if the packet is a // Message it will be stored offline - backupDeliverer.deliver(doc); + XMPPPacketReader xmppReader = new XMPPPacketReader(); + xmppReader.setXPPFactory(factory); + try { + xmppReader.read(new StringReader(stanza)); + Element doc = xmppReader.parseDocument().getRootElement(); + backupDeliverer.deliver(doc); + } catch (Exception e) { + Log.error("Error parsing stanza: " + stanza, e); + } } } } diff --git a/src/java/org/jivesoftware/multiplexer/net/SocketConnection.java b/src/java/org/jivesoftware/multiplexer/net/SocketConnection.java index 043d31d..f8bc672 100644 --- a/src/java/org/jivesoftware/multiplexer/net/SocketConnection.java +++ b/src/java/org/jivesoftware/multiplexer/net/SocketConnection.java @@ -14,16 +14,16 @@ import com.jcraft.jzlib.JZlib; import com.jcraft.jzlib.ZOutputStream; import org.dom4j.Element; +import org.dom4j.io.XMPPPacketReader; import org.jivesoftware.multiplexer.*; import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.Log; +import org.xmlpull.v1.XmlPullParserException; +import org.xmlpull.v1.XmlPullParserFactory; import javax.net.ssl.SSLSession; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.Writer; +import java.io.*; import java.net.InetAddress; import java.net.Socket; import java.nio.channels.Channels; @@ -49,6 +49,10 @@ * The utf-8 charset for decoding and encoding XMPP packet streams. */ public static final String CHARSET = "UTF-8"; + /** + * Reuse the same factory for all the connections. + */ + private static XmlPullParserFactory factory = null; private static Map instances = new ConcurrentHashMap(); @@ -82,7 +86,6 @@ private Session session; private boolean secure; private boolean compressed; - private org.jivesoftware.util.XMLWriter xmlSerializer; private boolean flashClient = false; private int majorVersion = 1; private int minorVersion = 0; @@ -101,6 +104,16 @@ */ private CompressionPolicy compressionPolicy = CompressionPolicy.disabled; + static { + try { + factory = XmlPullParserFactory.newInstance(MXParser.class.getName(), null); + factory.setNamespaceAware(true); + } + catch (XmlPullParserException e) { + Log.error("Error creating a parser factory", e); + } + } + public static Collection getInstances() { return instances.keySet(); } @@ -129,7 +142,6 @@ writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), CHARSET)); } this.backupDeliverer = backupDeliverer; - xmlSerializer = new XMLSocketWriter(writer, this); instances.put(this, ""); } @@ -158,7 +170,6 @@ tlsStreamHandler.start(); // Use new wrapped writers writer = new BufferedWriter(new OutputStreamWriter(tlsStreamHandler.getOutputStream(), CHARSET)); - xmlSerializer = new XMLSocketWriter(writer, this); } } @@ -170,13 +181,11 @@ ZOutputStream out = new ZOutputStream(socket.getOutputStream(), JZlib.Z_BEST_COMPRESSION); out.setFlushMode(JZlib.Z_PARTIAL_FLUSH); writer = new BufferedWriter(new OutputStreamWriter(out, CHARSET)); - xmlSerializer = new XMLSocketWriter(writer, this); } else { ZOutputStream out = new ZOutputStream(tlsStreamHandler.getOutputStream(), JZlib.Z_BEST_COMPRESSION); out.setFlushMode(JZlib.Z_PARTIAL_FLUSH); writer = new BufferedWriter(new OutputStreamWriter(out, CHARSET)); - xmlSerializer = new XMLSocketWriter(writer, this); } } catch (IOException e) { // TODO Would be nice to still be able to throw the exception and not catch it here @@ -494,9 +503,17 @@ } } - public void deliver(Element doc) { + public void deliver(String stanza) { if (isClosed()) { - backupDeliverer.deliver(doc); + XMPPPacketReader xmppReader = new XMPPPacketReader(); + xmppReader.setXPPFactory(factory); + try { + xmppReader.read(new StringReader(stanza)); + Element doc = xmppReader.parseDocument().getRootElement(); + backupDeliverer.deliver(doc); + } catch (Exception e) { + Log.error("Error parsing stanza: " + stanza, e); + } } else { boolean errorDelivering = false; @@ -504,11 +521,11 @@ try { requestWriting(); allowedToWrite = true; - xmlSerializer.write(doc); + writer.write(stanza); if (flashClient) { writer.write('\0'); } - xmlSerializer.flush(); + writer.flush(); } catch (Exception e) { Log.debug("Error delivering packet" + "\n" + this.toString(), e); @@ -523,7 +540,15 @@ close(); // Retry sending the packet again. Most probably if the packet is a // Message it will be stored offline - backupDeliverer.deliver(doc); + XMPPPacketReader xmppReader = new XMPPPacketReader(); + xmppReader.setXPPFactory(factory); + try { + xmppReader.read(new StringReader(stanza)); + Element doc = xmppReader.parseDocument().getRootElement(); + backupDeliverer.deliver(doc); + } catch (Exception e) { + Log.error("Error parsing stanza: " + stanza, e); + } } } } diff --git a/src/java/org/jivesoftware/multiplexer/net/StanzaHandler.java b/src/java/org/jivesoftware/multiplexer/net/StanzaHandler.java index 33a0950..0d4ef72 100644 --- a/src/java/org/jivesoftware/multiplexer/net/StanzaHandler.java +++ b/src/java/org/jivesoftware/multiplexer/net/StanzaHandler.java @@ -11,9 +11,8 @@ package org.jivesoftware.multiplexer.net; -import org.dom4j.DocumentException; import org.dom4j.Element; -import org.dom4j.io.SAXReader; +import org.dom4j.io.XMPPPacketReader; import org.jivesoftware.multiplexer.*; import org.jivesoftware.util.Log; import org.jivesoftware.util.StringUtils; @@ -67,8 +66,6 @@ */ private PacketRouter router; - private SAXReader reader; - static { try { factory = XmlPullParserFactory.newInstance(MXParser.class.getName(), null); @@ -85,17 +82,15 @@ * @param router the router for sending packets that were read. * @param serverName the name of the server this socket is working for. * @param connection the connection being read. + * @throws org.xmlpull.v1.XmlPullParserException */ - public StanzaHandler(PacketRouter router, String serverName, Connection connection) { + public StanzaHandler(PacketRouter router, String serverName, Connection connection) throws XmlPullParserException { this.serverName = serverName; this.router = router; this.connection = connection; - // Reader is associated with a new XMPPPacketReader - reader = new SAXReader(); - reader.setEncoding(CHARSET); } - public void process(String stanza) throws Exception { + public void process(String stanza, XmlPullParser parser) throws Exception { boolean initialStream = stanza.startsWith(STREAM_START); if (!sessionCreated || initialStream) { @@ -106,7 +101,6 @@ // Found an stream:stream tag... if (!sessionCreated) { sessionCreated = true; - MXParser parser = (MXParser) factory.newPullParser(); parser.setInput(new StringReader(stanza)); createSession(parser); } else if (startedSASL && session.getStatus() == Session.STATUS_AUTHENTICATED) { @@ -119,23 +113,22 @@ return; } - // Create DOM object from received stanza - Element doc; - try { - doc = reader.read(new StringReader(stanza)).getRootElement(); - } catch (DocumentException e) { - if (stanza.equals("")) { - session.close(); - return; - } - // Throw the exception. This will close the connection - throw e; - } - if (doc == null) { - // No document found. + // Verify if end of stream was requested + if (stanza.equals("")) { + session.close(); return; } - String tag = doc.getName(); + // Reset XPP parser with new stanza + parser.setInput(new StringReader(stanza)); + parser.next(); + String tag = parser.getName(); + // Verify that XML stanza is valid (i.e. well-formed) + boolean valid = validateStanza(stanza, parser); + + if (!valid) { + session.close(); + return; + } if ("starttls".equals(tag)) { // Negotiate TLS if (negotiateTLS()) { @@ -148,31 +141,42 @@ // User is trying to authenticate using SASL startedSASL = true; // Forward packet to the server - process(doc); + route(stanza); } else if ("compress".equals(tag)) { // Client is trying to initiate compression - if (compressClient(doc)) { + if (compressClient(stanza)) { // Compression was successful so open a new stream and offer // resource binding and session establishment (to client sessions only) waitingCompressionACK = true; } } else { - process(doc); + route(stanza); } } - protected void process(Element doc) { - if (doc == null) { - return; + private boolean validateStanza(String stanza, XmlPullParser parser) { + // TODO Detect when XML stanza is not complete + try { + int eventType = parser.getEventType(); + while (eventType != XmlPullParser.END_DOCUMENT) { + eventType = parser.next(); + } + } catch (Exception e) { + Log.error("Error parsing XML stanza: " + stanza, e); + return false; } + return true; + } + + private void route(String stanza) { // Ensure that connection was secured if TLS was required if (connection.getTlsPolicy() == Connection.TLSPolicy.required && !connection.isSecure()) { closeNeverSecuredConnection(); return; } - router.route(doc, session.getStreamID()); + router.route(stanza, session.getStreamID()); } /** @@ -257,12 +261,11 @@ * is already using compression or if client requested to use compression but this feature * is disabled. * - * @param doc the element sent by the client requesting compression. Compression method is + * @param stanza the XML stanza sent by the client requesting compression. Compression method is * included. * @return true if it was possible to use compression. - * @throws IOException if an error occurs while starting using compression. */ - private boolean compressClient(Element doc) { + private boolean compressClient(String stanza) { String error = null; if (connection.getCompressionPolicy() == Connection.CompressionPolicy.disabled) { // Client requested compression but this feature is disabled @@ -277,6 +280,18 @@ Log.warn("Client requested compression and connection is already compressed. Closing " + "connection : " + connection); } else { + XMPPPacketReader xmppReader = new XMPPPacketReader(); + xmppReader.setXPPFactory(factory); + Element doc; + try { + xmppReader.read(new StringReader(stanza)); + doc = xmppReader.parseDocument().getRootElement(); + } catch (Exception e) { + Log.error("Error parsing compression stanza: " + stanza, e); + connection.close(); + return false; + } + // Check that the requested method is supported String method = doc.elementText("method"); if (!"zlib".equals(method)) { @@ -375,6 +390,9 @@ * If the connection remains open, the XPP will be set to be ready for the * first packet. A call to next() should result in an START_TAG state with * the first packet in the stream. + * @param xpp + * @throws java.io.IOException + * @throws org.xmlpull.v1.XmlPullParserException */ protected void createSession(XmlPullParser xpp) throws XmlPullParserException, IOException { for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) { @@ -472,6 +490,9 @@ * Creates the appropriate {@link Session} subclass based on the specified namespace. * * @param namespace the namespace sent in the stream element. eg. jabber:client. + * @param serverName + * @param xpp + * @param connection * @return the created session or null. * @throws org.xmlpull.v1.XmlPullParserException */ diff --git a/src/java/org/jivesoftware/multiplexer/net/http/HttpSessionManager.java b/src/java/org/jivesoftware/multiplexer/net/http/HttpSessionManager.java index 2a9e795..eeffc32 100644 --- a/src/java/org/jivesoftware/multiplexer/net/http/HttpSessionManager.java +++ b/src/java/org/jivesoftware/multiplexer/net/http/HttpSessionManager.java @@ -10,12 +10,12 @@ */ package org.jivesoftware.multiplexer.net.http; +import org.dom4j.*; +import org.jivesoftware.multiplexer.ConnectionManager; +import org.jivesoftware.multiplexer.ServerSurrogate; +import org.jivesoftware.multiplexer.Session; import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.Log; -import org.jivesoftware.multiplexer.ServerSurrogate; -import org.jivesoftware.multiplexer.ConnectionManager; -import org.jivesoftware.multiplexer.Session; -import org.dom4j.*; import java.util.*; @@ -203,7 +203,7 @@ session.addConnection(connection, isPoll); for (Element packet : elements) { - serverSurrogate.send(packet, session.getStreamID()); + serverSurrogate.send(packet.asXML(), session.getStreamID()); } return connection; diff --git a/src/java/org/jivesoftware/multiplexer/spi/ClientFailoverDeliverer.java b/src/java/org/jivesoftware/multiplexer/spi/ClientFailoverDeliverer.java index 1defb17..f68f47c 100644 --- a/src/java/org/jivesoftware/multiplexer/spi/ClientFailoverDeliverer.java +++ b/src/java/org/jivesoftware/multiplexer/spi/ClientFailoverDeliverer.java @@ -11,10 +11,10 @@ package org.jivesoftware.multiplexer.spi; +import org.dom4j.Element; +import org.jivesoftware.multiplexer.ConnectionManager; import org.jivesoftware.multiplexer.PacketDeliverer; import org.jivesoftware.multiplexer.ServerSurrogate; -import org.jivesoftware.multiplexer.ConnectionManager; -import org.dom4j.Element; /** * Deliverer to use when a stanza received from the server failed to be forwarded @@ -50,7 +50,7 @@ error.addElement("unexpected-request") .addAttribute("xmlns", "urn:ietf:params:xml:ns:xmpp-stanzas"); // Bounce the failed IQ packet - serverSurrogate.send(reply, streamID); + serverSurrogate.send(reply.asXML(), streamID); } } } diff --git a/src/java/org/jivesoftware/multiplexer/spi/ServerRouter.java b/src/java/org/jivesoftware/multiplexer/spi/ServerRouter.java index dd0a656..431a9be 100644 --- a/src/java/org/jivesoftware/multiplexer/spi/ServerRouter.java +++ b/src/java/org/jivesoftware/multiplexer/spi/ServerRouter.java @@ -11,10 +11,9 @@ package org.jivesoftware.multiplexer.spi; -import org.jivesoftware.multiplexer.PacketRouter; import org.jivesoftware.multiplexer.ConnectionManager; +import org.jivesoftware.multiplexer.PacketRouter; import org.jivesoftware.multiplexer.ServerSurrogate; -import org.dom4j.Element; /** * Packet router that will route all traffic to the server. @@ -29,7 +28,7 @@ serverSurrogate = ConnectionManager.getInstance().getServerSurrogate(); } - public void route(Element stanza, String streamID) { + public void route(String stanza, String streamID) { serverSurrogate.send(stanza, streamID); } } diff --git a/src/java/org/jivesoftware/multiplexer/task/RouteTask.java b/src/java/org/jivesoftware/multiplexer/task/RouteTask.java index ba8b95d..4ccbd9c 100644 --- a/src/java/org/jivesoftware/multiplexer/task/RouteTask.java +++ b/src/java/org/jivesoftware/multiplexer/task/RouteTask.java @@ -11,9 +11,8 @@ package org.jivesoftware.multiplexer.task; -import org.jivesoftware.multiplexer.ConnectionWorkerThread; import org.jivesoftware.multiplexer.ClientSession; -import org.dom4j.Element; +import org.jivesoftware.multiplexer.ConnectionWorkerThread; /** * Task that forwards client packets to the server. @@ -22,9 +21,9 @@ */ public class RouteTask extends ClientTask { - private Element stanza; + private String stanza; - public RouteTask(String streamID, Element stanza) { + public RouteTask(String streamID, String stanza) { super(streamID); this.stanza = stanza; }