|
@@ -62,6 +62,7 @@ import org.apache.hadoop.filecache.TaskDistributedCacheManager;
|
|
import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
|
|
import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
|
|
import org.apache.hadoop.mapreduce.server.tasktracker.*;
|
|
import org.apache.hadoop.mapreduce.server.tasktracker.*;
|
|
import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.*;
|
|
import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.*;
|
|
|
|
+import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
import org.apache.hadoop.fs.DF;
|
|
import org.apache.hadoop.fs.DF;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
@@ -406,6 +407,11 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
*/
|
|
*/
|
|
private long diskHealthCheckInterval;
|
|
private long diskHealthCheckInterval;
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Whether the TT performs a full or relaxed version check with the JT.
|
|
|
|
+ */
|
|
|
|
+ private boolean relaxedVersionCheck;
|
|
|
|
+
|
|
/*
|
|
/*
|
|
* A list of commitTaskActions for whom commit response has been received
|
|
* A list of commitTaskActions for whom commit response has been received
|
|
*/
|
|
*/
|
|
@@ -1417,6 +1423,9 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
*/
|
|
*/
|
|
public TaskTracker(JobConf conf) throws IOException, InterruptedException {
|
|
public TaskTracker(JobConf conf) throws IOException, InterruptedException {
|
|
originalConf = conf;
|
|
originalConf = conf;
|
|
|
|
+ relaxedVersionCheck = conf.getBoolean(
|
|
|
|
+ CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_KEY,
|
|
|
|
+ CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_DEFAULT);
|
|
FILE_CACHE_SIZE = conf.getInt("mapred.tasktracker.file.cache.size", 2000);
|
|
FILE_CACHE_SIZE = conf.getInt("mapred.tasktracker.file.cache.size", 2000);
|
|
maxMapSlots = conf.getInt(
|
|
maxMapSlots = conf.getInt(
|
|
"mapred.tasktracker.map.tasks.maximum", 2);
|
|
"mapred.tasktracker.map.tasks.maximum", 2);
|
|
@@ -1582,6 +1591,32 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
return (heartbeatInterval / (numFinishedTasks * oobHeartbeatDamper + 1));
|
|
return (heartbeatInterval / (numFinishedTasks * oobHeartbeatDamper + 1));
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * @return true if this tasktracker is permitted to connect to
|
|
|
|
+ * the given jobtracker version
|
|
|
|
+ */
|
|
|
|
+ boolean isPermittedVersion(String jtBuildVersion, String jtVersion) {
|
|
|
|
+ boolean buildVersionMatch =
|
|
|
|
+ jtBuildVersion.equals(VersionInfo.getBuildVersion());
|
|
|
|
+ boolean versionMatch = jtVersion.equals(VersionInfo.getVersion());
|
|
|
|
+ if (buildVersionMatch && !versionMatch) {
|
|
|
|
+ throw new AssertionError("Invalid build. The build versions match" +
|
|
|
|
+ " but the JT version is " + jtVersion +
|
|
|
|
+ " and the TT version is " + VersionInfo.getVersion());
|
|
|
|
+ }
|
|
|
|
+ if (relaxedVersionCheck) {
|
|
|
|
+ if (!buildVersionMatch && versionMatch) {
|
|
|
|
+ LOG.info("Permitting tasktracker revision " + VersionInfo.getRevision() +
|
|
|
|
+ " to connect to jobtracker " + jtBuildVersion + " because " +
|
|
|
|
+ CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_KEY +
|
|
|
|
+ " is enabled");
|
|
|
|
+ }
|
|
|
|
+ return versionMatch;
|
|
|
|
+ } else {
|
|
|
|
+ return buildVersionMatch;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Main service loop. Will stay in this loop forever.
|
|
* Main service loop. Will stay in this loop forever.
|
|
*/
|
|
*/
|
|
@@ -1615,15 +1650,18 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
}
|
|
}
|
|
|
|
|
|
// If the TaskTracker is just starting up:
|
|
// If the TaskTracker is just starting up:
|
|
- // 1. Verify the buildVersion
|
|
|
|
|
|
+ // 1. Verify the versions matches with the JobTracker
|
|
// 2. Get the system directory & filesystem
|
|
// 2. Get the system directory & filesystem
|
|
if(justInited) {
|
|
if(justInited) {
|
|
- String jobTrackerBV = jobClient.getBuildVersion();
|
|
|
|
- if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {
|
|
|
|
|
|
+ String jtBuildVersion = jobClient.getBuildVersion();
|
|
|
|
+ String jtVersion = jobClient.getVIVersion();
|
|
|
|
+ if (!isPermittedVersion(jtBuildVersion, jtVersion)) {
|
|
String msg = "Shutting down. Incompatible buildVersion." +
|
|
String msg = "Shutting down. Incompatible buildVersion." +
|
|
- "\nJobTracker's: " + jobTrackerBV +
|
|
|
|
- "\nTaskTracker's: "+ VersionInfo.getBuildVersion();
|
|
|
|
- LOG.error(msg);
|
|
|
|
|
|
+ "\nJobTracker's: " + jtBuildVersion +
|
|
|
|
+ "\nTaskTracker's: "+ VersionInfo.getBuildVersion() +
|
|
|
|
+ " and " + CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_KEY +
|
|
|
|
+ " is " + (relaxedVersionCheck ? "enabled" : "not enabled");
|
|
|
|
+ LOG.fatal(msg);
|
|
try {
|
|
try {
|
|
jobClient.reportTaskTrackerError(taskTrackerName, null, msg);
|
|
jobClient.reportTaskTrackerError(taskTrackerName, null, msg);
|
|
} catch(Exception e ) {
|
|
} catch(Exception e ) {
|