|
@@ -727,6 +727,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
* Increments faults(blacklist by job) for the tracker by one.
|
|
|
*
|
|
|
* Adds the tracker to the potentially faulty list.
|
|
|
+ * Assumes JobTracker is locked on the entry.
|
|
|
*
|
|
|
* @param hostName
|
|
|
*/
|
|
@@ -816,12 +817,16 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // Assumes JobTracker is locked on the entry
|
|
|
private FaultInfo getFaultInfo(String hostName,
|
|
|
boolean createIfNeccessary) {
|
|
|
- FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
|
|
|
- if (fi == null && createIfNeccessary) {
|
|
|
- fi = new FaultInfo(clock.getTime());
|
|
|
- potentiallyFaultyTrackers.put(hostName, fi);
|
|
|
+ FaultInfo fi = null;
|
|
|
+ synchronized (potentiallyFaultyTrackers) {
|
|
|
+ fi = potentiallyFaultyTrackers.get(hostName);
|
|
|
+ if (fi == null && createIfNeccessary) {
|
|
|
+ fi = new FaultInfo();
|
|
|
+ potentiallyFaultyTrackers.put(hostName, fi);
|
|
|
+ }
|
|
|
}
|
|
|
return fi;
|
|
|
}
|
|
@@ -859,6 +864,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
* Removes the tracker from blacklist and
|
|
|
* from potentially faulty list, when it is restarted.
|
|
|
*
|
|
|
+ * Assumes JobTracker is locked on the entry.
|
|
|
+ *
|
|
|
* @param hostName
|
|
|
*/
|
|
|
void markTrackerHealthy(String hostName) {
|
|
@@ -877,6 +884,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
* One fault of the tracker is discarded if there
|
|
|
* are no faults during one day. So, the tracker will get a
|
|
|
* chance again to run tasks of a job.
|
|
|
+ * Assumes JobTracker is locked on the entry.
|
|
|
*
|
|
|
* @param hostName The tracker name
|
|
|
* @param now The current time
|
|
@@ -945,6 +953,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
/**
|
|
|
* Whether a host is blacklisted across all the jobs.
|
|
|
*
|
|
|
+ * Assumes JobTracker is locked on the entry.
|
|
|
* @param hostName
|
|
|
* @return
|
|
|
*/
|
|
@@ -958,6 +967,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
+ // Assumes JobTracker is locked on the entry.
|
|
|
int getFaultCount(String hostName) {
|
|
|
synchronized (potentiallyFaultyTrackers) {
|
|
|
FaultInfo fi = null;
|
|
@@ -968,6 +978,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
+ // Assumes JobTracker is locked on the entry.
|
|
|
Set<ReasonForBlackListing> getReasonForBlackListing(String hostName) {
|
|
|
synchronized (potentiallyFaultyTrackers) {
|
|
|
FaultInfo fi = null;
|
|
@@ -979,6 +990,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
|
|
|
|
|
|
+ // Assumes JobTracker is locked on the entry.
|
|
|
void setNodeHealthStatus(String hostName, boolean isHealthy, String reason) {
|
|
|
FaultInfo fi = null;
|
|
|
// If tracker is not healthy, create a fault info object
|
|
@@ -1007,6 +1019,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
/**
|
|
|
* Get all task tracker statuses on given host
|
|
|
*
|
|
|
+ * Assumes JobTracker is locked on the entry
|
|
|
* @param hostName
|
|
|
* @return {@link java.util.List} of {@link TaskTrackerStatus}
|
|
|
*/
|
|
@@ -2713,7 +2726,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
*
|
|
|
* @return {@link Collection} of {@link TaskTrackerStatus}
|
|
|
*/
|
|
|
- public Collection<TaskTrackerStatus> taskTrackers() {
|
|
|
+ // lock to taskTrackers should hold JT lock first.
|
|
|
+ public synchronized Collection<TaskTrackerStatus> taskTrackers() {
|
|
|
Collection<TaskTrackerStatus> ttStatuses;
|
|
|
synchronized (taskTrackers) {
|
|
|
ttStatuses =
|
|
@@ -2730,7 +2744,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
*
|
|
|
* @return {@link Collection} of active {@link TaskTrackerStatus}
|
|
|
*/
|
|
|
- public Collection<TaskTrackerStatus> activeTaskTrackers() {
|
|
|
+ // This method is synchronized to make sure that the locking order
|
|
|
+ // "taskTrackers lock followed by faultyTrackers.potentiallyFaultyTrackers
|
|
|
+ // lock" is under JobTracker lock to avoid deadlocks.
|
|
|
+ synchronized public Collection<TaskTrackerStatus> activeTaskTrackers() {
|
|
|
Collection<TaskTrackerStatus> activeTrackers =
|
|
|
new ArrayList<TaskTrackerStatus>();
|
|
|
synchronized (taskTrackers) {
|
|
@@ -2750,7 +2767,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
* The second element in the returned list contains the list of blacklisted
|
|
|
* tracker names.
|
|
|
*/
|
|
|
- public List<List<String>> taskTrackerNames() {
|
|
|
+ // This method is synchronized to make sure that the locking order
|
|
|
+ // "taskTrackers lock followed by faultyTrackers.potentiallyFaultyTrackers
|
|
|
+ // lock" is under JobTracker lock to avoid deadlocks.
|
|
|
+ synchronized public List<List<String>> taskTrackerNames() {
|
|
|
List<String> activeTrackers =
|
|
|
new ArrayList<String>();
|
|
|
List<String> blacklistedTrackers =
|
|
@@ -2776,7 +2796,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
*
|
|
|
* @return {@link Collection} of blacklisted {@link TaskTrackerStatus}
|
|
|
*/
|
|
|
- public Collection<TaskTrackerStatus> blacklistedTaskTrackers() {
|
|
|
+ // This method is synchronized to make sure that the locking order
|
|
|
+ // "taskTrackers lock followed by faultyTrackers.potentiallyFaultyTrackers
|
|
|
+ // lock" is under JobTracker lock to avoid deadlocks.
|
|
|
+ synchronized public Collection<TaskTrackerStatus> blacklistedTaskTrackers() {
|
|
|
Collection<TaskTrackerStatus> blacklistedTrackers =
|
|
|
new ArrayList<TaskTrackerStatus>();
|
|
|
synchronized (taskTrackers) {
|
|
@@ -2790,7 +2813,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
return blacklistedTrackers;
|
|
|
}
|
|
|
|
|
|
- int getFaultCount(String hostName) {
|
|
|
+ synchronized int getFaultCount(String hostName) {
|
|
|
return faultyTrackers.getFaultCount(hostName);
|
|
|
}
|
|
|
|
|
@@ -2810,7 +2833,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
*
|
|
|
* @return true if blacklisted, false otherwise
|
|
|
*/
|
|
|
- public boolean isBlacklisted(String trackerID) {
|
|
|
+ synchronized public boolean isBlacklisted(String trackerID) {
|
|
|
TaskTrackerStatus status = getTaskTrackerStatus(trackerID);
|
|
|
if (status != null) {
|
|
|
return faultyTrackers.isBlacklisted(status.getHost());
|
|
@@ -2818,7 +2841,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- public TaskTrackerStatus getTaskTrackerStatus(String trackerID) {
|
|
|
+ // lock to taskTrackers should hold JT lock first.
|
|
|
+ synchronized public TaskTrackerStatus getTaskTrackerStatus(String trackerID) {
|
|
|
TaskTracker taskTracker;
|
|
|
synchronized (taskTrackers) {
|
|
|
taskTracker = taskTrackers.get(trackerID);
|
|
@@ -2826,7 +2850,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
return (taskTracker == null) ? null : taskTracker.getStatus();
|
|
|
}
|
|
|
|
|
|
- public TaskTracker getTaskTracker(String trackerID) {
|
|
|
+ // lock to taskTrackers should hold JT lock first.
|
|
|
+ synchronized public TaskTracker getTaskTracker(String trackerID) {
|
|
|
synchronized (taskTrackers) {
|
|
|
return taskTrackers.get(trackerID);
|
|
|
}
|
|
@@ -2839,7 +2864,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
* Adds a new node to the jobtracker. It involves adding it to the expiry
|
|
|
* thread and adding it for resolution
|
|
|
*
|
|
|
- * Assuming trackerExpiryQueue is locked on entry
|
|
|
+ * Assumes JobTracker, taskTrackers and trackerExpiryQueue is locked on entry
|
|
|
*
|
|
|
* @param status Task Tracker's status
|
|
|
*/
|
|
@@ -4430,6 +4455,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
decommissionNodes(excludeSet);
|
|
|
}
|
|
|
|
|
|
+ // Assumes JobTracker, taskTrackers and trackerExpiryQueue is locked on entry
|
|
|
// Remove a tracker from the system
|
|
|
private void removeTracker(TaskTracker tracker) {
|
|
|
String trackerName = tracker.getTrackerName();
|
|
@@ -4709,7 +4735,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
this.queueManager.refreshAcls(new Configuration(this.conf));
|
|
|
}
|
|
|
|
|
|
- String getReasonsForBlacklisting(String host) {
|
|
|
+ synchronized String getReasonsForBlacklisting(String host) {
|
|
|
FaultInfo fi = faultyTrackers.getFaultInfo(host, false);
|
|
|
if (fi == null) {
|
|
|
return "";
|
|
@@ -4718,15 +4744,20 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
|
|
|
/** Test Methods */
|
|
|
- Set<ReasonForBlackListing> getReasonForBlackList(String host) {
|
|
|
+ synchronized Set<ReasonForBlackListing> getReasonForBlackList(String host) {
|
|
|
FaultInfo fi = faultyTrackers.getFaultInfo(host, false);
|
|
|
if (fi == null) {
|
|
|
return new HashSet<ReasonForBlackListing>();
|
|
|
}
|
|
|
return fi.getReasonforblacklisting();
|
|
|
}
|
|
|
-
|
|
|
- void incrementFaults(String hostName) {
|
|
|
+
|
|
|
+ /*
|
|
|
+ * This method is synchronized to make sure that the locking order
|
|
|
+ * "faultyTrackers.potentiallyFaultyTrackers lock followed by taskTrackers
|
|
|
+ * lock" is under JobTracker lock to avoid deadlocks.
|
|
|
+ */
|
|
|
+ synchronized void incrementFaults(String hostName) {
|
|
|
faultyTrackers.incrementFaults(hostName);
|
|
|
}
|
|
|
}
|