|
@@ -72,11 +72,12 @@ class IndexCache {
|
|
|
try {
|
|
|
info.wait();
|
|
|
} catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
throw new IOException("Interrupted waiting for construction", e);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- LOG.debug("IndexCache HIT: MapId " + mapId + " found");
|
|
|
+ LOG.debug("IndexCache HIT: MapId {} found", mapId);
|
|
|
}
|
|
|
|
|
|
if (info.mapSpillRecord.size() == 0 ||
|
|
@@ -106,63 +107,91 @@ class IndexCache {
|
|
|
try {
|
|
|
info.wait();
|
|
|
} catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
throw new IOException("Interrupted waiting for construction", e);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- LOG.debug("IndexCache HIT: MapId " + mapId + " found");
|
|
|
+ LOG.debug("IndexCache HIT: MapId {} found", mapId);
|
|
|
return info;
|
|
|
}
|
|
|
- LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
|
|
|
+ LOG.debug("IndexCache MISS: MapId {} not found", mapId);
|
|
|
SpillRecord tmp = null;
|
|
|
+ boolean success = false;
|
|
|
try {
|
|
|
tmp = new SpillRecord(indexFileName, conf, expectedIndexOwner);
|
|
|
- } catch (Throwable e) {
|
|
|
+ success = true;
|
|
|
+ } catch (Throwable e) {
|
|
|
tmp = new SpillRecord(0);
|
|
|
cache.remove(mapId);
|
|
|
+ if (e instanceof InterruptedException) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ }
|
|
|
throw new IOException("Error Reading IndexFile", e);
|
|
|
- } finally {
|
|
|
- synchronized (newInd) {
|
|
|
+ } finally {
|
|
|
+ synchronized (newInd) {
|
|
|
newInd.mapSpillRecord = tmp;
|
|
|
+ if (success) {
|
|
|
+ // Only add mapId to the queue for successful read and after added to
|
|
|
+ // the cache. Once in the queue, it is now eligible for removal once
|
|
|
+ // construction is finished.
|
|
|
+ queue.add(mapId);
|
|
|
+ if (totalMemoryUsed.addAndGet(newInd.getSize()) > totalMemoryAllowed) {
|
|
|
+ freeIndexInformation();
|
|
|
+ }
|
|
|
+ }
|
|
|
newInd.notifyAll();
|
|
|
}
|
|
|
}
|
|
|
- queue.add(mapId);
|
|
|
-
|
|
|
- if (totalMemoryUsed.addAndGet(newInd.getSize()) > totalMemoryAllowed) {
|
|
|
- freeIndexInformation();
|
|
|
- }
|
|
|
+
|
|
|
return newInd;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * This method removes the map from the cache if index information for this
|
|
|
- * map is loaded(size>0), index information entry in cache will not be
|
|
|
- * removed if it is in the loading phrase(size=0), this prevents corruption
|
|
|
- * of totalMemoryUsed. It should be called when a map output on this tracker
|
|
|
- * is discarded.
|
|
|
+ * This method removes the map from the cache if it is present in the queue.
|
|
|
* @param mapId The taskID of this map.
|
|
|
*/
|
|
|
- public void removeMap(String mapId) {
|
|
|
- IndexInformation info = cache.get(mapId);
|
|
|
- if (info == null || isUnderConstruction(info)) {
|
|
|
+ public void removeMap(String mapId) throws IOException {
|
|
|
+ // Successfully removing the mapId from the queue enters into a contract
|
|
|
+ // that this thread will remove the corresponding mapId from the cache.
|
|
|
+ if (!queue.remove(mapId)) {
|
|
|
+ LOG.debug("Map ID {} not found in queue", mapId);
|
|
|
return;
|
|
|
}
|
|
|
- info = cache.remove(mapId);
|
|
|
- if (info != null) {
|
|
|
- totalMemoryUsed.addAndGet(-info.getSize());
|
|
|
- if (!queue.remove(mapId)) {
|
|
|
- LOG.warn("Map ID" + mapId + " not found in queue!!");
|
|
|
+ removeMapInternal(mapId);
|
|
|
+ }
|
|
|
+
|
|
|
+ /** This method should only be called upon successful removal of mapId from
|
|
|
+ * the queue. The mapId will be removed from the cache and totalUsedMemory
|
|
|
+ * will be decremented.
|
|
|
+ * @param mapId the cache item to be removed
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private void removeMapInternal(String mapId) throws IOException {
|
|
|
+ IndexInformation info = cache.remove(mapId);
|
|
|
+ if (info == null) {
|
|
|
+ // Inconsistent state as presence in queue implies presence in cache
|
|
|
+ LOG.warn("Map ID " + mapId + " not found in cache");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ synchronized(info) {
|
|
|
+ while (isUnderConstruction(info)) {
|
|
|
+ info.wait();
|
|
|
+ }
|
|
|
+ totalMemoryUsed.getAndAdd(-info.getSize());
|
|
|
}
|
|
|
- } else {
|
|
|
- LOG.info("Map ID " + mapId + " not found in cache");
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ totalMemoryUsed.getAndAdd(-info.getSize());
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ throw new IOException("Interrupted waiting for construction", e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * This method checks if cache and totolMemoryUsed is consistent.
|
|
|
+ * This method checks if cache and totalMemoryUsed is consistent.
|
|
|
* It is only used for unit test.
|
|
|
- * @return True if cache and totolMemoryUsed is consistent
|
|
|
+ * @return True if cache and totalMemoryUsed is consistent
|
|
|
*/
|
|
|
boolean checkTotalMemoryUsed() {
|
|
|
int totalSize = 0;
|
|
@@ -175,13 +204,13 @@ class IndexCache {
|
|
|
/**
|
|
|
* Bring memory usage below totalMemoryAllowed.
|
|
|
*/
|
|
|
- private synchronized void freeIndexInformation() {
|
|
|
+ private synchronized void freeIndexInformation() throws IOException {
|
|
|
while (totalMemoryUsed.get() > totalMemoryAllowed) {
|
|
|
- String s = queue.remove();
|
|
|
- IndexInformation info = cache.remove(s);
|
|
|
- if (info != null) {
|
|
|
- totalMemoryUsed.addAndGet(-info.getSize());
|
|
|
+ if(queue.isEmpty()) {
|
|
|
+ break;
|
|
|
}
|
|
|
+ String mapId = queue.remove();
|
|
|
+ removeMapInternal(mapId);
|
|
|
}
|
|
|
}
|
|
|
|