diff --git a/src/main/java/net/minecraft/server/MinecraftServer.java b/src/main/java/net/minecraft/server/MinecraftServer.java index f70297e..14fe75e 100644 --- a/src/main/java/net/minecraft/server/MinecraftServer.java +++ b/src/main/java/net/minecraft/server/MinecraftServer.java @@ -80,6 +80,7 @@ import org.apache.logging.log4j.Logger; import org.ultramine.server.ConfigurationHandler; import org.ultramine.server.WatchdogThread; +import org.ultramine.server.chunk.ChunkIOExecutor; import net.minecraftforge.common.DimensionManager; import net.minecraftforge.common.MinecraftForge; @@ -576,6 +577,10 @@ public void updateTimeLightAndEntities() { + theProfiler.startSection("ChunkIOExecutor"); + ChunkIOExecutor.tick(); + theProfiler.endSection(); + this.theProfiler.startSection("levels"); int i; diff --git a/src/main/java/net/minecraft/server/management/PlayerManager.java b/src/main/java/net/minecraft/server/management/PlayerManager.java index 7dbc76b..e4afb2e 100644 --- a/src/main/java/net/minecraft/server/management/PlayerManager.java +++ b/src/main/java/net/minecraft/server/management/PlayerManager.java @@ -2,7 +2,11 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; + +import org.ultramine.server.chunk.ChunkIOExecutor; + import net.minecraft.entity.player.EntityPlayerMP; import net.minecraft.network.Packet; import net.minecraft.network.play.server.S21PacketChunkData; @@ -285,11 +289,25 @@ private int flagsYAreasToUpdate; private long previousWorldTime; private static final String __OBFID = "CL_00001435"; + + private boolean loaded = false; + private Runnable loadedRunnable = new Runnable() + { + public void run() + { + PlayerInstance.this.loaded = true; + for(Object o : playersWatchingChunk) + { + EntityPlayerMP p = (EntityPlayerMP)o; + p.loadedChunks.add(chunkLocation); + } + } + }; public PlayerInstance(int par2, int par3) { this.chunkLocation = new ChunkCoordIntPair(par2, par3); - PlayerManager.this.getWorldServer().theChunkProviderServer.loadChunk(par2, par3); + getWorldServer().theChunkProviderServer.loadAsync(par2, par3, this.loadedRunnable); } public void addPlayer(EntityPlayerMP par1EntityPlayerMP) @@ -306,7 +324,10 @@ } this.playersWatchingChunk.add(par1EntityPlayerMP); - par1EntityPlayerMP.loadedChunks.add(this.chunkLocation); + if(loaded) + { + par1EntityPlayerMP.loadedChunks.add(this.chunkLocation); + } } } @@ -314,6 +335,20 @@ { if (this.playersWatchingChunk.contains(par1EntityPlayerMP)) { + + if (!this.loaded) + { + this.playersWatchingChunk.remove(par1EntityPlayerMP); + if(this.playersWatchingChunk.isEmpty()) + { + ChunkIOExecutor.dropQueuedChunkLoad(getWorldServer(), this.chunkLocation.chunkXPos, this.chunkLocation.chunkZPos, this.loadedRunnable); + long i = (long) this.chunkLocation.chunkXPos + 2147483647L | (long) this.chunkLocation.chunkZPos + 2147483647L << 32; + playerInstances.remove(i); + chunkWatcherWithPlayers.remove(this); + } + return; + } + Chunk chunk = PlayerManager.this.theWorldServer.getChunkFromChunkCoords(this.chunkLocation.chunkXPos, this.chunkLocation.chunkZPos); if (chunk.func_150802_k()) diff --git a/src/main/java/net/minecraft/world/chunk/storage/AnvilChunkLoader.java b/src/main/java/net/minecraft/world/chunk/storage/AnvilChunkLoader.java index 313988e..8a4feee 100644 --- a/src/main/java/net/minecraft/world/chunk/storage/AnvilChunkLoader.java +++ b/src/main/java/net/minecraft/world/chunk/storage/AnvilChunkLoader.java @@ -1,5 +1,8 @@ package net.minecraft.world.chunk.storage; +import gnu.trove.iterator.TIntObjectIterator; +import gnu.trove.map.hash.TIntObjectHashMap; + import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; @@ -31,14 +34,14 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.ultramine.server.chunk.ChunkHash; import cpw.mods.fml.common.FMLLog; public class AnvilChunkLoader implements IChunkLoader, IThreadedFileIO { private static final Logger logger = LogManager.getLogger(); - private List chunksToRemove = new ArrayList(); - private Set pendingAnvilChunksCoordinates = new HashSet(); + private final TIntObjectHashMap pendingSaves = new TIntObjectHashMap(); private Object syncLockObject = new Object(); public final File chunkSaveLocation; private static final String __OBFID = "CL_00000384"; @@ -50,38 +53,18 @@ public Chunk loadChunk(World par1World, int par2, int par3) throws IOException { - NBTTagCompound nbttagcompound = null; - ChunkCoordIntPair chunkcoordintpair = new ChunkCoordIntPair(par2, par3); - Object object = this.syncLockObject; + Object[] data = this.loadChunk__Async_CB(par1World, par2, par3); - synchronized (this.syncLockObject) + if (data != null) { - if (this.pendingAnvilChunksCoordinates.contains(chunkcoordintpair)) - { - for (int k = 0; k < this.chunksToRemove.size(); ++k) - { - if (((AnvilChunkLoader.PendingChunk)this.chunksToRemove.get(k)).chunkCoordinate.equals(chunkcoordintpair)) - { - nbttagcompound = ((AnvilChunkLoader.PendingChunk)this.chunksToRemove.get(k)).nbtTags; - break; - } - } - } + Chunk chunk = (Chunk) data[0]; + NBTTagCompound nbttagcompound = (NBTTagCompound) data[1]; + this.loadEntities(chunk, nbttagcompound.getCompoundTag("Level"), par1World); + MinecraftForge.EVENT_BUS.post(new ChunkDataEvent.Load(chunk, nbttagcompound)); + return chunk; } - if (nbttagcompound == null) - { - DataInputStream datainputstream = RegionFileCache.getChunkInputStream(this.chunkSaveLocation, par2, par3); - - if (datainputstream == null) - { - return null; - } - - nbttagcompound = CompressedStreamTools.read(datainputstream); - } - - return this.checkedReadChunkFromNBT(par1World, par2, par3, nbttagcompound); + return null; } protected Chunk checkedReadChunkFromNBT(World par1World, int par2, int par3, NBTTagCompound par4NBTTagCompound) @@ -138,20 +121,22 @@ synchronized (this.syncLockObject) { - if (this.pendingAnvilChunksCoordinates.contains(par1ChunkCoordIntPair)) - { - for (int i = 0; i < this.chunksToRemove.size(); ++i) - { - if (((AnvilChunkLoader.PendingChunk)this.chunksToRemove.get(i)).chunkCoordinate.equals(par1ChunkCoordIntPair)) - { - this.chunksToRemove.set(i, new AnvilChunkLoader.PendingChunk(par1ChunkCoordIntPair, par2NBTTagCompound)); - return; - } - } - } +// if (this.pendingAnvilChunksCoordinates.contains(par1ChunkCoordIntPair)) +// { +// for (int i = 0; i < this.chunksToRemove.size(); ++i) +// { +// if (((AnvilChunkLoader.PendingChunk)this.chunksToRemove.get(i)).chunkCoordinate.equals(par1ChunkCoordIntPair)) +// { +// this.chunksToRemove.set(i, new AnvilChunkLoader.PendingChunk(par1ChunkCoordIntPair, par2NBTTagCompound)); +// return; +// } +// } +// } - this.chunksToRemove.add(new AnvilChunkLoader.PendingChunk(par1ChunkCoordIntPair, par2NBTTagCompound)); - this.pendingAnvilChunksCoordinates.add(par1ChunkCoordIntPair); + int hash = ChunkHash.chunkToKey(par1ChunkCoordIntPair.chunkXPos, par1ChunkCoordIntPair.chunkZPos); + + pendingSaves.put(hash, new PendingChunk(par1ChunkCoordIntPair, par2NBTTagCompound)); + //this.pendingAnvilChunksCoordinates.add(par1ChunkCoordIntPair); ThreadedFileIOBase.threadedIOInstance.queueIO(this); } } @@ -163,13 +148,15 @@ synchronized (this.syncLockObject) { - if (this.chunksToRemove.isEmpty()) + if (this.pendingSaves.isEmpty()) { return false; } - pendingchunk = (AnvilChunkLoader.PendingChunk)this.chunksToRemove.remove(0); - this.pendingAnvilChunksCoordinates.remove(pendingchunk.chunkCoordinate); + TIntObjectIterator it = pendingSaves.iterator(); + it.advance(); + pendingchunk = it.value(); + it.remove(); } if (pendingchunk != null) @@ -377,6 +364,27 @@ chunk.setBiomeArray(par2NBTTagCompound.getByteArray("Biomes")); } + return chunk; + } + + static class PendingChunk + { + public final ChunkCoordIntPair chunkCoordinate; + public final NBTTagCompound nbtTags; + private static final String __OBFID = "CL_00000385"; + + public PendingChunk(ChunkCoordIntPair par1ChunkCoordIntPair, NBTTagCompound par2NBTTagCompound) + { + this.chunkCoordinate = par1ChunkCoordIntPair; + this.nbtTags = par2NBTTagCompound; + } + } + + + /* ======================================== ULTRAMINE START =====================================*/ + + public void loadEntities(Chunk chunk, NBTTagCompound par2NBTTagCompound, World par1World) + { NBTTagList nbttaglist1 = par2NBTTagCompound.getTagList("Entities", 10); if (nbttaglist1 != null) @@ -437,20 +445,51 @@ } } } - - return chunk; } + + public Object[] loadChunk__Async_CB(World par1World, int par2, int par3) + { + NBTTagCompound nbttagcompound = null; - static class PendingChunk - { - public final ChunkCoordIntPair chunkCoordinate; - public final NBTTagCompound nbtTags; - private static final String __OBFID = "CL_00000385"; + synchronized (this.syncLockObject) + { + + PendingChunk anvilchunkloaderpending = pendingSaves.get(ChunkHash.chunkToKey(par2, par3)); + + if(anvilchunkloaderpending != null) + { + nbttagcompound = anvilchunkloaderpending.nbtTags; + } + } - public PendingChunk(ChunkCoordIntPair par1ChunkCoordIntPair, NBTTagCompound par2NBTTagCompound) + if (nbttagcompound == null) + { + DataInputStream datainputstream = RegionFileCache.getChunkInputStream(this.chunkSaveLocation, par2, par3); + + if (datainputstream == null) + { + return null; + } + + try { - this.chunkCoordinate = par1ChunkCoordIntPair; - this.nbtTags = par2NBTTagCompound; + nbttagcompound = CompressedStreamTools.read(datainputstream); } - } + catch (IOException e) + { + e.printStackTrace(); + return null; + } + } + + if(nbttagcompound == null) return null; + Chunk chunk = checkedReadChunkFromNBT(par1World, par2, par3, nbttagcompound); + if(chunk == null) return null; + + Object[] data = new Object[2]; + data[0] = chunk; + data[1] = nbttagcompound; + + return data; + } } \ No newline at end of file diff --git a/src/main/java/net/minecraft/world/gen/ChunkProviderServer.java b/src/main/java/net/minecraft/world/gen/ChunkProviderServer.java index 28549d8..d5b1321 100644 --- a/src/main/java/net/minecraft/world/gen/ChunkProviderServer.java +++ b/src/main/java/net/minecraft/world/gen/ChunkProviderServer.java @@ -29,6 +29,7 @@ import net.minecraft.world.chunk.Chunk; import net.minecraft.world.chunk.EmptyChunk; import net.minecraft.world.chunk.IChunkProvider; +import net.minecraft.world.chunk.storage.AnvilChunkLoader; import net.minecraft.world.chunk.storage.IChunkLoader; import net.minecraftforge.common.DimensionManager; import net.minecraftforge.common.ForgeChunkManager; @@ -36,17 +37,18 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.ultramine.server.chunk.ChunkHash; +import org.ultramine.server.chunk.ChunkIOExecutor; public class ChunkProviderServer implements IChunkProvider { private static final Logger logger = LogManager.getLogger(); - private TIntSet chunksToUnload = new TIntHashSet(); + public TIntSet chunksToUnload = new TIntHashSet(); private Chunk defaultEmptyChunk; - private IChunkProvider currentChunkProvider; + public IChunkProvider currentChunkProvider; public IChunkLoader currentChunkLoader; public boolean loadChunkOnProvideRequest = true; - private TIntObjectMap loadedChunkHashMap = new TIntObjectHashMap(); - private WorldServer worldObj; + public TIntObjectMap loadedChunkHashMap = new TIntObjectHashMap(); + public WorldServer worldObj; private static final String __OBFID = "CL_00001436"; public ChunkProviderServer(WorldServer par1WorldServer, IChunkLoader par2IChunkLoader, IChunkProvider par3IChunkProvider) @@ -359,4 +361,20 @@ } public void recreateStructures(int par1, int par2) {} + + + /* ======================================== ULTRAMINE START =====================================*/ + + public void loadAsync(int x, int z, Runnable callback) //XXX + { + if(loadedChunkHashMap.containsKey(ChunkHash.chunkToKey(x, z))) + { + callback.run(); + return; + } + else + { + ChunkIOExecutor.queueChunkLoad(this.worldObj, (AnvilChunkLoader)currentChunkLoader, this, x, z, callback); + } + } } \ No newline at end of file diff --git a/src/main/java/net/minecraft/world/storage/ThreadedFileIOBase.java b/src/main/java/net/minecraft/world/storage/ThreadedFileIOBase.java index 36a51ab..020a959 100644 --- a/src/main/java/net/minecraft/world/storage/ThreadedFileIOBase.java +++ b/src/main/java/net/minecraft/world/storage/ThreadedFileIOBase.java @@ -39,14 +39,14 @@ ++this.savedIOCounter; } - try - { - Thread.sleep(this.isThreadWaiting ? 0L : 10L); - } - catch (InterruptedException interruptedexception1) - { - interruptedexception1.printStackTrace(); - } +// try +// { +// Thread.sleep(this.isThreadWaiting ? 0L : 10L); +// } +// catch (InterruptedException interruptedexception1) +// { +// interruptedexception1.printStackTrace(); +// } } if (this.threadedIOQueue.isEmpty()) diff --git a/src/main/java/org/ultramine/server/chunk/AsynchronousExecutor.java b/src/main/java/org/ultramine/server/chunk/AsynchronousExecutor.java new file mode 100644 index 0000000..69d9135 --- /dev/null +++ b/src/main/java/org/ultramine/server/chunk/AsynchronousExecutor.java @@ -0,0 +1,475 @@ +package org.ultramine.server.chunk; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +import org.apache.commons.lang3.Validate; + +/** + * Executes tasks using a multi-stage process executor. Synchronous executions + * are via {@link AsynchronousExecutor#finishActive()} or the + * {@link AsynchronousExecutor#get(Object)} methods.
  • Stage 1 creates the + * object from a parameter, and is usually called asynchronously.
  • Stage 2 + * takes the parameter and object from stage 1 and does any synchronous + * processing to prepare it.
  • Stage 3 takes the parameter and object from + * stage 1, as well as a callback that was registered, and performs any + * synchronous calculations. + * + * @param

    + * The type of parameter you provide to make the object that will be + * created. It should implement {@link Object#hashCode()} and + * {@link Object#equals(Object)} if you want to get the value early. + * @param + * The type of object you provide. This is created in stage 1, and + * passed to stage 2, 3, and returned if get() is called. + * @param + * The type of callback you provide. You may register many of these + * to be passed to the provider in stage 3, one at a time. + * @param + * A type of exception you may throw and expect to be handled by the + * main thread + * @author Wesley Wolfe (c) 2012, 2014 + */ +public final class AsynchronousExecutor +{ + + public static interface CallBackProvider extends ThreadFactory + { + + /** + * Normally an asynchronous call, but can be synchronous + * + * @param parameter + * parameter object provided + * @return the created object + */ + T callStage1(P parameter) throws E; + + /** + * Synchronous call + * + * @param parameter + * parameter object provided + * @param object + * the previously created object + */ + void callStage2(P parameter, T object) throws E; + + /** + * Synchronous call, called multiple times, once per registered callback + * + * @param parameter + * parameter object provided + * @param object + * the previously created object + * @param callback + * the current callback to execute + */ + void callStage3(P parameter, T object, C callback) throws E; + } + + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater STATE_FIELD = AtomicIntegerFieldUpdater.newUpdater(AsynchronousExecutor.Task.class, "state"); + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private static boolean set(AsynchronousExecutor.Task $this, int expected, int value) + { + return STATE_FIELD.compareAndSet($this, expected, value); + } + + class Task implements Runnable + { + static final int PENDING = 0x0; + static final int STAGE_1_ASYNC = PENDING + 1; + static final int STAGE_1_SYNC = STAGE_1_ASYNC + 1; + static final int STAGE_1_COMPLETE = STAGE_1_SYNC + 1; + static final int FINISHED = STAGE_1_COMPLETE + 1; + + volatile int state = PENDING; + final P parameter; + T object; + final List callbacks = new LinkedList(); + E t = null; + + Task(final P parameter) + { + this.parameter = parameter; + } + + public void run() + { + if(initAsync()) + { + finished.add(this); + } + } + + boolean initAsync() + { + if(set(this, PENDING, STAGE_1_ASYNC)) + { + boolean ret = true; + + try + { + init(); + } + finally + { + if(set(this, STAGE_1_ASYNC, STAGE_1_COMPLETE)) + { + // No one is/will be waiting + } + else + { + // We know that the sync thread will be waiting + synchronized(this) + { + if(state != STAGE_1_SYNC) + { + // They beat us to the synchronized block + this.notifyAll(); + } + else + { + // We beat them to the synchronized block + } + state = STAGE_1_COMPLETE; // They're already + // synchronized, atomic + // locks are not needed + } + // We want to return false, because we know a + // synchronous task already handled the finish() + ret = false; // Don't return inside finally; VERY bad + // practice. + } + } + + return ret; + } + else + { + return false; + } + } + + void initSync() + { + if(set(this, PENDING, STAGE_1_COMPLETE)) + { + // If we succeed that variable switch, good as done + init(); + } + else if(set(this, STAGE_1_ASYNC, STAGE_1_SYNC)) + { + // Async thread is running, but this shouldn't be likely; we + // need to sync to wait on them because of it. + synchronized(this) + { + if(set(this, STAGE_1_SYNC, PENDING)) + { // They might NOT synchronized yet, atomic lock IS needed + // We are the first into the lock + while(state != STAGE_1_COMPLETE) + { + try + { + this.wait(); + } + catch(InterruptedException e) + { + Thread.currentThread().interrupt(); + throw new RuntimeException("Unable to handle interruption on " + parameter, e); + } + } + } + else + { + // They beat us to the synchronized block + } + } + } + else + { + // Async thread is not pending, the more likely situation for a + // task not pending + } + } + + @SuppressWarnings("unchecked") + void init() + { + try + { + object = provider.callStage1(parameter); + } + catch(final Throwable t) + { + this.t = (E) t; + } + } + + @SuppressWarnings("unchecked") + T get() throws E + { + initSync(); + if(callbacks.isEmpty()) + { + // 'this' is a placeholder to prevent callbacks from being empty + // during finish call + // See get method below + callbacks.add((C) this); + } + finish(); + return object; + } + + void finish() throws E + { + switch(state) + { + default: + case PENDING: + case STAGE_1_ASYNC: + case STAGE_1_SYNC: + throw new IllegalStateException("Attempting to finish unprepared(" + state + ") task(" + parameter + ")"); + case STAGE_1_COMPLETE: + try + { + if(t != null) + { + throw t; + } + if(callbacks.isEmpty()) + { + return; + } + + final CallBackProvider provider = AsynchronousExecutor.this.provider; + final P parameter = this.parameter; + final T object = this.object; + + provider.callStage2(parameter, object); + for(C callback : callbacks) + { + if(callback == this) + { + // 'this' is a placeholder to prevent callbacks from + // being empty on a get() call + // See get method above + continue; + } + provider.callStage3(parameter, object, callback); + } + } + finally + { + tasks.remove(parameter); + state = FINISHED; + } + case FINISHED: + } + } + + boolean drop() + { + if(set(this, PENDING, FINISHED)) + { + // If we succeed that variable switch, good as forgotten + tasks.remove(parameter); + return true; + } + else + { + // We need the async thread to finish normally to properly + // dispose of the task + return false; + } + } + } + + final CallBackProvider provider; + final Queue finished = new ConcurrentLinkedQueue(); + final Map tasks = new HashMap(); + final ThreadPoolExecutor pool; + + /** + * Uses a thread pool to pass executions to the provider. + * + * @see AsynchronousExecutor + */ + public AsynchronousExecutor(final CallBackProvider provider, final int coreSize) + { + Validate.notNull(provider, "Provider cannot be null"); + this.provider = provider; + + // We have an unbound queue size so do not need a max thread size + pool = new ThreadPoolExecutor(coreSize, Integer.MAX_VALUE, 60l, TimeUnit.SECONDS, new LinkedBlockingQueue(), provider); + } + + /** + * Adds a callback to the parameter provided, adding parameter to the queue + * if needed. + *

    + * This should always be synchronous. + */ + public void add(P parameter, C callback) + { + Task task = tasks.get(parameter); + if(task == null) + { + tasks.put(parameter, task = new Task(parameter)); + pool.execute(task); + } + task.callbacks.add(callback); + } + + /** + * This removes a particular callback from the specified parameter. + *

    + * If no callbacks remain for a given parameter, then the + * {@link CallBackProvider CallBackProvider's} stages may be omitted from + * execution. Stage 3 will have no callbacks, stage 2 will be skipped unless + * a {@link #get(Object)} is used, and stage 1 will be avoided on a + * best-effort basis. + *

    + * Subsequent calls to {@link #getSkipQueue(Object)} will always work. + *

    + * Subsequent calls to {@link #get(Object)} might work. + *

    + * This should always be synchronous + * + * @return true if no further execution for the parameter is possible, such + * that, no exceptions will be thrown in {@link #finishActive()} for + * the parameter, and {@link #get(Object)} will throw an + * {@link IllegalStateException}, false otherwise + * @throws IllegalStateException + * if parameter is not in the queue anymore + * @throws IllegalStateException + * if the callback was not specified for given parameter + */ + public boolean drop(P parameter, C callback) throws IllegalStateException + { + final Task task = tasks.get(parameter); + if(task == null) + { + throw new IllegalStateException("Unknown " + parameter); + } + if(!task.callbacks.remove(callback)) + { + throw new IllegalStateException("Unknown " + callback + " for " + parameter); + } + if(task.callbacks.isEmpty()) + { + return task.drop(); + } + return false; + } + + /** + * This method attempts to skip the waiting period for said parameter. + *

    + * This should always be synchronous. + * + * @throws IllegalStateException + * if the parameter is not in the queue anymore, or sometimes if + * called from asynchronous thread + */ + public T get(P parameter) throws E, IllegalStateException + { + final Task task = tasks.get(parameter); + if(task == null) + { + throw new IllegalStateException("Unknown " + parameter); + } + return task.get(); + } + + /** + * Processes a parameter as if it was in the queue, without ever passing to + * another thread. + */ + public T getSkipQueue(P parameter) throws E + { + return skipQueue(parameter); + } + + /** + * Processes a parameter as if it was in the queue, without ever passing to + * another thread. + */ + public T getSkipQueue(P parameter, C callback) throws E + { + final T object = skipQueue(parameter); + provider.callStage3(parameter, object, callback); + return object; + } + + /** + * Processes a parameter as if it was in the queue, without ever passing to + * another thread. + */ + public T getSkipQueue(P parameter, C... callbacks) throws E + { + final CallBackProvider provider = this.provider; + final T object = skipQueue(parameter); + for(C callback : callbacks) + { + provider.callStage3(parameter, object, callback); + } + return object; + } + + /** + * Processes a parameter as if it was in the queue, without ever passing to + * another thread. + */ + public T getSkipQueue(P parameter, Iterable callbacks) throws E + { + final CallBackProvider provider = this.provider; + final T object = skipQueue(parameter); + for(C callback : callbacks) + { + provider.callStage3(parameter, object, callback); + } + return object; + } + + private T skipQueue(P parameter) throws E + { + Task task = tasks.get(parameter); + if(task != null) + { + return task.get(); + } + T object = provider.callStage1(parameter); + provider.callStage2(parameter, object); + return object; + } + + /** + * This is the 'heartbeat' that should be called synchronously to finish any + * pending tasks + */ + public void finishActive() throws E + { + final Queue finished = this.finished; + while(!finished.isEmpty()) + { + finished.poll().finish(); + } + } + + public void setActiveThreads(final int coreSize) + { + pool.setCorePoolSize(coreSize); + } +} diff --git a/src/main/java/org/ultramine/server/chunk/ChunkIOExecutor.java b/src/main/java/org/ultramine/server/chunk/ChunkIOExecutor.java new file mode 100644 index 0000000..4fc71af --- /dev/null +++ b/src/main/java/org/ultramine/server/chunk/ChunkIOExecutor.java @@ -0,0 +1,42 @@ +package org.ultramine.server.chunk; + +import net.minecraft.world.World; +import net.minecraft.world.chunk.Chunk; +import net.minecraft.world.chunk.storage.AnvilChunkLoader; +import net.minecraft.world.gen.ChunkProviderServer; + +public class ChunkIOExecutor +{ + static final int BASE_THREADS = 1; + static final int PLAYERS_PER_THREAD = 50; + + private static final AsynchronousExecutor instance = new AsynchronousExecutor( + new ChunkIOProvider(), BASE_THREADS); + + // public static void waitForChunkLoad(World world, int x, int z) { + // instance.get(new QueuedChunk(ChunkHash.chunkToKey(x, z), null, world, + // null)); + // } + + public static void queueChunkLoad(World world, AnvilChunkLoader loader, ChunkProviderServer provider, int x, int z, Runnable runnable) + { + instance.add(new QueuedChunk(ChunkHash.chunkToKey(x, z), loader, world, provider), runnable); + } + + public static void dropQueuedChunkLoad(net.minecraft.world.World world, int x, int z, Runnable runnable) + { + instance.drop(new QueuedChunk(ChunkHash.chunkToKey(x, z), null, world, null), runnable); + } + + public static void adjustPoolSize(int players) + { + // int size = Math.max(BASE_THREADS, (int) Math.ceil(players / + // PLAYERS_PER_THREAD)); + // instance.setActiveThreads(size); + } + + public static void tick() + { + instance.finishActive(); + } +} diff --git a/src/main/java/org/ultramine/server/chunk/ChunkIOProvider.java b/src/main/java/org/ultramine/server/chunk/ChunkIOProvider.java new file mode 100644 index 0000000..aa27f3f --- /dev/null +++ b/src/main/java/org/ultramine/server/chunk/ChunkIOProvider.java @@ -0,0 +1,82 @@ +package org.ultramine.server.chunk; + +import java.util.concurrent.atomic.AtomicInteger; + +import net.minecraft.world.chunk.Chunk; +import net.minecraft.world.chunk.storage.AnvilChunkLoader; +import net.minecraftforge.common.MinecraftForge; +import net.minecraftforge.event.world.ChunkDataEvent; + +class ChunkIOProvider implements AsynchronousExecutor.CallBackProvider +{ + private final AtomicInteger threadNumber = new AtomicInteger(1); + + // async stuff + public Chunk callStage1(QueuedChunk queuedChunk) throws RuntimeException + { + AnvilChunkLoader loader = queuedChunk.loader; + Object[] data = loader.loadChunk__Async_CB(queuedChunk.world, ChunkHash.keyToX(queuedChunk.coords), ChunkHash.keyToZ(queuedChunk.coords)); + + if(data != null) + { + queuedChunk.compound = (net.minecraft.nbt.NBTTagCompound) data[1]; + return (Chunk) data[0]; + } + + return null; + } + + // sync stuff + public void callStage2(QueuedChunk queuedChunk, Chunk chunk) throws RuntimeException + { + if(chunk == null) + { + // If the chunk loading failed just do it synchronously (may + // generate) + queuedChunk.provider.loadChunk(ChunkHash.keyToX(queuedChunk.coords), ChunkHash.keyToZ(queuedChunk.coords)); + return; + } + + int x = ChunkHash.keyToX(queuedChunk.coords); + int z = ChunkHash.keyToZ(queuedChunk.coords); + + // See if someone already loaded this chunk while we were working on it + // (API, etc) + if(queuedChunk.provider.loadedChunkHashMap.containsKey(queuedChunk.coords)) + { + // Make sure it isn't queued for unload, we need it + queuedChunk.provider.chunksToUnload.remove(queuedChunk.coords); // Spigot + return; + } + + queuedChunk.loader.loadEntities(chunk, queuedChunk.compound.getCompoundTag("Level"), queuedChunk.world); + MinecraftForge.EVENT_BUS.post(new ChunkDataEvent.Load(chunk, queuedChunk.compound)); // MCPC+ + // - + // Don't + // call + // ChunkDataEvent.Load + // async + chunk.lastSaveTime = queuedChunk.provider.worldObj.getTotalWorldTime(); + queuedChunk.provider.loadedChunkHashMap.put(queuedChunk.coords, chunk); + chunk.onChunkLoad(); + + if(queuedChunk.provider.currentChunkProvider != null) + { + queuedChunk.provider.currentChunkProvider.recreateStructures(x, z); + } + + chunk.populateChunk(queuedChunk.provider, queuedChunk.provider, x, z); + } + + public void callStage3(QueuedChunk queuedChunk, Chunk chunk, Runnable runnable) throws RuntimeException + { + runnable.run(); + } + + public Thread newThread(Runnable runnable) + { + Thread thread = new Thread(runnable, "Chunk I/O Executor Thread-" + threadNumber.getAndIncrement()); + thread.setDaemon(true); + return thread; + } +} diff --git a/src/main/java/org/ultramine/server/chunk/QueuedChunk.java b/src/main/java/org/ultramine/server/chunk/QueuedChunk.java new file mode 100644 index 0000000..b690208 --- /dev/null +++ b/src/main/java/org/ultramine/server/chunk/QueuedChunk.java @@ -0,0 +1,41 @@ +package org.ultramine.server.chunk; + +import net.minecraft.nbt.NBTTagCompound; +import net.minecraft.world.World; +import net.minecraft.world.chunk.storage.AnvilChunkLoader; +import net.minecraft.world.gen.ChunkProviderServer; + +class QueuedChunk +{ + final int coords; + final AnvilChunkLoader loader; + final World world; + final ChunkProviderServer provider; + NBTTagCompound compound; + + public QueuedChunk(int coords, AnvilChunkLoader loader, World world, ChunkProviderServer provider) + { + this.coords = coords; + this.loader = loader; + this.world = world; + this.provider = provider; + } + + @Override + public int hashCode() + { + return coords ^ (world.hashCode() << 24); + } + + @Override + public boolean equals(Object object) + { + if(object instanceof QueuedChunk) + { + QueuedChunk other = (QueuedChunk) object; + return coords == other.coords && world == other.world; + } + + return false; + } +}