Prechádzať zdrojové kódy

YARN-3464. Race condition in LocalizerRunner kills localizer before localizing all resources. (Zhihai Xu via kasha)

(cherry picked from commit 47279c3228185548ed09c36579b420225e4894f5)
(cherry picked from commit 4045c41afe440b773d006e962bf8a5eae3fdc284)
(cherry picked from commit 6f2cc0dfa8f21984ecdab59dc087ccf525934930)
Karthik Kambatla 10 rokov pred
rodič
commit
6ade6b5051

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -132,6 +132,9 @@ Release 2.6.1 - UNRELEASED
     YARN-3024. LocalizerRunner should give DIE action when all resources are
     localized. (Chengbing Liu via xgong)
 
+    YARN-3464. Race condition in LocalizerRunner kills localizer before 
+    localizing all resources. (Zhihai Xu via kasha)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java

@@ -58,7 +58,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
@@ -709,6 +711,10 @@ public class ContainerImpl implements Container {
         return ContainerState.LOCALIZING;
       }
 
+      container.dispatcher.getEventHandler().handle(
+          new ContainerLocalizationEvent(LocalizationEventType.
+              CONTAINER_RESOURCES_LOCALIZED, container));
+
       container.sendLaunchEvent();
       container.metrics.endInitingContainer();
       return ContainerState.LOCALIZED;

+ 41 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java

@@ -49,6 +49,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -107,6 +108,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@@ -388,6 +390,9 @@ public class ResourceLocalizationService extends CompositeService
     case INIT_CONTAINER_RESOURCES:
       handleInitContainerResources((ContainerLocalizationRequestEvent) event);
       break;
+    case CONTAINER_RESOURCES_LOCALIZED:
+      handleContainerResourcesLocalized((ContainerLocalizationEvent) event);
+      break;
     case CACHE_CLEANUP:
       handleCacheCleanup(event);
       break;
@@ -450,7 +455,18 @@ public class ResourceLocalizationService extends CompositeService
       }
     }
   }
-  
+
+  /**
+   * Once a container's resources are localized, kill the corresponding
+   * {@link ContainerLocalizer}
+   */
+  private void handleContainerResourcesLocalized(
+      ContainerLocalizationEvent event) {
+    Container c = event.getContainer();
+    String locId = ConverterUtils.toString(c.getContainerId());
+    localizerTracker.endContainerLocalization(locId);
+  }
+
   private void handleCacheCleanup(LocalizationEvent event) {
     ResourceRetentionSet retain =
       new ResourceRetentionSet(delService, cacheTargetSize);
@@ -661,7 +677,7 @@ public class ResourceLocalizationService extends CompositeService
           response.setLocalizerAction(LocalizerAction.DIE);
           return response;
         }
-        return localizer.update(status.getResources());
+        return localizer.processHeartbeat(status.getResources());
       }
     }
     
@@ -715,6 +731,17 @@ public class ResourceLocalizationService extends CompositeService
         localizer.interrupt();
       }
     }
+
+    public void endContainerLocalization(String locId) {
+      LocalizerRunner localizer;
+      synchronized (privLocalizers) {
+        localizer = privLocalizers.get(locId);
+        if (null == localizer) {
+          return; // ignore
+        }
+      }
+      localizer.endContainerLocalization();
+    }
   }
   
 
@@ -863,6 +890,7 @@ public class ResourceLocalizationService extends CompositeService
     final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled;
     // Its a shared list between Private Localizer and dispatcher thread.
     final List<LocalizerResourceRequestEvent> pending;
+    private AtomicBoolean killContainerLocalizer = new AtomicBoolean(false);
 
     // TODO: threadsafe, use outer?
     private final RecordFactory recordFactory =
@@ -883,6 +911,10 @@ public class ResourceLocalizationService extends CompositeService
       pending.add(request);
     }
 
+    public void endContainerLocalization() {
+      killContainerLocalizer.set(true);
+    }
+
     /**
      * Find next resource to be given to a spawned localizer.
      * 
@@ -929,7 +961,7 @@ public class ResourceLocalizationService extends CompositeService
       }
     }
 
-    LocalizerHeartbeatResponse update(
+    LocalizerHeartbeatResponse processHeartbeat(
         List<LocalResourceStatus> remoteResourceStatuses) {
       LocalizerHeartbeatResponse response =
         recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
@@ -938,7 +970,7 @@ public class ResourceLocalizationService extends CompositeService
       ApplicationId applicationId =
           context.getContainerId().getApplicationAttemptId().getApplicationId();
 
-      LocalizerAction action = LocalizerAction.LIVE;
+      boolean fetchFailed = false;
       // Update resource statuses.
       for (LocalResourceStatus stat : remoteResourceStatuses) {
         LocalResource rsrc = stat.getResource();
@@ -974,7 +1006,7 @@ public class ResourceLocalizationService extends CompositeService
           case FETCH_FAILURE:
             final String diagnostics = stat.getException().toString();
             LOG.warn(req + " failed: " + diagnostics);
-            response.setLocalizerAction(LocalizerAction.DIE);
+            fetchFailed = true;
             getLocalResourcesTracker(req.getVisibility(), user, applicationId)
               .handle(new ResourceFailedLocalizationEvent(
                   req, diagnostics));
@@ -986,15 +1018,15 @@ public class ResourceLocalizationService extends CompositeService
             break;
           default:
             LOG.info("Unknown status: " + stat.getStatus());
-            action = LocalizerAction.DIE;
+            fetchFailed = true;
             getLocalResourcesTracker(req.getVisibility(), user, applicationId)
               .handle(new ResourceFailedLocalizationEvent(
                   req, stat.getException().getMessage()));
             break;
         }
       }
-      if (action == LocalizerAction.DIE) {
-        response.setLocalizerAction(action);
+      if (fetchFailed || killContainerLocalizer.get()) {
+        response.setLocalizerAction(LocalizerAction.DIE);
         return response;
       }
 
@@ -1022,12 +1054,9 @@ public class ResourceLocalizationService extends CompositeService
         } catch (URISyntaxException e) {
             //TODO fail? Already translated several times...
         }
-      } else if (pending.isEmpty()) {
-        // TODO: Synchronization
-        action = LocalizerAction.DIE;
       }
 
-      response.setLocalizerAction(action);
+      response.setLocalizerAction(LocalizerAction.LIVE);
       response.setResourceSpecs(rsrcs);
       return response;
     }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java

@@ -23,4 +23,5 @@ public enum LocalizationEventType {
   CACHE_CLEANUP,
   CLEANUP_CONTAINER_RESOURCES,
   DESTROY_APPLICATION_RESOURCES,
+  CONTAINER_RESOURCES_LOCALIZED,
 }

+ 11 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

@@ -22,28 +22,25 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyShort;
-import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -63,12 +60,6 @@ import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.Future;
 
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.security.AccessControlException;
-import org.junit.Assert;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.AbstractFileSystem;
@@ -76,6 +67,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
@@ -128,6 +120,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Reso
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.PublicLocalizer;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@@ -145,6 +138,7 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -942,7 +936,13 @@ public class TestResourceLocalizationService {
       assertTrue(localizedPath.getFile().endsWith(
           localPath + Path.SEPARATOR + "1" + Path.SEPARATOR + "12"));
 
-      // get shutdown
+      response = spyService.heartbeat(stat);
+      assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+
+      spyService.handle(new ContainerLocalizationEvent(
+          LocalizationEventType.CONTAINER_RESOURCES_LOCALIZED, c));
+
+      // get shutdown after receive CONTAINER_RESOURCES_LOCALIZED event
       response = spyService.heartbeat(stat);
       assertEquals(LocalizerAction.DIE, response.getLocalizerAction());