Browse Source

FifoScheduler works on a single node!

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-279@1153441 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 14 years ago
parent
commit
041c9c55bd

+ 5 - 4
mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java

@@ -281,11 +281,12 @@ public class TestMRJobs {
 
 
   protected Job runFailingMapperJob()
   protected Job runFailingMapperJob()
   throws IOException, InterruptedException, ClassNotFoundException {
   throws IOException, InterruptedException, ClassNotFoundException {
-    mrCluster.getConfig().setInt(MRJobConfig.NUM_MAPS, 1);
-    mrCluster.getConfig().setInt("mapreduce.task.timeout", 10*1000);//reduce the timeout
-    mrCluster.getConfig().setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2); //reduce the number of attempts
+    Configuration myConf = new Configuration(mrCluster.getConfig());
+    myConf.setInt(MRJobConfig.NUM_MAPS, 1);
+    myConf.setInt("mapreduce.task.timeout", 10*1000);//reduce the timeout
+    myConf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2); //reduce the number of attempts
 
 
-    Job job = new Job(mrCluster.getConfig());
+    Job job = new Job(myConf);
 
 
     job.setJarByClass(FailingMapper.class);
     job.setJarByClass(FailingMapper.class);
     job.setJobName("failmapper");
     job.setJobName("failmapper");

+ 0 - 1
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -437,7 +437,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
       resourceManager.start();
       resourceManager.start();
     } catch (Throwable e) {
     } catch (Throwable e) {
       LOG.error("Error starting RM", e);
       LOG.error("Error starting RM", e);
-    } finally {
       if (resourceManager != null) {
       if (resourceManager != null) {
         resourceManager.stop();
         resourceManager.stop();
       }
       }

+ 19 - 0
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerFinishedEvent.java

@@ -0,0 +1,19 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+
+public class RMContainerFinishedEvent extends RMContainerEvent {
+
+  private final ContainerStatus remoteContainerStatus;
+
+  public RMContainerFinishedEvent(ContainerId containerId,
+      ContainerStatus containerStatus) {
+    super(containerId, RMContainerEventType.FINISHED);
+    this.remoteContainerStatus = containerStatus;
+  }
+
+  public ContainerStatus getRemoteContainerStatus() {
+    return this.remoteContainerStatus;
+  }
+}

+ 8 - 0
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java

@@ -198,6 +198,14 @@ public class RMContainerImpl implements RMContainer {
     @Override
     @Override
     public void transition(RMContainerImpl container, RMContainerEvent event) {
     public void transition(RMContainerImpl container, RMContainerEvent event) {
 
 
+      RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event;
+
+      // Update container-status for diagnostics. Today we completely
+      // replace it on finish. We may just need to update diagnostics.
+      // ^TODO
+      container.container.setContainerStatus(finishedEvent
+          .getRemoteContainerStatus());
+
       // Inform AppAttempt
       // Inform AppAttempt
       container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
       container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
           container.appAttemptId, container.container));
           container.appAttemptId, container.container));

+ 13 - 11
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -379,28 +380,29 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         return RMNodeState.UNHEALTHY;
         return RMNodeState.UNHEALTHY;
       }
       }
 
 
-      Map<ApplicationId, List<Container>> appConts = statusEvent
+      Map<ApplicationId, List<Container>> remoteAppContainersMap = statusEvent
           .getContainersCollection();
           .getContainersCollection();
-      for (List<Container> l : appConts.values()) {
-        for (Container cont : l) {
+      for (List<Container> remoteContainerList : remoteAppContainersMap
+          .values()) {
+        for (Container remoteContainer : remoteContainerList) {
 
 
           // Process running containers
           // Process running containers
-          ContainerId containerId = cont.getId();
-          if (cont.getContainerStatus().getState() == ContainerState.RUNNING
-              || cont.getContainerStatus().getState() == ContainerState.INITIALIZING) {
+          ContainerId containerId = remoteContainer.getId();
+          if (remoteContainer.getContainerStatus().getState() == ContainerState.RUNNING
+              || remoteContainer.getContainerStatus().getState() == ContainerState.INITIALIZING) {
             if (!rmNode.launchedContainers.containsKey(containerId)) {
             if (!rmNode.launchedContainers.containsKey(containerId)) {
-              rmNode.launchedContainers.put(containerId, cont);
+              // Just launched container. RM knows about it the first time.
+              rmNode.launchedContainers.put(containerId, remoteContainer);
               rmNode.context.getDispatcher().getEventHandler().handle(
               rmNode.context.getDispatcher().getEventHandler().handle(
                   new RMContainerEvent(containerId,
                   new RMContainerEvent(containerId,
                       RMContainerEventType.LAUNCHED));
                       RMContainerEventType.LAUNCHED));
             }
             }
           } else {
           } else {
-
-            rmNode.launchedContainers.remove(containerId);
+            // A finished container
             // Send event to the finished container
             // Send event to the finished container
             rmNode.context.getDispatcher().getEventHandler().handle(
             rmNode.context.getDispatcher().getEventHandler().handle(
-                new RMContainerEvent(containerId,
-                    RMContainerEventType.FINISHED));
+                new RMContainerFinishedEvent(containerId, remoteContainer
+                    .getContainerStatus()));
           }
           }
         }
         }
       }
       }

+ 3 - 1
mapreduce/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java

@@ -165,7 +165,9 @@ public class MiniYARNCluster extends CompositeService {
         remoteLogDir.mkdir();
         remoteLogDir.mkdir();
         LOG.info("Created logDir in " + logDir.getAbsolutePath());
         LOG.info("Created logDir in " + logDir.getAbsolutePath());
         getConfig().set(NMConfig.NM_LOG_DIR, logDir.getAbsolutePath());
         getConfig().set(NMConfig.NM_LOG_DIR, logDir.getAbsolutePath());
-        getConfig().set(NMConfig.REMOTE_USER_LOG_DIR, remoteLogDir.getAbsolutePath());
+        getConfig().set(NMConfig.REMOTE_USER_LOG_DIR,
+            remoteLogDir.getAbsolutePath());
+        getConfig().setInt(NMConfig.NM_VMEM_GB, 4); // By default AM + 2 containers
         nodeManager = new NodeManager() {
         nodeManager = new NodeManager() {
 
 
           @Override
           @Override