diff --git a/src/java/org/jivesoftware/multiplexer/ClientSession.java b/src/java/org/jivesoftware/multiplexer/ClientSession.java index 03ce976..fbd8262 100644 --- a/src/java/org/jivesoftware/multiplexer/ClientSession.java +++ b/src/java/org/jivesoftware/multiplexer/ClientSession.java @@ -22,6 +22,7 @@ import org.dom4j.Element; import org.jivesoftware.multiplexer.spi.ClientFailoverDeliverer; +import org.jivesoftware.multiplexer.task.ClientTask; import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.Log; import org.xmlpull.v1.XmlPullParser; @@ -29,6 +30,9 @@ import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; /** * Session that represents a client to server connection. @@ -306,4 +310,39 @@ public boolean isClosed() { return status == STATUS_CLOSED; } + + private final AtomicBoolean sessionCreatedOnServer = new AtomicBoolean( + false); + + private ArrayList pendingTasks = new ArrayList(); + + public boolean isSessionCreatedOnServer() { + return sessionCreatedOnServer.get(); + } + + public void pendClientTask(ClientTask task, AbstractExecutorService executor) { + if (sessionCreatedOnServer.get()) { + executor.execute(task); + } else { + synchronized (sessionCreatedOnServer) { + if (sessionCreatedOnServer.get()) { + executor.execute(task); + } else { + pendingTasks.add(task); + } + } + } + } + + public void onSessionCreatedOnServer(AbstractExecutorService executor) { + synchronized (sessionCreatedOnServer) { + if (!sessionCreatedOnServer.get()) { + for (ClientTask task : pendingTasks) { + executor.execute(task); + } + pendingTasks = null; + } + sessionCreatedOnServer.compareAndSet(false, true); + } + } } diff --git a/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java b/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java index 15bb6fd..4774fec 100644 --- a/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java +++ b/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java @@ -95,7 +95,26 @@ } } } else if ("result".equals(type)) { - if (Log.isDebugEnabled()) { + boolean handled = false; + Element wrapper = stanza.element("session"); + if (wrapper != null) { + String streamID = wrapper.attributeValue("id"); + if (wrapper.element("create") != null) { + //we got session create response + Session session=Session.getSession(streamID); + if(session!=null){ + handled = true; + ClientSession cs=(ClientSession)session; + cs.onSessionCreatedOnServer(ConnectionManager.getInstance().getServerSurrogate().getThreadPool()); + if (Log.isDebugEnabled()) { + Log.debug("Session created on server with streamID: " + streamID); + } + }else{ + Log.warn("Can't get session with streamId=" + streamID); + } + } + } + if (Log.isDebugEnabled() && !handled) { Log.debug("IQ stanza of type RESULT was discarded: " + stanza.asXML()); } } else if ("error".equals(type)) { diff --git a/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java b/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java index 8e5c3d0..3efdc9d 100644 --- a/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java +++ b/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java @@ -192,7 +192,13 @@ * @param streamID the stream ID assigned by the connection manager to the session. */ public void send(String stanza, String streamID) { - threadPool.execute(new RouteTask(streamID, stanza)); + RouteTask task = new RouteTask(streamID, stanza); + ClientSession session = (ClientSession) Session.getSession(streamID); + if (session == null || session.isSessionCreatedOnServer()) { + threadPool.execute(new RouteTask(streamID, stanza)); + } else { + session.pendClientTask(task, threadPool); + } } /** @@ -401,4 +407,8 @@ return t; } } + + AbstractExecutorService getThreadPool(){ + return threadPool; + } }