diff --git a/ConnectionManager.iml b/ConnectionManager.iml index 83a2df3..00c30d7 100644 --- a/ConnectionManager.iml +++ b/ConnectionManager.iml @@ -111,15 +111,6 @@ - - - - - - - - - @@ -153,24 +144,6 @@ - - - - - - - - - - - - - - - - - - @@ -195,11 +168,9 @@ - - - + \ No newline at end of file diff --git a/build/build.xml b/build/build.xml index 66edf1f..596f61b 100644 --- a/build/build.xml +++ b/build/build.xml @@ -166,6 +166,7 @@ + @@ -174,6 +175,9 @@ + + + @@ -214,6 +218,7 @@ debug="on" source="1.5" target="1.5" + encoding="UTF-8" > diff --git a/src/java/org/jivesoftware/multiplexer/ClientSession.java b/src/java/org/jivesoftware/multiplexer/ClientSession.java index 03ce976..fbd8262 100644 --- a/src/java/org/jivesoftware/multiplexer/ClientSession.java +++ b/src/java/org/jivesoftware/multiplexer/ClientSession.java @@ -22,6 +22,7 @@ import org.dom4j.Element; import org.jivesoftware.multiplexer.spi.ClientFailoverDeliverer; +import org.jivesoftware.multiplexer.task.ClientTask; import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.Log; import org.xmlpull.v1.XmlPullParser; @@ -29,6 +30,9 @@ import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; /** * Session that represents a client to server connection. @@ -306,4 +310,39 @@ public boolean isClosed() { return status == STATUS_CLOSED; } + + private final AtomicBoolean sessionCreatedOnServer = new AtomicBoolean( + false); + + private ArrayList pendingTasks = new ArrayList(); + + public boolean isSessionCreatedOnServer() { + return sessionCreatedOnServer.get(); + } + + public void pendClientTask(ClientTask task, AbstractExecutorService executor) { + if (sessionCreatedOnServer.get()) { + executor.execute(task); + } else { + synchronized (sessionCreatedOnServer) { + if (sessionCreatedOnServer.get()) { + executor.execute(task); + } else { + pendingTasks.add(task); + } + } + } + } + + public void onSessionCreatedOnServer(AbstractExecutorService executor) { + synchronized (sessionCreatedOnServer) { + if (!sessionCreatedOnServer.get()) { + for (ClientTask task : pendingTasks) { + executor.execute(task); + } + pendingTasks = null; + } + sessionCreatedOnServer.compareAndSet(false, true); + } + } } diff --git a/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java b/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java index 15bb6fd..4774fec 100644 --- a/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java +++ b/src/java/org/jivesoftware/multiplexer/ServerPacketHandler.java @@ -95,7 +95,26 @@ } } } else if ("result".equals(type)) { - if (Log.isDebugEnabled()) { + boolean handled = false; + Element wrapper = stanza.element("session"); + if (wrapper != null) { + String streamID = wrapper.attributeValue("id"); + if (wrapper.element("create") != null) { + //we got session create response + Session session=Session.getSession(streamID); + if(session!=null){ + handled = true; + ClientSession cs=(ClientSession)session; + cs.onSessionCreatedOnServer(ConnectionManager.getInstance().getServerSurrogate().getThreadPool()); + if (Log.isDebugEnabled()) { + Log.debug("Session created on server with streamID: " + streamID); + } + }else{ + Log.warn("Can't get session with streamId=" + streamID); + } + } + } + if (Log.isDebugEnabled() && !handled) { Log.debug("IQ stanza of type RESULT was discarded: " + stanza.asXML()); } } else if ("error".equals(type)) { diff --git a/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java b/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java index 8e5c3d0..3efdc9d 100644 --- a/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java +++ b/src/java/org/jivesoftware/multiplexer/ServerSurrogate.java @@ -192,7 +192,13 @@ * @param streamID the stream ID assigned by the connection manager to the session. */ public void send(String stanza, String streamID) { - threadPool.execute(new RouteTask(streamID, stanza)); + RouteTask task = new RouteTask(streamID, stanza); + ClientSession session = (ClientSession) Session.getSession(streamID); + if (session == null || session.isSessionCreatedOnServer()) { + threadPool.execute(new RouteTask(streamID, stanza)); + } else { + session.pendClientTask(task, threadPool); + } } /** @@ -401,4 +407,8 @@ return t; } } + + AbstractExecutorService getThreadPool(){ + return threadPool; + } } diff --git a/src/java/org/jivesoftware/multiplexer/Session.java b/src/java/org/jivesoftware/multiplexer/Session.java index c50dae8..7307ac1 100644 --- a/src/java/org/jivesoftware/multiplexer/Session.java +++ b/src/java/org/jivesoftware/multiplexer/Session.java @@ -90,6 +90,7 @@ } public static void removeSession(String streamID) { + StreamIDFactory.releaseId(streamID); sessions.remove(streamID); } diff --git a/src/java/org/jivesoftware/multiplexer/StreamIDFactory.java b/src/java/org/jivesoftware/multiplexer/StreamIDFactory.java index fb9eb8c..f3bf2d5 100644 --- a/src/java/org/jivesoftware/multiplexer/StreamIDFactory.java +++ b/src/java/org/jivesoftware/multiplexer/StreamIDFactory.java @@ -21,6 +21,8 @@ package org.jivesoftware.multiplexer; import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; /** * A basic stream ID factory that produces id's using java.util.Random @@ -32,15 +34,23 @@ * @author Gaston Dombiak */ public class StreamIDFactory { - /** - * The random number to use, someone with Java can predict stream IDs if they can guess the current seed * - */ - Random random = new Random(); + + private static final ConcurrentHashMap usingStreamIDs = new ConcurrentHashMap(); String managerName = ConnectionManager.getInstance().getName(); + private Random rand = new Random(); + public String createStreamID() { - return managerName + Integer.toHexString(random.nextInt()); + String streamID; + do { + streamID = managerName + Integer.toHexString(rand.nextInt()); + } while(usingStreamIDs.putIfAbsent(streamID, Boolean.TRUE) == null); + return streamID; + } + + public static void releaseId(String streamId){ + usingStreamIDs.remove(streamId); } } diff --git a/src/java/org/jivesoftware/multiplexer/net/MXParser.java b/src/java/org/jivesoftware/multiplexer/net/MXParser.java index 6c66adb..9e8153b 100644 --- a/src/java/org/jivesoftware/multiplexer/net/MXParser.java +++ b/src/java/org/jivesoftware/multiplexer/net/MXParser.java @@ -44,8 +44,8 @@ private long lastHeartbeat = 0; @Override - protected int nextImpl() - throws XmlPullParserException, IOException + protected int nextImpl() + throws XmlPullParserException, IOException { text = null; pcEnd = pcStart = 0; @@ -182,7 +182,7 @@ // } } else { throw new XmlPullParserException( - "unexpected character in markup "+printable(ch), this, null); + "unexpected character in markup "+printable(ch), this, null); } } else if(ch == '?') { parsePI(); @@ -202,7 +202,7 @@ return eventType = parseStartTag(); } else { throw new XmlPullParserException( - "unexpected character in markup "+printable(ch), this, null); + "unexpected character in markup "+printable(ch), this, null); } // do content comapctation if it makes sense!!!! @@ -223,8 +223,8 @@ entityRefName = newString(buf, posStart, posEnd - posStart); } throw new XmlPullParserException( - "could not resolve entity named '"+printable(entityRefName)+"'", - this, null); + "could not resolve entity named '"+printable(entityRefName)+"'", + this, null); } //int entStart = posStart; //int entEnd = posEnd; @@ -289,7 +289,7 @@ } } else if(seenBracketBracket && ch == '>') { throw new XmlPullParserException( - "characters ]]> are not allowed in content", this, null); + "characters ]]> are not allowed in content", this, null); } else { if(seenBracket) { seenBracketBracket = seenBracket = false; @@ -363,27 +363,50 @@ reader = oldReader; inputEncoding = oldEncoding; } - - /** - * Makes sure that each individual character is a valid XML character. - * - * Note that when MXParser is being modified to handle multibyte chars correctly, this method needs to change (as - * then, there are more codepoints to check). - */ + + private boolean highSurrogateSeen = false; + + /** + * Makes sure that each individual character is a valid XML character. + * + * Note that when MXParser is being modified to handle multibyte chars correctly, this method needs to change (as + * then, there are more codepoints to check). + * + */ @Override protected char more() throws IOException, XmlPullParserException { - final char codePoint = super.more(); // note - this does NOT return a codepoint now, but simply a (single byte) character! - if ((codePoint == 0x0) || // 0x0 is not allowed, but flash clients insist on sending this as the very first character of a stream. We should stop allowing this codepoint after the first byte has been parsed. - (codePoint == 0x9) || - (codePoint == 0xA) || - (codePoint == 0xD) || - ((codePoint >= 0x20) && (codePoint <= 0xD7FF)) || - ((codePoint >= 0xE000) && (codePoint <= 0xFFFD)) || - ((codePoint >= 0x10000) && (codePoint <= 0x10FFFF))) { - return codePoint; - } - - throw new XmlPullParserException("Illegal XML character: " + Integer.parseInt(codePoint+"", 16)); + final char codePoint = super.more(); // note - this does NOT return a codepoint now, but simply a (double byte) character! + boolean validCodepoint = false; + boolean isLowSurrogate = Character.isLowSurrogate(codePoint); + if ((codePoint == 0x0) || // 0x0 is not allowed, but flash clients insist on sending this as the very first character of a stream. We should stop allowing this codepoint after the first byte has been parsed. + (codePoint == 0x9) || + (codePoint == 0xA) || + (codePoint == 0xD) || + ((codePoint >= 0x20) && (codePoint <= 0xD7FF)) || + ((codePoint >= 0xE000) && (codePoint <= 0xFFFD))) { + validCodepoint = true; + } + else if (highSurrogateSeen) { + if (isLowSurrogate) { + validCodepoint = true; + } else { + throw new XmlPullParserException("High surrogate followed by non low surrogate '0x" + String.format("%x", (int) codePoint) + "'"); + } + } + else if (isLowSurrogate) { + throw new XmlPullParserException("Low surrogate '0x " + String.format("%x", (int) codePoint) + " without preceeding high surrogate"); + } + else if (Character.isHighSurrogate(codePoint)) { + highSurrogateSeen = true; + // Return here so that highSurrogateSeen is not reset + return codePoint; + } + // Always reset high surrogate seen + highSurrogateSeen = false; + if (validCodepoint) + return codePoint; + + throw new XmlPullParserException("Illegal XML character '0x" + String.format("%x", (int) codePoint) + "'"); } }