|
@@ -72,6 +72,7 @@ import org.apache.hadoop.mapred.JobHistory.Listener;
|
|
|
import org.apache.hadoop.mapred.JobHistory.Values;
|
|
|
import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
|
|
|
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
|
|
|
+import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
|
|
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
@@ -484,18 +485,28 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ enum ReasonForBlackListing {
|
|
|
+ EXCEEDING_FAILURES,
|
|
|
+ NODE_UNHEALTHY
|
|
|
+ }
|
|
|
+
|
|
|
// The FaultInfo which indicates the number of faults of a tracker
|
|
|
// and when the last fault occurred
|
|
|
// and whether the tracker is blacklisted across all jobs or not
|
|
|
private static class FaultInfo {
|
|
|
+ static final String FAULT_FORMAT_STRING = "%d failures on the tracker";
|
|
|
int numFaults = 0;
|
|
|
long lastUpdated;
|
|
|
boolean blacklisted;
|
|
|
-
|
|
|
+
|
|
|
+ private boolean isHealthy;
|
|
|
+ private HashMap<ReasonForBlackListing, String>rfbMap;
|
|
|
+
|
|
|
FaultInfo() {
|
|
|
numFaults = 0;
|
|
|
lastUpdated = System.currentTimeMillis();
|
|
|
blacklisted = false;
|
|
|
+ rfbMap = new HashMap<ReasonForBlackListing, String>();
|
|
|
}
|
|
|
|
|
|
void setFaultCount(int num) {
|
|
@@ -518,9 +529,47 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
return blacklisted;
|
|
|
}
|
|
|
|
|
|
- void setBlacklist(boolean blacklist) {
|
|
|
- blacklisted = blacklist;
|
|
|
+ void setBlacklist(ReasonForBlackListing rfb,
|
|
|
+ String trackerFaultReport) {
|
|
|
+ blacklisted = true;
|
|
|
+ this.rfbMap.put(rfb, trackerFaultReport);
|
|
|
}
|
|
|
+
|
|
|
+ public void setHealthy(boolean isHealthy) {
|
|
|
+ this.isHealthy = isHealthy;
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean isHealthy() {
|
|
|
+ return isHealthy;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getTrackerFaultReport() {
|
|
|
+ StringBuffer sb = new StringBuffer();
|
|
|
+ for(String reasons : rfbMap.values()) {
|
|
|
+ sb.append(reasons);
|
|
|
+ sb.append("\n");
|
|
|
+ }
|
|
|
+ return sb.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ Set<ReasonForBlackListing> getReasonforblacklisting() {
|
|
|
+ return this.rfbMap.keySet();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void unBlacklist() {
|
|
|
+ this.blacklisted = false;
|
|
|
+ this.rfbMap.clear();
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean removeBlackListedReason(ReasonForBlackListing rfb) {
|
|
|
+ String str = rfbMap.remove(rfb);
|
|
|
+ return str!=null;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void addBlackListedReason(ReasonForBlackListing rfb, String reason) {
|
|
|
+ this.rfbMap.put(rfb, reason);
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
|
|
|
private class FaultyTrackersInfo {
|
|
@@ -543,26 +592,82 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
*/
|
|
|
void incrementFaults(String hostName) {
|
|
|
synchronized (potentiallyFaultyTrackers) {
|
|
|
- FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
|
|
|
- if (fi == null) {
|
|
|
- fi = new FaultInfo();
|
|
|
- potentiallyFaultyTrackers.put(hostName, fi);
|
|
|
- }
|
|
|
+ FaultInfo fi = getFaultInfo(hostName, true);
|
|
|
int numFaults = fi.getFaultCount();
|
|
|
++numFaults;
|
|
|
fi.setFaultCount(numFaults);
|
|
|
fi.setLastUpdated(System.currentTimeMillis());
|
|
|
- if (!fi.isBlacklisted()) {
|
|
|
- if (shouldBlacklist(hostName, numFaults)) {
|
|
|
- LOG.info("Adding " + hostName + " to the blacklist" +
|
|
|
- " across all jobs");
|
|
|
- removeHostCapacity(hostName);
|
|
|
- fi.setBlacklist(true);
|
|
|
- }
|
|
|
+ if (exceedsFaults(fi)) {
|
|
|
+ LOG.info("Adding " + hostName + " to the blacklist"
|
|
|
+ + " across all jobs");
|
|
|
+ String reason = String.format(FaultInfo.FAULT_FORMAT_STRING,
|
|
|
+ numFaults);
|
|
|
+ blackListTracker(hostName, reason,
|
|
|
+ ReasonForBlackListing.EXCEEDING_FAILURES);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void blackListTracker(String hostName, String reason, ReasonForBlackListing rfb) {
|
|
|
+ FaultInfo fi = getFaultInfo(hostName, true);
|
|
|
+ boolean blackListed = fi.isBlacklisted();
|
|
|
+ if(blackListed) {
|
|
|
+ if(fi.getReasonforblacklisting().contains(rfb)) {
|
|
|
+ return;
|
|
|
+ } else {
|
|
|
+ LOG.info("Adding blacklisted reason for tracker : " + hostName
|
|
|
+ + " Reason for blacklisting is : " + rfb);
|
|
|
+ fi.addBlackListedReason(rfb, reason);
|
|
|
+ }
|
|
|
+ return;
|
|
|
+ } else {
|
|
|
+ LOG.info("Blacklisting tracker : " + hostName
|
|
|
+ + " Reason for blacklisting is : " + rfb);
|
|
|
+ removeHostCapacity(hostName);
|
|
|
+ fi.setBlacklist(rfb, reason);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean canUnBlackListTracker(String hostName,
|
|
|
+ ReasonForBlackListing rfb) {
|
|
|
+ FaultInfo fi = getFaultInfo(hostName, false);
|
|
|
+ if(fi == null) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ Set<ReasonForBlackListing> rfbSet = fi.getReasonforblacklisting();
|
|
|
+ return fi.isBlacklisted() && rfbSet.contains(rfb);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void unBlackListTracker(String hostName,
|
|
|
+ ReasonForBlackListing rfb) {
|
|
|
+ // check if you can black list the tracker then call this methods
|
|
|
+ FaultInfo fi = getFaultInfo(hostName, false);
|
|
|
+ if(fi.removeBlackListedReason(rfb)) {
|
|
|
+ if(fi.getReasonforblacklisting().isEmpty()) {
|
|
|
+ addHostCapacity(hostName);
|
|
|
+ LOG.info("Unblacklisting tracker : " + hostName);
|
|
|
+ fi.unBlacklist();
|
|
|
+ //We have unBlackListed tracker, so tracker should
|
|
|
+ //definitely be healthy. Check fault count if fault count
|
|
|
+ //is zero don't keep it memory.
|
|
|
+ if(fi.numFaults == 0) {
|
|
|
+ potentiallyFaultyTrackers.remove(hostName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private FaultInfo getFaultInfo(String hostName,
|
|
|
+ boolean createIfNeccessary) {
|
|
|
+ FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
|
|
|
+ if (fi == null && createIfNeccessary) {
|
|
|
+ fi = new FaultInfo();
|
|
|
+ potentiallyFaultyTrackers.put(hostName, fi);
|
|
|
+ }
|
|
|
+ return fi;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Blacklists the tracker across all jobs if
|
|
|
* <ol>
|
|
@@ -570,9 +675,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
* MAX_BLACKLISTS_PER_TRACKER (configurable) blacklists</li>
|
|
|
* <li>#faults is 50% (configurable) above the average #faults</li>
|
|
|
* <li>50% the cluster is not blacklisted yet </li>
|
|
|
+ * </ol>
|
|
|
*/
|
|
|
- private boolean shouldBlacklist(String hostName, int numFaults) {
|
|
|
- if (numFaults >= MAX_BLACKLISTS_PER_TRACKER) {
|
|
|
+ private boolean exceedsFaults(FaultInfo fi) {
|
|
|
+ int faultCount = fi.getFaultCount();
|
|
|
+ if (faultCount >= MAX_BLACKLISTS_PER_TRACKER) {
|
|
|
// calculate avgBlackLists
|
|
|
long clusterSize = getClusterStatus().getTaskTrackers();
|
|
|
long sum = 0;
|
|
@@ -582,7 +689,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
double avg = (double) sum / clusterSize;
|
|
|
|
|
|
long totalCluster = clusterSize + numBlacklistedTrackers;
|
|
|
- if ((numFaults - avg) > (AVERAGE_BLACKLIST_THRESHOLD * avg) &&
|
|
|
+ if ((faultCount - avg) > (AVERAGE_BLACKLIST_THRESHOLD * avg) &&
|
|
|
numBlacklistedTrackers < (totalCluster * MAX_BLACKLIST_PERCENT)) {
|
|
|
return true;
|
|
|
}
|
|
@@ -625,16 +732,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
if (fi != null &&
|
|
|
(now - fi.getLastUpdated()) > UPDATE_FAULTY_TRACKER_INTERVAL) {
|
|
|
int numFaults = fi.getFaultCount() - 1;
|
|
|
- if (fi.isBlacklisted()) {
|
|
|
- LOG.info("Removing " + hostName + " from blacklist");
|
|
|
- addHostCapacity(hostName);
|
|
|
- fi.setBlacklist(false);
|
|
|
- }
|
|
|
- if (numFaults > 0) {
|
|
|
- fi.setFaultCount(numFaults);
|
|
|
- fi.setLastUpdated(now);
|
|
|
- } else {
|
|
|
- potentiallyFaultyTrackers.remove(hostName);
|
|
|
+ fi.setFaultCount(numFaults);
|
|
|
+ fi.setLastUpdated(now);
|
|
|
+ if (canUnBlackListTracker(hostName,
|
|
|
+ ReasonForBlackListing.EXCEEDING_FAILURES)) {
|
|
|
+ unBlackListTracker(hostName,
|
|
|
+ ReasonForBlackListing.EXCEEDING_FAILURES);
|
|
|
}
|
|
|
}
|
|
|
return (fi != null && fi.isBlacklisted());
|
|
@@ -703,6 +806,41 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
return 0;
|
|
|
}
|
|
|
+
|
|
|
+ Set<ReasonForBlackListing> getReasonForBlackListing(String hostName) {
|
|
|
+ synchronized (potentiallyFaultyTrackers) {
|
|
|
+ FaultInfo fi = null;
|
|
|
+ if ((fi = potentiallyFaultyTrackers.get(hostName)) != null) {
|
|
|
+ return fi.getReasonforblacklisting();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ void setNodeHealthStatus(String hostName, boolean isHealthy, String reason) {
|
|
|
+ FaultInfo fi = null;
|
|
|
+ // If tracker is not healthy, create a fault info object
|
|
|
+ // blacklist it.
|
|
|
+ if (!isHealthy) {
|
|
|
+ fi = getFaultInfo(hostName, true);
|
|
|
+ fi.setHealthy(isHealthy);
|
|
|
+ synchronized (potentiallyFaultyTrackers) {
|
|
|
+ blackListTracker(hostName, reason,
|
|
|
+ ReasonForBlackListing.NODE_UNHEALTHY);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ fi = getFaultInfo(hostName, false);
|
|
|
+ if (fi == null) {
|
|
|
+ return;
|
|
|
+ } else {
|
|
|
+ if (canUnBlackListTracker(hostName,
|
|
|
+ ReasonForBlackListing.NODE_UNHEALTHY)) {
|
|
|
+ unBlackListTracker(hostName, ReasonForBlackListing.NODE_UNHEALTHY);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2728,7 +2866,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
// Initialize the response to be sent for the heartbeat
|
|
|
HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
|
|
|
List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
|
|
|
-
|
|
|
+ isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());
|
|
|
// Check for new tasks to be executed on the tasktracker
|
|
|
if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
|
|
|
TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName) ;
|
|
@@ -2930,6 +3068,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
getInstrumentation().setReduceSlots(totalReduceTaskCapacity);
|
|
|
return oldStatus != null;
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+ private void updateNodeHealthStatus(TaskTrackerStatus trackerStatus) {
|
|
|
+ TaskTrackerHealthStatus status = trackerStatus.getHealthStatus();
|
|
|
+ synchronized (faultyTrackers) {
|
|
|
+ faultyTrackers.setNodeHealthStatus(trackerStatus.getHost(),
|
|
|
+ status.isNodeHealthy(), status.getHealthReport());
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Process incoming heartbeat messages from the task trackers.
|
|
@@ -2971,6 +3118,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
|
|
|
updateTaskStatuses(trackerStatus);
|
|
|
+ updateNodeHealthStatus(trackerStatus);
|
|
|
|
|
|
return true;
|
|
|
}
|
|
@@ -4222,4 +4370,25 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
UserGroupInformation.getCurrentUGI().getUserName());
|
|
|
this.queueManager.refreshAcls(new Configuration(this.conf));
|
|
|
}
|
|
|
+
|
|
|
+ String getReasonsForBlacklisting(String host) {
|
|
|
+ FaultInfo fi = faultyTrackers.getFaultInfo(host, false);
|
|
|
+ if (fi == null) {
|
|
|
+ return "";
|
|
|
+ }
|
|
|
+ return fi.getTrackerFaultReport();
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Test Methods */
|
|
|
+ Set<ReasonForBlackListing> getReasonForBlackList(String host) {
|
|
|
+ FaultInfo fi = faultyTrackers.getFaultInfo(host, false);
|
|
|
+ if (fi == null) {
|
|
|
+ return new HashSet<ReasonForBlackListing>();
|
|
|
+ }
|
|
|
+ return fi.getReasonforblacklisting();
|
|
|
+ }
|
|
|
+
|
|
|
+ void incrementFaults(String hostName) {
|
|
|
+ faultyTrackers.incrementFaults(hostName);
|
|
|
+ }
|
|
|
}
|