浏览代码

MAPREDUCE-7028. Concurrent task progress updates causing NPE in Application Master. Contributed by Gergo Repas

(cherry picked from commit fe35103591ece0209f8345aba5544313e45a073c)
Jason Lowe 7 年之前
父节点
当前提交
2a870d91fe

+ 23 - 18
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java

@@ -585,33 +585,38 @@ public class TaskAttemptListenerImpl extends CompositeService
   private void coalesceStatusUpdate(TaskAttemptId yarnAttemptID,
       TaskAttemptStatus taskAttemptStatus,
       AtomicReference<TaskAttemptStatus> lastStatusRef) {
-    boolean asyncUpdatedNeeded = false;
-    TaskAttemptStatus lastStatus = lastStatusRef.get();
-
-    if (lastStatus == null) {
-      lastStatusRef.set(taskAttemptStatus);
-      asyncUpdatedNeeded = true;
-    } else {
-      List<TaskAttemptId> oldFetchFailedMaps =
-          taskAttemptStatus.fetchFailedMaps;
-
-      // merge fetchFailedMaps from the previous update
-      if (lastStatus.fetchFailedMaps != null) {
+    List<TaskAttemptId> fetchFailedMaps = taskAttemptStatus.fetchFailedMaps;
+    TaskAttemptStatus lastStatus = null;
+    boolean done = false;
+    while (!done) {
+      lastStatus = lastStatusRef.get();
+      if (lastStatus != null && lastStatus.fetchFailedMaps != null) {
+        // merge fetchFailedMaps from the previous update
         if (taskAttemptStatus.fetchFailedMaps == null) {
           taskAttemptStatus.fetchFailedMaps = lastStatus.fetchFailedMaps;
         } else {
-          taskAttemptStatus.fetchFailedMaps.addAll(lastStatus.fetchFailedMaps);
+          taskAttemptStatus.fetchFailedMaps =
+              new ArrayList<>(lastStatus.fetchFailedMaps.size() +
+                  fetchFailedMaps.size());
+          taskAttemptStatus.fetchFailedMaps.addAll(
+              lastStatus.fetchFailedMaps);
+          taskAttemptStatus.fetchFailedMaps.addAll(
+              fetchFailedMaps);
         }
       }
 
-      if (!lastStatusRef.compareAndSet(lastStatus, taskAttemptStatus)) {
-        // update failed - async dispatcher has processed it in the meantime
-        taskAttemptStatus.fetchFailedMaps = oldFetchFailedMaps;
-        lastStatusRef.set(taskAttemptStatus);
-        asyncUpdatedNeeded = true;
+      // lastStatusRef may be changed by either the AsyncDispatcher when
+      // it processes the update, or by another IPC server handler
+      done = lastStatusRef.compareAndSet(lastStatus, taskAttemptStatus);
+      if (!done) {
+        LOG.info("TaskAttempt " + yarnAttemptID +
+            ": lastStatusRef changed by another thread, retrying...");
+        // let's revert taskAttemptStatus.fetchFailedMaps
+        taskAttemptStatus.fetchFailedMaps = fetchFailedMaps;
       }
     }
 
+    boolean asyncUpdatedNeeded = (lastStatus == null);
     if (asyncUpdatedNeeded) {
       context.getEventHandler().handle(
           new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,