|
@@ -22,17 +22,17 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
|
|
|
-import org.slf4j.Logger;
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
|
|
|
class IndexCache {
|
|
|
|
|
|
private final JobConf conf;
|
|
|
private final int totalMemoryAllowed;
|
|
|
private AtomicInteger totalMemoryUsed = new AtomicInteger();
|
|
|
- private static final Logger LOG = LoggerFactory.getLogger(IndexCache.class);
|
|
|
+ private static final Log LOG = LogFactory.getLog(IndexCache.class);
|
|
|
|
|
|
private final ConcurrentHashMap<String,IndexInformation> cache =
|
|
|
new ConcurrentHashMap<String,IndexInformation>();
|
|
@@ -72,12 +72,11 @@ class IndexCache {
|
|
|
try {
|
|
|
info.wait();
|
|
|
} catch (InterruptedException e) {
|
|
|
- Thread.currentThread().interrupt();
|
|
|
throw new IOException("Interrupted waiting for construction", e);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- LOG.debug("IndexCache HIT: MapId {} found", mapId);
|
|
|
+ LOG.debug("IndexCache HIT: MapId " + mapId + " found");
|
|
|
}
|
|
|
|
|
|
if (info.mapSpillRecord.size() == 0 ||
|
|
@@ -107,91 +106,63 @@ class IndexCache {
|
|
|
try {
|
|
|
info.wait();
|
|
|
} catch (InterruptedException e) {
|
|
|
- Thread.currentThread().interrupt();
|
|
|
throw new IOException("Interrupted waiting for construction", e);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- LOG.debug("IndexCache HIT: MapId {} found", mapId);
|
|
|
+ LOG.debug("IndexCache HIT: MapId " + mapId + " found");
|
|
|
return info;
|
|
|
}
|
|
|
- LOG.debug("IndexCache MISS: MapId {} not found", mapId);
|
|
|
+ LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
|
|
|
SpillRecord tmp = null;
|
|
|
- boolean success = false;
|
|
|
try {
|
|
|
tmp = new SpillRecord(indexFileName, conf, expectedIndexOwner);
|
|
|
- success = true;
|
|
|
- } catch (Throwable e) {
|
|
|
+ } catch (Throwable e) {
|
|
|
tmp = new SpillRecord(0);
|
|
|
cache.remove(mapId);
|
|
|
- if (e instanceof InterruptedException) {
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- }
|
|
|
throw new IOException("Error Reading IndexFile", e);
|
|
|
- } finally {
|
|
|
- synchronized (newInd) {
|
|
|
+ } finally {
|
|
|
+ synchronized (newInd) {
|
|
|
newInd.mapSpillRecord = tmp;
|
|
|
- if (success) {
|
|
|
- // Only add mapId to the queue for successful read and after added to
|
|
|
- // the cache. Once in the queue, it is now eligible for removal once
|
|
|
- // construction is finished.
|
|
|
- queue.add(mapId);
|
|
|
- if (totalMemoryUsed.addAndGet(newInd.getSize()) > totalMemoryAllowed) {
|
|
|
- freeIndexInformation();
|
|
|
- }
|
|
|
- }
|
|
|
newInd.notifyAll();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+ queue.add(mapId);
|
|
|
+
|
|
|
+ if (totalMemoryUsed.addAndGet(newInd.getSize()) > totalMemoryAllowed) {
|
|
|
+ freeIndexInformation();
|
|
|
+ }
|
|
|
return newInd;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * This method removes the map from the cache if it is present in the queue.
|
|
|
+ * This method removes the map from the cache if index information for this
|
|
|
+ * map is loaded(size>0), index information entry in cache will not be
|
|
|
+ * removed if it is in the loading phrase(size=0), this prevents corruption
|
|
|
+ * of totalMemoryUsed. It should be called when a map output on this tracker
|
|
|
+ * is discarded.
|
|
|
* @param mapId The taskID of this map.
|
|
|
*/
|
|
|
- public void removeMap(String mapId) throws IOException {
|
|
|
- // Successfully removing the mapId from the queue enters into a contract
|
|
|
- // that this thread will remove the corresponding mapId from the cache.
|
|
|
- if (!queue.remove(mapId)) {
|
|
|
- LOG.debug("Map ID {} not found in queue", mapId);
|
|
|
- return;
|
|
|
- }
|
|
|
- removeMapInternal(mapId);
|
|
|
- }
|
|
|
-
|
|
|
- /** This method should only be called upon successful removal of mapId from
|
|
|
- * the queue. The mapId will be removed from the cache and totalUsedMemory
|
|
|
- * will be decremented.
|
|
|
- * @param mapId the cache item to be removed
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private void removeMapInternal(String mapId) throws IOException {
|
|
|
- IndexInformation info = cache.remove(mapId);
|
|
|
- if (info == null) {
|
|
|
- // Inconsistent state as presence in queue implies presence in cache
|
|
|
- LOG.warn("Map ID " + mapId + " not found in cache");
|
|
|
+ public void removeMap(String mapId) {
|
|
|
+ IndexInformation info = cache.get(mapId);
|
|
|
+ if (info == null || isUnderConstruction(info)) {
|
|
|
return;
|
|
|
}
|
|
|
- try {
|
|
|
- synchronized(info) {
|
|
|
- while (isUnderConstruction(info)) {
|
|
|
- info.wait();
|
|
|
- }
|
|
|
- totalMemoryUsed.getAndAdd(-info.getSize());
|
|
|
+ info = cache.remove(mapId);
|
|
|
+ if (info != null) {
|
|
|
+ totalMemoryUsed.addAndGet(-info.getSize());
|
|
|
+ if (!queue.remove(mapId)) {
|
|
|
+ LOG.warn("Map ID" + mapId + " not found in queue!!");
|
|
|
}
|
|
|
- } catch (InterruptedException e) {
|
|
|
- totalMemoryUsed.getAndAdd(-info.getSize());
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- throw new IOException("Interrupted waiting for construction", e);
|
|
|
+ } else {
|
|
|
+ LOG.info("Map ID " + mapId + " not found in cache");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * This method checks if cache and totalMemoryUsed is consistent.
|
|
|
+ * This method checks if cache and totolMemoryUsed is consistent.
|
|
|
* It is only used for unit test.
|
|
|
- * @return True if cache and totalMemoryUsed is consistent
|
|
|
+ * @return True if cache and totolMemoryUsed is consistent
|
|
|
*/
|
|
|
boolean checkTotalMemoryUsed() {
|
|
|
int totalSize = 0;
|
|
@@ -204,13 +175,13 @@ class IndexCache {
|
|
|
/**
|
|
|
* Bring memory usage below totalMemoryAllowed.
|
|
|
*/
|
|
|
- private synchronized void freeIndexInformation() throws IOException {
|
|
|
+ private synchronized void freeIndexInformation() {
|
|
|
while (totalMemoryUsed.get() > totalMemoryAllowed) {
|
|
|
- if(queue.isEmpty()) {
|
|
|
- break;
|
|
|
+ String s = queue.remove();
|
|
|
+ IndexInformation info = cache.remove(s);
|
|
|
+ if (info != null) {
|
|
|
+ totalMemoryUsed.addAndGet(-info.getSize());
|
|
|
}
|
|
|
- String mapId = queue.remove();
|
|
|
- removeMapInternal(mapId);
|
|
|
}
|
|
|
}
|
|
|
|