diff --git a/src/java/org/jivesoftware/multiplexer/net/ClientConnectionHandler.java b/src/java/org/jivesoftware/multiplexer/net/ClientConnectionHandler.java new file mode 100644 index 0000000..79e74bf --- /dev/null +++ b/src/java/org/jivesoftware/multiplexer/net/ClientConnectionHandler.java @@ -0,0 +1,38 @@ +/** + * $RCSfile$ + * $Revision: $ + * $Date: $ + * + * Copyright (C) 2006 Jive Software. All rights reserved. + * + * This software is published under the terms of the GNU Public License (GPL), + * a copy of which is included in this distribution. + */ + +package org.jivesoftware.multiplexer.net; + +import org.apache.mina.common.IoSession; +import org.jivesoftware.multiplexer.spi.ClientFailoverDeliverer; +import org.jivesoftware.util.JiveGlobals; + +/** + * ConnectionHandler that knows which subclass of {@link StanzaHandler} should + * be created and how to build and configure a {@link NIOConnection}. + * + * @author Gaston Dombiak + */ +public class ClientConnectionHandler extends ConnectionHandler { + + StanzaHandler createStanzaHandler(NIOConnection connection) { + return new ClientStanzaHandler(router, serverName, connection); + } + + NIOConnection createNIOConnection(IoSession session) { + return new NIOConnection(session, new ClientFailoverDeliverer()); + } + + int getMaxIdleTime() { + // Return 30 minuntes + return JiveGlobals.getIntProperty("xmpp.client.idle", 30 * 60 * 1000) / 1000; + } +} diff --git a/src/java/org/jivesoftware/multiplexer/net/ClientStanzaHandler.java b/src/java/org/jivesoftware/multiplexer/net/ClientStanzaHandler.java new file mode 100644 index 0000000..249c574 --- /dev/null +++ b/src/java/org/jivesoftware/multiplexer/net/ClientStanzaHandler.java @@ -0,0 +1,49 @@ +/** + * $RCSfile$ + * $Revision: $ + * $Date: $ + * + * Copyright (C) 2006 Jive Software. All rights reserved. + * + * This software is published under the terms of the GNU Public License (GPL), + * a copy of which is included in this distribution. + */ + +package org.jivesoftware.multiplexer.net; + +import org.jivesoftware.multiplexer.ClientSession; +import org.jivesoftware.multiplexer.Connection; +import org.jivesoftware.multiplexer.PacketRouter; +import org.jivesoftware.util.JiveGlobals; +import org.xmlpull.v1.XmlPullParser; +import org.xmlpull.v1.XmlPullParserException; + +/** + * Handler of XML stanzas sent by clients. + * + * @author Gaston Dombiak + */ +class ClientStanzaHandler extends StanzaHandler { + + public ClientStanzaHandler(PacketRouter router, String serverName, Connection connection) { + super(router, serverName, connection); + } + + String getNamespace() { + return "jabber:client"; + } + + boolean validateHost() { + return JiveGlobals.getBooleanProperty("xmpp.client.validate.host",false); + } + + boolean createSession(String namespace, String serverName, XmlPullParser xpp, Connection connection) + throws XmlPullParserException { + if ("jabber:client".equals(namespace)) { + // The connected client is a regular client so create a ClientSession + session = ClientSession.createSession(serverName, xpp, connection); + return true; + } + return false; + } +} diff --git a/src/java/org/jivesoftware/multiplexer/net/ConnectionHandler.java b/src/java/org/jivesoftware/multiplexer/net/ConnectionHandler.java new file mode 100644 index 0000000..54cd641 --- /dev/null +++ b/src/java/org/jivesoftware/multiplexer/net/ConnectionHandler.java @@ -0,0 +1,111 @@ +/** + * $RCSfile$ + * $Revision: $ + * $Date: $ + * + * Copyright (C) 2006 Jive Software. All rights reserved. + * + * This software is published under the terms of the GNU Public License (GPL), + * a copy of which is included in this distribution. + */ + +package org.jivesoftware.multiplexer.net; + +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoSession; +import org.jivesoftware.multiplexer.Connection; +import org.jivesoftware.multiplexer.ConnectionManager; +import org.jivesoftware.multiplexer.PacketRouter; +import org.jivesoftware.multiplexer.spi.ServerRouter; +import org.jivesoftware.util.Log; + +import java.io.IOException; + +/** + * A ConnectionHandler is responsible for creating new sessions, destroying sessions and delivering + * received XML stanzas to the proper StanzaHandler. + * + * @author Gaston Dombiak + */ +public abstract class ConnectionHandler extends IoHandlerAdapter { + + /** + * The utf-8 charset for decoding and encoding Jabber packet streams. + */ + static final String CHARSET = "UTF-8"; + static final String XML_PARSER = "XML-PARSER"; + private static final String HANDLER = "HANDLER"; + private static final String CONNECTION = "CONNECTION"; + + protected static PacketRouter router = new ServerRouter(); + protected static String serverName = ConnectionManager.getInstance().getServerName(); + + public void sessionOpened(IoSession session) throws Exception { + // Create a new XML parser for the new connection. The parser will be used by the XMPPDecoder filter. + XMLLightweightParser parser = new XMLLightweightParser(CHARSET); + session.setAttribute(XML_PARSER, parser); + // Create a new NIOConnection for the new session + NIOConnection connection = createNIOConnection(session); + session.setAttribute(CONNECTION, connection); + session.setAttribute(HANDLER, createStanzaHandler(connection)); + // Set the max time a connection can be idle before closing it + int idleTime = getMaxIdleTime(); + if (idleTime > 0) { + session.setIdleTime(IdleStatus.BOTH_IDLE, idleTime); + } + } + + public void sessionClosed(IoSession session) throws Exception { + // Get the connection for this session + Connection connection = (Connection) session.getAttribute(CONNECTION); + // Inform the connection that it was closed + connection.close(); + } + + public void sessionIdle(IoSession session, IdleStatus status) throws Exception { + // Get the connection for this session + Connection connection = (Connection) session.getAttribute(CONNECTION); + // Close idle connection + if (Log.isDebugEnabled()) { + Log.debug("Closing connection that has been idle: " + connection); + } + connection.close(); + } + + public void exceptionCaught(IoSession session, Throwable cause) throws Exception { + if (cause instanceof IOException) { + // TODO Verify if there were packets pending to be sent and decide what to do with them + Log.debug(cause); + } + else { + Log.error(cause); + } + } + + public void messageReceived(IoSession session, Object message) throws Exception { + //System.out.println("RCVD: " + message); + // Get the stanza handler for this session + StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER); + // Let the stanza handler process the received stanza + try { + handler.process( (String) message); + } catch (Exception e) { + Log.error("Closing connection due to error while processing message: " + message, e); + Connection connection = (Connection) session.getAttribute(CONNECTION); + connection.close(); + } + } + + abstract NIOConnection createNIOConnection(IoSession session); + + abstract StanzaHandler createStanzaHandler(NIOConnection connection); + + /** + * Returns the max number of seconds a connection can be idle (both ways) before + * being closed.

+ * + * @return the max number of seconds a connection can be idle. + */ + abstract int getMaxIdleTime(); +} diff --git a/src/java/org/jivesoftware/multiplexer/net/NIOConnection.java b/src/java/org/jivesoftware/multiplexer/net/NIOConnection.java new file mode 100644 index 0000000..73fc333 --- /dev/null +++ b/src/java/org/jivesoftware/multiplexer/net/NIOConnection.java @@ -0,0 +1,360 @@ +/** + * $RCSfile$ + * $Revision: $ + * $Date: $ + * + * Copyright (C) 2006 Jive Software. All rights reserved. + * + * This software is published under the terms of the GNU Public License (GPL), + * a copy of which is included in this distribution. + */ + +package org.jivesoftware.multiplexer.net; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.CompressionFilter; +import org.apache.mina.filter.SSLFilter; +import org.dom4j.Element; +import org.dom4j.io.OutputFormat; +import org.jivesoftware.multiplexer.Connection; +import org.jivesoftware.multiplexer.ConnectionCloseListener; +import org.jivesoftware.multiplexer.PacketDeliverer; +import org.jivesoftware.multiplexer.Session; +import org.jivesoftware.util.JiveGlobals; +import org.jivesoftware.util.Log; +import org.jivesoftware.util.XMLWriter; + +import javax.net.ssl.KeyManager; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.security.KeyStore; +import java.util.HashMap; +import java.util.Map; + +/** + * Implementation of {@link Connection} inteface specific for NIO connections when using + * the MINA framework.

+ * + * MINA project can be found at here. + * + * @author Gaston Dombiak + */ +public class NIOConnection implements Connection { + + /** + * The utf-8 charset for decoding and encoding XMPP packet streams. + */ + public static final String CHARSET = "UTF-8"; + + private Session session; + private IoSession ioSession; + + final private Map listeners = + new HashMap(); + + /** + * Deliverer to use when the connection is closed or was closed when delivering + * a packet. + */ + private PacketDeliverer backupDeliverer; + private boolean flashClient = false; + private int majorVersion = 1; + private int minorVersion = 0; + private String language = null; + + // TODO Uso el #checkHealth???? + /** + * TLS policy currently in use for this connection. + */ + private TLSPolicy tlsPolicy = TLSPolicy.optional; + + /** + * Compression policy currently in use for this connection. + */ + private CompressionPolicy compressionPolicy = CompressionPolicy.disabled; + + + public NIOConnection(IoSession session, PacketDeliverer packetDeliverer) { + this.ioSession = session; + this.backupDeliverer = packetDeliverer; + } + + public boolean validate() { + if (isClosed()) { + return false; + } + deliverRawText(" "); + return !isClosed(); + } + + public void registerCloseListener(ConnectionCloseListener listener, Object handbackMessage) { + if (isClosed()) { + listener.onConnectionClose(handbackMessage); + } + else { + listeners.put(listener, handbackMessage); + } + } + + public void removeCloseListener(ConnectionCloseListener listener) { + listeners.remove(listener); + } + + public InetAddress getInetAddress() throws UnknownHostException { + return ((InetSocketAddress) ioSession.getRemoteAddress()).getAddress(); + } + + public PacketDeliverer getPacketDeliverer() { + return backupDeliverer; + } + + public void close() { + boolean wasClosed = false; + synchronized (this) { + if (!isClosed()) { + try { + deliverRawText(""); + } catch (Exception e) { + // Ignore + } + closeConnection(); + wasClosed = true; + } + } + if (wasClosed) { + notifyCloseListeners(); + } + } + + public void systemShutdown() { + deliverRawText(""); + close(); + } + + /** + * Forces the connection to be closed immediately no matter if closing the socket takes + * a long time. This method should only be called from {@link SocketSendingTracker} when + * sending data over the socket has taken a long time and we need to close the socket, discard + * the connection and its ioSession. + */ + private void forceClose() { + closeConnection(); + // Notify the close listeners so that the SessionManager can send unavailable + // presences if required. + notifyCloseListeners(); + } + + private void closeConnection() { + ioSession.close(); + } + + /** + * Notifies all close listeners that the connection has been closed. + * Used by subclasses to properly finish closing the connection. + */ + private void notifyCloseListeners() { + synchronized (listeners) { + for (ConnectionCloseListener listener : listeners.keySet()) { + try { + listener.onConnectionClose(listeners.get(listener)); + } + catch (Exception e) { + Log.error("Error notifying listener: " + listener, e); + } + } + } + } + + public void init(Session owner) { + session = owner; + } + + public boolean isClosed() { + if (session == null) { + return !ioSession.isConnected(); + } + return session.getStatus() == Session.STATUS_CLOSED; + } + + public boolean isSecure() { + return ioSession.getFilterChain().contains("tls"); + } + + public void deliver(Element doc) { + if (isClosed()) { + backupDeliverer.deliver(doc); + } + else { + ByteBuffer buffer = ByteBuffer.allocate(4096); + buffer.setAutoExpand(true); + + boolean errorDelivering = false; + try { + XMLWriter xmlSerializer = new XMLWriter(buffer.asOutputStream(), new OutputFormat()); + xmlSerializer.write(doc); + xmlSerializer.flush(); + if (flashClient) { + buffer.put((byte) '\0'); + } + buffer.flip(); + //System.out.println("SENT: " + doc.asXML()); + ioSession.write(buffer); + } + catch (Exception e) { + Log.debug("Error delivering packet" + "\n" + this.toString(), e); + errorDelivering = true; + } + if (errorDelivering) { + close(); + // Retry sending the packet again. Most probably if the packet is a + // Message it will be stored offline + backupDeliverer.deliver(doc); + } + } + } + + public void deliverRawText(String text) { + if (!isClosed()) { + ByteBuffer buffer = ByteBuffer.allocate(text.length()); + buffer.setAutoExpand(true); + + boolean errorDelivering = false; + try { + //Charset charset = Charset.forName(CHARSET); + //buffer.putString(text, charset.newEncoder()); + buffer.put(text.getBytes(CHARSET)); + if (flashClient) { + buffer.put((byte) '\0'); + } + buffer.flip(); + //System.out.println("SENT: " + text); + ioSession.write(buffer); + } + catch (Exception e) { + Log.debug("Error delivering raw text" + "\n" + this.toString(), e); + errorDelivering = true; + } + if (errorDelivering) { + close(); + } + } + } + + public void startTLS(boolean clientMode, String remoteServer) throws Exception { + KeyStore ksKeys = SSLConfig.getKeyStore(); + String keypass = SSLConfig.getKeyPassword(); + + KeyStore ksTrust = SSLConfig.getTrustStore(); + String trustpass = SSLConfig.getTrustPassword(); + + // KeyManager's decide which key material to use. + KeyManager[] km = SSLJiveKeyManagerFactory.getKeyManagers(ksKeys, keypass); + + // TrustManager's decide whether to allow connections. + TrustManager[] tm = SSLJiveTrustManagerFactory.getTrustManagers(ksTrust, trustpass); + // TODO Set proper value when s2s is supported + boolean needClientAuth = false; + if (clientMode || needClientAuth) { + // Check if we can trust certificates presented by the server + tm = new TrustManager[]{new ServerTrustManager(remoteServer, ksTrust)}; + } + + SSLContext tlsContext = SSLContext.getInstance("TLS"); + + tlsContext.init(km, tm, null); + + SSLFilter filter = new SSLFilter(tlsContext); + filter.setUseClientMode(clientMode); + if (needClientAuth) { + // Only REQUIRE client authentication if we are fully verifying certificates + if (JiveGlobals.getBooleanProperty("xmpp.server.certificate.verify", true) && + JiveGlobals.getBooleanProperty("xmpp.server.certificate.verify.chain", true) && + !JiveGlobals + .getBooleanProperty("xmpp.server.certificate.accept-selfsigned", false)) + { + filter.setNeedClientAuth(true); + } + else { + // Just indicate that we would like to authenticate the client but if client + // certificates are self-signed or have no certificate chain then we are still + // good + filter.setWantClientAuth(true); + } + } + + ioSession.getFilterChain().addAfter("org.apache.mina.common.ExecutorThreadModel", "tls", filter); + ioSession.setAttribute(SSLFilter.DISABLE_ENCRYPTION_ONCE, Boolean.TRUE); + if (!clientMode) { + // Indicate the client that the server is ready to negotiate TLS + deliverRawText(""); + } + } + + public void startCompression() { + IoFilterChain chain = ioSession.getFilterChain(); + String baseFilter = "org.apache.mina.common.ExecutorThreadModel"; + if (chain.contains("tls")) { + baseFilter = "tls"; + } + chain.addAfter(baseFilter, "compression", new CompressionFilter(CompressionFilter.COMPRESSION_MAX)); + } + + public boolean isFlashClient() { + return flashClient; + } + + public void setFlashClient(boolean flashClient) { + this.flashClient = flashClient; + } + + public int getMajorXMPPVersion() { + return majorVersion; + } + + public int getMinorXMPPVersion() { + return minorVersion; + } + + public void setXMPPVersion(int majorVersion, int minorVersion) { + this.majorVersion = majorVersion; + this.minorVersion = minorVersion; + } + + public String getLanguage() { + return language; + } + + public void setLanaguage(String language) { + this.language = language; + } + + public boolean isCompressed() { + return ioSession.getFilterChain().contains("compression"); + } + + public CompressionPolicy getCompressionPolicy() { + return compressionPolicy; + } + + public void setCompressionPolicy(CompressionPolicy compressionPolicy) { + this.compressionPolicy = compressionPolicy; + } + + public TLSPolicy getTlsPolicy() { + return tlsPolicy; + } + + public void setTlsPolicy(TLSPolicy tlsPolicy) { + this.tlsPolicy = tlsPolicy; + } + + public String toString() { + return super.toString() + " MINA Session: " + ioSession; + } +} diff --git a/src/java/org/jivesoftware/multiplexer/net/StanzaHandler.java b/src/java/org/jivesoftware/multiplexer/net/StanzaHandler.java new file mode 100644 index 0000000..33a0950 --- /dev/null +++ b/src/java/org/jivesoftware/multiplexer/net/StanzaHandler.java @@ -0,0 +1,480 @@ +/** + * $RCSfile$ + * $Revision: $ + * $Date: $ + * + * Copyright (C) 2006 Jive Software. All rights reserved. + * + * This software is published under the terms of the GNU Public License (GPL), + * a copy of which is included in this distribution. + */ + +package org.jivesoftware.multiplexer.net; + +import org.dom4j.DocumentException; +import org.dom4j.Element; +import org.dom4j.io.SAXReader; +import org.jivesoftware.multiplexer.*; +import org.jivesoftware.util.Log; +import org.jivesoftware.util.StringUtils; +import org.xmlpull.v1.XmlPullParser; +import org.xmlpull.v1.XmlPullParserException; +import org.xmlpull.v1.XmlPullParserFactory; + +import java.io.IOException; +import java.io.StringReader; + +/** + * A StanzaHandler is the main responsible for handling incoming stanzas. Some stanzas like startTLS + * are totally managed by this class. Other stanzas are just forwarded to the server. + * + * @author Gaston Dombiak + */ +abstract class StanzaHandler { + /** + * The utf-8 charset for decoding and encoding Jabber packet streams. + */ + protected static String CHARSET = "UTF-8"; + private static final String STREAM_START = " + return; + } + // Found an stream:stream tag... + if (!sessionCreated) { + sessionCreated = true; + MXParser parser = (MXParser) factory.newPullParser(); + parser.setInput(new StringReader(stanza)); + createSession(parser); + } else if (startedSASL && session.getStatus() == Session.STATUS_AUTHENTICATED) { + startedSASL = false; + saslSuccessful(); + } else if (waitingCompressionACK) { + waitingCompressionACK = false; + compressionSuccessful(); + } + return; + } + + // Create DOM object from received stanza + Element doc; + try { + doc = reader.read(new StringReader(stanza)).getRootElement(); + } catch (DocumentException e) { + if (stanza.equals("")) { + session.close(); + return; + } + // Throw the exception. This will close the connection + throw e; + } + if (doc == null) { + // No document found. + return; + } + String tag = doc.getName(); + if ("starttls".equals(tag)) { + // Negotiate TLS + if (negotiateTLS()) { + tlsNegotiated(); + } else { + connection.close(); + session = null; + } + } else if ("auth".equals(tag)) { + // User is trying to authenticate using SASL + startedSASL = true; + // Forward packet to the server + process(doc); + } else if ("compress".equals(tag)) { + // Client is trying to initiate compression + if (compressClient(doc)) { + // Compression was successful so open a new stream and offer + // resource binding and session establishment (to client sessions only) + waitingCompressionACK = true; + } + } else { + process(doc); + } + } + + protected void process(Element doc) { + if (doc == null) { + return; + } + + // Ensure that connection was secured if TLS was required + if (connection.getTlsPolicy() == Connection.TLSPolicy.required && + !connection.isSecure()) { + closeNeverSecuredConnection(); + return; + } + router.route(doc, session.getStreamID()); + } + + /** + * Tries to secure the connection using TLS. If the connection is secured then reset + * the parser to use the new secured reader. But if the connection failed to be secured + * then send a stanza and close the connection. + * + * @return true if the connection was secured. + */ + private boolean negotiateTLS() { + if (connection.getTlsPolicy() == Connection.TLSPolicy.disabled) { + // Set the not_authorized error + StreamError error = new StreamError(StreamError.Condition.not_authorized); + // Deliver stanza + connection.deliverRawText(error.toXML()); + // Close the underlying connection + connection.close(); + // Log a warning so that admins can track this case from the server side + Log.warn("TLS requested by initiator when TLS was never offered by server. " + + "Closing connection : " + connection); + return false; + } + // Client requested to secure the connection using TLS. Negotiate TLS. + try { + connection.startTLS(false, null); + } + catch (Exception e) { + Log.error("Error while negotiating TLS", e); + connection.deliverRawText(""); + connection.close(); + return false; + } + return true; + } + + /** + * TLS negotiation was successful so open a new stream and offer the new stream features. + * The new stream features will include available SASL mechanisms and specific features + * depending on the session type such as auth for Non-SASL authentication and register + * for in-band registration. + */ + private void tlsNegotiated() { + // Offer stream features including SASL Mechanisms + StringBuilder sb = new StringBuilder(620); + sb.append(geStreamHeader()); + sb.append(""); + // Include available SASL Mechanisms + sb.append(ConnectionManager.getInstance().getServerSurrogate().getSASLMechanisms(session)); + // Include specific features such as auth and register for client sessions + String specificFeatures = session.getAvailableStreamFeatures(); + if (specificFeatures != null) { + sb.append(specificFeatures); + } + sb.append(""); + connection.deliverRawText(sb.toString()); + } + + /** + * After SASL authentication was successful we should open a new stream and offer + * new stream features such as resource binding and session establishment. Notice that + * resource binding and session establishment should only be offered to clients (i.e. not + * to servers or external components) + */ + private void saslSuccessful() { + StringBuilder sb = new StringBuilder(420); + sb.append(geStreamHeader()); + sb.append(""); + + // Include specific features such as resource binding and session establishment + // for client sessions + String specificFeatures = session.getAvailableStreamFeatures(); + if (specificFeatures != null) { + sb.append(specificFeatures); + } + sb.append(""); + connection.deliverRawText(sb.toString()); + } + + /** + * Start using compression but first check if the connection can and should use compression. + * The connection will be closed if the requested method is not supported, if the connection + * is already using compression or if client requested to use compression but this feature + * is disabled. + * + * @param doc the element sent by the client requesting compression. Compression method is + * included. + * @return true if it was possible to use compression. + * @throws IOException if an error occurs while starting using compression. + */ + private boolean compressClient(Element doc) { + String error = null; + if (connection.getCompressionPolicy() == Connection.CompressionPolicy.disabled) { + // Client requested compression but this feature is disabled + error = ""; + // Log a warning so that admins can track this case from the server side + Log.warn("Client requested compression while compression is disabled. Closing " + + "connection : " + connection); + } else if (connection.isCompressed()) { + // Client requested compression but connection is already compressed + error = ""; + // Log a warning so that admins can track this case from the server side + Log.warn("Client requested compression and connection is already compressed. Closing " + + "connection : " + connection); + } else { + // Check that the requested method is supported + String method = doc.elementText("method"); + if (!"zlib".equals(method)) { + error = ""; + // Log a warning so that admins can track this case from the server side + Log.warn("Requested compression method is not supported: " + method + + ". Closing connection : " + connection); + } + } + + if (error != null) { + // Deliver stanza + connection.deliverRawText(error); + return false; + } else { + // Indicate client that he can proceed and compress the socket + connection.deliverRawText(""); + + // Start using compression + connection.startCompression(); + return true; + } + } + + /** + * After compression was successful we should open a new stream and offer + * new stream features such as resource binding and session establishment. Notice that + * resource binding and session establishment should only be offered to clients (i.e. not + * to servers or external components) + */ + private void compressionSuccessful() { + StringBuilder sb = new StringBuilder(340); + sb.append(geStreamHeader()); + sb.append(""); + // Include SASL mechanisms only if client has not been authenticated + if (session.getStatus() != Session.STATUS_AUTHENTICATED) { + // Include available SASL Mechanisms + sb.append(ConnectionManager.getInstance().getServerSurrogate().getSASLMechanisms( + session)); + } + // Include specific features such as resource binding and session establishment + // for client sessions + String specificFeatures = session.getAvailableStreamFeatures(); + if (specificFeatures != null) { + sb.append(specificFeatures); + } + sb.append(""); + connection.deliverRawText(sb.toString()); + } + + private String geStreamHeader() { + StringBuilder sb = new StringBuilder(200); + sb.append(""); + if (connection.isFlashClient()) { + sb.append(""); + return sb.toString(); + } + + /** + * Close the connection since TLS was mandatory and the entity never negotiated TLS. Before + * closing the connection a stream error will be sent to the entity. + */ + void closeNeverSecuredConnection() { + // Set the not_authorized error + StreamError error = new StreamError(StreamError.Condition.not_authorized); + // Deliver stanza + connection.deliverRawText(error.toXML()); + // Close the underlying connection + connection.close(); + // Log a warning so that admins can track this case from the server side + Log.warn("TLS was required by the server and connection was never secured. " + + "Closing connection : " + connection); + } + + /** + * Uses the XPP to grab the opening stream tag and create an active session + * object. The session to create will depend on the sent namespace. In all + * cases, the method obtains the opening stream tag, checks for errors, and + * either creates a session or returns an error and kills the connection. + * If the connection remains open, the XPP will be set to be ready for the + * first packet. A call to next() should result in an START_TAG state with + * the first packet in the stream. + */ + protected void createSession(XmlPullParser xpp) throws XmlPullParserException, IOException { + for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) { + eventType = xpp.next(); + } + + // Check that the TO attribute of the stream header matches the server name or a valid + // subdomain. If the value of the 'to' attribute is not valid then return a host-unknown + // error and close the underlying connection. + String host = xpp.getAttributeValue("", "to"); + if (validateHost() && isHostUnknown(host)) { + StringBuilder sb = new StringBuilder(250); + sb.append(""); + // Append stream header + sb.append(""); + // Set the host_unknown error + StreamError error = new StreamError(StreamError.Condition.host_unknown); + sb.append(error.toXML()); + // Deliver stanza + connection.deliverRawText(sb.toString()); + // Close the underlying connection + connection.close(); + // Log a warning so that admins can track this cases from the server side + Log.warn("Closing session due to incorrect hostname in stream header. Host: " + host + + ". Connection: " + connection); + } + + // Create the correct session based on the sent namespace. At this point the server + // may offer the client to secure the connection. If the client decides to secure + // the connection then a stanza should be received + else if (!createSession(xpp.getNamespace(null), serverName, xpp, connection)) { + // No session was created because of an invalid namespace prefix so answer a stream + // error and close the underlying connection + StringBuilder sb = new StringBuilder(250); + sb.append(""); + // Append stream header + sb.append(""); + // Include the bad-namespace-prefix in the response + StreamError error = new StreamError(StreamError.Condition.bad_namespace_prefix); + sb.append(error.toXML()); + connection.deliverRawText(sb.toString()); + // Close the underlying connection + connection.close(); + // Log a warning so that admins can track this cases from the server side + Log.warn("Closing session due to bad_namespace_prefix in stream header. Prefix: " + + xpp.getNamespace(null) + ". Connection: " + connection); + } + } + + private boolean isHostUnknown(String host) { + if (host == null) { + // Answer false since when using server dialback the stream header will not + // have a TO attribute + return false; + } + if (serverName.equals(host)) { + // requested host matched the server name + return false; + } + return true; + } + + /** + * Returns the stream namespace. (E.g. jabber:client, jabber:server, etc.). + * + * @return the stream namespace. + */ + abstract String getNamespace(); + + /** + * Returns true if the value of the 'to' attribute in the stream header should be + * validated. If the value of the 'to' attribute is not valid then a host-unknown error + * will be returned and the underlying connection will be closed. + * + * @return true if the value of the 'to' attribute in the initial stream header should be + * validated. + */ + abstract boolean validateHost(); + + /** + * Creates the appropriate {@link Session} subclass based on the specified namespace. + * + * @param namespace the namespace sent in the stream element. eg. jabber:client. + * @return the created session or null. + * @throws org.xmlpull.v1.XmlPullParserException + */ + abstract boolean createSession(String namespace, String serverName, XmlPullParser xpp, Connection connection) + throws XmlPullParserException; +} diff --git a/src/java/org/jivesoftware/multiplexer/net/XMLLightweightParser.java b/src/java/org/jivesoftware/multiplexer/net/XMLLightweightParser.java new file mode 100644 index 0000000..c8183a0 --- /dev/null +++ b/src/java/org/jivesoftware/multiplexer/net/XMLLightweightParser.java @@ -0,0 +1,270 @@ +/** + * $RCSfile$ + * $Revision: $ + * $Date: $ + * + * Copyright (C) 2006 Jive Software. All rights reserved. + * + * This software is published under the terms of the GNU Public License (GPL), + * a copy of which is included in this distribution. + */ + +package org.jivesoftware.multiplexer.net; + +import org.apache.mina.common.ByteBuffer; + +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +/** + * This is a Light-Weight XML Parser. + * It read data from a channel and collect data until data are available in + * the channel. + * When a message is complete you can retrieve messages invoking the method + * getMsgs() and you can invoke the method areThereMsgs() to know if at least + * an message is presents. + * + * @author Daniele Piras + * @author Gaston Dombiak + */ +class XMLLightweightParser { + // Chars that rappresent CDATA section start + protected static char[] CDATA_START = {'<', '!', '[', 'C', 'D', 'A', 'T', 'A', '['}; + // Chars that rappresent CDATA section end + protected static char[] CDATA_END = {']', ']', '>'}; + + // Buffer with all data retrieved + protected StringBuilder buffer = new StringBuilder(); + + // ---- INTERNAL STATUS ------- + // Initial status + protected static final int INIT = 0; + // Status used when the first tag name is retrieved + protected static final int HEAD = 2; + // Status used when robot is inside the xml and it looking for the tag conclusion + protected static final int INSIDE = 3; + // Status used when a '<' is found and try to find the conclusion tag. + protected static final int PRETAIL = 4; + // Status used when the ending tag is equal to the head tag + protected static final int TAIL = 5; + // Status used when robot is inside the main tag and found an '/' to check '/>'. + protected static final int VERIFY_CLOSE_TAG = 6; + // Status used when you are inside a parameter + protected static final int INSIDE_PARAM_VALUE = 7; + // Status used when you are inside a cdata section + protected static final int INSIDE_CDATA = 8; + + + // Current robot status + protected int status = XMLLightweightParser.INIT; + + // Index to looking for a CDATA section start or end. + protected int cdataOffset = 0; + + // Number of chars that machs with the head tag. If the tailCount is equal to + // the head length so a close tag is found. + protected int tailCount = 0; + // Indicate the starting point in the buffer for the next message. + protected int startLastMsg = 0; + // Flag used to discover tag in the form . + protected boolean insideRootTag = false; + // Object conteining the head tag + protected StringBuilder head = new StringBuilder(5); + // List with all finished messages found. + protected List msgs = new ArrayList(); + + protected boolean insideChildrenTag = false; + + ByteBuffer byteBuffer; + Charset encoder; + + public XMLLightweightParser(String charset) { + encoder = Charset.forName(charset); + } + + /* + * true if the parser has found some complete xml message. + */ + public boolean areThereMsgs() { + return (msgs.size() > 0); + } + + /* + * @return an array with all messages found + */ + public String[] getMsgs() { + String[] res = new String[msgs.size()]; + for (int i = 0; i < res.length; i++) { + res[i] = msgs.get(i); + } + msgs.clear(); + invalidateBuffer(); + return res; + } + + /* + * Method use to re-initialize the buffer + */ + protected void invalidateBuffer() { + if (buffer.length() > 0) { + String str = buffer.substring(startLastMsg); + buffer.delete(0, buffer.length()); + buffer.append(str); + buffer.trimToSize(); + } + startLastMsg = 0; + } + + + /* + * Method that add a message to the list and reinit parser. + */ + protected void foundMsg(String msg) { + // Add message to the complete message list + if (msg != null) { + msgs.add(msg); + } + // Move the position into the buffer + status = XMLLightweightParser.INIT; + tailCount = 0; + cdataOffset = 0; + head.setLength(0); + insideRootTag = false; + insideChildrenTag = false; + } + + /* + * Main reading method + */ + public void read(ByteBuffer byteBuffer) throws Exception { + int readByte = byteBuffer.remaining(); + + invalidateBuffer(); + CharBuffer charBuffer = encoder.decode(byteBuffer.buf()); + //charBuffer.flip(); + char[] buf = charBuffer.array(); + + buffer.append(buf); + // Robot. + char ch; + for (int i = 0; i < readByte; i++) { + //ch = rawByteBuffer[ i ]; + ch = buf[i]; + if (status == XMLLightweightParser.TAIL) { + // Looking for the close tag + if (ch == head.charAt(tailCount)) { + tailCount++; + if (tailCount == head.length()) { + // Close tag found! + // Calculate the correct start,end position of the message into the buffer + int end = buffer.length() - readByte + (i + 1); + String msg = buffer.substring(startLastMsg, end); + // Add message to the list + foundMsg(msg); + startLastMsg = end; + } + } else { + tailCount = 0; + status = XMLLightweightParser.INSIDE; + } + } else if (status == XMLLightweightParser.PRETAIL) { + if (ch == XMLLightweightParser.CDATA_START[cdataOffset]) { + cdataOffset++; + if (cdataOffset == XMLLightweightParser.CDATA_START.length) { + status = XMLLightweightParser.INSIDE_CDATA; + cdataOffset = 0; + continue; + } + } else { + cdataOffset = 0; + status = XMLLightweightParser.INSIDE; + } + if (ch == '/') { + status = XMLLightweightParser.TAIL; + } + } else if (status == XMLLightweightParser.VERIFY_CLOSE_TAG) { + if (ch == '>') { + // Found a tag in the form + int end = buffer.length() - readByte + (i + 1); + String msg = buffer.substring(startLastMsg, end); + // Add message to the list + foundMsg(msg); + startLastMsg = end; + } else { + status = XMLLightweightParser.INSIDE; + } + } else if (status == XMLLightweightParser.INSIDE_PARAM_VALUE) { + + if (ch == '"') { + status = XMLLightweightParser.INSIDE; + continue; + } + } else if (status == XMLLightweightParser.INSIDE_CDATA) { + if (ch == XMLLightweightParser.CDATA_END[cdataOffset]) { + cdataOffset++; + if (cdataOffset == XMLLightweightParser.CDATA_END.length) { + status = XMLLightweightParser.INSIDE; + cdataOffset = 0; + continue; + } + } else { + cdataOffset = 0; + } + } else if (status == XMLLightweightParser.INSIDE) { + if (ch == XMLLightweightParser.CDATA_START[cdataOffset]) { + cdataOffset++; + if (cdataOffset == XMLLightweightParser.CDATA_START.length) { + status = XMLLightweightParser.INSIDE_CDATA; + cdataOffset = 0; + continue; + } + } else { + cdataOffset = 0; + } + if (ch == '"') { + status = XMLLightweightParser.INSIDE_PARAM_VALUE; + } else if (ch == '>') { + if (insideRootTag && + ("stream:stream>".equals(head.toString()) || ("?xml>".equals(head.toString())))) { + // Found closing stream:stream + int end = buffer.length() - readByte + (i + 1); + String msg = buffer.substring(startLastMsg, end); + foundMsg(msg); + startLastMsg = end; + } + insideRootTag = false; + } else if (ch == '<') { + status = XMLLightweightParser.PRETAIL; + insideChildrenTag = true; + } else if (ch == '/' && insideRootTag && !insideChildrenTag) { + status = XMLLightweightParser.VERIFY_CLOSE_TAG; + } + } else if (status == XMLLightweightParser.HEAD) { + if (ch == ' ' || ch == '>') { + // Append > to head to facility the research of + head.append(">"); + status = XMLLightweightParser.INSIDE; + insideRootTag = true; + insideChildrenTag = false; + continue; + } + head.append(ch); + + } else if (status == XMLLightweightParser.INIT) { + if (ch != ' ' && ch != '\r' && ch != '\n' && ch != '<') { + invalidateBuffer(); + return; + } + if (ch == '<') { + status = XMLLightweightParser.HEAD; + } + } + } + if (head.length() > 0 && "/stream:stream>".equals(head.toString())) { + // Found closing stream:stream + foundMsg(""); + } + } +} diff --git a/src/java/org/jivesoftware/multiplexer/net/XMPPCodecFactory.java b/src/java/org/jivesoftware/multiplexer/net/XMPPCodecFactory.java new file mode 100644 index 0000000..4c9117d --- /dev/null +++ b/src/java/org/jivesoftware/multiplexer/net/XMPPCodecFactory.java @@ -0,0 +1,40 @@ +/** + * $RCSfile$ + * $Revision: $ + * $Date: $ + * + * Copyright (C) 2006 Jive Software. All rights reserved. + * + * This software is published under the terms of the GNU Public License (GPL), + * a copy of which is included in this distribution. + */ + +package org.jivesoftware.multiplexer.net; + +import org.apache.mina.filter.codec.ProtocolCodecFactory; +import org.apache.mina.filter.codec.ProtocolDecoder; +import org.apache.mina.filter.codec.ProtocolEncoder; + +/** + * Factory that specifies the encode and decoder to use for parsing XMPP stanzas. + * + * @author Gaston Dombiak + */ +public class XMPPCodecFactory implements ProtocolCodecFactory { + + private final XMPPEncoder encoder; + private final XMPPDecoder decoder; + + public XMPPCodecFactory() { + encoder = new XMPPEncoder(); + decoder = new XMPPDecoder(); + } + + public ProtocolEncoder getEncoder() throws Exception { + return encoder; + } + + public ProtocolDecoder getDecoder() throws Exception { + return decoder; + } +} diff --git a/src/java/org/jivesoftware/multiplexer/net/XMPPDecoder.java b/src/java/org/jivesoftware/multiplexer/net/XMPPDecoder.java new file mode 100644 index 0000000..8bb09c1 --- /dev/null +++ b/src/java/org/jivesoftware/multiplexer/net/XMPPDecoder.java @@ -0,0 +1,45 @@ +/** + * $RCSfile$ + * $Revision: $ + * $Date: $ + * + * Copyright (C) 2006 Jive Software. All rights reserved. + * + * This software is published under the terms of the GNU Public License (GPL), + * a copy of which is included in this distribution. + */ + +package org.jivesoftware.multiplexer.net; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.codec.CumulativeProtocolDecoder; +import org.apache.mina.filter.codec.ProtocolDecoderOutput; + +/** + * Decoder class that parses ByteBuffers and generates XML stanzas. Generated + * stanzas are then passed to the next filters. + * + * @author Gaston Dombiak + */ +public class XMPPDecoder extends CumulativeProtocolDecoder { + + protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) + throws Exception { + if (in.remaining() < 4) { + return false; + } + // Get the XML light parser from the IoSession + XMLLightweightParser parser = + (XMLLightweightParser) session.getAttribute(ConnectionHandler.XML_PARSER); + // Parse as many stanzas as possible from the received data + parser.read(in); + + if (parser.areThereMsgs()) { + for (String stanza : parser.getMsgs()) { + out.write(stanza); + } + } + return true; + } +} diff --git a/src/java/org/jivesoftware/multiplexer/net/XMPPEncoder.java b/src/java/org/jivesoftware/multiplexer/net/XMPPEncoder.java new file mode 100644 index 0000000..b32f8eb --- /dev/null +++ b/src/java/org/jivesoftware/multiplexer/net/XMPPEncoder.java @@ -0,0 +1,32 @@ +/** + * $RCSfile$ + * $Revision: $ + * $Date: $ + * + * Copyright (C) 2006 Jive Software. All rights reserved. + * + * This software is published under the terms of the GNU Public License (GPL), + * a copy of which is included in this distribution. + */ + +package org.jivesoftware.multiplexer.net; + +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.codec.ProtocolEncoderAdapter; +import org.apache.mina.filter.codec.ProtocolEncoderOutput; + +/** + * Encoder that does nothing. We are already writing ByteBuffers so there is no need + * to encode them.

+ * + * This class exists as a counterpart of {@link XMPPDecoder}. Unlike that class this class does nothing. + * + * @author Gaston Dombiak + */ +public class XMPPEncoder extends ProtocolEncoderAdapter { + + public void encode(IoSession session, Object message, ProtocolEncoderOutput out) + throws Exception { + // Ignore. Do nothing. Content being sent is already a bytebuffer (of strings) + } +} diff --git a/test/org/jivesoftware/multiplexer/net/XMLLightweightParserTest.java b/test/org/jivesoftware/multiplexer/net/XMLLightweightParserTest.java new file mode 100644 index 0000000..ba2862f --- /dev/null +++ b/test/org/jivesoftware/multiplexer/net/XMLLightweightParserTest.java @@ -0,0 +1,154 @@ +/** + * $RCSfile$ + * $Revision: $ + * $Date: $ + * + * Copyright (C) 2006 Jive Software. All rights reserved. + * + * This software is published under the terms of the GNU Public License (GPL), + * a copy of which is included in this distribution. + */ + +package org.jivesoftware.multiplexer.net; + +import junit.framework.TestCase; +import org.apache.mina.common.ByteBuffer; +import org.dom4j.Element; +import org.dom4j.io.SAXReader; + +import java.io.StringReader; +import java.nio.charset.Charset; + +/** + * Simple test of XMLLightweightParser. + * + * @author Gaston Dombiak + */ +public class XMLLightweightParserTest extends TestCase { + + private final static String CHARSET = "UTF-8"; + + private XMLLightweightParser parser; + private ByteBuffer in; + + public void testHeader() throws Exception { + String msg1 = + ""; + in.putString(msg1, Charset.forName(CHARSET).newEncoder()); + in.flip(); + // Fill parser with byte buffer content and parse it + parser.read(in); + // Make verifications + assertTrue("Stream header is not being correctly parsed", parser.areThereMsgs()); + assertEquals("Wrong stanza was parsed", msg1, parser.getMsgs()[0]); + } + + public void testHeaderWithXMLVersion() throws Exception { + String msg1 = ""; + String msg2 = ""; + in.putString(msg1 + msg2, Charset.forName(CHARSET).newEncoder()); + in.flip(); + // Fill parser with byte buffer content and parse it + parser.read(in); + // Make verifications + assertTrue("Stream header is not being correctly parsed", parser.areThereMsgs()); + String[] values = parser.getMsgs(); + assertEquals("Wrong number of parsed stanzas", 2, values.length); + assertEquals("Wrong stanza was parsed", msg1, values[0]); + assertEquals("Wrong stanza was parsed", msg2, values[1]); + } + + public void testStanzas() throws Exception { + String msg1 = ""; + String msg2 = ""; + String msg3 = ""; + String msg4 = ""; + String msg5 = ""; + String msg6 = ""; + in.putString(msg1, Charset.forName(CHARSET).newEncoder()); + in.putString(msg2, Charset.forName(CHARSET).newEncoder()); + in.putString(msg3, Charset.forName(CHARSET).newEncoder()); + in.putString(msg4, Charset.forName(CHARSET).newEncoder()); + in.putString(msg5, Charset.forName(CHARSET).newEncoder()); + in.putString(msg6, Charset.forName(CHARSET).newEncoder()); + in.flip(); + // Fill parser with byte buffer content and parse it + parser.read(in); + // Make verifications + assertTrue("Stream header is not being correctly parsed", parser.areThereMsgs()); + String[] values = parser.getMsgs(); + assertEquals("Wrong number of parsed stanzas", 6, values.length); + assertEquals("Wrong stanza was parsed", msg1, values[0]); + assertEquals("Wrong stanza was parsed", msg2, values[1]); + assertEquals("Wrong stanza was parsed", msg3, values[2]); + assertEquals("Wrong stanza was parsed", msg4, values[3]); + assertEquals("Wrong stanza was parsed", msg5, values[4]); + assertEquals("Wrong stanza was parsed", msg6, values[5]); + } + + public void testCompleteStanzas() throws Exception { + String msg1 = ""; + String msg2 = ""; + String msg3 = ""; + String msg4 = ""; + String msg5 = ""; + String msg6 = ""; + String msg7 = ""; + in.putString(msg1, Charset.forName(CHARSET).newEncoder()); + in.putString(msg2, Charset.forName(CHARSET).newEncoder()); + in.putString(msg3, Charset.forName(CHARSET).newEncoder()); + in.putString(msg4, Charset.forName(CHARSET).newEncoder()); + in.putString(msg5, Charset.forName(CHARSET).newEncoder()); + in.putString(msg6, Charset.forName(CHARSET).newEncoder()); + in.putString(msg7, Charset.forName(CHARSET).newEncoder()); + in.flip(); + // Fill parser with byte buffer content and parse it + parser.read(in); + // Make verifications + assertTrue("Stream header is not being correctly parsed", parser.areThereMsgs()); + String[] values = parser.getMsgs(); + assertEquals("Wrong number of parsed stanzas", 7, values.length); + assertEquals("Wrong stanza was parsed", msg1, values[0]); + assertEquals("Wrong stanza was parsed", msg2, values[1]); + assertEquals("Wrong stanza was parsed", msg3, values[2]); + assertEquals("Wrong stanza was parsed", msg4, values[3]); + assertEquals("Wrong stanza was parsed", msg5, values[4]); + assertEquals("Wrong stanza was parsed", msg6, values[5]); + assertEquals("Wrong stanza was parsed", msg7, values[6]); + } + + public void testIQ() throws Exception { + String iq = + ""; + in.putString(iq, Charset.forName(CHARSET).newEncoder()); + in.flip(); + // Fill parser with byte buffer content and parse it + parser.read(in); + // Make verifications + assertTrue("Stream header is not being correctly parsed", parser.areThereMsgs()); + String parsedIQ = parser.getMsgs()[0]; + assertEquals("Wrong stanza was parsed", iq, parsedIQ); + + SAXReader reader = new SAXReader(); + reader.setEncoding(CHARSET); + Element doc = reader.read(new StringReader(parsedIQ)).getRootElement(); + assertNotNull("Failed to parse IQ stanza", doc); + + } + + protected void setUp() throws Exception { + super.setUp(); + // Create parser + parser = new XMLLightweightParser(CHARSET); + // Crete byte buffer and append text + in = ByteBuffer.allocate(4096); + in.setAutoExpand(true); + } + + + protected void tearDown() throws Exception { + super.tearDown(); + // Release byte buffer + in.release(); + } +}