Selaa lähdekoodia

YARN-3464. Race condition in LocalizerRunner kills localizer before localizing all resources. (Zhihai Xu via kasha)

(cherry picked from commit 47279c3228185548ed09c36579b420225e4894f5)
(cherry picked from commit 4045c41afe440b773d006e962bf8a5eae3fdc284)
Karthik Kambatla 10 vuotta sitten
vanhempi
commit
4ddcc7e5b5

+ 2 - 0
hadoop-yarn-project/CHANGES.txt

@@ -41,6 +41,8 @@ Release 2.7.1 - UNRELEASED
     YARN-3516. killing ContainerLocalizer action doesn't take effect when
     private localizer receives FETCH_FAILURE status.(zhihai xu via xgong)
 
+    YARN-3464. Race condition in LocalizerRunner kills localizer before 
+    localizing all resources. (Zhihai Xu via kasha)
 
 Release 2.7.0 - 2015-04-20
 

+ 7 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java

@@ -59,7 +59,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
@@ -715,7 +717,12 @@ public class ContainerImpl implements Container {
         return ContainerState.LOCALIZING;
       }
 
+      container.dispatcher.getEventHandler().handle(
+          new ContainerLocalizationEvent(LocalizationEventType.
+              CONTAINER_RESOURCES_LOCALIZED, container));
+
       container.sendLaunchEvent();
+      container.metrics.endInitingContainer();
 
       // If this is a recovered container that has already launched, skip
       // uploading resources to the shared cache. We do this to avoid uploading
@@ -733,7 +740,6 @@ public class ContainerImpl implements Container {
                 SharedCacheUploadEventType.UPLOAD));
       }
 
-      container.metrics.endInitingContainer();
       return ContainerState.LOCALIZED;
     }
   }

+ 41 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java

@@ -35,6 +35,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
@@ -108,6 +109,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@@ -389,6 +391,9 @@ public class ResourceLocalizationService extends CompositeService
     case INIT_CONTAINER_RESOURCES:
       handleInitContainerResources((ContainerLocalizationRequestEvent) event);
       break;
+    case CONTAINER_RESOURCES_LOCALIZED:
+      handleContainerResourcesLocalized((ContainerLocalizationEvent) event);
+      break;
     case CACHE_CLEANUP:
       handleCacheCleanup(event);
       break;
@@ -451,7 +456,18 @@ public class ResourceLocalizationService extends CompositeService
       }
     }
   }
-  
+
+  /**
+   * Once a container's resources are localized, kill the corresponding
+   * {@link ContainerLocalizer}
+   */
+  private void handleContainerResourcesLocalized(
+      ContainerLocalizationEvent event) {
+    Container c = event.getContainer();
+    String locId = ConverterUtils.toString(c.getContainerId());
+    localizerTracker.endContainerLocalization(locId);
+  }
+
   private void handleCacheCleanup(LocalizationEvent event) {
     ResourceRetentionSet retain =
       new ResourceRetentionSet(delService, cacheTargetSize);
@@ -662,7 +678,7 @@ public class ResourceLocalizationService extends CompositeService
           response.setLocalizerAction(LocalizerAction.DIE);
           return response;
         }
-        return localizer.update(status.getResources());
+        return localizer.processHeartbeat(status.getResources());
       }
     }
     
@@ -716,6 +732,17 @@ public class ResourceLocalizationService extends CompositeService
         localizer.interrupt();
       }
     }
+
+    public void endContainerLocalization(String locId) {
+      LocalizerRunner localizer;
+      synchronized (privLocalizers) {
+        localizer = privLocalizers.get(locId);
+        if (null == localizer) {
+          return; // ignore
+        }
+      }
+      localizer.endContainerLocalization();
+    }
   }
   
 
@@ -870,6 +897,7 @@ public class ResourceLocalizationService extends CompositeService
     final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled;
     // Its a shared list between Private Localizer and dispatcher thread.
     final List<LocalizerResourceRequestEvent> pending;
+    private AtomicBoolean killContainerLocalizer = new AtomicBoolean(false);
 
     // TODO: threadsafe, use outer?
     private final RecordFactory recordFactory =
@@ -890,6 +918,10 @@ public class ResourceLocalizationService extends CompositeService
       pending.add(request);
     }
 
+    public void endContainerLocalization() {
+      killContainerLocalizer.set(true);
+    }
+
     /**
      * Find next resource to be given to a spawned localizer.
      * 
@@ -936,7 +968,7 @@ public class ResourceLocalizationService extends CompositeService
       }
     }
 
-    LocalizerHeartbeatResponse update(
+    LocalizerHeartbeatResponse processHeartbeat(
         List<LocalResourceStatus> remoteResourceStatuses) {
       LocalizerHeartbeatResponse response =
         recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
@@ -945,7 +977,7 @@ public class ResourceLocalizationService extends CompositeService
       ApplicationId applicationId =
           context.getContainerId().getApplicationAttemptId().getApplicationId();
 
-      LocalizerAction action = LocalizerAction.LIVE;
+      boolean fetchFailed = false;
       // Update resource statuses.
       for (LocalResourceStatus stat : remoteResourceStatuses) {
         LocalResource rsrc = stat.getResource();
@@ -981,7 +1013,7 @@ public class ResourceLocalizationService extends CompositeService
           case FETCH_FAILURE:
             final String diagnostics = stat.getException().toString();
             LOG.warn(req + " failed: " + diagnostics);
-            action = LocalizerAction.DIE;
+            fetchFailed = true;
             getLocalResourcesTracker(req.getVisibility(), user, applicationId)
               .handle(new ResourceFailedLocalizationEvent(
                   req, diagnostics));
@@ -993,15 +1025,15 @@ public class ResourceLocalizationService extends CompositeService
             break;
           default:
             LOG.info("Unknown status: " + stat.getStatus());
-            action = LocalizerAction.DIE;
+            fetchFailed = true;
             getLocalResourcesTracker(req.getVisibility(), user, applicationId)
               .handle(new ResourceFailedLocalizationEvent(
                   req, stat.getException().getMessage()));
             break;
         }
       }
-      if (action == LocalizerAction.DIE) {
-        response.setLocalizerAction(action);
+      if (fetchFailed || killContainerLocalizer.get()) {
+        response.setLocalizerAction(LocalizerAction.DIE);
         return response;
       }
 
@@ -1029,12 +1061,9 @@ public class ResourceLocalizationService extends CompositeService
         } catch (URISyntaxException e) {
             //TODO fail? Already translated several times...
         }
-      } else if (pending.isEmpty()) {
-        // TODO: Synchronization
-        action = LocalizerAction.DIE;
       }
 
-      response.setLocalizerAction(action);
+      response.setLocalizerAction(LocalizerAction.LIVE);
       response.setResourceSpecs(rsrcs);
       return response;
     }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java

@@ -23,4 +23,5 @@ public enum LocalizationEventType {
   CACHE_CLEANUP,
   CLEANUP_CONTAINER_RESOURCES,
   DESTROY_APPLICATION_RESOURCES,
+  CONTAINER_RESOURCES_LOCALIZED,
 }

+ 10 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

@@ -125,6 +125,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Reso
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.PublicLocalizer;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@@ -975,7 +976,8 @@ public class TestResourceLocalizationService {
         .thenReturn(Collections.<LocalResourceStatus>emptyList())
         .thenReturn(Collections.singletonList(rsrc1success))
         .thenReturn(Collections.singletonList(rsrc2pending))
-        .thenReturn(rsrcs4);
+        .thenReturn(rsrcs4)
+        .thenReturn(Collections.<LocalResourceStatus>emptyList());
 
       String localPath = Path.SEPARATOR + ContainerLocalizer.USERCACHE +
           Path.SEPARATOR + "user0" + Path.SEPARATOR +
@@ -1019,7 +1021,13 @@ public class TestResourceLocalizationService {
       assertTrue(localizedPath.getFile().endsWith(
           localPath + Path.SEPARATOR + "1" + Path.SEPARATOR + "12"));
 
-      // get shutdown
+      response = spyService.heartbeat(stat);
+      assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+
+      spyService.handle(new ContainerLocalizationEvent(
+          LocalizationEventType.CONTAINER_RESOURCES_LOCALIZED, c));
+
+      // get shutdown after receive CONTAINER_RESOURCES_LOCALIZED event
       response = spyService.heartbeat(stat);
       assertEquals(LocalizerAction.DIE, response.getLocalizerAction());