diff --git a/ConnectionManager.iml b/ConnectionManager.iml
index 83a2df3..00c30d7 100644
--- a/ConnectionManager.iml
+++ b/ConnectionManager.iml
@@ -111,15 +111,6 @@
-
-
-
-
-
-
-
-
-
@@ -153,24 +144,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -195,11 +168,9 @@
-
-
-
+
\ No newline at end of file
diff --git a/build/build.xml b/build/build.xml
index 66edf1f..596f61b 100644
--- a/build/build.xml
+++ b/build/build.xml
@@ -166,6 +166,7 @@
+
@@ -174,6 +175,9 @@
+
+
+
@@ -214,6 +218,7 @@
debug="on"
source="1.5"
target="1.5"
+ encoding="UTF-8"
>
diff --git a/src/java/org/jivesoftware/multiplexer/ClientSession.java b/src/java/org/jivesoftware/multiplexer/ClientSession.java
index 03ce976..fbd8262 100644
--- a/src/java/org/jivesoftware/multiplexer/ClientSession.java
+++ b/src/java/org/jivesoftware/multiplexer/ClientSession.java
@@ -22,6 +22,7 @@
import org.dom4j.Element;
import org.jivesoftware.multiplexer.spi.ClientFailoverDeliverer;
+import org.jivesoftware.multiplexer.task.ClientTask;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.xmlpull.v1.XmlPullParser;
@@ -29,6 +30,9 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Session that represents a client to server connection.
@@ -306,4 +310,39 @@
public boolean isClosed() {
return status == STATUS_CLOSED;
}
+
+ private final AtomicBoolean sessionCreatedOnServer = new AtomicBoolean(
+ false);
+
+ private ArrayList pendingTasks = new ArrayList();
+
+ public boolean isSessionCreatedOnServer() {
+ return sessionCreatedOnServer.get();
+ }
+
+ public void pendClientTask(ClientTask task, AbstractExecutorService executor) {
+ if (sessionCreatedOnServer.get()) {
+ executor.execute(task);
+ } else {
+ synchronized (sessionCreatedOnServer) {
+ if (sessionCreatedOnServer.get()) {
+ executor.execute(task);
+ } else {
+ pendingTasks.add(task);
+ }
+ }
+ }
+ }
+
+ public void onSessionCreatedOnServer(AbstractExecutorService executor) {
+ synchronized (sessionCreatedOnServer) {
+ if (!sessionCreatedOnServer.get()) {
+ for (ClientTask task : pendingTasks) {
+ executor.execute(task);
+ }
+ pendingTasks = null;
+ }
+ sessionCreatedOnServer.compareAndSet(false, true);
+ }
+ }
}
diff --git a/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java b/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java
index 15bb6fd..4774fec 100644
--- a/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java
+++ b/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java
@@ -95,7 +95,26 @@
}
}
} else if ("result".equals(type)) {
- if (Log.isDebugEnabled()) {
+ boolean handled = false;
+ Element wrapper = stanza.element("session");
+ if (wrapper != null) {
+ String streamID = wrapper.attributeValue("id");
+ if (wrapper.element("create") != null) {
+ //we got session create response
+ Session session=Session.getSession(streamID);
+ if(session!=null){
+ handled = true;
+ ClientSession cs=(ClientSession)session;
+ cs.onSessionCreatedOnServer(ConnectionManager.getInstance().getServerSurrogate().getThreadPool());
+ if (Log.isDebugEnabled()) {
+ Log.debug("Session created on server with streamID: " + streamID);
+ }
+ }else{
+ Log.warn("Can't get session with streamId=" + streamID);
+ }
+ }
+ }
+ if (Log.isDebugEnabled() && !handled) {
Log.debug("IQ stanza of type RESULT was discarded: " + stanza.asXML());
}
} else if ("error".equals(type)) {
diff --git a/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java b/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java
index 8e5c3d0..3efdc9d 100644
--- a/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java
+++ b/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java
@@ -192,7 +192,13 @@
* @param streamID the stream ID assigned by the connection manager to the session.
*/
public void send(String stanza, String streamID) {
- threadPool.execute(new RouteTask(streamID, stanza));
+ RouteTask task = new RouteTask(streamID, stanza);
+ ClientSession session = (ClientSession) Session.getSession(streamID);
+ if (session == null || session.isSessionCreatedOnServer()) {
+ threadPool.execute(new RouteTask(streamID, stanza));
+ } else {
+ session.pendClientTask(task, threadPool);
+ }
}
/**
@@ -401,4 +407,8 @@
return t;
}
}
+
+ AbstractExecutorService getThreadPool(){
+ return threadPool;
+ }
}
diff --git a/src/java/org/jivesoftware/multiplexer/Session.java b/src/java/org/jivesoftware/multiplexer/Session.java
index c50dae8..7307ac1 100644
--- a/src/java/org/jivesoftware/multiplexer/Session.java
+++ b/src/java/org/jivesoftware/multiplexer/Session.java
@@ -90,6 +90,7 @@
}
public static void removeSession(String streamID) {
+ StreamIDFactory.releaseId(streamID);
sessions.remove(streamID);
}
diff --git a/src/java/org/jivesoftware/multiplexer/StreamIDFactory.java b/src/java/org/jivesoftware/multiplexer/StreamIDFactory.java
index fb9eb8c..f3bf2d5 100644
--- a/src/java/org/jivesoftware/multiplexer/StreamIDFactory.java
+++ b/src/java/org/jivesoftware/multiplexer/StreamIDFactory.java
@@ -21,6 +21,8 @@
package org.jivesoftware.multiplexer;
import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
/**
* A basic stream ID factory that produces id's using java.util.Random
@@ -32,15 +34,23 @@
* @author Gaston Dombiak
*/
public class StreamIDFactory {
- /**
- * The random number to use, someone with Java can predict stream IDs if they can guess the current seed *
- */
- Random random = new Random();
+
+ private static final ConcurrentHashMap usingStreamIDs = new ConcurrentHashMap();
String managerName = ConnectionManager.getInstance().getName();
+ private Random rand = new Random();
+
public String createStreamID() {
- return managerName + Integer.toHexString(random.nextInt());
+ String streamID;
+ do {
+ streamID = managerName + Integer.toHexString(rand.nextInt());
+ } while(usingStreamIDs.putIfAbsent(streamID, Boolean.TRUE) == null);
+ return streamID;
+ }
+
+ public static void releaseId(String streamId){
+ usingStreamIDs.remove(streamId);
}
}
diff --git a/src/java/org/jivesoftware/multiplexer/net/MXParser.java b/src/java/org/jivesoftware/multiplexer/net/MXParser.java
index 6c66adb..9e8153b 100644
--- a/src/java/org/jivesoftware/multiplexer/net/MXParser.java
+++ b/src/java/org/jivesoftware/multiplexer/net/MXParser.java
@@ -44,8 +44,8 @@
private long lastHeartbeat = 0;
@Override
- protected int nextImpl()
- throws XmlPullParserException, IOException
+ protected int nextImpl()
+ throws XmlPullParserException, IOException
{
text = null;
pcEnd = pcStart = 0;
@@ -182,7 +182,7 @@
// }
} else {
throw new XmlPullParserException(
- "unexpected character in markup "+printable(ch), this, null);
+ "unexpected character in markup "+printable(ch), this, null);
}
} else if(ch == '?') {
parsePI();
@@ -202,7 +202,7 @@
return eventType = parseStartTag();
} else {
throw new XmlPullParserException(
- "unexpected character in markup "+printable(ch), this, null);
+ "unexpected character in markup "+printable(ch), this, null);
}
// do content comapctation if it makes sense!!!!
@@ -223,8 +223,8 @@
entityRefName = newString(buf, posStart, posEnd - posStart);
}
throw new XmlPullParserException(
- "could not resolve entity named '"+printable(entityRefName)+"'",
- this, null);
+ "could not resolve entity named '"+printable(entityRefName)+"'",
+ this, null);
}
//int entStart = posStart;
//int entEnd = posEnd;
@@ -289,7 +289,7 @@
}
} else if(seenBracketBracket && ch == '>') {
throw new XmlPullParserException(
- "characters ]]> are not allowed in content", this, null);
+ "characters ]]> are not allowed in content", this, null);
} else {
if(seenBracket) {
seenBracketBracket = seenBracket = false;
@@ -363,27 +363,50 @@
reader = oldReader;
inputEncoding = oldEncoding;
}
-
- /**
- * Makes sure that each individual character is a valid XML character.
- *
- * Note that when MXParser is being modified to handle multibyte chars correctly, this method needs to change (as
- * then, there are more codepoints to check).
- */
+
+ private boolean highSurrogateSeen = false;
+
+ /**
+ * Makes sure that each individual character is a valid XML character.
+ *
+ * Note that when MXParser is being modified to handle multibyte chars correctly, this method needs to change (as
+ * then, there are more codepoints to check).
+ *
+ */
@Override
protected char more() throws IOException, XmlPullParserException {
- final char codePoint = super.more(); // note - this does NOT return a codepoint now, but simply a (single byte) character!
- if ((codePoint == 0x0) || // 0x0 is not allowed, but flash clients insist on sending this as the very first character of a stream. We should stop allowing this codepoint after the first byte has been parsed.
- (codePoint == 0x9) ||
- (codePoint == 0xA) ||
- (codePoint == 0xD) ||
- ((codePoint >= 0x20) && (codePoint <= 0xD7FF)) ||
- ((codePoint >= 0xE000) && (codePoint <= 0xFFFD)) ||
- ((codePoint >= 0x10000) && (codePoint <= 0x10FFFF))) {
- return codePoint;
- }
-
- throw new XmlPullParserException("Illegal XML character: " + Integer.parseInt(codePoint+"", 16));
+ final char codePoint = super.more(); // note - this does NOT return a codepoint now, but simply a (double byte) character!
+ boolean validCodepoint = false;
+ boolean isLowSurrogate = Character.isLowSurrogate(codePoint);
+ if ((codePoint == 0x0) || // 0x0 is not allowed, but flash clients insist on sending this as the very first character of a stream. We should stop allowing this codepoint after the first byte has been parsed.
+ (codePoint == 0x9) ||
+ (codePoint == 0xA) ||
+ (codePoint == 0xD) ||
+ ((codePoint >= 0x20) && (codePoint <= 0xD7FF)) ||
+ ((codePoint >= 0xE000) && (codePoint <= 0xFFFD))) {
+ validCodepoint = true;
+ }
+ else if (highSurrogateSeen) {
+ if (isLowSurrogate) {
+ validCodepoint = true;
+ } else {
+ throw new XmlPullParserException("High surrogate followed by non low surrogate '0x" + String.format("%x", (int) codePoint) + "'");
+ }
+ }
+ else if (isLowSurrogate) {
+ throw new XmlPullParserException("Low surrogate '0x " + String.format("%x", (int) codePoint) + " without preceeding high surrogate");
+ }
+ else if (Character.isHighSurrogate(codePoint)) {
+ highSurrogateSeen = true;
+ // Return here so that highSurrogateSeen is not reset
+ return codePoint;
+ }
+ // Always reset high surrogate seen
+ highSurrogateSeen = false;
+ if (validCodepoint)
+ return codePoint;
+
+ throw new XmlPullParserException("Illegal XML character '0x" + String.format("%x", (int) codePoint) + "'");
}
}