/** * $RCSfile$ * $Revision: $ * $Date: $ * * Copyright (C) 2006 Jive Software. All rights reserved. * * This software is published under the terms of the GNU Public License (GPL), * a copy of which is included in this distribution. */ package org.jivesoftware.multiplexer.net; import com.jcraft.jzlib.JZlib; import com.jcraft.jzlib.ZInputStream; import org.dom4j.Element; import org.jivesoftware.multiplexer.Connection; import org.jivesoftware.multiplexer.Session; import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.Log; import org.xmlpull.v1.XmlPullParser; import org.xmlpull.v1.XmlPullParserException; import java.io.EOFException; import java.io.IOException; import java.io.InputStreamReader; import java.net.Socket; import java.net.SocketException; import java.nio.channels.AsynchronousCloseException; /** * Process incoming packets using a blocking model. Once a session has been created * an endless loop is used to process incoming packets. Packets are processed * sequentially. * * @author Gaston Dombiak */ class BlockingReadingMode extends SocketReadingMode { private Status saslStatus = Status.waitingServer; public BlockingReadingMode(Socket socket, SocketReader socketReader) { super(socket, socketReader); } /** * A dedicated thread loop for reading the stream and sending incoming * packets to the appropriate router. */ public void run() { try { socketReader.reader.getXPPParser().setInput(new InputStreamReader(socket.getInputStream(), CHARSET)); // Read in the opening tag and prepare for packet stream try { socketReader.createSession(); } catch (IOException e) { Log.debug("Error creating session", e); throw e; } // Read the packet stream until it ends if (socketReader.session != null) { readStream(); } } catch (EOFException eof) { // Normal disconnect } catch (SocketException se) { // The socket was closed. The server may close the connection for several // reasons (e.g. user requested to remove his account). Do nothing here. } catch (AsynchronousCloseException ace) { // The socket was closed. } catch (XmlPullParserException ie) { // It is normal for clients to abruptly cut a connection // rather than closing the stream document. Since this is // normal behavior, we won't log it as an error. // Log.error(LocaleUtils.getLocalizedString("admin.disconnect"),ie); } catch (Exception e) { Log.warn(LocaleUtils.getLocalizedString("admin.error.stream") + ". Connection: " + socketReader.connection, e); } finally { if (socketReader.session != null) { if (Log.isDebugEnabled()) { Log.debug("Logging off " + socketReader.connection); } try { socketReader.session.close(); } catch (Exception e) { Log.warn(LocaleUtils.getLocalizedString("admin.error.connection") + "\n" + socket.toString()); } } else { // Close and release the created connection socketReader.connection.close(); Log.error(LocaleUtils.getLocalizedString("admin.error.connection") + "\n" + socket.toString()); } socketReader.shutdown(); } } /** * Read the incoming stream until it ends. */ private void readStream() throws Exception { socketReader.open = true; while (socketReader.open) { Element doc = socketReader.reader.parseDocument().getRootElement(); if (doc == null) { // Stop reading the stream since the client has sent an end of // stream element and probably closed the connection. return; } String tag = doc.getName(); if ("starttls".equals(tag)) { // Negotiate TLS if (negotiateTLS()) { tlsNegotiated(); } else { socketReader.open = false; } } else if ("auth".equals(tag)) { // User is trying to authenticate using SASL if (authenticateClient(doc)) { // SASL authentication was successful so open a new stream and offer // resource binding and session establishment (to client sessions only) saslSuccessful(); } } else if ("compress".equals(tag)) { // Client is trying to initiate compression if (compressClient(doc)) { // Compression was successful so open a new stream and offer // resource binding and session establishment (to client sessions only) compressionSuccessful(); } } else { socketReader.process(doc); } } } protected void tlsNegotiated() throws XmlPullParserException, IOException { XmlPullParser xpp = socketReader.reader.getXPPParser(); // Reset the parser to use the new reader xpp.setInput(new InputStreamReader( socketReader.connection.getTLSStreamHandler().getInputStream(), CHARSET)); // Skip new stream element for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) { eventType = xpp.next(); } super.tlsNegotiated(); } protected boolean authenticateClient(Element doc) throws Exception { // Ensure that connection was secured if TLS was required if (socketReader.connection.getTlsPolicy() == Connection.TLSPolicy.required && !socketReader.connection.isSecure()) { socketReader.closeNeverSecuredConnection(); return false; } boolean isComplete = false; boolean success = false; while (!isComplete && !socketReader.connection.isClosed()) { // Forward stanza to the server socketReader.process(doc); // Wait 5 minutes to get a response from the server synchronized (this) { wait(5 * 60 * 1000); } // Raise an error if no response from the server was received if (saslStatus == Status.waitingServer) { throw new Exception("No answer was received from the server"); } // If client was challenged then wait for client answer if (saslStatus == Status.needResponse) { doc = socketReader.reader.parseDocument().getRootElement(); if (doc == null) { // Nothing was read because the connection was closed or dropped isComplete = true; } } else { success = socketReader.session.getStatus() == Session.STATUS_AUTHENTICATED; isComplete = true; } } return success; } /** * Notification message indicating that a client needs to response to a SASL * challenge. */ void clientChallenged() { // Set that client needs to send response saslStatus = Status.needResponse; synchronized (this) { notify(); } } /** * Notification message indicating that sasl authentication has finished. The * <tt>success</tt> parameter indicates whether authentication was successful or not. * * @param success true when authentication was successful. */ void clientAuthenticated(boolean success) { // Set result of authentication process saslStatus = success ? Status.authenticated : Status.failed; synchronized (this) { notify(); } } protected void saslSuccessful() throws XmlPullParserException, IOException { MXParser xpp = socketReader.reader.getXPPParser(); // Reset the parser since a new stream header has been sent from the client xpp.resetInput(); // Skip the opening stream sent by the client for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) { eventType = xpp.next(); } super.saslSuccessful(); } protected boolean compressClient(Element doc) throws XmlPullParserException, IOException { boolean answer = super.compressClient(doc); if (answer) { XmlPullParser xpp = socketReader.reader.getXPPParser(); // Reset the parser since a new stream header has been sent from the client if (socketReader.connection.getTLSStreamHandler() == null) { ZInputStream in = new ZInputStream(socket.getInputStream()); in.setFlushMode(JZlib.Z_PARTIAL_FLUSH); xpp.setInput(new InputStreamReader(in, CHARSET)); } else { ZInputStream in = new ZInputStream( socketReader.connection.getTLSStreamHandler().getInputStream()); in.setFlushMode(JZlib.Z_PARTIAL_FLUSH); xpp.setInput(new InputStreamReader(in, CHARSET)); } } return answer; } protected void compressionSuccessful() throws XmlPullParserException, IOException { XmlPullParser xpp = socketReader.reader.getXPPParser(); // Skip the opening stream sent by the client for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) { eventType = xpp.next(); } super.compressionSuccessful(); } public enum Status { /** * Server needs to process sasl stanza and send its answer to the client. */ waitingServer, /** * Entity needs to respond last challenge. Session is still negotiating * SASL authentication. */ needResponse, /** * SASL negotiation has failed. The entity may retry a few times before the connection * is closed. */ failed, /** * SASL negotiation has been successful. */ authenticated } }