소스 검색

MAPREDUCE-7309. Improve performance of reading resource request for mapper/reducers from config. Contributed by Peter Bacsko & Wangda Tan.

Peter Bacsko 4 년 전
부모
커밋
2f2b690df7

+ 11 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -158,6 +158,9 @@ public abstract class TaskAttemptImpl implements
     org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt,
       EventHandler<TaskAttemptEvent> {
 
+  @VisibleForTesting
+  protected final static Map<TaskType, Resource> RESOURCE_REQUEST_CACHE
+      = new HashMap<>();
   static final Counters EMPTY_COUNTERS = new Counters();
   private static final Logger LOG =
       LoggerFactory.getLogger(TaskAttemptImpl.class);
@@ -172,7 +175,7 @@ public abstract class TaskAttemptImpl implements
   private final Clock clock;
   private final org.apache.hadoop.mapred.JobID oldJobId;
   private final TaskAttemptListener taskAttemptListener;
-  private final Resource resourceCapability;
+  private Resource resourceCapability;
   protected Set<String> dataLocalHosts;
   protected Set<String> dataLocalRacks;
   private final List<String> diagnostics = new ArrayList<String>();
@@ -707,6 +710,10 @@ public abstract class TaskAttemptImpl implements
         getResourceTypePrefix(taskType);
     boolean memorySet = false;
     boolean cpuVcoresSet = false;
+    if (RESOURCE_REQUEST_CACHE.get(taskType) != null) {
+      resourceCapability = RESOURCE_REQUEST_CACHE.get(taskType);
+      return;
+    }
     if (resourceTypePrefix != null) {
       List<ResourceInformation> resourceRequests =
           ResourceUtils.getRequestedResourcesFromConfig(conf,
@@ -767,6 +774,9 @@ public abstract class TaskAttemptImpl implements
     if (!cpuVcoresSet) {
       this.resourceCapability.setVirtualCores(getCpuRequired(conf, taskType));
     }
+    RESOURCE_REQUEST_CACHE.put(taskType, resourceCapability);
+    LOG.info("Resource capability of task type {} is set to {}",
+        taskType, resourceCapability);
   }
 
   private String getCpuVcoresKey(TaskType taskType) {

+ 7 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java

@@ -42,6 +42,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -180,6 +181,11 @@ public class TestTaskAttempt{
     ResourceUtils.resetResourceTypes(new Configuration());
   }
 
+  @Before
+  public void before() {
+    TaskAttemptImpl.RESOURCE_REQUEST_CACHE.clear();
+  }
+
   @After
   public void tearDown() {
     ResourceUtils.resetResourceTypes(new Configuration());
@@ -1634,6 +1640,7 @@ public class TestTaskAttempt{
       TestAppender testAppender = new TestAppender();
       final Logger logger = Logger.getLogger(TaskAttemptImpl.class);
       try {
+        TaskAttemptImpl.RESOURCE_REQUEST_CACHE.clear();
         logger.addAppender(testAppender);
         EventHandler eventHandler = mock(EventHandler.class);
         Clock clock = SystemClock.getInstance();