Newer
Older
Openfire-connectionmanager / src / java / org / jivesoftware / multiplexer / ServerPacketReader.java
/**
 * $RCSfile$
 * $Revision: $
 * $Date: $
 *
 * Copyright (C) 2006 Jive Software. All rights reserved.
 *
 * This software is published under the terms of the GNU Public License (GPL),
 * a copy of which is included in this distribution.
 */

package org.jivesoftware.multiplexer;

import org.dom4j.Element;
import org.dom4j.io.XMPPPacketReader;
import org.jivesoftware.multiplexer.net.SocketConnection;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;

import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Reads and processes stanzas sent from the server. Each connection to the server will
 * have an instance of this class. Read packets will be processed using a thread pool.
 * By default, the thread pool will have 5 processing threads. Configure the property
 * <tt>xmpp.manager.incoming.threads</tt> to change the number of processing threads
 * per connection to the server. 
 *
 * @author Gaston Dombiak
 */
class ServerPacketReader implements SocketStatistic {

    private boolean open = true;
    private XMPPPacketReader reader = null;

    /**
     * Pool of threads that will process incoming stanzas from the server.
     */
    private ThreadPoolExecutor threadPool;
    /**
     * Actual object responsible for handling incoming traffic.
     */
    private ServerPacketHandler packetsHandler;

    public ServerPacketReader(XMPPPacketReader reader, SocketConnection connection,
                              String address) {
        this.reader = reader;
        packetsHandler = new ServerPacketHandler(connection, address);
        init();
    }

    private void init() {
        // Create a pool of threads that will process incoming packets.
        int maxThreads = JiveGlobals.getIntProperty("xmpp.manager.incoming.threads", 5);
        if (maxThreads < 1) {
            // Ensure that the max number of threads in the pool is at least 1
            maxThreads = 1;
        }
        threadPool =
                new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS,
                        new LinkedBlockingQueue<Runnable>(),
                        new ThreadPoolExecutor.CallerRunsPolicy());

        // Create a thread that will read and store DOM Elements.
        Thread thread = new Thread("Server Packet Reader") {
            @Override
			public void run() {
                while (open) {
                    Element doc;
                    try {
                        doc = reader.parseDocument().getRootElement();

                        if (doc == null) {
                            // Stop reading the stream since the remote server has sent an end of
                            // stream element and probably closed the connection.
                            shutdown();
                        }
                        else {
                            // Queue task that process incoming stanzas
                            threadPool.execute(new ProcessStanzaTask(packetsHandler, doc));
                        }
                    }
                    catch (IOException e) {
                        Log.debug("Finishing Incoming Server Stanzas Reader.", e);
                        shutdown();
                    }
                    catch (Exception e) {
                        Log.error("Finishing Incoming Server Stanzas Reader.", e);
                        shutdown();
                    }
                }
            }
        };
        thread.setDaemon(true);
        thread.start();
    }

    public long getLastActive() {
        return reader.getLastActive();
    }

    public void shutdown() {
        open = false;
        threadPool.shutdown();
    }

    /**
     * Task that processes incoming stanzas from the server.
     */
    private class ProcessStanzaTask implements Runnable {
        /**
         * Incoming stanza to process.
         */
        private Element stanza;
        /**
         * Actual object responsible for handling incoming traffic.
         */
        private ServerPacketHandler handler;

        public ProcessStanzaTask(ServerPacketHandler handler, Element stanza) {
            this.handler = handler;
            this.stanza = stanza;
        }

        public void run() {
            handler.handle(stanza);
        }
    }
}