diff --git a/src/java/org/jivesoftware/multiplexer/ClientSession.java b/src/java/org/jivesoftware/multiplexer/ClientSession.java
index 15f6bf6..8fd4bc6 100644
--- a/src/java/org/jivesoftware/multiplexer/ClientSession.java
+++ b/src/java/org/jivesoftware/multiplexer/ClientSession.java
@@ -237,7 +237,7 @@
// Deliver stanza to client
if (conn != null && !conn.isClosed()) {
try {
- conn.deliver(stanza);
+ conn.deliver(stanza.asXML());
}
catch (Exception e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
diff --git a/src/java/org/jivesoftware/multiplexer/Connection.java b/src/java/org/jivesoftware/multiplexer/Connection.java
index a5aa158..d468de2 100644
--- a/src/java/org/jivesoftware/multiplexer/Connection.java
+++ b/src/java/org/jivesoftware/multiplexer/Connection.java
@@ -11,8 +11,6 @@
package org.jivesoftware.multiplexer;
-import org.dom4j.Element;
-
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -90,14 +88,14 @@
* Delivers the packet to this connection without checking the recipient.
* The method essentially calls socket.send(packet.getWriteBuffer())
.
*
- * @param doc the packet to deliver.
+ * @param stanza the stanza to deliver.
*/
- public void deliver(Element doc);
+ public void deliver(String stanza);
/**
* Delivers raw text to this connection. This is a very low level way for sending
* XML stanzas to the client. This method should not be used unless you have very
- * good reasons for not using {@link #deliver(Element)}.
+ * good reasons for not using {@link #deliver(String)}.
*
* This method avoids having to get the writer of this connection and mess directly
* with the writer. Therefore, this method ensures a correct delivery of the stanza
diff --git a/src/java/org/jivesoftware/multiplexer/ConnectionManager.java b/src/java/org/jivesoftware/multiplexer/ConnectionManager.java
index 1c7ccf7..a3e49e3 100644
--- a/src/java/org/jivesoftware/multiplexer/ConnectionManager.java
+++ b/src/java/org/jivesoftware/multiplexer/ConnectionManager.java
@@ -287,9 +287,11 @@
// Customize Executor that will be used by processors to process incoming stanzas
ExecutorThreadModel threadModel = ExecutorThreadModel.getInstance("client");
int eventThreads = JiveGlobals.getIntProperty("xmpp.processor.threads.standard", 16);
- Executor eventExecutor = new ThreadPoolExecutor(
- eventThreads + 1, eventThreads + 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue() );
- threadModel.setExecutor(eventExecutor);
+ ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor)threadModel.getExecutor();
+ eventExecutor.setCorePoolSize(eventThreads + 1);
+ eventExecutor.setMaximumPoolSize(eventThreads + 1);
+ eventExecutor.setKeepAliveTime(60, TimeUnit.SECONDS);
+
socketAcceptor.getDefaultConfig().setThreadModel(threadModel);
// Add the XMPP codec filter
socketAcceptor.getFilterChain().addFirst("xmpp", new ProtocolCodecFilter(new XMPPCodecFactory()));
@@ -336,9 +338,11 @@
// Customize thread model for c2s (old ssl port)
ExecutorThreadModel threadModel = ExecutorThreadModel.getInstance("client_ssl");
int eventThreads = JiveGlobals.getIntProperty("xmpp.processor.threads.ssl", 16);
- Executor eventExecutor = new ThreadPoolExecutor(
- eventThreads + 1, eventThreads + 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue() );
- threadModel.setExecutor(eventExecutor);
+ ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor)threadModel.getExecutor();
+ eventExecutor.setCorePoolSize(eventThreads + 1);
+ eventExecutor.setMaximumPoolSize(eventThreads + 1);
+ eventExecutor.setKeepAliveTime(60, TimeUnit.SECONDS);
+
sslSocketAcceptor.getDefaultConfig().setThreadModel(threadModel);
// Add the XMPP codec filter
sslSocketAcceptor.getFilterChain().addFirst("xmpp", new ProtocolCodecFilter(new XMPPCodecFactory()));
diff --git a/src/java/org/jivesoftware/multiplexer/ConnectionWorkerThread.java b/src/java/org/jivesoftware/multiplexer/ConnectionWorkerThread.java
index 0e2fbea..db3aa30 100644
--- a/src/java/org/jivesoftware/multiplexer/ConnectionWorkerThread.java
+++ b/src/java/org/jivesoftware/multiplexer/ConnectionWorkerThread.java
@@ -13,7 +13,6 @@
import com.jcraft.jzlib.JZlib;
import com.jcraft.jzlib.ZInputStream;
-import org.dom4j.DocumentFactory;
import org.dom4j.Element;
import org.dom4j.io.XMPPPacketReader;
import org.jivesoftware.multiplexer.net.DNSUtil;
@@ -54,7 +53,6 @@
*/
public static final int DEFAULT_MULTIPLEX_PORT = 5262;
- private static DocumentFactory docFactory = DocumentFactory.getInstance();
// Sequence and random number generator used for creating unique IQ ID's.
private static int sequence = 0;
private static Random random = new Random();
@@ -388,16 +386,14 @@
* @param streamID the stream ID assigned by the connection manager to the new session.
*/
public void clientSessionCreated(String streamID) {
- Element iq = docFactory.createDocument().addElement("iq");
- iq.addAttribute("type", "set");
- iq.addAttribute("to", serverName);
- iq.addAttribute("from", jidAddress);
- iq.addAttribute("id", String.valueOf(random.nextInt(1000) + "-" + sequence++));
- Element child = iq.addElement("session", "http://jabber.org/protocol/connectionmanager");
- child.addAttribute("id", streamID);
- child.addElement("create");
+ StringBuilder sb = new StringBuilder(100);
+ sb.append("");
// Forward the notification to the server
- connection.deliver(iq);
+ connection.deliver(sb.toString());
}
/**
@@ -406,16 +402,14 @@
* @param streamID the stream ID assigned by the connection manager to the closed session.
*/
public void clientSessionClosed(String streamID) {
- Element iq = docFactory.createDocument().addElement("iq");
- iq.addAttribute("type", "set");
- iq.addAttribute("to", serverName);
- iq.addAttribute("from", jidAddress);
- iq.addAttribute("id", String.valueOf(random.nextInt(1000) + "-" + sequence++));
- Element child = iq.addElement("session", "http://jabber.org/protocol/connectionmanager");
- child.addAttribute("id", streamID);
- child.addElement("close");
+ StringBuilder sb = new StringBuilder(100);
+ sb.append("");
// Forward the notification to the server
- connection.deliver(iq);
+ connection.deliver(sb.toString());
}
/**
@@ -427,16 +421,14 @@
* longer available session.
*/
public void deliveryFailed(Element stanza, String streamID) {
- Element iq = docFactory.createDocument().addElement("iq");
- iq.addAttribute("type", "set");
- iq.addAttribute("to", serverName);
- iq.addAttribute("from", jidAddress);
- iq.addAttribute("id", String.valueOf(random.nextInt(1000) + "-" + sequence++));
- Element child = iq.addElement("session", "http://jabber.org/protocol/connectionmanager");
- child.addAttribute("id", streamID);
- child.addElement("failed").add(stanza.createCopy());
+ StringBuilder sb = new StringBuilder(100);
+ sb.append("").append(stanza.asXML()).append("");
// Send notification to the server
- connection.deliver(iq);
+ connection.deliver(sb.toString());
}
public void run() {
@@ -470,14 +462,17 @@
* @param stanza the original client stanza that is going to be wrapped.
* @param streamID the stream ID assigned by the connection manager to the client session.
*/
- public void deliver(Element stanza, String streamID) {
+ public void deliver(String stanza, String streamID) {
// Wrap the stanza
- Element wrapper = docFactory.createDocument().addElement("route");
- wrapper.addAttribute("to", serverName);
- wrapper.addAttribute("from", jidAddress);
- wrapper.addAttribute("streamid", streamID);
- wrapper.add(stanza.createCopy());
+ StringBuilder sb = new StringBuilder(80);
+ sb.append("");
+ sb.append(stanza);
+ sb.append("");
+
// Forward the wrapped stanza to the server
- connection.deliver(wrapper);
+ connection.deliver(sb.toString());
}
}
diff --git a/src/java/org/jivesoftware/multiplexer/PacketRouter.java b/src/java/org/jivesoftware/multiplexer/PacketRouter.java
index 9301e51..7e15cb0 100644
--- a/src/java/org/jivesoftware/multiplexer/PacketRouter.java
+++ b/src/java/org/jivesoftware/multiplexer/PacketRouter.java
@@ -11,8 +11,6 @@
package org.jivesoftware.multiplexer;
-import org.dom4j.Element;
-
/**
* A router that handles incoming packets. Packets will be routed to their
* corresponding handler. A router is much like a forwarded with some logic
@@ -25,8 +23,8 @@
/**
* Routes the given packet based on its type.
*
- * @param doc The packet to route.
+ * @param stanza The stanza to route.
* @param streamID The ID of the client's stream.
*/
- void route(Element doc, String streamID);
+ void route(String stanza, String streamID);
}
diff --git a/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java b/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java
index 4e5fdc5..b015b05 100644
--- a/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java
+++ b/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java
@@ -147,7 +147,7 @@
error.addElement("unexpected-request")
.addAttribute("xmlns", "urn:ietf:params:xml:ns:xmpp-stanzas");
// Bounce the failed IQ packet
- connectionManager.getServerSurrogate().send(reply, streamID);
+ connectionManager.getServerSurrogate().send(reply.asXML(), streamID);
}
}
}
@@ -195,7 +195,7 @@
reply.addAttribute("type", "result");
reply.addAttribute("to", connectionManager.getServerName());
reply.addAttribute("from", jidAddress);
- connection.deliver(reply);
+ connection.deliver(reply.asXML());
}
}
diff --git a/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java b/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java
index d509af7..e53f31a 100644
--- a/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java
+++ b/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java
@@ -179,7 +179,7 @@
* @param stanza the stanza to send to the server.
* @param streamID the stream ID assigned by the connection manager to the session.
*/
- public void send(Element stanza, String streamID) {
+ public void send(String stanza, String streamID) {
threadPool.execute(new RouteTask(streamID, stanza));
}
diff --git a/src/java/org/jivesoftware/multiplexer/net/ClientConnectionHandler.java b/src/java/org/jivesoftware/multiplexer/net/ClientConnectionHandler.java
index 79e74bf..d1a7e2b 100644
--- a/src/java/org/jivesoftware/multiplexer/net/ClientConnectionHandler.java
+++ b/src/java/org/jivesoftware/multiplexer/net/ClientConnectionHandler.java
@@ -14,6 +14,7 @@
import org.apache.mina.common.IoSession;
import org.jivesoftware.multiplexer.spi.ClientFailoverDeliverer;
import org.jivesoftware.util.JiveGlobals;
+import org.xmlpull.v1.XmlPullParserException;
/**
* ConnectionHandler that knows which subclass of {@link StanzaHandler} should
@@ -23,7 +24,7 @@
*/
public class ClientConnectionHandler extends ConnectionHandler {
- StanzaHandler createStanzaHandler(NIOConnection connection) {
+ StanzaHandler createStanzaHandler(NIOConnection connection) throws XmlPullParserException {
return new ClientStanzaHandler(router, serverName, connection);
}
diff --git a/src/java/org/jivesoftware/multiplexer/net/ClientStanzaHandler.java b/src/java/org/jivesoftware/multiplexer/net/ClientStanzaHandler.java
index 249c574..9d345e6 100644
--- a/src/java/org/jivesoftware/multiplexer/net/ClientStanzaHandler.java
+++ b/src/java/org/jivesoftware/multiplexer/net/ClientStanzaHandler.java
@@ -25,7 +25,8 @@
*/
class ClientStanzaHandler extends StanzaHandler {
- public ClientStanzaHandler(PacketRouter router, String serverName, Connection connection) {
+ public ClientStanzaHandler(PacketRouter router, String serverName, Connection connection)
+ throws XmlPullParserException {
super(router, serverName, connection);
}
diff --git a/src/java/org/jivesoftware/multiplexer/net/ConnectionHandler.java b/src/java/org/jivesoftware/multiplexer/net/ConnectionHandler.java
index 54cd641..20f2ea6 100644
--- a/src/java/org/jivesoftware/multiplexer/net/ConnectionHandler.java
+++ b/src/java/org/jivesoftware/multiplexer/net/ConnectionHandler.java
@@ -19,8 +19,13 @@
import org.jivesoftware.multiplexer.PacketRouter;
import org.jivesoftware.multiplexer.spi.ServerRouter;
import org.jivesoftware.util.Log;
+import org.xmlpull.v1.XmlPullParser;
+import org.xmlpull.v1.XmlPullParserException;
+import org.xmlpull.v1.XmlPullParserFactory;
import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* A ConnectionHandler is responsible for creating new sessions, destroying sessions and delivering
@@ -40,6 +45,21 @@
protected static PacketRouter router = new ServerRouter();
protected static String serverName = ConnectionManager.getInstance().getServerName();
+ private static Map parsers = new ConcurrentHashMap();
+ /**
+ * Reuse the same factory for all the connections.
+ */
+ private static XmlPullParserFactory factory = null;
+
+ static {
+ try {
+ factory = XmlPullParserFactory.newInstance(MXParser.class.getName(), null);
+ factory.setNamespaceAware(true);
+ }
+ catch (XmlPullParserException e) {
+ Log.error("Error creating a parser factory", e);
+ }
+ }
public void sessionOpened(IoSession session) throws Exception {
// Create a new XML parser for the new connection. The parser will be used by the XMPPDecoder filter.
@@ -87,9 +107,21 @@
//System.out.println("RCVD: " + message);
// Get the stanza handler for this session
StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER);
+ // Get the parser to use to process stanza. For optimization there is going
+ // to be a parser for each running thread. Each Filter will be executed
+ // by the Executor placed as the first Filter. So we can have a parser associated
+ // to each Thread
+ int hashCode = Thread.currentThread().hashCode();
+ XmlPullParser parser = parsers.get(hashCode);
+ if (parser == null) {
+ parser = factory.newPullParser();
+ parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true);
+ parsers.put(hashCode, parser);
+ }
+
// Let the stanza handler process the received stanza
try {
- handler.process( (String) message);
+ handler.process( (String) message, parser);
} catch (Exception e) {
Log.error("Closing connection due to error while processing message: " + message, e);
Connection connection = (Connection) session.getAttribute(CONNECTION);
@@ -99,7 +131,7 @@
abstract NIOConnection createNIOConnection(IoSession session);
- abstract StanzaHandler createStanzaHandler(NIOConnection connection);
+ abstract StanzaHandler createStanzaHandler(NIOConnection connection) throws XmlPullParserException;
/**
* Returns the max number of seconds a connection can be idle (both ways) before
diff --git a/src/java/org/jivesoftware/multiplexer/net/NIOConnection.java b/src/java/org/jivesoftware/multiplexer/net/NIOConnection.java
index 73fc333..dc0e842 100644
--- a/src/java/org/jivesoftware/multiplexer/net/NIOConnection.java
+++ b/src/java/org/jivesoftware/multiplexer/net/NIOConnection.java
@@ -17,21 +17,25 @@
import org.apache.mina.filter.CompressionFilter;
import org.apache.mina.filter.SSLFilter;
import org.dom4j.Element;
-import org.dom4j.io.OutputFormat;
+import org.dom4j.io.XMPPPacketReader;
import org.jivesoftware.multiplexer.Connection;
import org.jivesoftware.multiplexer.ConnectionCloseListener;
import org.jivesoftware.multiplexer.PacketDeliverer;
import org.jivesoftware.multiplexer.Session;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
-import org.jivesoftware.util.XMLWriter;
+import org.xmlpull.v1.XmlPullParserException;
+import org.xmlpull.v1.XmlPullParserFactory;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
+import java.io.StringReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
import java.security.KeyStore;
import java.util.HashMap;
import java.util.Map;
@@ -50,6 +54,10 @@
* The utf-8 charset for decoding and encoding XMPP packet streams.
*/
public static final String CHARSET = "UTF-8";
+ /**
+ * Reuse the same factory for all the connections.
+ */
+ private static XmlPullParserFactory factory = null;
private Session session;
private IoSession ioSession;
@@ -77,11 +85,22 @@
* Compression policy currently in use for this connection.
*/
private CompressionPolicy compressionPolicy = CompressionPolicy.disabled;
+ private CharsetEncoder encoder;
+ static {
+ try {
+ factory = XmlPullParserFactory.newInstance(MXParser.class.getName(), null);
+ factory.setNamespaceAware(true);
+ }
+ catch (XmlPullParserException e) {
+ Log.error("Error creating a parser factory", e);
+ }
+ }
public NIOConnection(IoSession session, PacketDeliverer packetDeliverer) {
this.ioSession = session;
this.backupDeliverer = packetDeliverer;
+ encoder = Charset.forName(CHARSET).newEncoder();
}
public boolean validate() {
@@ -186,9 +205,17 @@
return ioSession.getFilterChain().contains("tls");
}
- public void deliver(Element doc) {
+ public void deliver(String stanza) {
if (isClosed()) {
- backupDeliverer.deliver(doc);
+ XMPPPacketReader xmppReader = new XMPPPacketReader();
+ xmppReader.setXPPFactory(factory);
+ try {
+ xmppReader.read(new StringReader(stanza));
+ Element doc = xmppReader.parseDocument().getRootElement();
+ backupDeliverer.deliver(doc);
+ } catch (Exception e) {
+ Log.error("Error parsing stanza: " + stanza, e);
+ }
}
else {
ByteBuffer buffer = ByteBuffer.allocate(4096);
@@ -196,9 +223,7 @@
boolean errorDelivering = false;
try {
- XMLWriter xmlSerializer = new XMLWriter(buffer.asOutputStream(), new OutputFormat());
- xmlSerializer.write(doc);
- xmlSerializer.flush();
+ buffer.putString(stanza, encoder);
if (flashClient) {
buffer.put((byte) '\0');
}
@@ -214,7 +239,15 @@
close();
// Retry sending the packet again. Most probably if the packet is a
// Message it will be stored offline
- backupDeliverer.deliver(doc);
+ XMPPPacketReader xmppReader = new XMPPPacketReader();
+ xmppReader.setXPPFactory(factory);
+ try {
+ xmppReader.read(new StringReader(stanza));
+ Element doc = xmppReader.parseDocument().getRootElement();
+ backupDeliverer.deliver(doc);
+ } catch (Exception e) {
+ Log.error("Error parsing stanza: " + stanza, e);
+ }
}
}
}
diff --git a/src/java/org/jivesoftware/multiplexer/net/SocketConnection.java b/src/java/org/jivesoftware/multiplexer/net/SocketConnection.java
index 043d31d..f8bc672 100644
--- a/src/java/org/jivesoftware/multiplexer/net/SocketConnection.java
+++ b/src/java/org/jivesoftware/multiplexer/net/SocketConnection.java
@@ -14,16 +14,16 @@
import com.jcraft.jzlib.JZlib;
import com.jcraft.jzlib.ZOutputStream;
import org.dom4j.Element;
+import org.dom4j.io.XMPPPacketReader;
import org.jivesoftware.multiplexer.*;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
+import org.xmlpull.v1.XmlPullParserException;
+import org.xmlpull.v1.XmlPullParserFactory;
import javax.net.ssl.SSLSession;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
+import java.io.*;
import java.net.InetAddress;
import java.net.Socket;
import java.nio.channels.Channels;
@@ -49,6 +49,10 @@
* The utf-8 charset for decoding and encoding XMPP packet streams.
*/
public static final String CHARSET = "UTF-8";
+ /**
+ * Reuse the same factory for all the connections.
+ */
+ private static XmlPullParserFactory factory = null;
private static Map instances =
new ConcurrentHashMap();
@@ -82,7 +86,6 @@
private Session session;
private boolean secure;
private boolean compressed;
- private org.jivesoftware.util.XMLWriter xmlSerializer;
private boolean flashClient = false;
private int majorVersion = 1;
private int minorVersion = 0;
@@ -101,6 +104,16 @@
*/
private CompressionPolicy compressionPolicy = CompressionPolicy.disabled;
+ static {
+ try {
+ factory = XmlPullParserFactory.newInstance(MXParser.class.getName(), null);
+ factory.setNamespaceAware(true);
+ }
+ catch (XmlPullParserException e) {
+ Log.error("Error creating a parser factory", e);
+ }
+ }
+
public static Collection getInstances() {
return instances.keySet();
}
@@ -129,7 +142,6 @@
writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), CHARSET));
}
this.backupDeliverer = backupDeliverer;
- xmlSerializer = new XMLSocketWriter(writer, this);
instances.put(this, "");
}
@@ -158,7 +170,6 @@
tlsStreamHandler.start();
// Use new wrapped writers
writer = new BufferedWriter(new OutputStreamWriter(tlsStreamHandler.getOutputStream(), CHARSET));
- xmlSerializer = new XMLSocketWriter(writer, this);
}
}
@@ -170,13 +181,11 @@
ZOutputStream out = new ZOutputStream(socket.getOutputStream(), JZlib.Z_BEST_COMPRESSION);
out.setFlushMode(JZlib.Z_PARTIAL_FLUSH);
writer = new BufferedWriter(new OutputStreamWriter(out, CHARSET));
- xmlSerializer = new XMLSocketWriter(writer, this);
}
else {
ZOutputStream out = new ZOutputStream(tlsStreamHandler.getOutputStream(), JZlib.Z_BEST_COMPRESSION);
out.setFlushMode(JZlib.Z_PARTIAL_FLUSH);
writer = new BufferedWriter(new OutputStreamWriter(out, CHARSET));
- xmlSerializer = new XMLSocketWriter(writer, this);
}
} catch (IOException e) {
// TODO Would be nice to still be able to throw the exception and not catch it here
@@ -494,9 +503,17 @@
}
}
- public void deliver(Element doc) {
+ public void deliver(String stanza) {
if (isClosed()) {
- backupDeliverer.deliver(doc);
+ XMPPPacketReader xmppReader = new XMPPPacketReader();
+ xmppReader.setXPPFactory(factory);
+ try {
+ xmppReader.read(new StringReader(stanza));
+ Element doc = xmppReader.parseDocument().getRootElement();
+ backupDeliverer.deliver(doc);
+ } catch (Exception e) {
+ Log.error("Error parsing stanza: " + stanza, e);
+ }
}
else {
boolean errorDelivering = false;
@@ -504,11 +521,11 @@
try {
requestWriting();
allowedToWrite = true;
- xmlSerializer.write(doc);
+ writer.write(stanza);
if (flashClient) {
writer.write('\0');
}
- xmlSerializer.flush();
+ writer.flush();
}
catch (Exception e) {
Log.debug("Error delivering packet" + "\n" + this.toString(), e);
@@ -523,7 +540,15 @@
close();
// Retry sending the packet again. Most probably if the packet is a
// Message it will be stored offline
- backupDeliverer.deliver(doc);
+ XMPPPacketReader xmppReader = new XMPPPacketReader();
+ xmppReader.setXPPFactory(factory);
+ try {
+ xmppReader.read(new StringReader(stanza));
+ Element doc = xmppReader.parseDocument().getRootElement();
+ backupDeliverer.deliver(doc);
+ } catch (Exception e) {
+ Log.error("Error parsing stanza: " + stanza, e);
+ }
}
}
}
diff --git a/src/java/org/jivesoftware/multiplexer/net/StanzaHandler.java b/src/java/org/jivesoftware/multiplexer/net/StanzaHandler.java
index 33a0950..0d4ef72 100644
--- a/src/java/org/jivesoftware/multiplexer/net/StanzaHandler.java
+++ b/src/java/org/jivesoftware/multiplexer/net/StanzaHandler.java
@@ -11,9 +11,8 @@
package org.jivesoftware.multiplexer.net;
-import org.dom4j.DocumentException;
import org.dom4j.Element;
-import org.dom4j.io.SAXReader;
+import org.dom4j.io.XMPPPacketReader;
import org.jivesoftware.multiplexer.*;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.StringUtils;
@@ -67,8 +66,6 @@
*/
private PacketRouter router;
- private SAXReader reader;
-
static {
try {
factory = XmlPullParserFactory.newInstance(MXParser.class.getName(), null);
@@ -85,17 +82,15 @@
* @param router the router for sending packets that were read.
* @param serverName the name of the server this socket is working for.
* @param connection the connection being read.
+ * @throws org.xmlpull.v1.XmlPullParserException
*/
- public StanzaHandler(PacketRouter router, String serverName, Connection connection) {
+ public StanzaHandler(PacketRouter router, String serverName, Connection connection) throws XmlPullParserException {
this.serverName = serverName;
this.router = router;
this.connection = connection;
- // Reader is associated with a new XMPPPacketReader
- reader = new SAXReader();
- reader.setEncoding(CHARSET);
}
- public void process(String stanza) throws Exception {
+ public void process(String stanza, XmlPullParser parser) throws Exception {
boolean initialStream = stanza.startsWith(STREAM_START);
if (!sessionCreated || initialStream) {
@@ -106,7 +101,6 @@
// Found an stream:stream tag...
if (!sessionCreated) {
sessionCreated = true;
- MXParser parser = (MXParser) factory.newPullParser();
parser.setInput(new StringReader(stanza));
createSession(parser);
} else if (startedSASL && session.getStatus() == Session.STATUS_AUTHENTICATED) {
@@ -119,23 +113,22 @@
return;
}
- // Create DOM object from received stanza
- Element doc;
- try {
- doc = reader.read(new StringReader(stanza)).getRootElement();
- } catch (DocumentException e) {
- if (stanza.equals("")) {
- session.close();
- return;
- }
- // Throw the exception. This will close the connection
- throw e;
- }
- if (doc == null) {
- // No document found.
+ // Verify if end of stream was requested
+ if (stanza.equals("")) {
+ session.close();
return;
}
- String tag = doc.getName();
+ // Reset XPP parser with new stanza
+ parser.setInput(new StringReader(stanza));
+ parser.next();
+ String tag = parser.getName();
+ // Verify that XML stanza is valid (i.e. well-formed)
+ boolean valid = validateStanza(stanza, parser);
+
+ if (!valid) {
+ session.close();
+ return;
+ }
if ("starttls".equals(tag)) {
// Negotiate TLS
if (negotiateTLS()) {
@@ -148,31 +141,42 @@
// User is trying to authenticate using SASL
startedSASL = true;
// Forward packet to the server
- process(doc);
+ route(stanza);
} else if ("compress".equals(tag)) {
// Client is trying to initiate compression
- if (compressClient(doc)) {
+ if (compressClient(stanza)) {
// Compression was successful so open a new stream and offer
// resource binding and session establishment (to client sessions only)
waitingCompressionACK = true;
}
} else {
- process(doc);
+ route(stanza);
}
}
- protected void process(Element doc) {
- if (doc == null) {
- return;
+ private boolean validateStanza(String stanza, XmlPullParser parser) {
+ // TODO Detect when XML stanza is not complete
+ try {
+ int eventType = parser.getEventType();
+ while (eventType != XmlPullParser.END_DOCUMENT) {
+ eventType = parser.next();
+ }
+ } catch (Exception e) {
+ Log.error("Error parsing XML stanza: " + stanza, e);
+ return false;
}
+ return true;
+ }
+
+ private void route(String stanza) {
// Ensure that connection was secured if TLS was required
if (connection.getTlsPolicy() == Connection.TLSPolicy.required &&
!connection.isSecure()) {
closeNeverSecuredConnection();
return;
}
- router.route(doc, session.getStreamID());
+ router.route(stanza, session.getStreamID());
}
/**
@@ -257,12 +261,11 @@
* is already using compression or if client requested to use compression but this feature
* is disabled.
*
- * @param doc the element sent by the client requesting compression. Compression method is
+ * @param stanza the XML stanza sent by the client requesting compression. Compression method is
* included.
* @return true if it was possible to use compression.
- * @throws IOException if an error occurs while starting using compression.
*/
- private boolean compressClient(Element doc) {
+ private boolean compressClient(String stanza) {
String error = null;
if (connection.getCompressionPolicy() == Connection.CompressionPolicy.disabled) {
// Client requested compression but this feature is disabled
@@ -277,6 +280,18 @@
Log.warn("Client requested compression and connection is already compressed. Closing " +
"connection : " + connection);
} else {
+ XMPPPacketReader xmppReader = new XMPPPacketReader();
+ xmppReader.setXPPFactory(factory);
+ Element doc;
+ try {
+ xmppReader.read(new StringReader(stanza));
+ doc = xmppReader.parseDocument().getRootElement();
+ } catch (Exception e) {
+ Log.error("Error parsing compression stanza: " + stanza, e);
+ connection.close();
+ return false;
+ }
+
// Check that the requested method is supported
String method = doc.elementText("method");
if (!"zlib".equals(method)) {
@@ -375,6 +390,9 @@
* If the connection remains open, the XPP will be set to be ready for the
* first packet. A call to next() should result in an START_TAG state with
* the first packet in the stream.
+ * @param xpp
+ * @throws java.io.IOException
+ * @throws org.xmlpull.v1.XmlPullParserException
*/
protected void createSession(XmlPullParser xpp) throws XmlPullParserException, IOException {
for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) {
@@ -472,6 +490,9 @@
* Creates the appropriate {@link Session} subclass based on the specified namespace.
*
* @param namespace the namespace sent in the stream element. eg. jabber:client.
+ * @param serverName
+ * @param xpp
+ * @param connection
* @return the created session or null.
* @throws org.xmlpull.v1.XmlPullParserException
*/
diff --git a/src/java/org/jivesoftware/multiplexer/net/http/HttpSessionManager.java b/src/java/org/jivesoftware/multiplexer/net/http/HttpSessionManager.java
index 2a9e795..eeffc32 100644
--- a/src/java/org/jivesoftware/multiplexer/net/http/HttpSessionManager.java
+++ b/src/java/org/jivesoftware/multiplexer/net/http/HttpSessionManager.java
@@ -10,12 +10,12 @@
*/
package org.jivesoftware.multiplexer.net.http;
+import org.dom4j.*;
+import org.jivesoftware.multiplexer.ConnectionManager;
+import org.jivesoftware.multiplexer.ServerSurrogate;
+import org.jivesoftware.multiplexer.Session;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
-import org.jivesoftware.multiplexer.ServerSurrogate;
-import org.jivesoftware.multiplexer.ConnectionManager;
-import org.jivesoftware.multiplexer.Session;
-import org.dom4j.*;
import java.util.*;
@@ -203,7 +203,7 @@
session.addConnection(connection, isPoll);
for (Element packet : elements) {
- serverSurrogate.send(packet, session.getStreamID());
+ serverSurrogate.send(packet.asXML(), session.getStreamID());
}
return connection;
diff --git a/src/java/org/jivesoftware/multiplexer/spi/ClientFailoverDeliverer.java b/src/java/org/jivesoftware/multiplexer/spi/ClientFailoverDeliverer.java
index 1defb17..f68f47c 100644
--- a/src/java/org/jivesoftware/multiplexer/spi/ClientFailoverDeliverer.java
+++ b/src/java/org/jivesoftware/multiplexer/spi/ClientFailoverDeliverer.java
@@ -11,10 +11,10 @@
package org.jivesoftware.multiplexer.spi;
+import org.dom4j.Element;
+import org.jivesoftware.multiplexer.ConnectionManager;
import org.jivesoftware.multiplexer.PacketDeliverer;
import org.jivesoftware.multiplexer.ServerSurrogate;
-import org.jivesoftware.multiplexer.ConnectionManager;
-import org.dom4j.Element;
/**
* Deliverer to use when a stanza received from the server failed to be forwarded
@@ -50,7 +50,7 @@
error.addElement("unexpected-request")
.addAttribute("xmlns", "urn:ietf:params:xml:ns:xmpp-stanzas");
// Bounce the failed IQ packet
- serverSurrogate.send(reply, streamID);
+ serverSurrogate.send(reply.asXML(), streamID);
}
}
}
diff --git a/src/java/org/jivesoftware/multiplexer/spi/ServerRouter.java b/src/java/org/jivesoftware/multiplexer/spi/ServerRouter.java
index dd0a656..431a9be 100644
--- a/src/java/org/jivesoftware/multiplexer/spi/ServerRouter.java
+++ b/src/java/org/jivesoftware/multiplexer/spi/ServerRouter.java
@@ -11,10 +11,9 @@
package org.jivesoftware.multiplexer.spi;
-import org.jivesoftware.multiplexer.PacketRouter;
import org.jivesoftware.multiplexer.ConnectionManager;
+import org.jivesoftware.multiplexer.PacketRouter;
import org.jivesoftware.multiplexer.ServerSurrogate;
-import org.dom4j.Element;
/**
* Packet router that will route all traffic to the server.
@@ -29,7 +28,7 @@
serverSurrogate = ConnectionManager.getInstance().getServerSurrogate();
}
- public void route(Element stanza, String streamID) {
+ public void route(String stanza, String streamID) {
serverSurrogate.send(stanza, streamID);
}
}
diff --git a/src/java/org/jivesoftware/multiplexer/task/RouteTask.java b/src/java/org/jivesoftware/multiplexer/task/RouteTask.java
index ba8b95d..4ccbd9c 100644
--- a/src/java/org/jivesoftware/multiplexer/task/RouteTask.java
+++ b/src/java/org/jivesoftware/multiplexer/task/RouteTask.java
@@ -11,9 +11,8 @@
package org.jivesoftware.multiplexer.task;
-import org.jivesoftware.multiplexer.ConnectionWorkerThread;
import org.jivesoftware.multiplexer.ClientSession;
-import org.dom4j.Element;
+import org.jivesoftware.multiplexer.ConnectionWorkerThread;
/**
* Task that forwards client packets to the server.
@@ -22,9 +21,9 @@
*/
public class RouteTask extends ClientTask {
- private Element stanza;
+ private String stanza;
- public RouteTask(String streamID, Element stanza) {
+ public RouteTask(String streamID, String stanza) {
super(streamID);
this.stanza = stanza;
}