|
@@ -39,7 +39,7 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
import org.apache.hadoop.io.Text;
|
|
import org.apache.hadoop.io.Text;
|
|
import org.apache.hadoop.ipc.ProtocolSignature;
|
|
import org.apache.hadoop.ipc.ProtocolSignature;
|
|
-import org.apache.hadoop.ipc.VersionedProtocol;
|
|
|
|
|
|
+import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
|
|
import org.apache.hadoop.mapreduce.ClusterMetrics;
|
|
import org.apache.hadoop.mapreduce.ClusterMetrics;
|
|
import org.apache.hadoop.mapreduce.Counters;
|
|
import org.apache.hadoop.mapreduce.Counters;
|
|
import org.apache.hadoop.mapreduce.JobContext;
|
|
import org.apache.hadoop.mapreduce.JobContext;
|
|
@@ -55,7 +55,6 @@ import org.apache.hadoop.mapreduce.TaskReport;
|
|
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
|
|
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
|
|
import org.apache.hadoop.mapreduce.TaskType;
|
|
import org.apache.hadoop.mapreduce.TaskType;
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
-import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
|
|
|
|
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
|
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
|
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
|
|
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
|
|
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
|
|
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
|
|
@@ -81,7 +80,6 @@ import org.apache.hadoop.yarn.api.records.URL;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
-import org.apache.hadoop.yarn.ipc.RPCUtil;
|
|
|
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
|
|
|
|
|
|
|
|
@@ -104,9 +102,19 @@ public class YARNRunner implements ClientProtocol {
|
|
* @param conf the configuration object for the client
|
|
* @param conf the configuration object for the client
|
|
*/
|
|
*/
|
|
public YARNRunner(Configuration conf) {
|
|
public YARNRunner(Configuration conf) {
|
|
|
|
+ this(conf, new ResourceMgrDelegate(conf));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Similar to {@link #YARNRunner(Configuration)} but allowing injecting
|
|
|
|
+ * {@link ResourceMgrDelegate}. Enables mocking and testing.
|
|
|
|
+ * @param conf the configuration object for the client
|
|
|
|
+ * @param resMgrDelegate the resourcemanager client handle.
|
|
|
|
+ */
|
|
|
|
+ public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
|
|
this.conf = new YarnConfiguration(conf);
|
|
this.conf = new YarnConfiguration(conf);
|
|
try {
|
|
try {
|
|
- this.resMgrDelegate = new ResourceMgrDelegate(this.conf);
|
|
|
|
|
|
+ this.resMgrDelegate = resMgrDelegate;
|
|
this.clientCache = new ClientCache(this.conf,
|
|
this.clientCache = new ClientCache(this.conf,
|
|
resMgrDelegate);
|
|
resMgrDelegate);
|
|
this.defaultFileContext = FileContext.getFileContext(this.conf);
|
|
this.defaultFileContext = FileContext.getFileContext(this.conf);
|
|
@@ -114,7 +122,7 @@ public class YARNRunner implements ClientProtocol {
|
|
throw new RuntimeException("Error in instantiating YarnClient", ufe);
|
|
throw new RuntimeException("Error in instantiating YarnClient", ufe);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
|
|
public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
|
|
throws IOException, InterruptedException {
|
|
throws IOException, InterruptedException {
|
|
@@ -242,7 +250,8 @@ public class YARNRunner implements ClientProtocol {
|
|
.getApplicationReport(applicationId);
|
|
.getApplicationReport(applicationId);
|
|
if (appMaster == null || appMaster.getState() == ApplicationState.FAILED
|
|
if (appMaster == null || appMaster.getState() == ApplicationState.FAILED
|
|
|| appMaster.getState() == ApplicationState.KILLED) {
|
|
|| appMaster.getState() == ApplicationState.KILLED) {
|
|
- throw RPCUtil.getRemoteException("failed to run job");
|
|
|
|
|
|
+ throw new IOException("Failed to run job : " +
|
|
|
|
+ appMaster.getDiagnostics());
|
|
}
|
|
}
|
|
return clientCache.getClient(jobId).getJobStatus(jobId);
|
|
return clientCache.getClient(jobId).getJobStatus(jobId);
|
|
}
|
|
}
|
|
@@ -260,7 +269,7 @@ public class YARNRunner implements ClientProtocol {
|
|
return rsrc;
|
|
return rsrc;
|
|
}
|
|
}
|
|
|
|
|
|
- private ApplicationSubmissionContext createApplicationSubmissionContext(
|
|
|
|
|
|
+ public ApplicationSubmissionContext createApplicationSubmissionContext(
|
|
Configuration jobConf,
|
|
Configuration jobConf,
|
|
String jobSubmitDir, Credentials ts) throws IOException {
|
|
String jobSubmitDir, Credentials ts) throws IOException {
|
|
ApplicationSubmissionContext appContext =
|
|
ApplicationSubmissionContext appContext =
|