Forráskód Böngészése

MAPREDUCE-4580. Change MapReduce to use the yarn-client module. (Contributed by Vinod Kumar Vavilapalli)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1377922 13f79535-47bb-0310-9956-ffa450edef68
Siddharth Seth 12 éve
szülő
commit
24e47ebc18

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -234,6 +234,9 @@ Release 2.1.0-alpha - Unreleased
     MAPREDUCE-3289. Make use of fadvise in the NM's shuffle handler.
     (Todd Lipcon and Siddharth Seth via sseth)
 
+    MAPREDUCE-4580. Change MapReduce to use the yarn-client module.
+    (Vinod Kumar Vavilapalli via sseth)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 4 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml

@@ -37,6 +37,10 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-common</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-core</artifactId>

+ 23 - 189
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java

@@ -19,9 +19,6 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -41,75 +38,29 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
-import org.apache.hadoop.yarn.api.records.DelegationToken;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.util.ProtoUtils;
+import org.hadoop.yarn.client.YarnClientImpl;
 
-
-// TODO: This should be part of something like yarn-client.
-public class ResourceMgrDelegate {
+public class ResourceMgrDelegate extends YarnClientImpl {
   private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
       
-  private final InetSocketAddress rmAddress;
   private YarnConfiguration conf;
-  ClientRMProtocol applicationsManager;
+  private GetNewApplicationResponse application;
   private ApplicationId applicationId;
-  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
 
   /**
    * Delegate responsible for communicating with the Resource Manager's {@link ClientRMProtocol}.
    * @param conf the configuration object.
    */
   public ResourceMgrDelegate(YarnConfiguration conf) {
+    super();
     this.conf = conf;
-    YarnRPC rpc = YarnRPC.create(this.conf);
-    this.rmAddress = getRmAddress(conf);
-    LOG.debug("Connecting to ResourceManager at " + rmAddress);
-    applicationsManager =
-        (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
-            rmAddress, this.conf);
-    LOG.debug("Connected to ResourceManager at " + rmAddress);
-  }
-  
-  /**
-   * Used for injecting applicationsManager, mostly for testing.
-   * @param conf the configuration object
-   * @param applicationsManager the handle to talk the resource managers 
-   *                            {@link ClientRMProtocol}.
-   */
-  public ResourceMgrDelegate(YarnConfiguration conf, 
-      ClientRMProtocol applicationsManager) {
-    this.conf = conf;
-    this.applicationsManager = applicationsManager;
-    this.rmAddress = getRmAddress(conf);
-  }
-  
-  private static InetSocketAddress getRmAddress(YarnConfiguration conf) {
-    return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
-                              YarnConfiguration.DEFAULT_RM_ADDRESS,
-                              YarnConfiguration.DEFAULT_RM_PORT);
+    init(conf);
+    start();
   }
   
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
@@ -117,26 +68,15 @@ public class ResourceMgrDelegate {
     return;
   }
 
-
   public TaskTrackerInfo[] getActiveTrackers() throws IOException,
       InterruptedException {
-    GetClusterNodesRequest request = 
-      recordFactory.newRecordInstance(GetClusterNodesRequest.class);
-    GetClusterNodesResponse response = 
-      applicationsManager.getClusterNodes(request);
-    return TypeConverter.fromYarnNodes(response.getNodeReports());
+    return TypeConverter.fromYarnNodes(super.getNodeReports());
   }
 
-
   public JobStatus[] getAllJobs() throws IOException, InterruptedException {
-    GetAllApplicationsRequest request =
-      recordFactory.newRecordInstance(GetAllApplicationsRequest.class);
-    GetAllApplicationsResponse response = 
-      applicationsManager.getAllApplications(request);
-    return TypeConverter.fromYarnApps(response.getApplicationList(), this.conf);
+    return TypeConverter.fromYarnApps(super.getApplicationList(), this.conf);
   }
 
-
   public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
       InterruptedException {
     // TODO: Implement getBlacklistedTrackers
@@ -144,128 +84,56 @@ public class ResourceMgrDelegate {
     return new TaskTrackerInfo[0];
   }
 
-
   public ClusterMetrics getClusterMetrics() throws IOException,
       InterruptedException {
-    GetClusterMetricsRequest request = recordFactory.newRecordInstance(GetClusterMetricsRequest.class);
-    GetClusterMetricsResponse response = applicationsManager.getClusterMetrics(request);
-    YarnClusterMetrics metrics = response.getClusterMetrics();
+    YarnClusterMetrics metrics = super.getYarnClusterMetrics();
     ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1, 
         metrics.getNumNodeManagers() * 10, metrics.getNumNodeManagers() * 2, 1,
         metrics.getNumNodeManagers(), 0, 0);
     return oldMetrics;
   }
 
-
   @SuppressWarnings("rawtypes")
-  public Token getDelegationToken(Text renewer)
-      throws IOException, InterruptedException {
-    /* get the token from RM */
-    org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest 
-    rmDTRequest = recordFactory.newRecordInstance(
-        org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest.class);
-    rmDTRequest.setRenewer(renewer.toString());
-    org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse 
-      response = applicationsManager.getDelegationToken(rmDTRequest);
-    DelegationToken yarnToken = response.getRMDelegationToken();
-    return ProtoUtils.convertFromProtoFormat(yarnToken, rmAddress);
+  public Token getDelegationToken(Text renewer) throws IOException,
+      InterruptedException {
+    return ProtoUtils.convertFromProtoFormat(
+      super.getRMDelegationToken(renewer), rmAddress);
   }
 
-
   public String getFilesystemName() throws IOException, InterruptedException {
     return FileSystem.get(conf).getUri().toString();
   }
 
   public JobID getNewJobID() throws IOException, InterruptedException {
-    GetNewApplicationRequest request = recordFactory.newRecordInstance(GetNewApplicationRequest.class);
-    applicationId = applicationsManager.getNewApplication(request).getApplicationId();
+    this.application = super.getNewApplication();
+    this.applicationId = this.application.getApplicationId();
     return TypeConverter.fromYarn(applicationId);
   }
 
-  private static final String ROOT = "root";
-
-  private GetQueueInfoRequest getQueueInfoRequest(String queueName, 
-      boolean includeApplications, boolean includeChildQueues, boolean recursive) {
-    GetQueueInfoRequest request = 
-      recordFactory.newRecordInstance(GetQueueInfoRequest.class);
-    request.setQueueName(queueName);
-    request.setIncludeApplications(includeApplications);
-    request.setIncludeChildQueues(includeChildQueues);
-    request.setRecursive(recursive);
-    return request;
-    
-  }
-  
   public QueueInfo getQueue(String queueName) throws IOException,
   InterruptedException {
-    GetQueueInfoRequest request = 
-      getQueueInfoRequest(queueName, true, false, false); 
-      recordFactory.newRecordInstance(GetQueueInfoRequest.class);
     return TypeConverter.fromYarn(
-        applicationsManager.getQueueInfo(request).getQueueInfo(), this.conf);
-  }
-  
-  private void getChildQueues(org.apache.hadoop.yarn.api.records.QueueInfo parent, 
-      List<org.apache.hadoop.yarn.api.records.QueueInfo> queues,
-      boolean recursive) {
-    List<org.apache.hadoop.yarn.api.records.QueueInfo> childQueues = 
-      parent.getChildQueues();
-
-    for (org.apache.hadoop.yarn.api.records.QueueInfo child : childQueues) {
-      queues.add(child);
-      if(recursive) {
-        getChildQueues(child, queues, recursive);
-      }
-    }
+        super.getQueueInfo(queueName), this.conf);
   }
 
-
   public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
       InterruptedException {
-    GetQueueUserAclsInfoRequest request = 
-      recordFactory.newRecordInstance(GetQueueUserAclsInfoRequest.class);
-    List<QueueUserACLInfo> userAcls = 
-      applicationsManager.getQueueUserAcls(request).getUserAclsInfoList();
-    return TypeConverter.fromYarnQueueUserAclsInfo(userAcls);
+    return TypeConverter.fromYarnQueueUserAclsInfo(super
+      .getQueueAclsInfo());
   }
 
-
   public QueueInfo[] getQueues() throws IOException, InterruptedException {
-    List<org.apache.hadoop.yarn.api.records.QueueInfo> queues = 
-      new ArrayList<org.apache.hadoop.yarn.api.records.QueueInfo>();
-
-    org.apache.hadoop.yarn.api.records.QueueInfo rootQueue = 
-      applicationsManager.getQueueInfo(
-          getQueueInfoRequest(ROOT, false, true, true)).getQueueInfo();
-    getChildQueues(rootQueue, queues, true);
-
-    return TypeConverter.fromYarnQueueInfo(queues, this.conf);
+    return TypeConverter.fromYarnQueueInfo(super.getAllQueues(), this.conf);
   }
 
-
   public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
-    List<org.apache.hadoop.yarn.api.records.QueueInfo> queues = 
-      new ArrayList<org.apache.hadoop.yarn.api.records.QueueInfo>();
-
-    org.apache.hadoop.yarn.api.records.QueueInfo rootQueue = 
-      applicationsManager.getQueueInfo(
-          getQueueInfoRequest(ROOT, false, true, true)).getQueueInfo();
-    getChildQueues(rootQueue, queues, false);
-
-    return TypeConverter.fromYarnQueueInfo(queues, this.conf);
+    return TypeConverter.fromYarnQueueInfo(super.getRootQueueInfos(), this.conf);
   }
 
   public QueueInfo[] getChildQueues(String parent) throws IOException,
       InterruptedException {
-      List<org.apache.hadoop.yarn.api.records.QueueInfo> queues = 
-          new ArrayList<org.apache.hadoop.yarn.api.records.QueueInfo>();
-        
-        org.apache.hadoop.yarn.api.records.QueueInfo parentQueue = 
-          applicationsManager.getQueueInfo(
-              getQueueInfoRequest(parent, false, true, false)).getQueueInfo();
-        getChildQueues(parentQueue, queues, true);
-        
-        return TypeConverter.fromYarnQueueInfo(queues, this.conf);
+    return TypeConverter.fromYarnQueueInfo(super.getChildQueueInfos(parent),
+      this.conf);
   }
 
   public String getStagingAreaDir() throws IOException, InterruptedException {
@@ -307,40 +175,6 @@ public class ResourceMgrDelegate {
     return 0;
   }
   
-  
-  public ApplicationId submitApplication(
-      ApplicationSubmissionContext appContext) 
-  throws IOException {
-    appContext.setApplicationId(applicationId);
-    SubmitApplicationRequest request = 
-        recordFactory.newRecordInstance(SubmitApplicationRequest.class);
-    request.setApplicationSubmissionContext(appContext);
-    applicationsManager.submitApplication(request);
-    LOG.info("Submitted application " + applicationId + " to ResourceManager" +
-    		" at " + rmAddress);
-    return applicationId;
-  }
-  
-  public void killApplication(ApplicationId applicationId) throws IOException {
-    KillApplicationRequest request = 
-        recordFactory.newRecordInstance(KillApplicationRequest.class);
-    request.setApplicationId(applicationId);
-    applicationsManager.forceKillApplication(request);
-    LOG.info("Killing application " + applicationId);
-  }
-
-
-  public ApplicationReport getApplicationReport(ApplicationId appId)
-      throws YarnRemoteException {
-    GetApplicationReportRequest request = recordFactory
-        .newRecordInstance(GetApplicationReportRequest.class);
-    request.setApplicationId(appId);
-    GetApplicationReportResponse response = applicationsManager
-        .getApplicationReport(request);
-    ApplicationReport applicationReport = response.getApplicationReport();
-    return applicationReport;
-  }
-
   public ApplicationId getApplicationId() {
     return applicationId;
   }

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java

@@ -89,7 +89,7 @@ import org.apache.hadoop.yarn.util.ProtoUtils;
 /**
  * This class enables the current JobClient (0.22 hadoop) to run on YARN.
  */
-@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
+@SuppressWarnings({ "rawtypes", "unchecked" })
 public class YARNRunner implements ClientProtocol {
 
   private static final Log LOG = LogFactory.getLog(YARNRunner.class);

+ 15 - 5
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java

@@ -50,7 +50,7 @@ public class TestResourceMgrDelegate {
    */
   @Test
   public void testGetRootQueues() throws IOException, InterruptedException {
-    ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class);
+    final ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class);
     GetQueueInfoResponse response = Mockito.mock(GetQueueInfoResponse.class);
     org.apache.hadoop.yarn.api.records.QueueInfo queueInfo =
       Mockito.mock(org.apache.hadoop.yarn.api.records.QueueInfo.class);
@@ -59,12 +59,17 @@ public class TestResourceMgrDelegate {
       GetQueueInfoRequest.class))).thenReturn(response);
 
     ResourceMgrDelegate delegate = new ResourceMgrDelegate(
-      new YarnConfiguration(), applicationsManager);
+      new YarnConfiguration()) {
+      @Override
+      public synchronized void start() {
+        this.rmClient = applicationsManager;
+      }
+    };
     delegate.getRootQueues();
 
     ArgumentCaptor<GetQueueInfoRequest> argument =
       ArgumentCaptor.forClass(GetQueueInfoRequest.class);
-    Mockito.verify(delegate.applicationsManager).getQueueInfo(
+    Mockito.verify(applicationsManager).getQueueInfo(
       argument.capture());
 
     Assert.assertTrue("Children of root queue not requested",
@@ -75,7 +80,7 @@ public class TestResourceMgrDelegate {
 
   @Test
   public void tesAllJobs() throws Exception {
-    ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class);
+    final ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class);
     GetAllApplicationsResponse allApplicationsResponse = Records
         .newRecord(GetAllApplicationsResponse.class);
     List<ApplicationReport> applications = new ArrayList<ApplicationReport>();
@@ -93,7 +98,12 @@ public class TestResourceMgrDelegate {
             .any(GetAllApplicationsRequest.class))).thenReturn(
         allApplicationsResponse);
     ResourceMgrDelegate resourceMgrDelegate = new ResourceMgrDelegate(
-        new YarnConfiguration(), applicationsManager);
+      new YarnConfiguration()) {
+      @Override
+      public synchronized void start() {
+        this.rmClient = applicationsManager;
+      }
+    };
     JobStatus[] allJobs = resourceMgrDelegate.getAllJobs();
 
     Assert.assertEquals(State.FAILED, allJobs[0].getState());

+ 11 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java

@@ -18,9 +18,10 @@
 
 package org.apache.hadoop.mapreduce;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.when;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
@@ -104,14 +105,20 @@ public class TestYarnClientProtocolProvider extends TestCase {
       rmDTToken.setPassword(ByteBuffer.wrap("testcluster".getBytes()));
       rmDTToken.setService("0.0.0.0:8032");
       getDTResponse.setRMDelegationToken(rmDTToken);
-      ClientRMProtocol cRMProtocol = mock(ClientRMProtocol.class);
+      final ClientRMProtocol cRMProtocol = mock(ClientRMProtocol.class);
       when(cRMProtocol.getDelegationToken(any(
           GetDelegationTokenRequest.class))).thenReturn(getDTResponse);
       ResourceMgrDelegate rmgrDelegate = new ResourceMgrDelegate(
-          new YarnConfiguration(conf), cRMProtocol);
+          new YarnConfiguration(conf)) {
+        @Override
+        public synchronized void start() {
+          this.rmClient = cRMProtocol;
+        }
+      };
       yrunner.setResourceMgrDelegate(rmgrDelegate);
       Token t = cluster.getDelegationToken(new Text(" "));
-      assertTrue("Testclusterkind".equals(t.getKind().toString()));
+      assertTrue("Token kind is instead " + t.getKind().toString(),
+        "Testclusterkind".equals(t.getKind().toString()));
     } finally {
       if (cluster != null) {
         cluster.close();

+ 7 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java

@@ -177,8 +177,13 @@ public class TestYARNRunner extends TestCase {
   @Test
   public void testResourceMgrDelegate() throws Exception {
     /* we not want a mock of resourcemgr deleagte */
-    ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class);
-    ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf, clientRMProtocol);
+    final ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class);
+    ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf) {
+      @Override
+      public synchronized void start() {
+        this.rmClient = clientRMProtocol;
+      }
+    };
     /* make sure kill calls finish application master */
     when(clientRMProtocol.forceKillApplication(any(KillApplicationRequest.class)))
     .thenReturn(null);