Browse Source

MAPREDUCE-6384. Add the last reporting reducer info for too many fetch failure diagnostics. Contributed by Chang Li
(cherry picked from commit b6ba56457c6b01dae795c11d587c3fe3855ee707)

Jason Lowe 10 năm trước cách đây
mục cha
commit
fdfd5be2b3

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -97,6 +97,9 @@ Release 2.8.0 - UNRELEASED
     MAPREDUCE-6408. Queue name and user name should be printed on the job page.
     (Siqi Li via gera)
 
+    MAPREDUCE-6384. Add the last reporting reducer info for too many fetch
+    failure diagnostics (Chang Li via jlowe)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 7 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptFetchFailureEvent.java

@@ -28,13 +28,15 @@ public class JobTaskAttemptFetchFailureEvent extends JobEvent {
 
   private final TaskAttemptId reduce;
   private final List<TaskAttemptId> maps;
+  private final String hostname;
 
   public JobTaskAttemptFetchFailureEvent(TaskAttemptId reduce, 
-      List<TaskAttemptId> maps) {
-    super(reduce.getTaskId().getJobId(), 
+      List<TaskAttemptId> maps, String host) {
+    super(reduce.getTaskId().getJobId(),
         JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE);
     this.reduce = reduce;
     this.maps = maps;
+    this.hostname = host;
   }
 
   public List<TaskAttemptId> getMaps() {
@@ -45,4 +47,7 @@ public class JobTaskAttemptFetchFailureEvent extends JobEvent {
     return reduce;
   }
 
+  public String getHost() {
+    return hostname;
+  }
 }

+ 50 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptTooManyFetchFailureEvent.java

@@ -0,0 +1,50 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.job.event;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
+/**
+ * TaskAttemptTooManyFetchFailureEvent is used for TA_TOO_MANY_FETCH_FAILURE.
+ */
+public class TaskAttemptTooManyFetchFailureEvent extends TaskAttemptEvent {
+  private TaskAttemptId reduceID;
+  private String  reduceHostname;
+
+  /**
+   * Create a new TaskAttemptTooManyFetchFailureEvent.
+   * @param attemptId the id of the mapper task attempt
+   * @param reduceId the id of the reporting reduce task attempt.
+   * @param reduceHost the hostname of the reporting reduce task attempt.
+   */
+  public TaskAttemptTooManyFetchFailureEvent(TaskAttemptId attemptId,
+      TaskAttemptId reduceId, String reduceHost) {
+      super(attemptId, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE);
+    this.reduceID = reduceId;
+    this.reduceHostname = reduceHost;
+  }
+
+  public TaskAttemptId getReduceId() {
+    return reduceID;
+  }
+
+  public String getReduceHost() {
+    return reduceHostname;
+  }  
+}

+ 3 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -103,9 +103,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
@@ -1914,8 +1913,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
             && failureRate >= job.getMaxAllowedFetchFailuresFraction()) {
           LOG.info("Too many fetch-failures for output of task attempt: " + 
               mapId + " ... raising fetch failure to map");
-          job.eventHandler.handle(new TaskAttemptEvent(mapId, 
-              TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
+          job.eventHandler.handle(new TaskAttemptTooManyFetchFailureEvent(mapId,
+              fetchfailureEvent.getReduce(), fetchfailureEvent.getHost()));
           job.fetchFailuresMapping.remove(mapId);
         }
       }

+ 12 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -95,6 +95,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
@@ -1923,12 +1924,17 @@ public abstract class TaskAttemptImpl implements
     @SuppressWarnings("unchecked")
     @Override
     public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
+      TaskAttemptTooManyFetchFailureEvent fetchFailureEvent =
+          (TaskAttemptTooManyFetchFailureEvent) event;
       // too many fetch failure can only happen for map tasks
       Preconditions
           .checkArgument(taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP);
       //add to diagnostic
-      taskAttempt.addDiagnosticInfo("Too Many fetch failures.Failing the attempt");
-      
+      taskAttempt.addDiagnosticInfo("Too many fetch failures."
+          + " Failing the attempt. Last failure reported by " +
+          fetchFailureEvent.getReduceId() +
+          " from host " + fetchFailureEvent.getReduceHost());
+
       if (taskAttempt.getLaunchTime() != 0) {
         taskAttempt.eventHandler
             .handle(createJobCounterUpdateEventTAFailed(taskAttempt, true));
@@ -2211,8 +2217,11 @@ public abstract class TaskAttemptImpl implements
       //this only will happen in reduce attempt type
       if (taskAttempt.reportedStatus.fetchFailedMaps != null && 
           taskAttempt.reportedStatus.fetchFailedMaps.size() > 0) {
+        String hostname = taskAttempt.container == null ? "UNKNOWN"
+            : taskAttempt.container.getNodeId().getHost();
         taskAttempt.eventHandler.handle(new JobTaskAttemptFetchFailureEvent(
-            taskAttempt.attemptId, taskAttempt.reportedStatus.fetchFailedMaps));
+            taskAttempt.attemptId, taskAttempt.reportedStatus.fetchFailedMaps,
+                hostname));
       }
     }
   }

+ 18 - 13
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java

@@ -94,10 +94,10 @@ public class TestFetchFailure {
     app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
     
     //send 3 fetch failures from reduce to trigger map re execution
-    sendFetchFailure(app, reduceAttempt, mapAttempt1);
-    sendFetchFailure(app, reduceAttempt, mapAttempt1);
-    sendFetchFailure(app, reduceAttempt, mapAttempt1);
-    
+    sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
+    sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
+    sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
+
     //wait for map Task state move back to RUNNING
     app.waitForState(mapTask, TaskState.RUNNING);
     
@@ -215,9 +215,9 @@ public class TestFetchFailure {
     app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
 
     //send 3 fetch failures from reduce to trigger map re execution
-    sendFetchFailure(app, reduceAttempt, mapAttempt1);
-    sendFetchFailure(app, reduceAttempt, mapAttempt1);
-    sendFetchFailure(app, reduceAttempt, mapAttempt1);
+    sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
+    sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
+    sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
 
     //wait for map Task state move back to RUNNING
     app.waitForState(mapTask, TaskState.RUNNING);
@@ -324,8 +324,8 @@ public class TestFetchFailure {
     updateStatus(app, reduceAttempt3, Phase.SHUFFLE);
 
     //send 2 fetch failures from reduce to prepare for map re execution
-    sendFetchFailure(app, reduceAttempt, mapAttempt1);
-    sendFetchFailure(app, reduceAttempt, mapAttempt1);
+    sendFetchFailure(app, reduceAttempt, mapAttempt1, "host1");
+    sendFetchFailure(app, reduceAttempt2, mapAttempt1, "host2");
 
     //We should not re-launch the map task yet
     assertEquals(TaskState.SUCCEEDED, mapTask.getState());
@@ -333,7 +333,7 @@ public class TestFetchFailure {
     updateStatus(app, reduceAttempt3, Phase.REDUCE);
 
     //send 3rd fetch failures from reduce to trigger map re execution
-    sendFetchFailure(app, reduceAttempt, mapAttempt1);
+    sendFetchFailure(app, reduceAttempt3, mapAttempt1, "host3");
 
     //wait for map Task state move back to RUNNING
     app.waitForState(mapTask, TaskState.RUNNING);
@@ -342,6 +342,11 @@ public class TestFetchFailure {
     Assert.assertEquals("Map TaskAttempt state not correct",
         TaskAttemptState.FAILED, mapAttempt1.getState());
 
+    Assert.assertEquals(mapAttempt1.getDiagnostics().get(0),
+            "Too many fetch failures. Failing the attempt. "
+            + "Last failure reported by "
+            + reduceAttempt3.getID().toString() + " from host host3");
+
     Assert.assertEquals("Num attempts in Map Task not correct",
         2, mapTask.getAttempts().size());
     
@@ -410,7 +415,6 @@ public class TestFetchFailure {
     Assert.assertEquals("Unexpected map event", convertedEvents[2],
         mapEvents[0]);
   }
-  
 
   private void updateStatus(MRApp app, TaskAttempt attempt, Phase phase) {
     TaskAttemptStatusUpdateEvent.TaskAttemptStatus status = new TaskAttemptStatusUpdateEvent.TaskAttemptStatus();
@@ -430,11 +434,12 @@ public class TestFetchFailure {
   }
 
   private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt, 
-      TaskAttempt mapAttempt) {
+      TaskAttempt mapAttempt, String hostname) {
     app.getContext().getEventHandler().handle(
         new JobTaskAttemptFetchFailureEvent(
             reduceAttempt.getID(), 
-            Arrays.asList(new TaskAttemptId[] {mapAttempt.getID()})));
+            Arrays.asList(new TaskAttemptId[] {mapAttempt.getID()}),
+                hostname));
   }
   
   static class MRAppWithHistory extends MRApp {

+ 75 - 68
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java

@@ -70,6 +70,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
@@ -507,6 +508,9 @@ public class TestTaskAttempt{
     JobId jobId = MRBuilderUtils.newJobId(appId, 1);
     TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
     TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    TaskId reduceTaskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.REDUCE);
+    TaskAttemptId reduceTAId =
+        MRBuilderUtils.newTaskAttemptId(reduceTaskId, 0);
     Path jobFile = mock(Path.class);
 
     MockEventHandler eventHandler = new MockEventHandler();
@@ -554,8 +558,8 @@ public class TestTaskAttempt{
 
     assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
         TaskAttemptState.SUCCEEDED);
-    taImpl.handle(new TaskAttemptEvent(attemptId,
-        TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
+    taImpl.handle(new TaskAttemptTooManyFetchFailureEvent(attemptId,
+        reduceTAId, "Host"));
     assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
         TaskAttemptState.FAILED);
     taImpl.handle(new TaskAttemptEvent(attemptId,
@@ -735,72 +739,75 @@ public class TestTaskAttempt{
     
   @Test
   public void testFetchFailureAttemptFinishTime() throws Exception{
-	ApplicationId appId = ApplicationId.newInstance(1, 2);
-	ApplicationAttemptId appAttemptId =
-	ApplicationAttemptId.newInstance(appId, 0);
-	JobId jobId = MRBuilderUtils.newJobId(appId, 1);
-	TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
-	TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
-	Path jobFile = mock(Path.class);
-
-	MockEventHandler eventHandler = new MockEventHandler();
-	TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-	when(taListener.getAddress()).thenReturn(
-		new InetSocketAddress("localhost", 0));
-
-	JobConf jobConf = new JobConf();
-	jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
-	jobConf.setBoolean("fs.file.impl.disable.cache", true);
-	jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
-	jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
-
-	TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
-	when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
-
-	AppContext appCtx = mock(AppContext.class);
-	ClusterInfo clusterInfo = mock(ClusterInfo.class);
-	when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
-  setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
-
-	TaskAttemptImpl taImpl =
-	  new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
-	  splits, jobConf, taListener,mock(Token.class), new Credentials(),
-	  new SystemClock(), appCtx);
-
-	NodeId nid = NodeId.newInstance("127.0.0.1", 0);
-	ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
-	Container container = mock(Container.class);
-	when(container.getId()).thenReturn(contId);
-	when(container.getNodeId()).thenReturn(nid);
-	when(container.getNodeHttpAddress()).thenReturn("localhost:0"); 
-	    
-	taImpl.handle(new TaskAttemptEvent(attemptId,
-	 	TaskAttemptEventType.TA_SCHEDULE));
-	taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
-	    container, mock(Map.class)));
-	taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
-	taImpl.handle(new TaskAttemptEvent(attemptId,
-	    TaskAttemptEventType.TA_DONE));
-	taImpl.handle(new TaskAttemptEvent(attemptId,
-	    TaskAttemptEventType.TA_CONTAINER_COMPLETED));
-	    
-	assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
-		      TaskAttemptState.SUCCEEDED);
-	
-	assertTrue("Task Attempt finish time is not greater than 0", 
-			taImpl.getFinishTime() > 0);
-	
-	Long finishTime = taImpl.getFinishTime();
-	Thread.sleep(5);   
-	taImpl.handle(new TaskAttemptEvent(attemptId,
-	   TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
-	
-	assertEquals("Task attempt is not in Too Many Fetch Failure state", 
-			taImpl.getState(), TaskAttemptState.FAILED);
-	
-	assertEquals("After TA_TOO_MANY_FETCH_FAILURE,"
-		+ " Task attempt finish time is not the same ",
-		finishTime, Long.valueOf(taImpl.getFinishTime()));  
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId =
+    ApplicationAttemptId.newInstance(appId, 0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    TaskId reducetaskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.REDUCE);
+    TaskAttemptId reduceTAId =
+        MRBuilderUtils.newTaskAttemptId(reducetaskId, 0);
+    Path jobFile = mock(Path.class);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+
+    AppContext appCtx = mock(AppContext.class);
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+    setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
+
+    TaskAttemptImpl taImpl =
+      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+      splits, jobConf, taListener,mock(Token.class), new Credentials(),
+      new SystemClock(), appCtx);
+
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_SCHEDULE));
+    taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+        container, mock(Map.class)));
+    taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_DONE));
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+
+    assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
+        TaskAttemptState.SUCCEEDED);
+
+    assertTrue("Task Attempt finish time is not greater than 0",
+        taImpl.getFinishTime() > 0);
+
+    Long finishTime = taImpl.getFinishTime();
+    Thread.sleep(5);
+    taImpl.handle(new TaskAttemptTooManyFetchFailureEvent(attemptId,
+        reduceTAId, "Host"));
+
+    assertEquals("Task attempt is not in Too Many Fetch Failure state",
+        taImpl.getState(), TaskAttemptState.FAILED);
+
+    assertEquals("After TA_TOO_MANY_FETCH_FAILURE,"
+        + " Task attempt finish time is not the same ",
+        finishTime, Long.valueOf(taImpl.getFinishTime()));
   }
   
   @Test