|
@@ -22,6 +22,7 @@ import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.IdentityHashMap;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.LinkedHashSet;
|
|
@@ -2273,7 +2274,6 @@ class JobInProgress {
|
|
|
this.status.setReduceProgress(1.0f);
|
|
|
}
|
|
|
this.finishTime = System.currentTimeMillis();
|
|
|
- cancelReservedSlots();
|
|
|
LOG.info("Job " + this.status.getJobID() +
|
|
|
" has completed successfully.");
|
|
|
JobHistory.JobInfo.logFinished(this.status.getJobID(), finishTime,
|
|
@@ -2357,17 +2357,21 @@ class JobInProgress {
|
|
|
for (int i = 0; i < reduces.length; i++) {
|
|
|
reduces[i].kill();
|
|
|
}
|
|
|
-
|
|
|
- // Clear out reserved tasktrackers
|
|
|
- cancelReservedSlots();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void cancelReservedSlots() {
|
|
|
- for (TaskTracker tt : trackersReservedForMaps.keySet()) {
|
|
|
+ // Make a copy of the set of TaskTrackers to prevent a
|
|
|
+ // ConcurrentModificationException ...
|
|
|
+ Set<TaskTracker> tm =
|
|
|
+ new HashSet<TaskTracker>(trackersReservedForMaps.keySet());
|
|
|
+ for (TaskTracker tt : tm) {
|
|
|
tt.unreserveSlots(TaskType.MAP, this);
|
|
|
}
|
|
|
- for (TaskTracker tt : trackersReservedForReduces.keySet()) {
|
|
|
+
|
|
|
+ Set<TaskTracker> tr =
|
|
|
+ new HashSet<TaskTracker>(trackersReservedForReduces.keySet());
|
|
|
+ for (TaskTracker tt : tr) {
|
|
|
tt.unreserveSlots(TaskType.REDUCE, this);
|
|
|
}
|
|
|
}
|
|
@@ -2658,6 +2662,8 @@ class JobInProgress {
|
|
|
* from the various tables.
|
|
|
*/
|
|
|
synchronized void garbageCollect() {
|
|
|
+ //Cancel task tracker reservation
|
|
|
+ cancelReservedSlots();
|
|
|
// Let the JobTracker know that a job is complete
|
|
|
jobtracker.getInstrumentation().decWaitingMaps(getJobID(), pendingMaps());
|
|
|
jobtracker.getInstrumentation().decWaitingReduces(getJobID(), pendingReduces());
|