|
@@ -62,7 +62,6 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
|
|
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
|
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
-import org.apache.hadoop.yarn.api.records.ApplicationState;
|
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
|
@@ -70,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
import org.apache.hadoop.yarn.api.records.URL;
|
|
import org.apache.hadoop.yarn.api.records.URL;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
@@ -99,7 +99,7 @@ public class YARNRunner implements ClientProtocol {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Similar to {@link #YARNRunner(Configuration)} but allowing injecting
|
|
|
|
|
|
+ * Similar to {@link #YARNRunner(Configuration)} but allowing injecting
|
|
* {@link ResourceMgrDelegate}. Enables mocking and testing.
|
|
* {@link ResourceMgrDelegate}. Enables mocking and testing.
|
|
* @param conf the configuration object for the client
|
|
* @param conf the configuration object for the client
|
|
* @param resMgrDelegate the resourcemanager client handle.
|
|
* @param resMgrDelegate the resourcemanager client handle.
|
|
@@ -107,12 +107,12 @@ public class YARNRunner implements ClientProtocol {
|
|
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
|
|
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
|
|
this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));
|
|
this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
- * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)}
|
|
|
|
|
|
+ * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)}
|
|
* but allowing injecting {@link ClientCache}. Enable mocking and testing.
|
|
* but allowing injecting {@link ClientCache}. Enable mocking and testing.
|
|
* @param conf the configuration object
|
|
* @param conf the configuration object
|
|
- * @param resMgrDelegate the resource manager delegate
|
|
|
|
|
|
+ * @param resMgrDelegate the resource manager delegate
|
|
* @param clientCache the client cache object.
|
|
* @param clientCache the client cache object.
|
|
*/
|
|
*/
|
|
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
|
|
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
|
|
@@ -126,7 +126,7 @@ public class YARNRunner implements ClientProtocol {
|
|
throw new RuntimeException("Error in instantiating YarnClient", ufe);
|
|
throw new RuntimeException("Error in instantiating YarnClient", ufe);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
|
|
public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
|
|
throws IOException, InterruptedException {
|
|
throws IOException, InterruptedException {
|
|
@@ -152,7 +152,7 @@ public class YARNRunner implements ClientProtocol {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public ClusterMetrics getClusterMetrics() throws IOException,
|
|
public ClusterMetrics getClusterMetrics() throws IOException,
|
|
- InterruptedException {
|
|
|
|
|
|
+ InterruptedException {
|
|
return resMgrDelegate.getClusterMetrics();
|
|
return resMgrDelegate.getClusterMetrics();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -209,13 +209,13 @@ public class YARNRunner implements ClientProtocol {
|
|
public String getSystemDir() throws IOException, InterruptedException {
|
|
public String getSystemDir() throws IOException, InterruptedException {
|
|
return resMgrDelegate.getSystemDir();
|
|
return resMgrDelegate.getSystemDir();
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public long getTaskTrackerExpiryInterval() throws IOException,
|
|
public long getTaskTrackerExpiryInterval() throws IOException,
|
|
InterruptedException {
|
|
InterruptedException {
|
|
return resMgrDelegate.getTaskTrackerExpiryInterval();
|
|
return resMgrDelegate.getTaskTrackerExpiryInterval();
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
|
|
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
|
|
throws IOException, InterruptedException {
|
|
throws IOException, InterruptedException {
|
|
@@ -230,20 +230,20 @@ public class YARNRunner implements ClientProtocol {
|
|
}
|
|
}
|
|
|
|
|
|
// Construct necessary information to start the MR AM
|
|
// Construct necessary information to start the MR AM
|
|
- ApplicationSubmissionContext appContext =
|
|
|
|
|
|
+ ApplicationSubmissionContext appContext =
|
|
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
|
|
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
|
|
-
|
|
|
|
|
|
+
|
|
// Submit to ResourceManager
|
|
// Submit to ResourceManager
|
|
ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);
|
|
ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);
|
|
-
|
|
|
|
|
|
+
|
|
ApplicationReport appMaster = resMgrDelegate
|
|
ApplicationReport appMaster = resMgrDelegate
|
|
.getApplicationReport(applicationId);
|
|
.getApplicationReport(applicationId);
|
|
- String diagnostics =
|
|
|
|
- (appMaster == null ?
|
|
|
|
|
|
+ String diagnostics =
|
|
|
|
+ (appMaster == null ?
|
|
"application report is null" : appMaster.getDiagnostics());
|
|
"application report is null" : appMaster.getDiagnostics());
|
|
- if (appMaster == null || appMaster.getState() == ApplicationState.FAILED
|
|
|
|
- || appMaster.getState() == ApplicationState.KILLED) {
|
|
|
|
- throw new IOException("Failed to run job : " +
|
|
|
|
|
|
+ if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
|
|
|
|
+ || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
|
|
|
|
+ throw new IOException("Failed to run job : " +
|
|
diagnostics);
|
|
diagnostics);
|
|
}
|
|
}
|
|
return clientCache.getClient(jobId).getJobStatus(jobId);
|
|
return clientCache.getClient(jobId).getJobStatus(jobId);
|
|
@@ -266,7 +266,7 @@ public class YARNRunner implements ClientProtocol {
|
|
Configuration jobConf,
|
|
Configuration jobConf,
|
|
String jobSubmitDir, Credentials ts) throws IOException {
|
|
String jobSubmitDir, Credentials ts) throws IOException {
|
|
ApplicationId applicationId = resMgrDelegate.getApplicationId();
|
|
ApplicationId applicationId = resMgrDelegate.getApplicationId();
|
|
-
|
|
|
|
|
|
+
|
|
// Setup resource requirements
|
|
// Setup resource requirements
|
|
Resource capability = recordFactory.newRecordInstance(Resource.class);
|
|
Resource capability = recordFactory.newRecordInstance(Resource.class);
|
|
capability.setMemory(conf.getInt(MRJobConfig.MR_AM_VMEM_MB,
|
|
capability.setMemory(conf.getInt(MRJobConfig.MR_AM_VMEM_MB,
|
|
@@ -276,9 +276,9 @@ public class YARNRunner implements ClientProtocol {
|
|
// Setup LocalResources
|
|
// Setup LocalResources
|
|
Map<String, LocalResource> localResources =
|
|
Map<String, LocalResource> localResources =
|
|
new HashMap<String, LocalResource>();
|
|
new HashMap<String, LocalResource>();
|
|
-
|
|
|
|
|
|
+
|
|
Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);
|
|
Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);
|
|
-
|
|
|
|
|
|
+
|
|
URL yarnUrlForJobSubmitDir = ConverterUtils
|
|
URL yarnUrlForJobSubmitDir = ConverterUtils
|
|
.getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem()
|
|
.getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem()
|
|
.resolvePath(
|
|
.resolvePath(
|
|
@@ -299,18 +299,18 @@ public class YARNRunner implements ClientProtocol {
|
|
LOG.info("Job jar is not present. "
|
|
LOG.info("Job jar is not present. "
|
|
+ "Not adding any jar to the list of resources.");
|
|
+ "Not adding any jar to the list of resources.");
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
// TODO gross hack
|
|
// TODO gross hack
|
|
- for (String s : new String[] {
|
|
|
|
- MRJobConfig.JOB_SPLIT,
|
|
|
|
|
|
+ for (String s : new String[] {
|
|
|
|
+ MRJobConfig.JOB_SPLIT,
|
|
MRJobConfig.JOB_SPLIT_METAINFO,
|
|
MRJobConfig.JOB_SPLIT_METAINFO,
|
|
MRJobConfig.APPLICATION_TOKENS_FILE }) {
|
|
MRJobConfig.APPLICATION_TOKENS_FILE }) {
|
|
localResources.put(
|
|
localResources.put(
|
|
MRJobConfig.JOB_SUBMIT_DIR + "/" + s,
|
|
MRJobConfig.JOB_SUBMIT_DIR + "/" + s,
|
|
- createApplicationResource(defaultFileContext,
|
|
|
|
|
|
+ createApplicationResource(defaultFileContext,
|
|
new Path(jobSubmitDir, s)));
|
|
new Path(jobSubmitDir, s)));
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
// Setup security tokens
|
|
// Setup security tokens
|
|
ByteBuffer securityTokens = null;
|
|
ByteBuffer securityTokens = null;
|
|
if (UserGroupInformation.isSecurityEnabled()) {
|
|
if (UserGroupInformation.isSecurityEnabled()) {
|
|
@@ -322,20 +322,20 @@ public class YARNRunner implements ClientProtocol {
|
|
// Setup the command to run the AM
|
|
// Setup the command to run the AM
|
|
Vector<CharSequence> vargs = new Vector<CharSequence>(8);
|
|
Vector<CharSequence> vargs = new Vector<CharSequence>(8);
|
|
vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
|
|
vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
|
|
-
|
|
|
|
|
|
+
|
|
long logSize = TaskLog.getTaskLogLength(new JobConf(conf));
|
|
long logSize = TaskLog.getTaskLogLength(new JobConf(conf));
|
|
vargs.add("-Dlog4j.configuration=container-log4j.properties");
|
|
vargs.add("-Dlog4j.configuration=container-log4j.properties");
|
|
vargs.add("-D" + MRJobConfig.TASK_LOG_DIR + "="
|
|
vargs.add("-D" + MRJobConfig.TASK_LOG_DIR + "="
|
|
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR);
|
|
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR);
|
|
vargs.add("-D" + MRJobConfig.TASK_LOG_SIZE + "=" + logSize);
|
|
vargs.add("-D" + MRJobConfig.TASK_LOG_SIZE + "=" + logSize);
|
|
-
|
|
|
|
|
|
+
|
|
vargs.add(conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
|
|
vargs.add(conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
|
|
MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS));
|
|
MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS));
|
|
|
|
|
|
vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
|
|
vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
|
|
- vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
|
|
|
|
|
|
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
|
|
Path.SEPARATOR + ApplicationConstants.STDOUT);
|
|
Path.SEPARATOR + ApplicationConstants.STDOUT);
|
|
- vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
|
|
|
|
|
|
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
|
|
Path.SEPARATOR + ApplicationConstants.STDERR);
|
|
Path.SEPARATOR + ApplicationConstants.STDERR);
|
|
|
|
|
|
|
|
|
|
@@ -349,12 +349,12 @@ public class YARNRunner implements ClientProtocol {
|
|
|
|
|
|
LOG.info("Command to launch container for ApplicationMaster is : "
|
|
LOG.info("Command to launch container for ApplicationMaster is : "
|
|
+ mergedCommand);
|
|
+ mergedCommand);
|
|
-
|
|
|
|
- // Setup the CLASSPATH in environment
|
|
|
|
|
|
+
|
|
|
|
+ // Setup the CLASSPATH in environment
|
|
// i.e. add { job jar, CWD, Hadoop jars} to classpath.
|
|
// i.e. add { job jar, CWD, Hadoop jars} to classpath.
|
|
Map<String, String> environment = new HashMap<String, String>();
|
|
Map<String, String> environment = new HashMap<String, String>();
|
|
MRApps.setClasspath(environment);
|
|
MRApps.setClasspath(environment);
|
|
-
|
|
|
|
|
|
+
|
|
// Parse distributed cache
|
|
// Parse distributed cache
|
|
MRApps.setupDistributedCache(jobConf, localResources);
|
|
MRApps.setupDistributedCache(jobConf, localResources);
|
|
|
|
|
|
@@ -374,12 +374,12 @@ public class YARNRunner implements ClientProtocol {
|
|
appContext.setUser( // User name
|
|
appContext.setUser( // User name
|
|
UserGroupInformation.getCurrentUser().getShortUserName());
|
|
UserGroupInformation.getCurrentUser().getShortUserName());
|
|
appContext.setQueue( // Queue name
|
|
appContext.setQueue( // Queue name
|
|
- jobConf.get(JobContext.QUEUE_NAME,
|
|
|
|
|
|
+ jobConf.get(JobContext.QUEUE_NAME,
|
|
YarnConfiguration.DEFAULT_QUEUE_NAME));
|
|
YarnConfiguration.DEFAULT_QUEUE_NAME));
|
|
appContext.setApplicationName( // Job name
|
|
appContext.setApplicationName( // Job name
|
|
- jobConf.get(JobContext.JOB_NAME,
|
|
|
|
- YarnConfiguration.DEFAULT_APPLICATION_NAME));
|
|
|
|
- appContext.setAMContainerSpec(amContainer); // AM Container
|
|
|
|
|
|
+ jobConf.get(JobContext.JOB_NAME,
|
|
|
|
+ YarnConfiguration.DEFAULT_APPLICATION_NAME));
|
|
|
|
+ appContext.setAMContainerSpec(amContainer); // AM Container
|
|
|
|
|
|
return appContext;
|
|
return appContext;
|
|
}
|
|
}
|
|
@@ -394,14 +394,14 @@ public class YARNRunner implements ClientProtocol {
|
|
public long getProtocolVersion(String arg0, long arg1) throws IOException {
|
|
public long getProtocolVersion(String arg0, long arg1) throws IOException {
|
|
return resMgrDelegate.getProtocolVersion(arg0, arg1);
|
|
return resMgrDelegate.getProtocolVersion(arg0, arg1);
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public long renewDelegationToken(Token<DelegationTokenIdentifier> arg0)
|
|
public long renewDelegationToken(Token<DelegationTokenIdentifier> arg0)
|
|
throws IOException, InterruptedException {
|
|
throws IOException, InterruptedException {
|
|
return resMgrDelegate.renewDelegationToken(arg0);
|
|
return resMgrDelegate.renewDelegationToken(arg0);
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public Counters getJobCounters(JobID arg0) throws IOException,
|
|
public Counters getJobCounters(JobID arg0) throws IOException,
|
|
InterruptedException {
|
|
InterruptedException {
|
|
@@ -419,7 +419,7 @@ public class YARNRunner implements ClientProtocol {
|
|
JobStatus status = clientCache.getClient(jobID).getJobStatus(jobID);
|
|
JobStatus status = clientCache.getClient(jobID).getJobStatus(jobID);
|
|
return status;
|
|
return status;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
|
|
public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
|
|
int arg2) throws IOException, InterruptedException {
|
|
int arg2) throws IOException, InterruptedException {
|
|
@@ -446,8 +446,8 @@ public class YARNRunner implements ClientProtocol {
|
|
if (status.getState() != JobStatus.State.RUNNING) {
|
|
if (status.getState() != JobStatus.State.RUNNING) {
|
|
resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
|
|
resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
|
|
return;
|
|
return;
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+ }
|
|
|
|
+
|
|
try {
|
|
try {
|
|
/* send a kill to the AM */
|
|
/* send a kill to the AM */
|
|
clientCache.getClient(arg0).killJob(arg0);
|
|
clientCache.getClient(arg0).killJob(arg0);
|