Browse Source

MAPREDUCE-5803. Counters page display all task nevertheless of task type( Map or Reduce). Contributed by Kai Sasaki.

Rohith Sharma K S 8 years ago
parent
commit
4fd37eed90

+ 13 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/SingleCounterBlock.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
@@ -51,6 +52,7 @@ public class SingleCounterBlock extends HtmlBlock {
   protected TreeMap<String, Long> values = new TreeMap<String, Long>(); 
   protected Job job;
   protected Task task;
+  private TaskType counterType;
   
   @Inject SingleCounterBlock(AppContext appCtx, ViewContext ctx) {
     super(ctx);
@@ -101,6 +103,13 @@ public class SingleCounterBlock extends HtmlBlock {
     JobId jobID = null;
     TaskId taskID = null;
     String tid = $(TASK_ID);
+    if ($(TITLE).contains("MAPS")) {
+      counterType = TaskType.MAP;
+    } else if ($(TITLE).contains("REDUCES")) {
+      counterType = TaskType.REDUCE;
+    } else {
+      counterType = null;
+    }
     if (!tid.isEmpty()) {
       taskID = MRApps.toTaskID(tid);
       jobID = taskID.getJobId();
@@ -152,7 +161,10 @@ public class SingleCounterBlock extends HtmlBlock {
           value = c.getValue();
         }
       }
-      values.put(MRApps.toString(entry.getKey()), value);
+      if (counterType == null ||
+              counterType == entry.getValue().getType()) {
+        values.put(MRApps.toString(entry.getKey()), value);
+      }
     }
   }
 }

+ 119 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestBlocks.java

@@ -23,6 +23,7 @@ import java.io.PrintWriter;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.yarn.webapp.View;
 import org.junit.Test;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -206,6 +207,68 @@ public class TestBlocks {
         +"attempt_0_0001_r_000000_0</a>"));
   }
 
+  @Test
+  public void testSingleCounterBlock() {
+    AppContext appCtx = mock(AppContext.class);
+    View.ViewContext ctx = mock(View.ViewContext.class);
+    JobId jobId = new JobIdPBImpl();
+    jobId.setId(0);
+    jobId.setAppId(ApplicationIdPBImpl.newInstance(0, 1));
+
+    TaskId mapTaskId = new TaskIdPBImpl();
+    mapTaskId.setId(0);
+    mapTaskId.setTaskType(TaskType.MAP);
+    mapTaskId.setJobId(jobId);
+    Task mapTask = mock(Task.class);
+    when(mapTask.getID()).thenReturn(mapTaskId);
+    TaskReport mapReport = mock(TaskReport.class);
+    when(mapTask.getReport()).thenReturn(mapReport);
+    when(mapTask.getType()).thenReturn(TaskType.MAP);
+
+    TaskId reduceTaskId = new TaskIdPBImpl();
+    reduceTaskId.setId(0);
+    reduceTaskId.setTaskType(TaskType.REDUCE);
+    reduceTaskId.setJobId(jobId);
+    Task reduceTask = mock(Task.class);
+    when(reduceTask.getID()).thenReturn(reduceTaskId);
+    TaskReport reduceReport = mock(TaskReport.class);
+    when(reduceTask.getReport()).thenReturn(reduceReport);
+    when(reduceTask.getType()).thenReturn(TaskType.REDUCE);
+
+    Map<TaskId, Task> tasks =
+            new HashMap<TaskId, Task>();
+    tasks.put(mapTaskId, mapTask);
+    tasks.put(reduceTaskId, reduceTask);
+
+    Job job = mock(Job.class);
+    when(job.getTasks()).thenReturn(tasks);
+    when(appCtx.getJob(any(JobId.class))).thenReturn(job);
+
+    // SingleCounter for map task
+    SingleCounterBlockForMapTest blockForMapTest
+            = spy(new SingleCounterBlockForMapTest(appCtx, ctx));
+    PrintWriter pWriterForMapTest = new PrintWriter(data);
+    Block htmlForMapTest = new BlockForTest(new HtmlBlockForTest(),
+            pWriterForMapTest, 0, false);
+    blockForMapTest.render(htmlForMapTest);
+    pWriterForMapTest.flush();
+    assertTrue(data.toString().contains("task_0_0001_m_000000"));
+    assertFalse(data.toString().contains("task_0_0001_r_000000"));
+
+    data.reset();
+    // SingleCounter for reduce task
+    SingleCounterBlockForReduceTest blockForReduceTest
+            = spy(new SingleCounterBlockForReduceTest(appCtx, ctx));
+    PrintWriter pWriterForReduceTest = new PrintWriter(data);
+    Block htmlForReduceTest = new BlockForTest(new HtmlBlockForTest(),
+            pWriterForReduceTest, 0, false);
+    blockForReduceTest.render(htmlForReduceTest);
+    pWriterForReduceTest.flush();
+    System.out.println(data.toString());
+    assertFalse(data.toString().contains("task_0_0001_m_000000"));
+    assertTrue(data.toString().contains("task_0_0001_r_000000"));
+  }
+
   private class ConfBlockForTest extends ConfBlock {
     private final Map<String, String> params = new HashMap<String, String>();
 
@@ -258,4 +321,60 @@ public class TestBlocks {
       return result;
     }
   }
+
+  private class SingleCounterBlockForMapTest extends SingleCounterBlock {
+
+    public SingleCounterBlockForMapTest(AppContext appCtx, ViewContext ctx) {
+      super(appCtx, ctx);
+    }
+
+    public String $(String key, String defaultValue) {
+      if (key.equals(TITLE)) {
+        return "org.apache.hadoop.mapreduce.JobCounter DATA_LOCAL_MAPS for " +
+                "job_12345_0001";
+      } else if (key.equals(AMParams.JOB_ID)) {
+        return "job_12345_0001";
+      } else if (key.equals(AMParams.TASK_ID)) {
+        return "";
+      }
+      return "";
+    }
+
+    @Override
+    public String url(String... parts) {
+      String result = "url://";
+      for (String string : parts) {
+        result += string + ":";
+      }
+      return result;
+    }
+  }
+
+  private class SingleCounterBlockForReduceTest extends SingleCounterBlock {
+
+    public SingleCounterBlockForReduceTest(AppContext appCtx, ViewContext ctx) {
+      super(appCtx, ctx);
+    }
+
+    public String $(String key, String defaultValue) {
+      if (key.equals(TITLE)) {
+        return "org.apache.hadoop.mapreduce.JobCounter DATA_LOCAL_REDUCES " +
+            "for job_12345_0001";
+      } else if (key.equals(AMParams.JOB_ID)) {
+        return "job_12345_0001";
+      } else if (key.equals(AMParams.TASK_ID)) {
+        return "";
+      }
+      return "";
+    }
+
+    @Override
+    public String url(String... parts) {
+      String result = "url://";
+      for (String string : parts) {
+        result += string + ":";
+      }
+      return result;
+    }
+  }
 }