Newer
Older
Openfire-connectionmanager / src / java / org / jivesoftware / multiplexer / net / BlockingReadingMode.java
@Gaston Dombiak Gaston Dombiak on 18 Jun 2006 10 KB Initial version. JM-666
/**
 * $RCSfile$
 * $Revision: $
 * $Date: $
 *
 * Copyright (C) 2006 Jive Software. All rights reserved.
 *
 * This software is published under the terms of the GNU Public License (GPL),
 * a copy of which is included in this distribution.
 */

package org.jivesoftware.multiplexer.net;

import com.jcraft.jzlib.JZlib;
import com.jcraft.jzlib.ZInputStream;
import org.dom4j.Element;
import org.jivesoftware.multiplexer.Connection;
import org.jivesoftware.multiplexer.Session;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.net.SocketException;
import java.nio.channels.AsynchronousCloseException;

/**
 * Process incoming packets using a blocking model. Once a session has been created
 * an endless loop is used to process incoming packets. Packets are processed
 * sequentially.
 *
 * @author Gaston Dombiak
 */
class BlockingReadingMode extends SocketReadingMode {

    private Status saslStatus = Status.waitingServer;

    public BlockingReadingMode(Socket socket, SocketReader socketReader) {
        super(socket, socketReader);
    }

    /**
     * A dedicated thread loop for reading the stream and sending incoming
     * packets to the appropriate router.
     */
    public void run() {
        try {
            socketReader.reader.getXPPParser().setInput(new InputStreamReader(socket.getInputStream(),
                    CHARSET));

            // Read in the opening tag and prepare for packet stream
            try {
                socketReader.createSession();
            }
            catch (IOException e) {
                Log.debug("Error creating session", e);
                throw e;
            }

            // Read the packet stream until it ends
            if (socketReader.session != null) {
                readStream();
            }

        }
        catch (EOFException eof) {
            // Normal disconnect
        }
        catch (SocketException se) {
            // The socket was closed. The server may close the connection for several
            // reasons (e.g. user requested to remove his account). Do nothing here.
        }
        catch (AsynchronousCloseException ace) {
            // The socket was closed.
        }
        catch (XmlPullParserException ie) {
            // It is normal for clients to abruptly cut a connection
            // rather than closing the stream document. Since this is
            // normal behavior, we won't log it as an error.
            // Log.error(LocaleUtils.getLocalizedString("admin.disconnect"),ie);
        }
        catch (Exception e) {
            Log.warn(LocaleUtils.getLocalizedString("admin.error.stream") + ". Connection: " +
                    socketReader.connection, e);
        }
        finally {
            if (socketReader.session != null) {
                if (Log.isDebugEnabled()) {
                    Log.debug("Logging off " + socketReader.connection);
                }
                try {
                    socketReader.session.close();
                }
                catch (Exception e) {
                    Log.warn(LocaleUtils.getLocalizedString("admin.error.connection")
                            + "\n" + socket.toString());
                }
            }
            else {
                // Close and release the created connection
                socketReader.connection.close();
                Log.error(LocaleUtils.getLocalizedString("admin.error.connection")
                        + "\n" + socket.toString());
            }
            socketReader.shutdown();
        }
    }

    /**
     * Read the incoming stream until it ends.
     */
    private void readStream() throws Exception {
        socketReader.open = true;
        while (socketReader.open) {
            Element doc = socketReader.reader.parseDocument().getRootElement();
            if (doc == null) {
                // Stop reading the stream since the client has sent an end of
                // stream element and probably closed the connection.
                return;
            }
            String tag = doc.getName();
            if ("starttls".equals(tag)) {
                // Negotiate TLS
                if (negotiateTLS()) {
                    tlsNegotiated();
                }
                else {
                    socketReader.open = false;
                }
            }
            else if ("auth".equals(tag)) {
                // User is trying to authenticate using SASL
                if (authenticateClient(doc)) {
                    // SASL authentication was successful so open a new stream and offer
                    // resource binding and session establishment (to client sessions only)
                    saslSuccessful();
                }
            }
            else if ("compress".equals(tag))
            {
                // Client is trying to initiate compression
                if (compressClient(doc)) {
                    // Compression was successful so open a new stream and offer
                    // resource binding and session establishment (to client sessions only)
                    compressionSuccessful();
                }
            }
            else {
                socketReader.process(doc);
            }
        }
    }

    protected void tlsNegotiated() throws XmlPullParserException, IOException {
        XmlPullParser xpp = socketReader.reader.getXPPParser();
        // Reset the parser to use the new reader
        xpp.setInput(new InputStreamReader(
                socketReader.connection.getTLSStreamHandler().getInputStream(), CHARSET));
        // Skip new stream element
        for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) {
            eventType = xpp.next();
        }
        super.tlsNegotiated();
    }

    protected boolean authenticateClient(Element doc) throws Exception {
        // Ensure that connection was secured if TLS was required
        if (socketReader.connection.getTlsPolicy() == Connection.TLSPolicy.required &&
                !socketReader.connection.isSecure()) {
            socketReader.closeNeverSecuredConnection();
            return false;
        }

        boolean isComplete = false;
        boolean success = false;
        while (!isComplete && !socketReader.connection.isClosed()) {
            // Forward stanza to the server
            socketReader.process(doc);
            // Wait 5 minutes to get a response from the server
            synchronized (this) {
                wait(5 * 60 * 1000);
            }
            // Raise an error if no response from the server was received
            if (saslStatus == Status.waitingServer) {
                throw new Exception("No answer was received from the server");
            }

            // If client was challenged then wait for client answer
            if (saslStatus == Status.needResponse) {
                doc = socketReader.reader.parseDocument().getRootElement();
                if (doc == null) {
                    // Nothing was read because the connection was closed or dropped
                    isComplete = true;
                }
            }
            else {
                success = socketReader.session.getStatus() == Session.STATUS_AUTHENTICATED;
                isComplete = true;
            }
        }
        return success;
    }

    /**
     * Notification message indicating that a client needs to response to a SASL
     * challenge.
     */
    void clientChallenged() {
        // Set that client needs to send response
        saslStatus = Status.needResponse;
        synchronized (this) {
            notify();
        }
    }

    /**
     * Notification message indicating that sasl authentication has finished. The
     * <tt>success</tt> parameter indicates whether authentication was successful or not.
     *
     * @param success true when authentication was successful.
     */
    void clientAuthenticated(boolean success) {
        // Set result of authentication process
        saslStatus = success ? Status.authenticated : Status.failed;
        synchronized (this) {
            notify();
        }
    }

    protected void saslSuccessful() throws XmlPullParserException, IOException {
        MXParser xpp = socketReader.reader.getXPPParser();
        // Reset the parser since a new stream header has been sent from the client
        xpp.resetInput();

        // Skip the opening stream sent by the client
        for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) {
            eventType = xpp.next();
        }
        super.saslSuccessful();
    }

    protected boolean compressClient(Element doc) throws XmlPullParserException, IOException {
        boolean answer = super.compressClient(doc);
        if (answer) {
            XmlPullParser xpp = socketReader.reader.getXPPParser();
            // Reset the parser since a new stream header has been sent from the client
            if (socketReader.connection.getTLSStreamHandler() == null) {
                ZInputStream in = new ZInputStream(socket.getInputStream());
                in.setFlushMode(JZlib.Z_PARTIAL_FLUSH);
                xpp.setInput(new InputStreamReader(in, CHARSET));
            }
            else {
                ZInputStream in = new ZInputStream(
                        socketReader.connection.getTLSStreamHandler().getInputStream());
                in.setFlushMode(JZlib.Z_PARTIAL_FLUSH);
                xpp.setInput(new InputStreamReader(in, CHARSET));
            }
        }
        return answer;
    }

    protected void compressionSuccessful() throws XmlPullParserException, IOException {
        XmlPullParser xpp = socketReader.reader.getXPPParser();
        // Skip the opening stream sent by the client
        for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) {
            eventType = xpp.next();
        }
        super.compressionSuccessful();
    }

    public enum Status {
        /**
         * Server needs to process sasl stanza and send its answer to the client.
         */
        waitingServer,
        /**
         * Entity needs to respond last challenge. Session is still negotiating
         * SASL authentication.
         */
        needResponse,
        /**
         * SASL negotiation has failed. The entity may retry a few times before the connection
         * is closed.
         */
        failed,
        /**
         * SASL negotiation has been successful.
         */
        authenticated
    }

}