|
@@ -22,7 +22,6 @@ import java.io.BufferedReader;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
import java.io.InputStreamReader;
|
|
|
-import java.net.InetSocketAddress;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
@@ -44,20 +43,8 @@ import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
|
|
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
|
|
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
|
|
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
|
|
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
|
|
|
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
|
|
|
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
|
|
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
|
|
|
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
|
|
-import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
|
|
-import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
|
|
-import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
|
|
|
-import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
|
|
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
@@ -73,12 +60,12 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
|
|
|
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
|
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
|
|
-import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
|
import org.apache.hadoop.yarn.util.Records;
|
|
|
-
|
|
|
+import org.hadoop.yarn.client.YarnClientImpl;
|
|
|
|
|
|
/**
|
|
|
* Client for Distributed Shell application submission to YARN.
|
|
@@ -113,19 +100,13 @@ import org.apache.hadoop.yarn.util.Records;
|
|
|
*/
|
|
|
@InterfaceAudience.Public
|
|
|
@InterfaceStability.Unstable
|
|
|
-public class Client {
|
|
|
+public class Client extends YarnClientImpl {
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(Client.class);
|
|
|
|
|
|
// Configuration
|
|
|
private Configuration conf;
|
|
|
|
|
|
- // RPC to communicate to RM
|
|
|
- private YarnRPC rpc;
|
|
|
-
|
|
|
- // Handle to talk to the Resource Manager/Applications Manager
|
|
|
- private ClientRMProtocol applicationsManager;
|
|
|
-
|
|
|
// Application master specific info to register a new Application with RM/ASM
|
|
|
private String appName = "";
|
|
|
// App master priority
|
|
@@ -196,9 +177,9 @@ public class Client {
|
|
|
/**
|
|
|
*/
|
|
|
public Client(Configuration conf) throws Exception {
|
|
|
- // Set up the configuration and RPC
|
|
|
+ super();
|
|
|
this.conf = conf;
|
|
|
- rpc = YarnRPC.create(conf);
|
|
|
+ init(conf);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -328,22 +309,17 @@ public class Client {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public boolean run() throws IOException {
|
|
|
- LOG.info("Starting Client");
|
|
|
|
|
|
- // Connect to ResourceManager
|
|
|
- connectToASM();
|
|
|
- assert(applicationsManager != null);
|
|
|
+ LOG.info("Running Client");
|
|
|
+ start();
|
|
|
|
|
|
- // Use ClientRMProtocol handle to general cluster information
|
|
|
- GetClusterMetricsRequest clusterMetricsReq = Records.newRecord(GetClusterMetricsRequest.class);
|
|
|
- GetClusterMetricsResponse clusterMetricsResp = applicationsManager.getClusterMetrics(clusterMetricsReq);
|
|
|
+ YarnClusterMetrics clusterMetrics = super.getYarnClusterMetrics();
|
|
|
LOG.info("Got Cluster metric info from ASM"
|
|
|
- + ", numNodeManagers=" + clusterMetricsResp.getClusterMetrics().getNumNodeManagers());
|
|
|
+ + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
|
|
|
|
|
|
- GetClusterNodesRequest clusterNodesReq = Records.newRecord(GetClusterNodesRequest.class);
|
|
|
- GetClusterNodesResponse clusterNodesResp = applicationsManager.getClusterNodes(clusterNodesReq);
|
|
|
+ List<NodeReport> clusterNodeReports = super.getNodeReports();
|
|
|
LOG.info("Got Cluster node info from ASM");
|
|
|
- for (NodeReport node : clusterNodesResp.getNodeReports()) {
|
|
|
+ for (NodeReport node : clusterNodeReports) {
|
|
|
LOG.info("Got node report from ASM for"
|
|
|
+ ", nodeId=" + node.getNodeId()
|
|
|
+ ", nodeAddress" + node.getHttpAddress()
|
|
@@ -352,10 +328,7 @@ public class Client {
|
|
|
+ ", nodeHealthStatus" + node.getNodeHealthStatus());
|
|
|
}
|
|
|
|
|
|
- GetQueueInfoRequest queueInfoReq = Records.newRecord(GetQueueInfoRequest.class);
|
|
|
- queueInfoReq.setQueueName(this.amQueue);
|
|
|
- GetQueueInfoResponse queueInfoResp = applicationsManager.getQueueInfo(queueInfoReq);
|
|
|
- QueueInfo queueInfo = queueInfoResp.getQueueInfo();
|
|
|
+ QueueInfo queueInfo = super.getQueueInfo(this.amQueue);
|
|
|
LOG.info("Queue info"
|
|
|
+ ", queueName=" + queueInfo.getQueueName()
|
|
|
+ ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
|
|
@@ -363,9 +336,7 @@ public class Client {
|
|
|
+ ", queueApplicationCount=" + queueInfo.getApplications().size()
|
|
|
+ ", queueChildQueueCount=" + queueInfo.getChildQueues().size());
|
|
|
|
|
|
- GetQueueUserAclsInfoRequest queueUserAclsReq = Records.newRecord(GetQueueUserAclsInfoRequest.class);
|
|
|
- GetQueueUserAclsInfoResponse queueUserAclsResp = applicationsManager.getQueueUserAcls(queueUserAclsReq);
|
|
|
- List<QueueUserACLInfo> listAclInfo = queueUserAclsResp.getUserAclsInfoList();
|
|
|
+ List<QueueUserACLInfo> listAclInfo = super.getQueueAclsInfo();
|
|
|
for (QueueUserACLInfo aclInfo : listAclInfo) {
|
|
|
for (QueueACL userAcl : aclInfo.getUserAcls()) {
|
|
|
LOG.info("User ACL Info for Queue"
|
|
@@ -375,7 +346,7 @@ public class Client {
|
|
|
}
|
|
|
|
|
|
// Get a new application id
|
|
|
- GetNewApplicationResponse newApp = getApplication();
|
|
|
+ GetNewApplicationResponse newApp = super.getNewApplication();
|
|
|
ApplicationId appId = newApp.getApplicationId();
|
|
|
|
|
|
// TODO get min/max resource capabilities from RM and change memory ask if needed
|
|
@@ -590,16 +561,12 @@ public class Client {
|
|
|
// Set the queue to which this application is to be submitted in the RM
|
|
|
appContext.setQueue(amQueue);
|
|
|
|
|
|
- // Create the request to send to the applications manager
|
|
|
- SubmitApplicationRequest appRequest = Records.newRecord(SubmitApplicationRequest.class);
|
|
|
- appRequest.setApplicationSubmissionContext(appContext);
|
|
|
-
|
|
|
// Submit the application to the applications manager
|
|
|
// SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
|
|
|
// Ignore the response as either a valid response object is returned on success
|
|
|
// or an exception thrown to denote some form of a failure
|
|
|
LOG.info("Submitting application to ASM");
|
|
|
- applicationsManager.submitApplication(appRequest);
|
|
|
+ super.submitApplication(appContext);
|
|
|
|
|
|
// TODO
|
|
|
// Try submitting the same request again
|
|
@@ -629,10 +596,7 @@ public class Client {
|
|
|
}
|
|
|
|
|
|
// Get application report for the appId we are interested in
|
|
|
- GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class);
|
|
|
- reportRequest.setApplicationId(appId);
|
|
|
- GetApplicationReportResponse reportResponse = applicationsManager.getApplicationReport(reportRequest);
|
|
|
- ApplicationReport report = reportResponse.getApplicationReport();
|
|
|
+ ApplicationReport report = super.getApplicationReport(appId);
|
|
|
|
|
|
LOG.info("Got application report from ASM for"
|
|
|
+ ", appId=" + appId.getId()
|
|
@@ -671,7 +635,7 @@ public class Client {
|
|
|
|
|
|
if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
|
|
|
LOG.info("Reached client specified timeout for application. Killing application");
|
|
|
- killApplication(appId);
|
|
|
+ forceKillApplication(appId);
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
@@ -683,61 +647,14 @@ public class Client {
|
|
|
* @param appId Application Id to be killed.
|
|
|
* @throws YarnRemoteException
|
|
|
*/
|
|
|
- private void killApplication(ApplicationId appId) throws YarnRemoteException {
|
|
|
- KillApplicationRequest request = Records.newRecord(KillApplicationRequest.class);
|
|
|
+ private void forceKillApplication(ApplicationId appId) throws YarnRemoteException {
|
|
|
// TODO clarify whether multiple jobs with the same app id can be submitted and be running at
|
|
|
// the same time.
|
|
|
// If yes, can we kill a particular attempt only?
|
|
|
- request.setApplicationId(appId);
|
|
|
- // KillApplicationResponse response = applicationsManager.forceKillApplication(request);
|
|
|
+
|
|
|
// Response can be ignored as it is non-null on success or
|
|
|
// throws an exception in case of failures
|
|
|
- applicationsManager.forceKillApplication(request);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Connect to the Resource Manager/Applications Manager
|
|
|
- * @return Handle to communicate with the ASM
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private void connectToASM() throws IOException {
|
|
|
-
|
|
|
- /*
|
|
|
- UserGroupInformation user = UserGroupInformation.getCurrentUser();
|
|
|
- applicationsManager = user.doAs(new PrivilegedAction<ClientRMProtocol>() {
|
|
|
- public ClientRMProtocol run() {
|
|
|
- InetSocketAddress rmAddress = NetUtils.createSocketAddr(conf.get(
|
|
|
- YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
|
|
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
|
|
|
- LOG.info("Connecting to ResourceManager at " + rmAddress);
|
|
|
- Configuration appsManagerServerConf = new Configuration(conf);
|
|
|
- appsManagerServerConf.setClass(YarnConfiguration.YARN_SECURITY_INFO,
|
|
|
- ClientRMSecurityInfo.class, SecurityInfo.class);
|
|
|
- ClientRMProtocol asm = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress, appsManagerServerConf));
|
|
|
- return asm;
|
|
|
- }
|
|
|
- });
|
|
|
- */
|
|
|
- YarnConfiguration yarnConf = new YarnConfiguration(conf);
|
|
|
- InetSocketAddress rmAddress = yarnConf.getSocketAddr(
|
|
|
- YarnConfiguration.RM_ADDRESS,
|
|
|
- YarnConfiguration.DEFAULT_RM_ADDRESS,
|
|
|
- YarnConfiguration.DEFAULT_RM_PORT);
|
|
|
- LOG.info("Connecting to ResourceManager at " + rmAddress);
|
|
|
- applicationsManager = ((ClientRMProtocol) rpc.getProxy(
|
|
|
- ClientRMProtocol.class, rmAddress, conf));
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Get a new application from the ASM
|
|
|
- * @return New Application
|
|
|
- * @throws YarnRemoteException
|
|
|
- */
|
|
|
- private GetNewApplicationResponse getApplication() throws YarnRemoteException {
|
|
|
- GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);
|
|
|
- GetNewApplicationResponse response = applicationsManager.getNewApplication(request);
|
|
|
- LOG.info("Got new application id=" + response.getApplicationId());
|
|
|
- return response;
|
|
|
+ super.killApplication(appId);
|
|
|
}
|
|
|
|
|
|
private static String getTestRuntimeClasspath() {
|