Sfoglia il codice sorgente

Making CS work on 150 nodes + gridmixV3. Fixed logging bugs + yet another protocol buffer wrapper related bug.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-279@1153454 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 14 anni fa
parent
commit
87278e3665
12 ha cambiato i file con 77 aggiunte e 51 eliminazioni
  1. 14 14
      mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptIdPBImpl.java
  2. 5 0
      mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationIdPBImpl.java
  3. 15 1
      mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
  4. 1 1
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
  5. 1 1
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java
  6. 9 0
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  7. 4 1
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
  8. 6 4
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
  9. 0 14
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
  10. 1 3
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
  11. 20 11
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  12. 1 1
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java

+ 14 - 14
mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptIdPBImpl.java

@@ -38,20 +38,20 @@ public class ApplicationAttemptIdPBImpl extends ProtoBase<ApplicationAttemptIdPr
     viaProto = true;
   }
   
-  public ApplicationAttemptIdProto getProto() {
+  public synchronized ApplicationAttemptIdProto getProto() {
     mergeLocalToProto();
     proto = viaProto ? proto : builder.build();
     viaProto = true;
     return proto;
   }
 
-  private void mergeLocalToBuilder() {
+  private synchronized void mergeLocalToBuilder() {
     if (this.applicationId != null && !((ApplicationIdPBImpl)applicationId).getProto().equals(builder.getApplicationId())) {
       builder.setApplicationId(convertToProtoFormat(this.applicationId));
     }
   }
 
-  private void mergeLocalToProto() {
+  private synchronized void mergeLocalToProto() {
     if (viaProto) 
       maybeInitBuilder();
     mergeLocalToBuilder();
@@ -59,7 +59,7 @@ public class ApplicationAttemptIdPBImpl extends ProtoBase<ApplicationAttemptIdPr
     viaProto = true;
   }
 
-  private void maybeInitBuilder() {
+  private synchronized void maybeInitBuilder() {
     if (viaProto || builder == null) {
       builder = ApplicationAttemptIdProto.newBuilder(proto);
     }
@@ -68,18 +68,18 @@ public class ApplicationAttemptIdPBImpl extends ProtoBase<ApplicationAttemptIdPr
     
   
   @Override
-  public int getAttemptId() {
+  public synchronized int getAttemptId() {
     ApplicationAttemptIdProtoOrBuilder p = viaProto ? proto : builder;
     return (p.getAttemptId());
   }
 
   @Override
-  public void setAttemptId(int attemptId) {
+  public synchronized void setAttemptId(int attemptId) {
     maybeInitBuilder();
     builder.setAttemptId((attemptId));
   }
   @Override
-  public ApplicationId getApplicationId() {
+  public synchronized ApplicationId getApplicationId() {
     ApplicationAttemptIdProtoOrBuilder p = viaProto ? proto : builder;
     if (this.applicationId != null) {
       return this.applicationId;
@@ -92,28 +92,28 @@ public class ApplicationAttemptIdPBImpl extends ProtoBase<ApplicationAttemptIdPr
   }
 
   @Override
-  public void setApplicationId(ApplicationId appId) {
+  public synchronized void setApplicationId(ApplicationId appId) {
     maybeInitBuilder();
     if (appId == null) 
       builder.clearApplicationId();
     this.applicationId = appId;
   }
 
-  private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+  private synchronized ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
     return new ApplicationIdPBImpl(p);
   }
 
-  private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+  private synchronized ApplicationIdProto convertToProtoFormat(ApplicationId t) {
     return ((ApplicationIdPBImpl)t).getProto();
   }
 
   @Override
-  public int hashCode() {
+  public synchronized int hashCode() {
     return getProto().hashCode();
   }
 
   @Override
-  public boolean equals(Object other) {
+  public synchronized boolean equals(Object other) {
     if (other.getClass().isAssignableFrom(this.getClass())) {
       return this.getProto().equals(this.getClass().cast(other).getProto());
     }
@@ -121,7 +121,7 @@ public class ApplicationAttemptIdPBImpl extends ProtoBase<ApplicationAttemptIdPr
   }
 
   @Override
-  public int compareTo(ApplicationAttemptId other) {
+  public synchronized int compareTo(ApplicationAttemptId other) {
     int compareAppIds = this.getApplicationId().compareTo(
         other.getApplicationId());
     if (compareAppIds == 0) {
@@ -133,7 +133,7 @@ public class ApplicationAttemptIdPBImpl extends ProtoBase<ApplicationAttemptIdPr
   }
   
   @Override
-  public String toString() {
+  public synchronized String toString() {
     String id = (this.getApplicationId() != null) ? this.getApplicationId().getClusterTimestamp() + "_" +
         idFormat.format(this.getApplicationId().getId()): "none";
     return "appattempt_" + id + "_" + counterFormat.format(getAttemptId());

+ 5 - 0
mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationIdPBImpl.java

@@ -68,4 +68,9 @@ public class ApplicationIdPBImpl extends ProtoBase<ApplicationIdProto> implement
       return this.getId() - other.getId();
     }
   }
+
+  @Override
+  public String toString() {
+    return "application_" + this.getClusterTimestamp() + "_" + this.getId();
+  }
 }  

+ 15 - 1
mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java

@@ -27,6 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.service.AbstractService;
 
 /**
@@ -150,7 +151,20 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
   class GenericEventHandler implements EventHandler<Event> {
     public void handle(Event event) {
       /* all this method does is enqueue all the events onto the queue */
-      eventQueue.offer(event);
+      int qSize = eventQueue.size();
+      if (qSize !=0 && qSize %1000 == 0) {
+        LOG.info("Size of event-queue is " + qSize);
+      }
+      int remCapacity = eventQueue.remainingCapacity();
+      if (remCapacity < 1000) {
+        LOG.info("Very low remaining capacity in the event-queue: "
+            + remCapacity);
+      }
+      try {
+        eventQueue.put(event);
+      } catch (InterruptedException e) {
+        throw new YarnException(e);
+      }
     };
   }
 

+ 1 - 1
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

@@ -205,7 +205,7 @@ public class ApplicationMasterService extends AbstractService implements
     }
 
     // Allow only one thread in AM to do heartbeat at a time.
-    synchronized (lastResponse) {
+    synchronized (lastResponse) { // BUG TODO: Locking order is screwed.
 
       // Send the status update to the appAttempt.
       this.rmContext.getDispatcher().getEventHandler().handle(

+ 1 - 1
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java

@@ -33,7 +33,7 @@ public class RMConfig {
       + "application.max.retries";
   public static final int DEFAULT_ZK_TIMEOUT = 60000;
   public static final int DEFAULT_AM_MAX_RETRIES = 3;
-  public static final int DEFAULT_AM_EXPIRY_INTERVAL = 60000;
+  public static final int DEFAULT_AM_EXPIRY_INTERVAL = 600000;
   public static final String NM_EXPIRY_INTERVAL = YarnConfiguration.RM_PREFIX
       + "nodemanager.expiry.interval";
   public static final int DEFAULT_NM_EXPIRY_INTERVAL = 600000;

+ 9 - 0
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -275,6 +275,15 @@ public class ResourceManager extends CompositeService implements Recoverable {
     @Override
     public void handle(SchedulerEvent event) {
       try {
+        int qSize = eventQueue.size();
+        if (qSize !=0 && qSize %1000 == 0) {
+          LOG.info("Size of scheduler event-queue is " + qSize);
+        }
+        int remCapacity = eventQueue.remainingCapacity();
+        if (remCapacity < 1000) {
+          LOG.info("Very low remaining capacity on scheduler event queue: "
+              + remCapacity);
+        }
         this.eventQueue.put(event);
       } catch (InterruptedException e) {
         throw new YarnException(e);

+ 4 - 1
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -316,7 +316,10 @@ public class RMAppImpl implements RMApp {
     this.readLock.lock();
 
     try {
-      return this.currentAttempt.getTrackingUrl();
+      if (this.currentAttempt != null) {
+        return this.currentAttempt.getTrackingUrl();
+      }
+      return null;
     } finally {
       this.readLock.unlock();
     }

+ 6 - 4
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -47,9 +47,6 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -164,7 +161,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     context.getDispatcher().getEventHandler().handle(
         new NodeAddedSchedulerEvent(this));
   }
-  
+
+  @Override
+  public String toString() {
+    return this.nodeId.toString();
+  }
+
   @Override
   public String getNodeHostName() {
     return hostName;

+ 0 - 14
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java

@@ -230,20 +230,6 @@ public class SchedulerApp {
         }
       }
     }
-    // TODO - Remove block
-    for (Priority priority : getPriorities()) {
-      Map<String, ResourceRequest> requests = getResourceRequests(priority);
-      if (requests != null) {
-        LOG.info("showRequests:" + " application=" + getApplicationId() + 
-            " headRoom=" + getHeadroom() + 
-            " currentConsumption=" + currentConsumption.getMemory());
-        for (ResourceRequest request : requests.values()) {
-          LOG.info("showRequests:" + " application=" + getApplicationId()
-              + " request=" + request);
-        }
-      }
-    }
-
   }
 
   public synchronized void setAvailableResourceLimit(Resource globalLimit) {

+ 1 - 3
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java

@@ -81,9 +81,7 @@ public class SchedulerNode {
     ++numContainers;
     
     launchedContainers.put(container.getId(), rmContainer);
-    LOG.info("Allocated container " + container.getId() + 
-        " to node " + rmNode.getNodeAddress());
-    
+
     LOG.info("Assigned container " + container.getId() + 
         " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + 
         ", which currently has " + numContainers + " containers, " + 

+ 20 - 11
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -409,21 +409,24 @@ implements ResourceScheduler, CapacitySchedulerContext {
 
     synchronized (application) {
 
-      LOG.info("DEBUG --- allocate: pre-update" +
-          " applicationId=" + applicationAttemptId + 
-          " application=" + application);
-      application.showRequests();
+      if (!ask.isEmpty()) {
 
-      // Update application requests
-      application.updateResourceRequests(ask);
-
-      LOG.info("DEBUG --- allocate: post-update");
-      application.showRequests();
+        LOG.info("DEBUG --- allocate: pre-update" +
+            " applicationAttemptId=" + applicationAttemptId + 
+            " application=" + application);
+        application.showRequests();
+  
+        // Update application requests
+        application.updateResourceRequests(ask);
+  
+        LOG.info("DEBUG --- allocate: post-update");
+        application.showRequests();
+      }
 
       LOG.info("DEBUG --- allocate:" +
-          " applicationId=" + applicationAttemptId + 
+          " applicationAttemptId=" + applicationAttemptId + 
           " #ask=" + ask.size());
-      
+
       return new Allocation(
           application.pullNewlyAllocatedContainers(), 
           application.getHeadroom());
@@ -481,6 +484,8 @@ implements ResourceScheduler, CapacitySchedulerContext {
     LOG.info("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
     
     SchedulerNode node = getNode(nm.getNodeID());
+
+    // Processing the current containers running/finished on node
     for (List<Container> appContainers : containers.values()) {
       for (Container container : appContainers) {
         if (container.getState() == ContainerState.RUNNING) {
@@ -493,6 +498,10 @@ implements ResourceScheduler, CapacitySchedulerContext {
       }
     }
 
+    // Now node data structures are upto date and ready for scheduling.
+    LOG.info("DEBUG -- Node being looked for scheduling " + nm
+        + " availableResource: " + node.getAvailableResource());
+
     // Assign new containers...
     // 1. Check for reserved applications
     // 2. Schedule if there are no reservations

+ 1 - 1
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java

@@ -516,7 +516,7 @@ public class ParentQueue implements Queue {
 
       LOG.info("DEBUG ---" +
       		" parentQ=" + getQueueName() + 
-      		" assigned=" + assigned + 
+      		" assignedSoFarInThisIteration=" + assigned + 
       		" utilization=" + getUtilization());
       
       // Do not assign more than one container if this isn't the root queue