浏览代码

MAPREDUCE-3720. Changed bin/mapred job -list to not print job-specific information not available at RM. Contributed by Vinod K V.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1236588 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 13 年之前
父节点
当前提交
0cb3dd913b

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -591,6 +591,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3735. Add distcp jar to the distribution (tar).
     (mahadev)
 
+    MAPREDUCE-3720. Changed bin/mapred job -list to not print job-specific
+    information not available at RM. (vinodkv via acmurthy) 
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java

@@ -455,10 +455,14 @@ public class Job extends JobContextImpl implements JobContext {
   public String toString() {
     ensureState(JobState.RUNNING);
     String reasonforFailure = " ";
+    int numMaps = 0;
+    int numReduces = 0;
     try {
       updateStatus();
       if (status.getState().equals(JobStatus.State.FAILED))
         reasonforFailure = getTaskFailureEventString();
+      numMaps = getTaskReports(TaskType.MAP).length;
+      numReduces = getTaskReports(TaskType.REDUCE).length;
     } catch (IOException e) {
     } catch (InterruptedException ie) {
     }
@@ -468,6 +472,8 @@ public class Job extends JobContextImpl implements JobContext {
     sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
     sb.append("\n");
     sb.append("Uber job : ").append(status.isUber()).append("\n");
+    sb.append("Number of maps: ").append(numMaps);
+    sb.append("Number of reduces: ").append(numReduces);
     sb.append("map() completion: ");
     sb.append(status.getMapProgress()).append("\n");
     sb.append("reduce() completion: ");

+ 19 - 13
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.mapreduce.tools;
 
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
@@ -28,6 +29,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.ipc.RemoteException;
@@ -579,25 +581,28 @@ public class CLI extends Configured implements Tool {
       }
     }
   }
-  
+
   public void displayJobList(JobStatus[] jobs) 
       throws IOException, InterruptedException {
-    System.out.println("Total jobs:" + jobs.length);
-    System.out.println("JobId\tState\tStartTime\t" +
-        "UserName\tQueue\tPriority\tMaps\tReduces\tUsedContainers\t" +
-        "RsvdContainers\tUsedMem\tRsvdMem\tNeededMem\tAM info");
-    for (JobStatus job : jobs) {
-      TaskReport[] mapReports =
-                 cluster.getJob(job.getJobID()).getTaskReports(TaskType.MAP);
-      TaskReport[] reduceReports =
-                 cluster.getJob(job.getJobID()).getTaskReports(TaskType.REDUCE);
+    displayJobList(jobs, new PrintWriter(System.out));
+  }
+
+  @Private
+  public static String headerPattern = "%23s\t%10s\t%14s\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
+  @Private
+  public static String dataPattern   = "%23s\t%10s\t%14d\t%12s\t%12s\t%10s\t%14d\t%14d\t%7dM\t%7sM\t%9dM\t%10s\n";
 
-      System.out.printf("%s\t%s\t%d\t%s\t%s\t%s\t%d\t%d\t%d\t%d\t%dM\t%dM\t%dM\t%s\n",
+  @Private
+  public void displayJobList(JobStatus[] jobs, PrintWriter writer) {
+    writer.println("Total jobs:" + jobs.length);
+    writer.printf(headerPattern, "JobId", "State", "StartTime", "UserName",
+      "Queue", "Priority", "UsedContainers",
+      "RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info");
+    for (JobStatus job : jobs) {
+      writer.printf(dataPattern,
           job.getJobID().toString(), job.getState(), job.getStartTime(),
           job.getUsername(), job.getQueue(), 
           job.getPriority().name(),
-          mapReports.length,
-          reduceReports.length,
           job.getNumUsedSlots(),
           job.getNumReservedSlots(),
           job.getUsedMem(),
@@ -605,6 +610,7 @@ public class CLI extends Configured implements Tool {
           job.getNeededMem(),
           job.getSchedulingInfo());
     }
+    writer.flush();
   }
   
   public static void main(String[] argv) throws Exception {

+ 24 - 13
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobClientUnitTest.java

@@ -22,19 +22,24 @@ import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import org.apache.hadoop.mapred.JobConf;
+import java.io.PrintWriter;
+
 import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobPriority;
 import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.junit.Assert;
 import org.junit.Test;
 
+@SuppressWarnings("deprecation")
 public class JobClientUnitTest {
   
   public class TestJobClient extends JobClient {
@@ -48,7 +53,6 @@ public class JobClientUnitTest {
     }
   }
 
-  @SuppressWarnings("deprecation")
   @Test
   public void testMapTaskReportsWithNullJob() throws Exception {
     TestJobClient client = new TestJobClient(new JobConf());
@@ -64,7 +68,6 @@ public class JobClientUnitTest {
     verify(mockCluster).getJob(id);
   }
   
-  @SuppressWarnings("deprecation")
   @Test
   public void testReduceTaskReportsWithNullJob() throws Exception {
     TestJobClient client = new TestJobClient(new JobConf());
@@ -80,7 +83,6 @@ public class JobClientUnitTest {
     verify(mockCluster).getJob(id);
   }
   
-  @SuppressWarnings("deprecation")
   @Test
   public void testSetupTaskReportsWithNullJob() throws Exception {
     TestJobClient client = new TestJobClient(new JobConf());
@@ -96,7 +98,6 @@ public class JobClientUnitTest {
     verify(mockCluster).getJob(id);
   }
   
-  @SuppressWarnings("deprecation")
   @Test
   public void testCleanupTaskReportsWithNullJob() throws Exception {
     TestJobClient client = new TestJobClient(new JobConf());
@@ -115,12 +116,15 @@ public class JobClientUnitTest {
   @Test
   public void testShowJob() throws Exception {
     TestJobClient client = new TestJobClient(new JobConf());
-    JobID jobID = new JobID("test", 0);
+
+    long startTime = System.currentTimeMillis();
+
+    JobID jobID = new JobID(String.valueOf(startTime), 12345);
 
     JobStatus mockJobStatus = mock(JobStatus.class);
     when(mockJobStatus.getJobID()).thenReturn(jobID);
     when(mockJobStatus.getState()).thenReturn(JobStatus.State.RUNNING);
-    when(mockJobStatus.getStartTime()).thenReturn(0L);
+    when(mockJobStatus.getStartTime()).thenReturn(startTime);
     when(mockJobStatus.getUsername()).thenReturn("mockuser");
     when(mockJobStatus.getQueue()).thenReturn("mockqueue");
     when(mockJobStatus.getPriority()).thenReturn(JobPriority.NORMAL);
@@ -132,18 +136,21 @@ public class JobClientUnitTest {
     when(mockJobStatus.getSchedulingInfo()).thenReturn("NA");
 
     Job mockJob = mock(Job.class);
-    when(mockJob.getTaskReports(isA(TaskType.class))).thenReturn(new TaskReport[0]);
+    when(mockJob.getTaskReports(isA(TaskType.class))).thenReturn(
+      new TaskReport[5]);
 
     Cluster mockCluster = mock(Cluster.class);
     when(mockCluster.getJob(jobID)).thenReturn(mockJob);
 
     client.setCluster(mockCluster);
     
-    
-    client.displayJobList(new JobStatus[] {mockJobStatus});
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    client.displayJobList(new JobStatus[] {mockJobStatus}, new PrintWriter(out));
+    String commandLineOutput = out.toString();
+    System.out.println(commandLineOutput);
+    Assert.assertTrue(commandLineOutput.contains("Total jobs:1"));
+
     verify(mockJobStatus, atLeastOnce()).getJobID();
-    verify(mockJob, atLeastOnce()).getTaskReports(isA(TaskType.class));
-    verify(mockCluster, atLeastOnce()).getJob(jobID);
     verify(mockJobStatus).getState();
     verify(mockJobStatus).getStartTime();
     verify(mockJobStatus).getUsername();
@@ -155,5 +162,9 @@ public class JobClientUnitTest {
     verify(mockJobStatus).getReservedMem();
     verify(mockJobStatus).getNeededMem();
     verify(mockJobStatus).getSchedulingInfo();
+
+    // This call should not go to each AM.
+    verify(mockCluster, never()).getJob(jobID);
+    verify(mockJob, never()).getTaskReports(isA(TaskType.class));
   }
 }