|
@@ -83,6 +83,19 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
static long TASKTRACKER_EXPIRY_INTERVAL = 10 * 60 * 1000;
|
|
|
static long RETIRE_JOB_INTERVAL;
|
|
|
static long RETIRE_JOB_CHECK_INTERVAL;
|
|
|
+ // The interval after which one fault of a tracker will be discarded,
|
|
|
+ // if there are no faults during this.
|
|
|
+ private static long UPDATE_FAULTY_TRACKER_INTERVAL = 24 * 60 * 60 * 1000;
|
|
|
+ // The maximum percentage of trackers in cluster added
|
|
|
+ // to the 'blacklist' across all the jobs.
|
|
|
+ private static double MAX_BLACKLIST_PERCENT = 0.50;
|
|
|
+ // A tracker is blacklisted across jobs only if number of
|
|
|
+ // blacklists are X% above the average number of blacklists.
|
|
|
+ // X is the blacklist threshold here.
|
|
|
+ private double AVERAGE_BLACKLIST_THRESHOLD = 0.50;
|
|
|
+ // The maximum number of blacklists for a tracker after which the
|
|
|
+ // tracker could be blacklisted across all jobs
|
|
|
+ private int MAX_BLACKLISTS_PER_TRACKER = 4;
|
|
|
public static enum State { INITIALIZING, RUNNING }
|
|
|
State state = State.INITIALIZING;
|
|
|
private static final int SYSTEM_DIR_CLEANUP_RETRY_PERIOD = 10000;
|
|
@@ -313,6 +326,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) {
|
|
|
// Remove completely after marking the tasks as 'KILLED'
|
|
|
lostTaskTracker(leastRecent.getTrackerName());
|
|
|
+ // tracker is lost, and if it is blacklisted, remove
|
|
|
+ // it from the count of blacklisted trackers in the cluster
|
|
|
+ if (isBlacklisted(trackerName)) {
|
|
|
+ faultyTrackers.numBlacklistedTrackers -= 1;
|
|
|
+ }
|
|
|
updateTaskTrackerStatus(trackerName, null);
|
|
|
} else {
|
|
|
// Update time by inserting latest profile
|
|
@@ -402,8 +420,236 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ // The FaultInfo which indicates the number of faults of a tracker
|
|
|
+ // and when the last fault occurred
|
|
|
+ // and whether the tracker is blacklisted across all jobs or not
|
|
|
+ private static class FaultInfo {
|
|
|
+ int numFaults = 0;
|
|
|
+ long lastUpdated;
|
|
|
+ boolean blacklisted;
|
|
|
|
|
|
-
|
|
|
+ FaultInfo() {
|
|
|
+ numFaults = 0;
|
|
|
+ lastUpdated = System.currentTimeMillis();
|
|
|
+ blacklisted = false;
|
|
|
+ }
|
|
|
+
|
|
|
+ void setFaultCount(int num) {
|
|
|
+ numFaults = num;
|
|
|
+ }
|
|
|
+
|
|
|
+ void setLastUpdated(long timeStamp) {
|
|
|
+ lastUpdated = timeStamp;
|
|
|
+ }
|
|
|
+
|
|
|
+ int getFaultCount() {
|
|
|
+ return numFaults;
|
|
|
+ }
|
|
|
+
|
|
|
+ long getLastUpdated() {
|
|
|
+ return lastUpdated;
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isBlacklisted() {
|
|
|
+ return blacklisted;
|
|
|
+ }
|
|
|
+
|
|
|
+ void setBlacklist(boolean blacklist) {
|
|
|
+ blacklisted = blacklist;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class FaultyTrackersInfo {
|
|
|
+ // A map from hostName to its faults
|
|
|
+ private Map<String, FaultInfo> potentiallyFaultyTrackers =
|
|
|
+ new HashMap<String, FaultInfo>();
|
|
|
+ // This count gives the number of blacklisted trackers in the cluster
|
|
|
+ // at any time. This is maintained to avoid iteration over
|
|
|
+ // the potentiallyFaultyTrackers to get blacklisted trackers. And also
|
|
|
+ // this count doesn't include blacklisted trackers which are lost,
|
|
|
+ // although the fault info is maintained for lost trackers.
|
|
|
+ private volatile int numBlacklistedTrackers = 0;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Increments faults(blacklist by job) for the tracker by one.
|
|
|
+ *
|
|
|
+ * Adds the tracker to the potentially faulty list.
|
|
|
+ *
|
|
|
+ * @param hostName
|
|
|
+ */
|
|
|
+ void incrementFaults(String hostName) {
|
|
|
+ synchronized (potentiallyFaultyTrackers) {
|
|
|
+ FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
|
|
|
+ if (fi == null) {
|
|
|
+ fi = new FaultInfo();
|
|
|
+ potentiallyFaultyTrackers.put(hostName, fi);
|
|
|
+ }
|
|
|
+ int numFaults = fi.getFaultCount();
|
|
|
+ ++numFaults;
|
|
|
+ fi.setFaultCount(numFaults);
|
|
|
+ fi.setLastUpdated(System.currentTimeMillis());
|
|
|
+ if (!fi.isBlacklisted()) {
|
|
|
+ if (shouldBlacklist(hostName, numFaults)) {
|
|
|
+ LOG.info("Adding " + hostName + " to the blacklist" +
|
|
|
+ " across all jobs");
|
|
|
+ removeHostCapacity(hostName);
|
|
|
+ fi.setBlacklist(true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Blacklists the tracker across all jobs if
|
|
|
+ * <ol>
|
|
|
+ * <li>#faults are more than
|
|
|
+ * MAX_BLACKLISTS_PER_TRACKER (configurable) blacklists</li>
|
|
|
+ * <li>#faults is 50% (configurable) above the average #faults</li>
|
|
|
+ * <li>50% the cluster is not blacklisted yet </li>
|
|
|
+ */
|
|
|
+ private boolean shouldBlacklist(String hostName, int numFaults) {
|
|
|
+ if (numFaults >= MAX_BLACKLISTS_PER_TRACKER) {
|
|
|
+ // calculate avgBlackLists
|
|
|
+ long clusterSize = getClusterStatus().getTaskTrackers();
|
|
|
+ long sum = 0;
|
|
|
+ for (FaultInfo f : potentiallyFaultyTrackers.values()) {
|
|
|
+ sum += f.getFaultCount();
|
|
|
+ }
|
|
|
+ double avg = (double) sum / clusterSize;
|
|
|
+
|
|
|
+ long totalCluster = clusterSize + numBlacklistedTrackers;
|
|
|
+ if ((numFaults - avg) > (AVERAGE_BLACKLIST_THRESHOLD * avg) &&
|
|
|
+ numBlacklistedTrackers < (totalCluster * MAX_BLACKLIST_PERCENT)) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Removes the tracker from blacklist and
|
|
|
+ * from potentially faulty list, when it is restarted.
|
|
|
+ *
|
|
|
+ * @param hostName
|
|
|
+ */
|
|
|
+ void markTrackerHealthy(String hostName) {
|
|
|
+ synchronized (potentiallyFaultyTrackers) {
|
|
|
+ FaultInfo fi = potentiallyFaultyTrackers.remove(hostName);
|
|
|
+ if (fi != null && fi.isBlacklisted()) {
|
|
|
+ LOG.info("Removing " + hostName + " from blacklist");
|
|
|
+ addHostCapacity(hostName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Check whether tasks can be assigned to the tracker.
|
|
|
+ *
|
|
|
+ * One fault of the tracker is discarded if there
|
|
|
+ * are no faults during one day. So, the tracker will get a
|
|
|
+ * chance again to run tasks of a job.
|
|
|
+ *
|
|
|
+ * @param hostName The tracker name
|
|
|
+ * @return true if the tracker is blacklisted
|
|
|
+ * false otherwise
|
|
|
+ */
|
|
|
+ boolean shouldAssignTasksToTracker(String hostName) {
|
|
|
+ synchronized (potentiallyFaultyTrackers) {
|
|
|
+ FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+ if (fi != null &&
|
|
|
+ (now - fi.getLastUpdated()) > UPDATE_FAULTY_TRACKER_INTERVAL) {
|
|
|
+ int numFaults = fi.getFaultCount() - 1;
|
|
|
+ if (fi.isBlacklisted()) {
|
|
|
+ LOG.info("Removing " + hostName + " from blacklist");
|
|
|
+ addHostCapacity(hostName);
|
|
|
+ fi.setBlacklist(false);
|
|
|
+ }
|
|
|
+ if (numFaults > 0) {
|
|
|
+ fi.setFaultCount(numFaults);
|
|
|
+ fi.setLastUpdated(now);
|
|
|
+ } else {
|
|
|
+ potentiallyFaultyTrackers.remove(hostName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return (fi != null && fi.isBlacklisted());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void removeHostCapacity(String hostName) {
|
|
|
+ synchronized (taskTrackers) {
|
|
|
+ // remove the capacity of trackers on this host
|
|
|
+ for (TaskTrackerStatus status : getStatusesOnHost(hostName)) {
|
|
|
+ totalMapTaskCapacity -= status.getMaxMapTasks();
|
|
|
+ totalReduceTaskCapacity -= status.getMaxReduceTasks();
|
|
|
+ }
|
|
|
+ numBlacklistedTrackers +=
|
|
|
+ uniqueHostsMap.remove(hostName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // This is called on tracker's restart or after a day of blacklist.
|
|
|
+ private void addHostCapacity(String hostName) {
|
|
|
+ synchronized (taskTrackers) {
|
|
|
+ int numTrackersOnHost = 0;
|
|
|
+ // add the capacity of trackers on the host
|
|
|
+ for (TaskTrackerStatus status : getStatusesOnHost(hostName)) {
|
|
|
+ totalMapTaskCapacity += status.getMaxMapTasks();
|
|
|
+ totalReduceTaskCapacity += status.getMaxReduceTasks();
|
|
|
+ numTrackersOnHost++;
|
|
|
+ }
|
|
|
+ uniqueHostsMap.put(hostName,
|
|
|
+ numTrackersOnHost);
|
|
|
+ numBlacklistedTrackers -= numTrackersOnHost;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Whether a host is blacklisted across all the jobs.
|
|
|
+ *
|
|
|
+ * @param hostName
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ boolean isBlacklisted(String hostName) {
|
|
|
+ synchronized (potentiallyFaultyTrackers) {
|
|
|
+ FaultInfo fi = null;
|
|
|
+ if ((fi = potentiallyFaultyTrackers.get(hostName)) != null) {
|
|
|
+ return fi.isBlacklisted();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ int getFaultCount(String hostName) {
|
|
|
+ synchronized (potentiallyFaultyTrackers) {
|
|
|
+ FaultInfo fi = null;
|
|
|
+ if ((fi = potentiallyFaultyTrackers.get(hostName)) != null) {
|
|
|
+ return fi.getFaultCount();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get all task tracker statuses on given host
|
|
|
+ *
|
|
|
+ * @param hostName
|
|
|
+ * @return {@link java.util.List} of {@link TaskTrackerStatus}
|
|
|
+ */
|
|
|
+ private List<TaskTrackerStatus> getStatusesOnHost(String hostName) {
|
|
|
+ List<TaskTrackerStatus> statuses = new ArrayList<TaskTrackerStatus>();
|
|
|
+ synchronized (taskTrackers) {
|
|
|
+ for (TaskTrackerStatus status : taskTrackers.values()) {
|
|
|
+ if (hostName.equals(status.getHost())) {
|
|
|
+ statuses.add(status);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return statuses;
|
|
|
+ }
|
|
|
+
|
|
|
///////////////////////////////////////////////////////
|
|
|
// Used to recover the jobs upon restart
|
|
|
///////////////////////////////////////////////////////
|
|
@@ -675,9 +921,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
String trackerName = attempt.get(Keys.TRACKER_NAME);
|
|
|
String trackerHostName =
|
|
|
JobInProgress.convertTrackerNameToHostName(trackerName);
|
|
|
- int index = trackerHostName.indexOf("_");
|
|
|
- trackerHostName =
|
|
|
- trackerHostName.substring(index + 1, trackerHostName.length());
|
|
|
int port = attempt.getInt(Keys.HTTP_PORT);
|
|
|
|
|
|
long attemptStartTime = attempt.getLong(Keys.START_TIME);
|
|
@@ -991,6 +1234,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
// Number of resolved entries
|
|
|
int numResolved;
|
|
|
|
|
|
+ private FaultyTrackersInfo faultyTrackers = new FaultyTrackersInfo();
|
|
|
+
|
|
|
//
|
|
|
// Watch and expire TaskTracker objects using these structures.
|
|
|
// We can map from Name->TaskTrackerStatus, or we can expire by time.
|
|
@@ -1063,6 +1308,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
RETIRE_JOB_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.interval", 24 * 60 * 60 * 1000);
|
|
|
RETIRE_JOB_CHECK_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.check", 60 * 1000);
|
|
|
MAX_COMPLETE_USER_JOBS_IN_MEMORY = conf.getInt("mapred.jobtracker.completeuserjobs.maximum", 100);
|
|
|
+ MAX_BLACKLISTS_PER_TRACKER =
|
|
|
+ conf.getInt("mapred.max.tracker.blacklists", 4);
|
|
|
+
|
|
|
+ //This configuration is there solely for tuning purposes and
|
|
|
+ //once this feature has been tested in real clusters and an appropriate
|
|
|
+ //value for the threshold has been found, this config might be taken out.
|
|
|
+ AVERAGE_BLACKLIST_THRESHOLD =
|
|
|
+ conf.getFloat("mapred.cluster.average.blacklist.threshold", 0.5f);
|
|
|
|
|
|
// This is a directory of temporary submission files. We delete it
|
|
|
// on startup, and can delete any files that we're done with
|
|
@@ -1538,6 +1791,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
|
|
|
long now = System.currentTimeMillis();
|
|
|
|
|
|
+ // add the blacklisted trackers to potentially faulty list
|
|
|
+ if (job.getStatus().getRunState() == JobStatus.SUCCEEDED) {
|
|
|
+ if (job.getNoOfBlackListedTrackers() > 0) {
|
|
|
+ for (String hostName : job.getBlackListedTrackers()) {
|
|
|
+ faultyTrackers.incrementFaults(hostName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
// Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs of a given user
|
|
|
// in memory; information about the purged jobs is available via
|
|
|
// JobHistory.
|
|
@@ -1681,11 +1943,82 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
return v;
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get all the task trackers in the cluster
|
|
|
+ *
|
|
|
+ * @return {@link Collection} of {@link TaskTrackerStatus}
|
|
|
+ */
|
|
|
public Collection<TaskTrackerStatus> taskTrackers() {
|
|
|
synchronized (taskTrackers) {
|
|
|
return taskTrackers.values();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the active task tracker statuses in the cluster
|
|
|
+ *
|
|
|
+ * @return {@link Collection} of active {@link TaskTrackerStatus}
|
|
|
+ */
|
|
|
+ public Collection<TaskTrackerStatus> activeTaskTrackers() {
|
|
|
+ Collection<TaskTrackerStatus> activeTrackers =
|
|
|
+ new ArrayList<TaskTrackerStatus>();
|
|
|
+ synchronized (taskTrackers) {
|
|
|
+ for (TaskTrackerStatus status : taskTrackers.values()) {
|
|
|
+ if (!faultyTrackers.isBlacklisted(status.getHost())) {
|
|
|
+ activeTrackers.add(status);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return activeTrackers;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the blacklisted task tracker statuses in the cluster
|
|
|
+ *
|
|
|
+ * @return {@link Collection} of blacklisted {@link TaskTrackerStatus}
|
|
|
+ */
|
|
|
+ public Collection<TaskTrackerStatus> blacklistedTaskTrackers() {
|
|
|
+ Collection<TaskTrackerStatus> blacklistedTrackers =
|
|
|
+ new ArrayList<TaskTrackerStatus>();
|
|
|
+ synchronized (taskTrackers) {
|
|
|
+ for (TaskTrackerStatus status : taskTrackers.values()) {
|
|
|
+ if (faultyTrackers.isBlacklisted(status.getHost())) {
|
|
|
+ blacklistedTrackers.add(status);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return blacklistedTrackers;
|
|
|
+ }
|
|
|
+
|
|
|
+ int getFaultCount(String hostName) {
|
|
|
+ return faultyTrackers.getFaultCount(hostName);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the number of blacklisted trackers across all the jobs
|
|
|
+ *
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ int getBlacklistedTrackerCount() {
|
|
|
+ return faultyTrackers.numBlacklistedTrackers;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Whether the tracker is blacklisted or not
|
|
|
+ *
|
|
|
+ * @param trackerID
|
|
|
+ *
|
|
|
+ * @return true if blacklisted, false otherwise
|
|
|
+ */
|
|
|
+ public boolean isBlacklisted(String trackerID) {
|
|
|
+ TaskTrackerStatus status = getTaskTracker(trackerID);
|
|
|
+ if (status != null) {
|
|
|
+ return faultyTrackers.isBlacklisted(status.getHost());
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
public TaskTrackerStatus getTaskTracker(String trackerID) {
|
|
|
synchronized (taskTrackers) {
|
|
|
return taskTrackers.get(trackerID);
|
|
@@ -1810,10 +2143,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
* tasks or jobs, and also 'reset' instructions during contingencies.
|
|
|
*/
|
|
|
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
|
|
|
- boolean initialContact, boolean acceptNewTasks, short responseId)
|
|
|
+ boolean restarted,
|
|
|
+ boolean acceptNewTasks,
|
|
|
+ short responseId)
|
|
|
throws IOException {
|
|
|
LOG.debug("Got heartbeat from: " + status.getTrackerName() +
|
|
|
- " (initialContact: " + initialContact +
|
|
|
+ " (restarted: " + restarted +
|
|
|
" acceptNewTasks: " + acceptNewTasks + ")" +
|
|
|
" with responseId: " + responseId);
|
|
|
|
|
@@ -1824,12 +2159,19 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
|
|
|
// First check if the last heartbeat response got through
|
|
|
String trackerName = status.getTrackerName();
|
|
|
+ boolean isBlacklisted = false;
|
|
|
+ if (restarted) {
|
|
|
+ faultyTrackers.markTrackerHealthy(status.getHost());
|
|
|
+ } else {
|
|
|
+ isBlacklisted =
|
|
|
+ faultyTrackers.shouldAssignTasksToTracker(status.getHost());
|
|
|
+ }
|
|
|
|
|
|
HeartbeatResponse prevHeartbeatResponse =
|
|
|
trackerToHeartbeatResponseMap.get(trackerName);
|
|
|
boolean addRestartInfo = false;
|
|
|
|
|
|
- if (initialContact != true) {
|
|
|
+ if (restarted != true) {
|
|
|
// If this isn't the 'initial contact' from the tasktracker,
|
|
|
// there is something seriously wrong if the JobTracker has
|
|
|
// no record of the 'previous heartbeat'; if so, ask the
|
|
@@ -1841,6 +2183,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
addRestartInfo = true;
|
|
|
} else {
|
|
|
// Jobtracker might have restarted but no recovery is needed
|
|
|
+ // otherwise this code should not be reached
|
|
|
LOG.warn("Serious problem, cannot find record of 'previous' " +
|
|
|
"heartbeat for '" + trackerName +
|
|
|
"'; reinitializing the tasktracker");
|
|
@@ -1865,13 +2208,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
|
|
|
// Process this heartbeat
|
|
|
short newResponseId = (short)(responseId + 1);
|
|
|
- if (!processHeartbeat(status, initialContact)) {
|
|
|
- if (prevHeartbeatResponse != null) {
|
|
|
- trackerToHeartbeatResponseMap.remove(trackerName);
|
|
|
- }
|
|
|
-
|
|
|
+ if (!processHeartbeat(status, restarted)) {
|
|
|
return new HeartbeatResponse(newResponseId,
|
|
|
- new TaskTrackerAction[] {new ReinitTrackerAction()});
|
|
|
+ new TaskTrackerAction[] {new ReinitTrackerAction()});
|
|
|
}
|
|
|
|
|
|
// Initialize the response to be sent for the heartbeat
|
|
@@ -1879,7 +2218,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
|
|
|
|
|
|
// Check for new tasks to be executed on the tasktracker
|
|
|
- if (acceptNewTasks) {
|
|
|
+ if (acceptNewTasks && !isBlacklisted) {
|
|
|
TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);
|
|
|
if (taskTrackerStatus == null) {
|
|
|
LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
|
|
@@ -1983,8 +2322,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
if (oldStatus != null) {
|
|
|
totalMaps -= oldStatus.countMapTasks();
|
|
|
totalReduces -= oldStatus.countReduceTasks();
|
|
|
- totalMapTaskCapacity -= oldStatus.getMaxMapTasks();
|
|
|
- totalReduceTaskCapacity -= oldStatus.getMaxReduceTasks();
|
|
|
+ if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
|
|
|
+ totalMapTaskCapacity -= oldStatus.getMaxMapTasks();
|
|
|
+ totalReduceTaskCapacity -= oldStatus.getMaxReduceTasks();
|
|
|
+ }
|
|
|
if (status == null) {
|
|
|
taskTrackers.remove(trackerName);
|
|
|
Integer numTaskTrackersInHost =
|
|
@@ -2001,8 +2342,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
if (status != null) {
|
|
|
totalMaps += status.countMapTasks();
|
|
|
totalReduces += status.countReduceTasks();
|
|
|
- totalMapTaskCapacity += status.getMaxMapTasks();
|
|
|
- totalReduceTaskCapacity += status.getMaxReduceTasks();
|
|
|
+ if (!faultyTrackers.isBlacklisted(status.getHost())) {
|
|
|
+ totalMapTaskCapacity += status.getMaxMapTasks();
|
|
|
+ totalReduceTaskCapacity += status.getMaxReduceTasks();
|
|
|
+ }
|
|
|
boolean alreadyPresent = false;
|
|
|
if (taskTrackers.containsKey(trackerName)) {
|
|
|
alreadyPresent = true;
|
|
@@ -2026,7 +2369,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
* Process incoming heartbeat messages from the task trackers.
|
|
|
*/
|
|
|
private synchronized boolean processHeartbeat(
|
|
|
- TaskTrackerStatus trackerStatus, boolean initialContact) {
|
|
|
+ TaskTrackerStatus trackerStatus,
|
|
|
+ boolean restarted) {
|
|
|
String trackerName = trackerStatus.getTrackerName();
|
|
|
trackerStatus.setLastSeen(System.currentTimeMillis());
|
|
|
|
|
@@ -2034,7 +2378,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
synchronized (trackerExpiryQueue) {
|
|
|
boolean seenBefore = updateTaskTrackerStatus(trackerName,
|
|
|
trackerStatus);
|
|
|
- if (initialContact) {
|
|
|
+ if (restarted) {
|
|
|
// If it's first contact, then clear out
|
|
|
// any state hanging around
|
|
|
if (seenBefore) {
|
|
@@ -2044,12 +2388,17 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
// If not first contact, there should be some record of the tracker
|
|
|
if (!seenBefore) {
|
|
|
LOG.warn("Status from unknown Tracker : " + trackerName);
|
|
|
- updateTaskTrackerStatus(trackerName, null);
|
|
|
+ // This is lost tracker that came back now, if it blacklisted
|
|
|
+ // increment the count of blacklisted trackers in the cluster
|
|
|
+ if (isBlacklisted(trackerName)) {
|
|
|
+ faultyTrackers.numBlacklistedTrackers += 1;
|
|
|
+ }
|
|
|
+ addNewTracker(trackerStatus);
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (initialContact) {
|
|
|
+ if (restarted) {
|
|
|
addNewTracker(trackerStatus);
|
|
|
}
|
|
|
}
|
|
@@ -2282,7 +2631,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
|
|
|
public synchronized ClusterStatus getClusterStatus() {
|
|
|
synchronized (taskTrackers) {
|
|
|
- return new ClusterStatus(taskTrackers.size(),
|
|
|
+ return new ClusterStatus(taskTrackers.size() -
|
|
|
+ getBlacklistedTrackerCount(),
|
|
|
+ getBlacklistedTrackerCount(),
|
|
|
totalMaps,
|
|
|
totalReduces,
|
|
|
totalMapTaskCapacity,
|