Просмотр исходного кода

MAPREDUCE-4419. ./mapred queue -info <queuename> -showJobs displays all the jobs irrespective of <queuename> (Devaraj K via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1361389 13f79535-47bb-0310-9956-ffa450edef68
Robert Joseph Evans 13 лет назад
Родитель
Сommit
228736ab51

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -682,6 +682,9 @@ Release 0.23.3 - UNRELEASED
     MAPREDUCE-3940. ContainerTokens should have an expiry interval. (Siddharth
     Seth and Vinod Kumar Vavilapalli via vinodkv)
 
+    MAPREDUCE-4419. ./mapred queue -info <queuename> -showJobs displays all
+    the jobs irrespective of <queuename> (Devaraj K via bobby)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 2 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueClient.java

@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -184,7 +185,7 @@ class JobQueueClient extends Configured implements Tool {
     printJobQueueInfo(jobQueueInfo, new PrintWriter(System.out));
     if (showJobs && (jobQueueInfo.getChildren() == null ||
         jobQueueInfo.getChildren().size() == 0)) {
-      JobStatus[] jobs = jc.getJobsFromQueue(queue);
+      JobStatus[] jobs = jobQueueInfo.getJobStatuses();
       if (jobs == null)
         jobs = new JobStatus[0];
       jc.displayJobList(jobs);

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java

@@ -238,7 +238,7 @@ public class JobStatus extends org.apache.hadoop.mapreduce.JobStatus {
       stat.getSetupProgress(), stat.getMapProgress(), stat.getReduceProgress(),
       stat.getCleanupProgress(), stat.getState().getValue(), 
       JobPriority.valueOf(stat.getPriority().name()),
-      stat.getUsername(), stat.getJobName(), stat.getJobFile(),
+      stat.getUsername(), stat.getJobName(), stat.getQueue(), stat.getJobFile(),
       stat.getTrackingUrl(), stat.isUber());
     old.setStartTime(stat.getStartTime());
     old.setFinishTime(stat.getFinishTime());

+ 3 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java

@@ -389,7 +389,9 @@ public class ClientRMService extends AbstractService implements
         appReports = new ArrayList<ApplicationReport>(
             apps.size());
         for (RMApp app : apps) {
-          appReports.add(app.createAndGetApplicationReport(true));
+          if (app.getQueue().equals(queueInfo.getQueueName())) {
+            appReports.add(app.createAndGetApplicationReport(true));
+          }
         }
       }
       queueInfo.setApplications(appReports);

+ 76 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java

@@ -20,7 +20,10 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyString;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
@@ -34,13 +37,21 @@ import org.apache.hadoop.yarn.api.ClientRMProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Test;
 
@@ -49,6 +60,9 @@ public class TestClientRMService {
 
   private static final Log LOG = LogFactory.getLog(TestClientRMService.class);
 
+  private RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+
   @Test
   public void testGetClusterNodes() throws Exception {
     MockRM rm = new MockRM() {
@@ -109,4 +123,66 @@ public class TestClientRMService {
     Assert.assertNull("It should return null as application report for absent application.",
         applicationReport.getApplicationReport());
   }
+  
+  @Test
+  public void testGetQueueInfo() throws Exception {
+    YarnScheduler yarnScheduler = mock(YarnScheduler.class);
+    RMContext rmContext = mock(RMContext.class);
+    mockRMContext(yarnScheduler, rmContext);
+    ClientRMService rmService = new ClientRMService(rmContext, yarnScheduler,
+        null, null, null);
+    GetQueueInfoRequest request = recordFactory
+        .newRecordInstance(GetQueueInfoRequest.class);
+    request.setQueueName("testqueue");
+    request.setIncludeApplications(true);
+    GetQueueInfoResponse queueInfo = rmService.getQueueInfo(request);
+    List<ApplicationReport> applications = queueInfo.getQueueInfo()
+        .getApplications();
+    Assert.assertEquals(2, applications.size());
+  }
+
+  private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext)
+      throws IOException {
+    Dispatcher dispatcher = mock(Dispatcher.class);
+    when(rmContext.getDispatcher()).thenReturn(dispatcher);
+    QueueInfo queInfo = recordFactory.newRecordInstance(QueueInfo.class);
+    queInfo.setQueueName("testqueue");
+    when(yarnScheduler.getQueueInfo(anyString(), anyBoolean(), anyBoolean()))
+        .thenReturn(queInfo);
+    ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext,
+        yarnScheduler);
+    when(rmContext.getRMApps()).thenReturn(apps);
+  }
+
+  private ConcurrentHashMap<ApplicationId, RMApp> getRMApps(
+      RMContext rmContext, YarnScheduler yarnScheduler) {
+    ConcurrentHashMap<ApplicationId, RMApp> apps = 
+      new ConcurrentHashMap<ApplicationId, RMApp>();
+    ApplicationId applicationId1 = getApplicationId(1);
+    ApplicationId applicationId2 = getApplicationId(2);
+    ApplicationId applicationId3 = getApplicationId(3);
+    YarnConfiguration config = new YarnConfiguration();
+    apps.put(applicationId1, getRMApp(rmContext, yarnScheduler, applicationId1,
+        config, "testqueue"));
+    apps.put(applicationId2, getRMApp(rmContext, yarnScheduler, applicationId2,
+        config, "a"));
+    apps.put(applicationId3, getRMApp(rmContext, yarnScheduler, applicationId3,
+        config, "testqueue"));
+    return apps;
+  }
+
+  private ApplicationId getApplicationId(int id) {
+    ApplicationId applicationId = recordFactory
+        .newRecordInstance(ApplicationId.class);
+    applicationId.setClusterTimestamp(123456);
+    applicationId.setId(id);
+    return applicationId;
+  }
+
+  private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler,
+      ApplicationId applicationId3, YarnConfiguration config, String queueName) {
+    return new RMAppImpl(applicationId3, rmContext, config, null, null,
+        queueName, null, null, null, yarnScheduler, null, System
+            .currentTimeMillis());
+  }
 }