/**
* $RCSfile$
* $Revision: 3187 $
* $Date: 2005-12-11 13:34:34 -0300 (Sun, 11 Dec 2005) $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.multiplexer.net;
import org.dom4j.Element;
import org.dom4j.io.XMPPPacketReader;
import org.jivesoftware.multiplexer.*;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.StringUtils;
import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException;
import org.xmlpull.v1.XmlPullParserFactory;
import java.io.IOException;
import java.net.Socket;
/**
* A SocketReader creates the appropriate {@link Session} based on the defined namespace in the
* stream element and will then keep reading and routing the received packets.<p>
*
* This class was copied from Wildfire. PacketInterceptors were removed. Session concept was
* removed.
*
* @author Gaston Dombiak
*/
public abstract class SocketReader implements Runnable, SocketStatistic {
/**
* The utf-8 charset for decoding and encoding Jabber packet streams.
*/
private static String CHARSET = "UTF-8";
/**
* Reuse the same factory for all the connections.
*/
private static XmlPullParserFactory factory = null;
/**
* Session associated with the socket reader.
*/
protected Session session;
/**
* Reference to the physical connection.
*/
protected SocketConnection connection;
/**
* Server name for which we are attending clients.
*/
protected String serverName;
/**
* Router used to route incoming packets to the correct channels.
*/
private PacketRouter router;
/**
* Specifies whether the socket is using blocking or non-blocking connections.
*/
private SocketReadingMode readingMode;
XMPPPacketReader reader = null;
protected boolean open;
static {
try {
factory = XmlPullParserFactory.newInstance(MXParser.class.getName(), null);
}
catch (XmlPullParserException e) {
Log.error("Error creating a parser factory", e);
}
}
/**
* Creates a dedicated reader for a socket.
*
* @param router the router for sending packets that were read.
* @param serverName the name of the server this socket is working for.
* @param socket the socket to read from.
* @param connection the connection being read.
* @param useBlockingMode true means that the server will use a thread per connection.
*/
public SocketReader(PacketRouter router, String serverName,
Socket socket, SocketConnection connection, boolean useBlockingMode) {
this.serverName = serverName;
this.router = router;
this.connection = connection;
connection.setSocketStatistic(this);
// Reader is associated with a new XMPPPacketReader
reader = new XMPPPacketReader();
reader.setXPPFactory(factory);
// Set the blocking reading mode to use
if (useBlockingMode) {
readingMode = new BlockingReadingMode(socket, this);
}
else {
//TODO readingMode = new NonBlockingReadingMode(socket, this);
}
}
/**
* A dedicated thread loop for reading the stream and sending incoming
* packets to the appropriate router.
*/
public void run() {
readingMode.run();
}
/**
* Notification message indicating that a client needs to response to a SASL
* challenge.
*/
public void clientChallenged() {
readingMode.clientChallenged();
}
/**
* Notification message indicating that sasl authentication has finished. The
* <tt>success</tt> parameter indicates whether authentication was successful or not.
*
* @param success true when authentication was successful.
*/
public void clientAuthenticated(boolean success) {
readingMode.clientAuthenticated(success);
}
protected void process(Element doc) throws Exception {
if (doc == null) {
return;
}
// Ensure that connection was secured if TLS was required
if (connection.getTlsPolicy() == Connection.TLSPolicy.required &&
!connection.isSecure()) {
closeNeverSecuredConnection();
return;
}
router.route(doc, session.getStreamID());
}
public long getLastActive() {
return reader.getLastActive();
}
/**
* Returns a name that identifies the type of reader and the unique instance.
*
* @return a name that identifies the type of reader and the unique instance.
*/
abstract String getName();
/**
* Close the connection since TLS was mandatory and the entity never negotiated TLS. Before
* closing the connection a stream error will be sent to the entity.
*/
void closeNeverSecuredConnection() {
// Set the not_authorized error
StreamError error = new StreamError(StreamError.Condition.not_authorized);
// Deliver stanza
connection.deliverRawText(error.toXML());
// Close the underlying connection
connection.close();
// Log a warning so that admins can track this case from the server side
Log.warn("TLS was required by the server and connection was never secured. " +
"Closing connection : " + connection);
}
/**
* Uses the XPP to grab the opening stream tag and create an active session
* object. The session to create will depend on the sent namespace. In all
* cases, the method obtains the opening stream tag, checks for errors, and
* either creates a session or returns an error and kills the connection.
* If the connection remains open, the XPP will be set to be ready for the
* first packet. A call to next() should result in an START_TAG state with
* the first packet in the stream.
*/
protected void createSession() throws XmlPullParserException, IOException {
XmlPullParser xpp = reader.getXPPParser();
for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) {
eventType = xpp.next();
}
// Check that the TO attribute of the stream header matches the server name or a valid
// subdomain. If the value of the 'to' attribute is not valid then return a host-unknown
// error and close the underlying connection.
String host = reader.getXPPParser().getAttributeValue("", "to");
if (validateHost() && isHostUnknown(host)) {
StringBuilder sb = new StringBuilder(250);
sb.append("<?xml version='1.0' encoding='");
sb.append(CHARSET);
sb.append("'?>");
// Append stream header
sb.append("<stream:stream ");
sb.append("from=\"").append(serverName).append("\" ");
sb.append("id=\"").append(StringUtils.randomString(5)).append("\" ");
sb.append("xmlns=\"").append(xpp.getNamespace(null)).append("\" ");
sb.append("xmlns:stream=\"").append(xpp.getNamespace("stream")).append("\" ");
sb.append("version=\"1.0\">");
// Set the host_unknown error
StreamError error = new StreamError(StreamError.Condition.host_unknown);
sb.append(error.toXML());
// Deliver stanza
connection.deliverRawText(sb.toString());
// Close the underlying connection
connection.close();
// Log a warning so that admins can track this cases from the server side
Log.warn("Closing session due to incorrect hostname in stream header. Host: " + host +
". Connection: " + connection);
}
// Create the correct session based on the sent namespace. At this point the server
// may offer the client to secure the connection. If the client decides to secure
// the connection then a <starttls> stanza should be received
else if (!createSession(xpp.getNamespace(null))) {
// No session was created because of an invalid namespace prefix so answer a stream
// error and close the underlying connection
StringBuilder sb = new StringBuilder(250);
sb.append("<?xml version='1.0' encoding='");
sb.append(CHARSET);
sb.append("'?>");
// Append stream header
sb.append("<stream:stream ");
sb.append("from=\"").append(serverName).append("\" ");
sb.append("id=\"").append(StringUtils.randomString(5)).append("\" ");
sb.append("xmlns=\"").append(xpp.getNamespace(null)).append("\" ");
sb.append("xmlns:stream=\"").append(xpp.getNamespace("stream")).append("\" ");
sb.append("version=\"1.0\">");
// Include the bad-namespace-prefix in the response
StreamError error = new StreamError(StreamError.Condition.bad_namespace_prefix);
sb.append(error.toXML());
connection.deliverRawText(sb.toString());
// Close the underlying connection
connection.close();
// Log a warning so that admins can track this cases from the server side
Log.warn("Closing session due to bad_namespace_prefix in stream header. Prefix: " +
xpp.getNamespace(null) + ". Connection: " + connection);
}
}
private boolean isHostUnknown(String host) {
if (host == null) {
// Answer false since when using server dialback the stream header will not
// have a TO attribute
return false;
}
if (serverName.equals(host)) {
// requested host matched the server name
return false;
}
return true;
}
/**
* Returns the stream namespace. (E.g. jabber:client, jabber:server, etc.).
*
* @return the stream namespace.
*/
abstract String getNamespace();
/**
* Returns true if the value of the 'to' attribute in the stream header should be
* validated. If the value of the 'to' attribute is not valid then a host-unknown error
* will be returned and the underlying connection will be closed.
*
* @return true if the value of the 'to' attribute in the initial stream header should be
* validated.
*/
abstract boolean validateHost();
/**
* Notification message indicating that the SocketReader is shutting down. The thread will
* stop reading and processing new requests. Subclasses may want to redefine this message
* for releasing any resource they might need.
*/
protected void shutdown() {
}
/**
* Creates the appropriate {@link Session} subclass based on the specified namespace.
*
* @param namespace the namespace sent in the stream element. eg. jabber:client.
* @return the created session or null.
* @throws XmlPullParserException
* @throws IOException
*/
abstract boolean createSession(String namespace) throws XmlPullParserException, IOException;
}