|
@@ -18,13 +18,9 @@
|
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
|
|
|
-import java.io.BufferedReader;
|
|
|
-import java.io.File;
|
|
|
-import java.io.FileInputStream;
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
import java.io.PrintWriter;
|
|
|
-import java.io.InputStreamReader;
|
|
|
import java.io.Writer;
|
|
|
import java.lang.management.ManagementFactory;
|
|
|
import java.net.BindException;
|
|
@@ -53,6 +49,7 @@ import java.util.TreeSet;
|
|
|
import java.util.Vector;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.CopyOnWriteArrayList;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -61,27 +58,33 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
-import org.apache.hadoop.fs.LocalFileSystem;
|
|
|
import org.apache.hadoop.fs.LocalDirAllocator;
|
|
|
+import org.apache.hadoop.fs.LocalFileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
-import org.apache.hadoop.mapred.JobSubmissionProtocol;
|
|
|
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
|
|
|
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSecretManager;
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
import org.apache.hadoop.http.HttpServer;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
-import org.apache.hadoop.ipc.Server;
|
|
|
import org.apache.hadoop.ipc.RPC.VersionMismatch;
|
|
|
+import org.apache.hadoop.ipc.Server;
|
|
|
import org.apache.hadoop.mapred.AuditLogger.Constants;
|
|
|
-import org.apache.hadoop.mapred.Counters.CountersExceededException;
|
|
|
import org.apache.hadoop.mapred.JobHistory.Keys;
|
|
|
-import org.apache.hadoop.mapred.JobHistory.Listener;
|
|
|
import org.apache.hadoop.mapred.JobHistory.Values;
|
|
|
import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
|
|
|
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
|
|
|
import org.apache.hadoop.mapred.QueueManager.QueueACL;
|
|
|
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
|
|
|
+import org.apache.hadoop.mapreduce.ClusterMetrics;
|
|
|
+import org.apache.hadoop.mapreduce.TaskType;
|
|
|
+import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
|
|
|
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
|
|
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
|
|
|
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSecretManager;
|
|
|
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
|
|
|
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
|
+import org.apache.hadoop.metrics2.util.MBeans;
|
|
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
@@ -89,6 +92,7 @@ import org.apache.hadoop.net.Node;
|
|
|
import org.apache.hadoop.net.NodeBase;
|
|
|
import org.apache.hadoop.net.ScriptBasedMapping;
|
|
|
import org.apache.hadoop.security.AccessControlException;
|
|
|
+import org.apache.hadoop.security.Credentials;
|
|
|
import org.apache.hadoop.security.Groups;
|
|
|
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
|
|
|
import org.apache.hadoop.security.SecurityUtil;
|
|
@@ -96,6 +100,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
|
|
import org.apache.hadoop.security.authorize.AccessControlList;
|
|
|
import org.apache.hadoop.security.authorize.AuthorizationException;
|
|
|
+import org.apache.hadoop.security.authorize.PolicyProvider;
|
|
|
import org.apache.hadoop.security.authorize.ProxyUsers;
|
|
|
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
|
|
|
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
|
@@ -104,16 +109,6 @@ import org.apache.hadoop.util.HostsFileReader;
|
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.util.VersionInfo;
|
|
|
-
|
|
|
-import org.apache.hadoop.mapreduce.ClusterMetrics;
|
|
|
-import org.apache.hadoop.mapreduce.JobSubmissionFiles;
|
|
|
-import org.apache.hadoop.mapreduce.TaskType;
|
|
|
-import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
|
|
|
-import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
|
|
-import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
|
|
|
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
|
-import org.apache.hadoop.metrics2.util.MBeans;
|
|
|
-import org.apache.hadoop.security.Credentials;
|
|
|
import org.mortbay.util.ajax.JSON;
|
|
|
|
|
|
/*******************************************************
|
|
@@ -203,7 +198,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
|
|
|
public static enum State { INITIALIZING, RUNNING }
|
|
|
State state = State.INITIALIZING;
|
|
|
- private static final int FS_ACCESS_RETRY_PERIOD = 10000;
|
|
|
+ private static final int FS_ACCESS_RETRY_PERIOD = 1000;
|
|
|
static final String JOB_INFO_FILE = "job-info";
|
|
|
static final String JOB_TOKEN_FILE = "jobToken";
|
|
|
private DNSToSwitchMapping dnsToSwitchMapping;
|
|
@@ -276,6 +271,16 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
return clock;
|
|
|
}
|
|
|
|
|
|
+ static final String JT_HDFS_MONITOR_ENABLE =
|
|
|
+ "mapreduce.jt.hdfs.monitor.enable";
|
|
|
+ static final boolean DEFAULT_JT_HDFS_MONITOR_THREAD_ENABLE = false;
|
|
|
+
|
|
|
+ static final String JT_HDFS_MONITOR_THREAD_INTERVAL =
|
|
|
+ "mapreduce.jt.hdfs.monitor.interval.ms";
|
|
|
+ static final int DEFAULT_JT_HDFS_MONITOR_THREAD_INTERVAL_MS = 5000;
|
|
|
+
|
|
|
+ private Thread hdfsMonitor;
|
|
|
+
|
|
|
/**
|
|
|
* Start the JobTracker with given configuration.
|
|
|
*
|
|
@@ -1872,8 +1877,179 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
this(conf, identifier, clock, new QueueManager(new Configuration(conf)));
|
|
|
}
|
|
|
|
|
|
+ private void initJTConf(JobConf conf) {
|
|
|
+ if (conf.getBoolean(
|
|
|
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, false)) {
|
|
|
+ LOG.warn(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY +
|
|
|
+ " is enabled, disabling it");
|
|
|
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, false);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void initializeFilesystem() throws IOException, InterruptedException {
|
|
|
+ // Connect to HDFS NameNode
|
|
|
+ while (!Thread.currentThread().isInterrupted() && fs == null) {
|
|
|
+ try {
|
|
|
+ fs = getMROwner().doAs(new PrivilegedExceptionAction<FileSystem>() {
|
|
|
+ public FileSystem run() throws IOException {
|
|
|
+ return FileSystem.get(conf);
|
|
|
+ }});
|
|
|
+ } catch (IOException ie) {
|
|
|
+ fs = null;
|
|
|
+ LOG.info("Problem connecting to HDFS Namenode... re-trying", ie);
|
|
|
+ Thread.sleep(FS_ACCESS_RETRY_PERIOD);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (Thread.currentThread().isInterrupted()) {
|
|
|
+ throw new InterruptedException();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Ensure HDFS is healthy
|
|
|
+ if ("hdfs".equalsIgnoreCase(fs.getUri().getScheme())) {
|
|
|
+ while (!DistributedFileSystem.isHealthy(fs.getUri())) {
|
|
|
+ LOG.info("HDFS initialized but not 'healthy' yet, waiting...");
|
|
|
+ Thread.sleep(FS_ACCESS_RETRY_PERIOD);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void initialize()
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ // initialize history parameters.
|
|
|
+ final JobTracker jtFinal = this;
|
|
|
+
|
|
|
+ getMROwner().doAs(new PrivilegedExceptionAction<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean run() throws Exception {
|
|
|
+ JobHistory.init(jtFinal, conf, jtFinal.localMachine,
|
|
|
+ jtFinal.startTime);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ // start the recovery manager
|
|
|
+ recoveryManager = new RecoveryManager();
|
|
|
+
|
|
|
+ while (!Thread.currentThread().isInterrupted()) {
|
|
|
+ try {
|
|
|
+ // if we haven't contacted the namenode go ahead and do it
|
|
|
+ // clean up the system dir, which will only work if hdfs is out of
|
|
|
+ // safe mode
|
|
|
+ if(systemDir == null) {
|
|
|
+ systemDir = new Path(getSystemDir());
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ FileStatus systemDirStatus = fs.getFileStatus(systemDir);
|
|
|
+ if (!systemDirStatus.getOwner().equals(
|
|
|
+ getMROwner().getShortUserName())) {
|
|
|
+ throw new AccessControlException("The systemdir " + systemDir +
|
|
|
+ " is not owned by " + getMROwner().getShortUserName());
|
|
|
+ }
|
|
|
+ if (!systemDirStatus.getPermission().equals(SYSTEM_DIR_PERMISSION)) {
|
|
|
+ LOG.warn("Incorrect permissions on " + systemDir +
|
|
|
+ ". Setting it to " + SYSTEM_DIR_PERMISSION);
|
|
|
+ fs.setPermission(systemDir,new FsPermission(SYSTEM_DIR_PERMISSION));
|
|
|
+ }
|
|
|
+ } catch (FileNotFoundException fnf) {} //ignore
|
|
|
+ // Make sure that the backup data is preserved
|
|
|
+ FileStatus[] systemDirData = fs.listStatus(this.systemDir);
|
|
|
+ // Check if the history is enabled .. as we cant have persistence with
|
|
|
+ // history disabled
|
|
|
+ if (conf.getBoolean("mapred.jobtracker.restart.recover", false)
|
|
|
+ && systemDirData != null) {
|
|
|
+ for (FileStatus status : systemDirData) {
|
|
|
+ try {
|
|
|
+ recoveryManager.checkAndAddJob(status);
|
|
|
+ } catch (Throwable t) {
|
|
|
+ LOG.warn("Failed to add the job " + status.getPath().getName(),
|
|
|
+ t);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if there are jobs to be recovered
|
|
|
+ hasRestarted = recoveryManager.shouldRecover();
|
|
|
+ if (hasRestarted) {
|
|
|
+ break; // if there is something to recover else clean the sys dir
|
|
|
+ }
|
|
|
+ }
|
|
|
+ LOG.info("Cleaning up the system directory");
|
|
|
+ fs.delete(systemDir, true);
|
|
|
+ if (FileSystem.mkdirs(fs, systemDir,
|
|
|
+ new FsPermission(SYSTEM_DIR_PERMISSION))) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ LOG.error("Mkdirs failed to create " + systemDir);
|
|
|
+ } catch (AccessControlException ace) {
|
|
|
+ LOG.warn("Failed to operate on mapred.system.dir (" + systemDir
|
|
|
+ + ") because of permissions.");
|
|
|
+ LOG.warn("Manually delete the mapred.system.dir (" + systemDir
|
|
|
+ + ") and then start the JobTracker.");
|
|
|
+ LOG.warn("Bailing out ... ", ace);
|
|
|
+ throw ace;
|
|
|
+ } catch (IOException ie) {
|
|
|
+ LOG.info("problem cleaning system directory: " + systemDir, ie);
|
|
|
+ }
|
|
|
+ Thread.sleep(FS_ACCESS_RETRY_PERIOD);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (Thread.currentThread().isInterrupted()) {
|
|
|
+ throw new InterruptedException();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Same with 'localDir' except it's always on the local disk.
|
|
|
+ if (!hasRestarted) {
|
|
|
+ conf.deleteLocalFiles(SUBDIR);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Initialize history DONE folder
|
|
|
+ FileSystem historyFS = getMROwner().doAs(
|
|
|
+ new PrivilegedExceptionAction<FileSystem>() {
|
|
|
+ public FileSystem run() throws IOException {
|
|
|
+ JobHistory.initDone(conf, fs);
|
|
|
+ final String historyLogDir =
|
|
|
+ JobHistory.getCompletedJobHistoryLocation().toString();
|
|
|
+ infoServer.setAttribute("historyLogDir", historyLogDir);
|
|
|
+
|
|
|
+ infoServer.setAttribute
|
|
|
+ ("serialNumberDirectoryDigits",
|
|
|
+ Integer.valueOf(JobHistory.serialNumberDirectoryDigits()));
|
|
|
+
|
|
|
+ infoServer.setAttribute
|
|
|
+ ("serialNumberTotalDigits",
|
|
|
+ Integer.valueOf(JobHistory.serialNumberTotalDigits()));
|
|
|
+
|
|
|
+ return new Path(historyLogDir).getFileSystem(conf);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ infoServer.setAttribute("fileSys", historyFS);
|
|
|
+ infoServer.setAttribute("jobConf", conf);
|
|
|
+ infoServer.setAttribute("aclManager", aclsManager);
|
|
|
+
|
|
|
+ if (JobHistoryServer.isEmbedded(conf)) {
|
|
|
+ LOG.info("History server being initialized in embedded mode");
|
|
|
+ jobHistoryServer = new JobHistoryServer(conf, aclsManager, infoServer);
|
|
|
+ jobHistoryServer.start();
|
|
|
+ LOG.info("Job History Server web address: " + JobHistoryServer.getAddress(conf));
|
|
|
+ }
|
|
|
+
|
|
|
+ //initializes the job status store
|
|
|
+ completedJobStatusStore = new CompletedJobStatusStore(conf, aclsManager);
|
|
|
+
|
|
|
+ // Setup HDFS monitoring
|
|
|
+ if (this.conf.getBoolean(
|
|
|
+ JT_HDFS_MONITOR_ENABLE, DEFAULT_JT_HDFS_MONITOR_THREAD_ENABLE)) {
|
|
|
+ hdfsMonitor = new HDFSMonitorThread(this.conf, this, this.fs);
|
|
|
+ hdfsMonitor.start();
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
JobTracker(final JobConf conf, String identifier, Clock clock, QueueManager qm)
|
|
|
- throws IOException, InterruptedException {
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+
|
|
|
+ initJTConf(conf);
|
|
|
+
|
|
|
this.queueManager = qm;
|
|
|
this.clock = clock;
|
|
|
// Set ports, start RPC servers, setup security policy etc.
|
|
@@ -1979,7 +2155,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
// Set service-level authorization security policy
|
|
|
if (conf.getBoolean(
|
|
|
ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
|
|
|
- ServiceAuthorizationManager.refresh(conf, new MapReducePolicyProvider());
|
|
|
+ PolicyProvider policyProvider =
|
|
|
+ (PolicyProvider)(ReflectionUtils.newInstance(
|
|
|
+ conf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
|
|
|
+ MapReducePolicyProvider.class, PolicyProvider.class),
|
|
|
+ conf));
|
|
|
+ ServiceAuthorizationManager.refresh(conf, policyProvider);
|
|
|
}
|
|
|
|
|
|
int handlerCount = conf.getInt("mapred.job.tracker.handler.count", 10);
|
|
@@ -2006,16 +2187,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort,
|
|
|
tmpInfoPort == 0, conf, aclsManager.getAdminsAcl());
|
|
|
infoServer.setAttribute("job.tracker", this);
|
|
|
- // initialize history parameters.
|
|
|
- final JobTracker jtFinal = this;
|
|
|
- getMROwner().doAs(new PrivilegedExceptionAction<Boolean>() {
|
|
|
- @Override
|
|
|
- public Boolean run() throws Exception {
|
|
|
- JobHistory.init(jtFinal, conf,jtFinal.localMachine,
|
|
|
- jtFinal.startTime);
|
|
|
- return true;
|
|
|
- }
|
|
|
- });
|
|
|
|
|
|
infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
|
|
|
infoServer.start();
|
|
@@ -2035,125 +2206,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
infoBindAddress + ":" + this.infoPort);
|
|
|
LOG.info("JobTracker webserver: " + this.infoServer.getPort());
|
|
|
|
|
|
- // start the recovery manager
|
|
|
- recoveryManager = new RecoveryManager();
|
|
|
-
|
|
|
- while (!Thread.currentThread().isInterrupted()) {
|
|
|
- try {
|
|
|
- // if we haven't contacted the namenode go ahead and do it
|
|
|
- if (fs == null) {
|
|
|
- fs = getMROwner().doAs(new PrivilegedExceptionAction<FileSystem>() {
|
|
|
- public FileSystem run() throws IOException {
|
|
|
- return FileSystem.get(conf);
|
|
|
- }});
|
|
|
- }
|
|
|
- // clean up the system dir, which will only work if hdfs is out of
|
|
|
- // safe mode
|
|
|
- if(systemDir == null) {
|
|
|
- systemDir = new Path(getSystemDir());
|
|
|
- }
|
|
|
- try {
|
|
|
- FileStatus systemDirStatus = fs.getFileStatus(systemDir);
|
|
|
- if (!systemDirStatus.getOwner().equals(
|
|
|
- getMROwner().getShortUserName())) {
|
|
|
- throw new AccessControlException("The systemdir " + systemDir +
|
|
|
- " is not owned by " + getMROwner().getShortUserName());
|
|
|
- }
|
|
|
- if (!systemDirStatus.getPermission().equals(SYSTEM_DIR_PERMISSION)) {
|
|
|
- LOG.warn("Incorrect permissions on " + systemDir +
|
|
|
- ". Setting it to " + SYSTEM_DIR_PERMISSION);
|
|
|
- fs.setPermission(systemDir,new FsPermission(SYSTEM_DIR_PERMISSION));
|
|
|
- }
|
|
|
- } catch (FileNotFoundException fnf) {} //ignore
|
|
|
- // Make sure that the backup data is preserved
|
|
|
- FileStatus[] systemDirData = fs.listStatus(this.systemDir);
|
|
|
- // Check if the history is enabled .. as we cant have persistence with
|
|
|
- // history disabled
|
|
|
- if (conf.getBoolean("mapred.jobtracker.restart.recover", false)
|
|
|
- && systemDirData != null) {
|
|
|
- for (FileStatus status : systemDirData) {
|
|
|
- try {
|
|
|
- recoveryManager.checkAndAddJob(status);
|
|
|
- } catch (Throwable t) {
|
|
|
- LOG.warn("Failed to add the job " + status.getPath().getName(),
|
|
|
- t);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Check if there are jobs to be recovered
|
|
|
- hasRestarted = recoveryManager.shouldRecover();
|
|
|
- if (hasRestarted) {
|
|
|
- break; // if there is something to recover else clean the sys dir
|
|
|
- }
|
|
|
- }
|
|
|
- LOG.info("Cleaning up the system directory");
|
|
|
- fs.delete(systemDir, true);
|
|
|
- if (FileSystem.mkdirs(fs, systemDir,
|
|
|
- new FsPermission(SYSTEM_DIR_PERMISSION))) {
|
|
|
- break;
|
|
|
- }
|
|
|
- LOG.error("Mkdirs failed to create " + systemDir);
|
|
|
- } catch (AccessControlException ace) {
|
|
|
- LOG.warn("Failed to operate on mapred.system.dir (" + systemDir
|
|
|
- + ") because of permissions.");
|
|
|
- LOG.warn("Manually delete the mapred.system.dir (" + systemDir
|
|
|
- + ") and then start the JobTracker.");
|
|
|
- LOG.warn("Bailing out ... ", ace);
|
|
|
- throw ace;
|
|
|
- } catch (IOException ie) {
|
|
|
- LOG.info("problem cleaning system directory: " + systemDir, ie);
|
|
|
- }
|
|
|
- Thread.sleep(FS_ACCESS_RETRY_PERIOD);
|
|
|
- }
|
|
|
-
|
|
|
- if (Thread.currentThread().isInterrupted()) {
|
|
|
- throw new InterruptedException();
|
|
|
- }
|
|
|
-
|
|
|
- // Same with 'localDir' except it's always on the local disk.
|
|
|
- if (!hasRestarted) {
|
|
|
- jobConf.deleteLocalFiles(SUBDIR);
|
|
|
- }
|
|
|
-
|
|
|
- // Initialize history DONE folder
|
|
|
- FileSystem historyFS = getMROwner().doAs(
|
|
|
- new PrivilegedExceptionAction<FileSystem>() {
|
|
|
- public FileSystem run() throws IOException {
|
|
|
- JobHistory.initDone(conf, fs);
|
|
|
- final String historyLogDir =
|
|
|
- JobHistory.getCompletedJobHistoryLocation().toString();
|
|
|
- infoServer.setAttribute("historyLogDir", historyLogDir);
|
|
|
-
|
|
|
- infoServer.setAttribute
|
|
|
- ("serialNumberDirectoryDigits",
|
|
|
- Integer.valueOf(JobHistory.serialNumberDirectoryDigits()));
|
|
|
-
|
|
|
- infoServer.setAttribute
|
|
|
- ("serialNumberTotalDigits",
|
|
|
- Integer.valueOf(JobHistory.serialNumberTotalDigits()));
|
|
|
-
|
|
|
- return new Path(historyLogDir).getFileSystem(conf);
|
|
|
- }
|
|
|
- });
|
|
|
- infoServer.setAttribute("fileSys", historyFS);
|
|
|
- infoServer.setAttribute("jobConf", conf);
|
|
|
- infoServer.setAttribute("aclManager", aclsManager);
|
|
|
-
|
|
|
- if (JobHistoryServer.isEmbedded(conf)) {
|
|
|
- LOG.info("History server being initialized in embedded mode");
|
|
|
- jobHistoryServer = new JobHistoryServer(conf, aclsManager, infoServer);
|
|
|
- jobHistoryServer.start();
|
|
|
- LOG.info("Job History Server web address: " + JobHistoryServer.getAddress(conf));
|
|
|
- }
|
|
|
-
|
|
|
this.dnsToSwitchMapping = ReflectionUtils.newInstance(
|
|
|
conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
|
|
|
DNSToSwitchMapping.class), conf);
|
|
|
this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels",
|
|
|
NetworkTopology.DEFAULT_HOST_LEVEL);
|
|
|
|
|
|
- //initializes the job status store
|
|
|
- completedJobStatusStore = new CompletedJobStatusStore(conf, aclsManager);
|
|
|
}
|
|
|
|
|
|
private static SimpleDateFormat getDateFormat() {
|
|
@@ -2244,6 +2302,17 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
* Run forever
|
|
|
*/
|
|
|
public void offerService() throws InterruptedException, IOException {
|
|
|
+ // start the inter-tracker server
|
|
|
+ this.interTrackerServer.start();
|
|
|
+
|
|
|
+ // Initialize the JobTracker FileSystem within safemode
|
|
|
+ setSafeModeInternal(SafeModeAction.SAFEMODE_ENTER);
|
|
|
+ initializeFilesystem();
|
|
|
+ setSafeModeInternal(SafeModeAction.SAFEMODE_LEAVE);
|
|
|
+
|
|
|
+ // Initialize JobTracker
|
|
|
+ initialize();
|
|
|
+
|
|
|
// Prepare for recovery. This is done irrespective of the status of restart
|
|
|
// flag.
|
|
|
while (true) {
|
|
@@ -2283,12 +2352,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
completedJobsStoreThread.start();
|
|
|
}
|
|
|
|
|
|
- // start the inter-tracker server once the jt is ready
|
|
|
- this.interTrackerServer.start();
|
|
|
-
|
|
|
synchronized (this) {
|
|
|
state = State.RUNNING;
|
|
|
}
|
|
|
+
|
|
|
LOG.info("Starting RUNNING");
|
|
|
|
|
|
this.interTrackerServer.join();
|
|
@@ -3488,6 +3555,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
// returns cleanup tasks first, then setup tasks.
|
|
|
synchronized List<Task> getSetupAndCleanupTasks(
|
|
|
TaskTrackerStatus taskTracker) throws IOException {
|
|
|
+
|
|
|
+ // Don't assign *any* new task in safemode
|
|
|
+ if (isInSafeMode()) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
int maxMapTasks = taskTracker.getMaxMapSlots();
|
|
|
int maxReduceTasks = taskTracker.getMaxReduceSlots();
|
|
|
int numMaps = taskTracker.countOccupiedMapSlots();
|
|
@@ -3622,6 +3695,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
public JobStatus submitJob(JobID jobId, String jobSubmitDir,
|
|
|
UserGroupInformation ugi, Credentials ts, boolean recovered)
|
|
|
throws IOException {
|
|
|
+ if (isInSafeMode()) {
|
|
|
+ throw new IOException("JobTracker in safemode");
|
|
|
+ }
|
|
|
+
|
|
|
JobInfo jobInfo = null;
|
|
|
if (ugi == null) {
|
|
|
ugi = UserGroupInformation.getCurrentUser();
|
|
@@ -4315,6 +4392,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
* @see org.apache.hadoop.mapred.JobSubmissionProtocol#getSystemDir()
|
|
|
*/
|
|
|
public String getSystemDir() {
|
|
|
+ // Might not be initialized yet, TT handles this
|
|
|
+ if (isInSafeMode()) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));
|
|
|
return fs.makeQualified(sysDir).toString();
|
|
|
}
|
|
@@ -5083,4 +5165,81 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
return map;
|
|
|
}
|
|
|
// End MXbean implementaiton
|
|
|
+
|
|
|
+ /**
|
|
|
+ * JobTracker SafeMode
|
|
|
+ */
|
|
|
+ // SafeMode actions
|
|
|
+ public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; }
|
|
|
+
|
|
|
+ private AtomicBoolean safeMode = new AtomicBoolean(false);
|
|
|
+ private AtomicBoolean adminSafeMode = new AtomicBoolean(false);
|
|
|
+ private String adminSafeModeUser = "";
|
|
|
+
|
|
|
+ public boolean setSafeMode(JobTracker.SafeModeAction safeModeAction)
|
|
|
+ throws IOException {
|
|
|
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
|
|
|
+
|
|
|
+ // Anyone can check JT safe-mode
|
|
|
+ if (safeModeAction == SafeModeAction.SAFEMODE_GET) {
|
|
|
+ boolean safeMode = this.safeMode.get();
|
|
|
+ LOG.info("Getting safemode information: safemode=" + safeMode + ". " +
|
|
|
+ "Requested by : " +
|
|
|
+ UserGroupInformation.getCurrentUser().getShortUserName());
|
|
|
+ AuditLogger.logSuccess(user, Constants.GET_SAFEMODE,
|
|
|
+ Constants.JOBTRACKER);
|
|
|
+ return safeMode;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check access for modifications to safe-mode
|
|
|
+ if (!aclsManager.isMRAdmin(UserGroupInformation.getCurrentUser())) {
|
|
|
+ AuditLogger.logFailure(user, Constants.SET_SAFEMODE,
|
|
|
+ aclsManager.getAdminsAcl().toString(), Constants.JOBTRACKER,
|
|
|
+ Constants.UNAUTHORIZED_USER);
|
|
|
+ throw new AccessControlException(user +
|
|
|
+ " is not authorized to set " +
|
|
|
+ " JobTracker safemode.");
|
|
|
+ }
|
|
|
+ AuditLogger.logSuccess(user, Constants.SET_SAFEMODE, Constants.JOBTRACKER);
|
|
|
+
|
|
|
+ boolean currSafeMode = setSafeModeInternal(safeModeAction);
|
|
|
+ adminSafeMode.set(currSafeMode);
|
|
|
+ adminSafeModeUser = user;
|
|
|
+ return currSafeMode;
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isInAdminSafeMode() {
|
|
|
+ return adminSafeMode.get();
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean setSafeModeInternal(JobTracker.SafeModeAction safeModeAction)
|
|
|
+ throws IOException {
|
|
|
+ if (safeModeAction != SafeModeAction.SAFEMODE_GET) {
|
|
|
+ boolean safeMode = false;
|
|
|
+ if (safeModeAction == SafeModeAction.SAFEMODE_ENTER) {
|
|
|
+ safeMode = true;
|
|
|
+ } else if (safeModeAction == SafeModeAction.SAFEMODE_LEAVE) {
|
|
|
+ safeMode = false;
|
|
|
+ }
|
|
|
+ LOG.info("Setting safe mode to " + safeMode + ". Requested by : " +
|
|
|
+ UserGroupInformation.getCurrentUser().getShortUserName());
|
|
|
+ this.safeMode.set(safeMode);
|
|
|
+ }
|
|
|
+ return this.safeMode.get();
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean isInSafeMode() {
|
|
|
+ return safeMode.get();
|
|
|
+ }
|
|
|
+
|
|
|
+ String getSafeModeText() {
|
|
|
+ if (!isInSafeMode())
|
|
|
+ return "OFF";
|
|
|
+ String safeModeInfo =
|
|
|
+ adminSafeMode.get() ?
|
|
|
+ "Set by admin <strong>" + adminSafeModeUser + "</strong>":
|
|
|
+ "HDFS unavailable";
|
|
|
+ return "<em>ON - " + safeModeInfo + "</em>";
|
|
|
+ }
|
|
|
+
|
|
|
}
|