/**
* $RCSfile$
* $Revision: $
* $Date: $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.multiplexer;
import org.dom4j.Element;
import org.dom4j.io.XMPPPacketReader;
import org.jivesoftware.multiplexer.net.SocketConnection;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Reads and processes stanzas sent from the server. Each connection to the server will
* have an instance of this class. Read packets will be processed using a thread pool.
* By default, the thread pool will have 5 processing threads. Configure the property
* <tt>xmpp.manager.incoming.threads</tt> to change the number of processing threads
* per connection to the server.
*
* @author Gaston Dombiak
*/
class ServerPacketReader implements SocketStatistic {
private boolean open = true;
private XMPPPacketReader reader = null;
/**
* Pool of threads that will process incoming stanzas from the server.
*/
private ThreadPoolExecutor threadPool;
/**
* Actual object responsible for handling incoming traffic.
*/
private ServerPacketHandler packetsHandler;
public ServerPacketReader(XMPPPacketReader reader, SocketConnection connection,
String address) {
this.reader = reader;
packetsHandler = new ServerPacketHandler(connection, address);
init();
}
private void init() {
// Create a pool of threads that will process incoming packets.
int maxThreads = JiveGlobals.getIntProperty("xmpp.manager.incoming.threads", 5);
if (maxThreads < 1) {
// Ensure that the max number of threads in the pool is at least 1
maxThreads = 1;
}
threadPool =
new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new ThreadPoolExecutor.CallerRunsPolicy());
// Create a thread that will read and store DOM Elements.
Thread thread = new Thread("Server Packet Reader") {
@Override
public void run() {
while (open) {
Element doc;
try {
doc = reader.parseDocument().getRootElement();
if (doc == null) {
// Stop reading the stream since the remote server has sent an end of
// stream element and probably closed the connection.
shutdown();
}
else {
// If this element belongs to a session, queue it so that it can
// be processed in the correct order.
Session session = getSession(doc);
if( session != null ) {
Queue<Element> sessionQueue = session.getStanzaQueue();
sessionQueue.add(doc);
threadPool.execute(new ProcessSessionQueueTask(packetsHandler,session));
} else {
// Queue task that process incoming stanzas not related to a specific streamID
threadPool.execute(new ProcessStanzaTask(packetsHandler, doc));
}
}
}
catch (IOException e) {
Log.debug("Finishing Incoming Server Stanzas Reader.", e);
shutdown();
}
catch (Exception e) {
Log.error("Finishing Incoming Server Stanzas Reader.", e);
shutdown();
}
}
}
};
thread.setDaemon(true);
thread.start();
}
public long getLastActive() {
return reader.getLastActive();
}
public void shutdown() {
open = false;
threadPool.shutdown();
}
/**
* @param stanza The stanza to find the session for using the streamid or id attribute
* @return the session associated with the given stanza, if any
*/
private Session getSession(Element stanza) {
if( "route".equals(stanza.getName())){
String streamID = stanza.attributeValue("streamid");
return(Session.getSession(streamID));
}
else {
Element wrapper = stanza.element("session");
if( wrapper != null ) {
String streamID = wrapper.attributeValue("id");
return(Session.getSession(streamID));
} else {
return(null);
}
}
}
/**
* Task that processes incoming stanzas from the server.
*/
private class ProcessStanzaTask implements Runnable {
/**
* Incoming stanza to process.
*/
private Element stanza;
/**
* Actual object responsible for handling incoming traffic.
*/
private ServerPacketHandler handler;
public ProcessStanzaTask(ServerPacketHandler handler, Element stanza) {
this.handler = handler;
this.stanza = stanza;
}
public void run() {
handler.handle(stanza);
}
}
/**
* Task that processes a Session's stanza queue. This guarantees
* that stanzas are processed in the same order that they are received
*/
private class ProcessSessionQueueTask implements Runnable {
/**
* The session
*/
private final Session session;
/**
* Actual object responsible for handling incoming traffic.
*/
private final ServerPacketHandler handler;
public ProcessSessionQueueTask(ServerPacketHandler handler, Session session) {
this.session = session;
this.handler = handler;
}
/**
* Process all the stanzas currently in the queue for this session
*/
public void run() {
// Synchronize on the session here to ensure that all stanzas
// for a given session get processed in order. This can be sub-optimal
// since we might block if multiple threads are processing stanzas
// for the same client, but the handler.handle() call should be quick,
// and correctness seems more important here.
synchronized(session) {
Element stanza = session.getStanzaQueue().poll();
while( stanza != null ){
handler.handle(stanza);
stanza = session.getStanzaQueue().poll();
}
}
}
}
}