1
0
Kaynağa Gözat

MAPREDUCE-3572. Moved AM event dispatcher to a separate thread for performance reasons. Contributed by Vinod K V.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1227426 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 13 yıl önce
ebeveyn
işleme
08f8abf563

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -406,6 +406,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3566. Fixed MR AM to construct CLC only once across all tasks.
     (vinodkv via acmurthy) 
 
+    MAPREDUCE-3572. Moved AM event dispatcher to a separate thread for
+    performance reasons. (vinodkv via acmurthy) 
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

+ 1 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java

@@ -104,10 +104,9 @@ public abstract class RMCommunicator extends AbstractService  {
   @Override
   public void start() {
     scheduler= createSchedulerProxy();
-    //LOG.info("Scheduler is " + scheduler);
     register();
     startAllocatorThread();
-    JobID id = TypeConverter.fromYarn(context.getApplicationID());
+    JobID id = TypeConverter.fromYarn(this.applicationId);
     JobId jobId = TypeConverter.toYarn(id);
     job = context.getJob(jobId);
     super.start();

+ 69 - 15
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -30,18 +30,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobCounter;
-import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
-import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -69,7 +68,7 @@ import org.apache.hadoop.yarn.util.RackResolver;
 public class RMContainerAllocator extends RMContainerRequestor
     implements ContainerAllocator {
 
-  private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
+  static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
   
   public static final 
   float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
@@ -77,7 +76,10 @@ public class RMContainerAllocator extends RMContainerRequestor
   private static final Priority PRIORITY_FAST_FAIL_MAP;
   private static final Priority PRIORITY_REDUCE;
   private static final Priority PRIORITY_MAP;
-  
+
+  private Thread eventHandlingThread;
+  private volatile boolean stopEventHandling;
+
   static {
     PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
     PRIORITY_FAST_FAIL_MAP.setPriority(5);
@@ -130,7 +132,10 @@ public class RMContainerAllocator extends RMContainerRequestor
   private float reduceSlowStart = 0;
   private long retryInterval;
   private long retrystartTime;
-  
+
+  BlockingQueue<ContainerAllocatorEvent> eventQueue
+    = new LinkedBlockingQueue<ContainerAllocatorEvent>();
+
   public RMContainerAllocator(ClientService clientService, AppContext context) {
     super(clientService, context);
   }
@@ -155,6 +160,40 @@ public class RMContainerAllocator extends RMContainerRequestor
     retrystartTime = System.currentTimeMillis();
   }
 
+  @Override
+  public void start() {
+    this.eventHandlingThread = new Thread() {
+      @SuppressWarnings("unchecked")
+      @Override
+      public void run() {
+
+        ContainerAllocatorEvent event;
+
+        while (!stopEventHandling && !Thread.currentThread().isInterrupted()) {
+          try {
+            event = RMContainerAllocator.this.eventQueue.take();
+          } catch (InterruptedException e) {
+            LOG.error("Returning, interrupted : " + e);
+            return;
+          }
+
+          try {
+            handleEvent(event);
+          } catch (Throwable t) {
+            LOG.error("Error in handling event type " + event.getType()
+                + " to the ContainreAllocator", t);
+            // Kill the AM
+            eventHandler.handle(new JobEvent(getJob().getID(),
+              JobEventType.INTERNAL_ERROR));
+            return;
+          }
+        }
+      }
+    };
+    this.eventHandlingThread.start();
+    super.start();
+  }
+
   @Override
   protected synchronized void heartbeat() throws Exception {
     LOG.info("Before Scheduling: " + getStat());
@@ -181,6 +220,8 @@ public class RMContainerAllocator extends RMContainerRequestor
 
   @Override
   public void stop() {
+    this.stopEventHandling = true;
+    eventHandlingThread.interrupt();
     super.stop();
     LOG.info("Final Stats: " + getStat());
   }
@@ -192,10 +233,27 @@ public class RMContainerAllocator extends RMContainerRequestor
   public void setIsReduceStarted(boolean reduceStarted) {
     this.reduceStarted = reduceStarted; 
   }
-  
-  @SuppressWarnings("unchecked")
+
   @Override
-  public synchronized void handle(ContainerAllocatorEvent event) {
+  public void handle(ContainerAllocatorEvent event) {
+    int qSize = eventQueue.size();
+    if (qSize != 0 && qSize % 1000 == 0) {
+      LOG.info("Size of event-queue in RMContainerAllocator is " + qSize);
+    }
+    int remCapacity = eventQueue.remainingCapacity();
+    if (remCapacity < 1000) {
+      LOG.warn("Very low remaining capacity in the event-queue "
+          + "of RMContainerAllocator: " + remCapacity);
+    }
+    try {
+      eventQueue.put(event);
+    } catch (InterruptedException e) {
+      throw new YarnException(e);
+    }
+  }
+
+  @SuppressWarnings({ "unchecked" })
+  protected synchronized void handleEvent(ContainerAllocatorEvent event) {
     LOG.info("Processing the event " + event.toString());
     recalculateReduceSchedule = true;
     if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
@@ -206,9 +264,7 @@ public class RMContainerAllocator extends RMContainerRequestor
           int minSlotMemSize = getMinContainerCapability().getMemory();
           mapResourceReqt = (int) Math.ceil((float) mapResourceReqt/minSlotMemSize)
               * minSlotMemSize;
-          JobID id = TypeConverter.fromYarn(applicationId);
-          JobId jobId = TypeConverter.toYarn(id);
-          eventHandler.handle(new JobHistoryEvent(jobId, 
+          eventHandler.handle(new JobHistoryEvent(getJob().getID(), 
               new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP,
               mapResourceReqt)));
           LOG.info("mapResourceReqt:"+mapResourceReqt);
@@ -232,9 +288,7 @@ public class RMContainerAllocator extends RMContainerRequestor
           //round off on slotsize
           reduceResourceReqt = (int) Math.ceil((float) 
               reduceResourceReqt/minSlotMemSize) * minSlotMemSize;
-          JobID id = TypeConverter.fromYarn(applicationId);
-          JobId jobId = TypeConverter.toYarn(id);
-          eventHandler.handle(new JobHistoryEvent(jobId, 
+          eventHandler.handle(new JobHistoryEvent(getJob().getID(), 
               new NormalizedResourceEvent(
                   org.apache.hadoop.mapreduce.TaskType.REDUCE,
               reduceResourceReqt)));

+ 2 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

@@ -1186,12 +1186,12 @@ public class TestRMContainerAllocator {
 
     public void sendRequests(List<ContainerRequestEvent> reqs) {
       for (ContainerRequestEvent req : reqs) {
-        super.handle(req);
+        super.handleEvent(req);
       }
     }
 
     public void sendFailure(ContainerFailedEvent f) {
-      super.handle(f);
+      super.handleEvent(f);
     }
     
     // API to be used by tests