Jelajahi Sumber

Merge branch 'trunk' into HADOOP-12756

Kai Zheng 8 tahun lalu
induk
melakukan
a49b3be38e
14 mengubah file dengan 1236 tambahan dan 774 penghapusan
  1. 0 3
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
  2. 0 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  3. 5 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
  4. 0 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
  5. 4 0
      hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
  6. 104 10
      hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
  7. 81 2
      hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
  8. 53 43
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
  9. 438 301
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
  10. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
  11. 221 197
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
  12. 258 207
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
  13. 65 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
  14. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md

+ 0 - 3
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java

@@ -131,9 +131,6 @@ public interface HdfsClientConfigKeys {
           "dfs.client.key.provider.cache.expiry";
   long    DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
               TimeUnit.DAYS.toMillis(10); // 10 days
-  String  DFS_HDFS_BLOCKS_METADATA_ENABLED =
-      "dfs.datanode.hdfs-blocks-metadata.enabled";
-  boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
 
   String  DFS_DATANODE_KERBEROS_PRINCIPAL_KEY =
       "dfs.datanode.kerberos.principal";

+ 0 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -58,10 +58,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       .DFS_CHECKSUM_TYPE_KEY;
   public static final String  DFS_CHECKSUM_TYPE_DEFAULT =
       HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT;
-  public static final String  DFS_HDFS_BLOCKS_METADATA_ENABLED =
-      HdfsClientConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED;
-  public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT =
-      HdfsClientConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT;
   public static final String DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT =
       HdfsClientConfigKeys.DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT;
   public static final String  DFS_WEBHDFS_NETTY_LOW_WATERMARK =

+ 5 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java

@@ -630,14 +630,15 @@ public class INodeDirectory extends INodeWithAdditionalFields
       ContentSummaryComputationContext summary) {
     final DirectoryWithSnapshotFeature sf = getDirectoryWithSnapshotFeature();
     if (sf != null && snapshotId == Snapshot.CURRENT_STATE_ID) {
+      final ContentCounts counts = new ContentCounts.Builder().build();
       // if the getContentSummary call is against a non-snapshot path, the
       // computation should include all the deleted files/directories
       sf.computeContentSummary4Snapshot(summary.getBlockStoragePolicySuite(),
-          summary.getCounts());
-      // Also compute ContentSummary for snapshotCounts (So we can extract it
+          counts);
+      summary.getCounts().addContents(counts);
+      // Also add ContentSummary to snapshotCounts (So we can extract it
       // later from the ContentSummary of all).
-      sf.computeContentSummary4Snapshot(summary.getBlockStoragePolicySuite(),
-          summary.getSnapshotCounts());
+      summary.getSnapshotCounts().addContents(counts);
     }
     final DirectoryWithQuotaFeature q = getDirectoryWithQuotaFeature();
     if (q != null && snapshotId == Snapshot.CURRENT_STATE_ID) {

+ 0 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java

@@ -75,8 +75,6 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase {
     // Fully deprecated properties?
     configurationPropsToSkipCompare
         .add("dfs.corruptfilesreturned.max");
-    configurationPropsToSkipCompare
-        .add("dfs.datanode.hdfs-blocks-metadata.enabled");
     configurationPropsToSkipCompare
         .add("dfs.metrics.session-id");
     configurationPropsToSkipCompare

+ 4 - 0
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java

@@ -58,6 +58,10 @@ public class DistCpConstants {
   public static final String CONF_LABEL_APPEND = "distcp.copy.append";
   public static final String CONF_LABEL_DIFF = "distcp.copy.diff";
   public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb";
+  public static final String CONF_LABEL_SIMPLE_LISTING_FILESTATUS_SIZE =
+      "distcp.simplelisting.file.status.size";
+  public static final String CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES =
+      "distcp.simplelisting.randomize.files";
   public static final String CONF_LABEL_FILTERS_FILE =
       "distcp.filters.file";
   public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE =

+ 104 - 10
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.tools;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -42,7 +43,10 @@ import com.google.common.annotations.VisibleForTesting;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
 
 import static org.apache.hadoop.tools.DistCpConstants
         .HDFS_RESERVED_RAW_DIRECTORY_NAME;
@@ -56,13 +60,19 @@ import static org.apache.hadoop.tools.DistCpConstants
 public class SimpleCopyListing extends CopyListing {
   private static final Log LOG = LogFactory.getLog(SimpleCopyListing.class);
 
+  public static final int DEFAULT_FILE_STATUS_SIZE = 1000;
+  public static final boolean DEFAULT_RANDOMIZE_FILE_LISTING = true;
+
   private long totalPaths = 0;
   private long totalDirs = 0;
   private long totalBytesToCopy = 0;
   private int numListstatusThreads = 1;
+  private final int fileStatusLimit;
+  private final boolean randomizeFileListing;
   private final int maxRetries = 3;
   private CopyFilter copyFilter;
   private DistCpSync distCpSync;
+  private final Random rnd = new Random();
 
   /**
    * Protected constructor, to initialize configuration.
@@ -76,6 +86,17 @@ public class SimpleCopyListing extends CopyListing {
     numListstatusThreads = getConf().getInt(
         DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS,
         DistCpConstants.DEFAULT_LISTSTATUS_THREADS);
+    fileStatusLimit = Math.max(1, getConf()
+        .getInt(DistCpConstants.CONF_LABEL_SIMPLE_LISTING_FILESTATUS_SIZE,
+        DEFAULT_FILE_STATUS_SIZE));
+    randomizeFileListing = getConf().getBoolean(
+        DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES,
+        DEFAULT_RANDOMIZE_FILE_LISTING);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("numListstatusThreads=" + numListstatusThreads
+          + ", fileStatusLimit=" + fileStatusLimit
+          + ", randomizeFileListing=" + randomizeFileListing);
+    }
     copyFilter = CopyFilter.getCopyFilter(getConf());
     copyFilter.initialize();
   }
@@ -83,9 +104,13 @@ public class SimpleCopyListing extends CopyListing {
   @VisibleForTesting
   protected SimpleCopyListing(Configuration configuration,
                               Credentials credentials,
-                              int numListstatusThreads) {
+                              int numListstatusThreads,
+                              int fileStatusLimit,
+                              boolean randomizeFileListing) {
     super(configuration, credentials);
     this.numListstatusThreads = numListstatusThreads;
+    this.fileStatusLimit = Math.max(1, fileStatusLimit);
+    this.randomizeFileListing = randomizeFileListing;
   }
 
   protected SimpleCopyListing(Configuration configuration,
@@ -236,6 +261,7 @@ public class SimpleCopyListing extends CopyListing {
     FileSystem sourceFS = sourceRoot.getFileSystem(getConf());
 
     try {
+      List<FileStatusInfo> fileStatuses = Lists.newArrayList();
       for (DiffInfo diff : diffList) {
         // add snapshot paths prefix
         diff.target = new Path(options.getSourcePaths().get(0), diff.target);
@@ -259,10 +285,13 @@ public class SimpleCopyListing extends CopyListing {
             sourceDirs.add(sourceStatus);
 
             traverseDirectory(fileListWriter, sourceFS, sourceDirs,
-                sourceRoot, options, excludeList);
+                sourceRoot, options, excludeList, fileStatuses);
           }
         }
       }
+      if (randomizeFileListing) {
+        writeToFileListing(fileStatuses, fileListWriter);
+      }
       fileListWriter.close();
       fileListWriter = null;
     } finally {
@@ -296,6 +325,7 @@ public class SimpleCopyListing extends CopyListing {
     }
 
     try {
+      List<FileStatusInfo> statusList = Lists.newArrayList();
       for (Path path: options.getSourcePaths()) {
         FileSystem sourceFS = path.getFileSystem(getConf());
         final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
@@ -326,8 +356,14 @@ public class SimpleCopyListing extends CopyListing {
                   preserveAcls && sourceStatus.isDirectory(),
                   preserveXAttrs && sourceStatus.isDirectory(),
                   preserveRawXAttrs && sourceStatus.isDirectory());
-            writeToFileListing(fileListWriter, sourceCopyListingStatus,
-                sourcePathRoot);
+            if (randomizeFileListing) {
+              addToFileListing(statusList,
+                  new FileStatusInfo(sourceCopyListingStatus, sourcePathRoot),
+                  fileListWriter);
+            } else {
+              writeToFileListing(fileListWriter, sourceCopyListingStatus,
+                  sourcePathRoot);
+            }
 
             if (sourceStatus.isDirectory()) {
               if (LOG.isDebugEnabled()) {
@@ -337,9 +373,12 @@ public class SimpleCopyListing extends CopyListing {
             }
           }
           traverseDirectory(fileListWriter, sourceFS, sourceDirs,
-                            sourcePathRoot, options, null);
+              sourcePathRoot, options, null, statusList);
         }
       }
+      if (randomizeFileListing) {
+        writeToFileListing(statusList, fileListWriter);
+      }
       fileListWriter.close();
       printStats();
       LOG.info("Build file listing completed.");
@@ -349,6 +388,52 @@ public class SimpleCopyListing extends CopyListing {
     }
   }
 
+  private void addToFileListing(List<FileStatusInfo> fileStatusInfoList,
+      FileStatusInfo statusInfo, SequenceFile.Writer fileListWriter)
+      throws IOException {
+    fileStatusInfoList.add(statusInfo);
+    if (fileStatusInfoList.size() > fileStatusLimit) {
+      writeToFileListing(fileStatusInfoList, fileListWriter);
+    }
+  }
+
+  @VisibleForTesting
+  void setSeedForRandomListing(long seed) {
+    this.rnd.setSeed(seed);
+  }
+
+  private void writeToFileListing(List<FileStatusInfo> fileStatusInfoList,
+      SequenceFile.Writer fileListWriter) throws IOException {
+    /**
+     * In cloud storage systems, it is possible to get region hotspot.
+     * Shuffling paths can avoid such cases and also ensure that
+     * some mappers do not get lots of similar paths.
+     */
+    Collections.shuffle(fileStatusInfoList, rnd);
+    for (FileStatusInfo fileStatusInfo : fileStatusInfoList) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding " + fileStatusInfo.fileStatus.getPath());
+      }
+      writeToFileListing(fileListWriter, fileStatusInfo.fileStatus,
+          fileStatusInfo.sourceRootPath);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Number of paths written to fileListing="
+          + fileStatusInfoList.size());
+    }
+    fileStatusInfoList.clear();
+  }
+
+  private static class FileStatusInfo {
+    private CopyListingFileStatus fileStatus;
+    private Path sourceRootPath;
+
+    FileStatusInfo(CopyListingFileStatus fileStatus, Path sourceRootPath) {
+      this.fileStatus = fileStatus;
+      this.sourceRootPath = sourceRootPath;
+    }
+  }
+
   private Path computeSourceRootPath(FileStatus sourceStatus,
                                      DistCpOptions options) throws IOException {
 
@@ -516,15 +601,18 @@ public class SimpleCopyListing extends CopyListing {
                                  ArrayList<FileStatus> sourceDirs,
                                  Path sourcePathRoot,
                                  DistCpOptions options,
-                                 HashSet<String> excludeList)
+                                 HashSet<String> excludeList,
+                                 List<FileStatusInfo> fileStatuses)
                                  throws IOException {
     final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
     final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
     final boolean preserveRawXattrs = options.shouldPreserveRawXattrs();
 
     assert numListstatusThreads > 0;
-    LOG.debug("Starting thread pool of " + numListstatusThreads +
-              " listStatus workers.");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Starting thread pool of " + numListstatusThreads +
+          " listStatus workers.");
+    }
     ProducerConsumer<FileStatus, FileStatus[]> workers =
         new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
     for (int i = 0; i < numListstatusThreads; i++) {
@@ -551,8 +639,14 @@ public class SimpleCopyListing extends CopyListing {
                 preserveAcls && child.isDirectory(),
                 preserveXAttrs && child.isDirectory(),
                 preserveRawXattrs && child.isDirectory());
-            writeToFileListing(fileListWriter, childCopyListingStatus,
-                 sourcePathRoot);
+            if (randomizeFileListing) {
+              addToFileListing(fileStatuses,
+                  new FileStatusInfo(childCopyListingStatus, sourcePathRoot),
+                  fileListWriter);
+            } else {
+              writeToFileListing(fileListWriter, childCopyListingStatus,
+                  sourcePathRoot);
+            }
           }
           if (retry < maxRetries) {
             if (child.isDirectory()) {

+ 81 - 2
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java

@@ -25,7 +25,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.tools.util.TestDistCpUtils;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.security.Credentials;
@@ -46,7 +45,9 @@ import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Random;
 
 @RunWith(value = Parameterized.class)
 public class TestCopyListing extends SimpleCopyListing {
@@ -77,7 +78,7 @@ public class TestCopyListing extends SimpleCopyListing {
   }
 
   public TestCopyListing(int numListstatusThreads) {
-    super(config, CREDENTIALS, numListstatusThreads);
+    super(config, CREDENTIALS, numListstatusThreads, 0, false);
   }
 
   protected TestCopyListing(Configuration configuration) {
@@ -221,6 +222,84 @@ public class TestCopyListing extends SimpleCopyListing {
     }
   }
 
+  @Test(timeout=60000)
+  public void testWithRandomFileListing() throws IOException {
+    FileSystem fs = null;
+    try {
+      fs = FileSystem.get(getConf());
+      List<Path> srcPaths = new ArrayList<>();
+      List<Path> srcFiles = new ArrayList<>();
+      Path target = new Path("/tmp/out/1");
+      final int pathCount = 25;
+      for (int i = 0; i < pathCount; i++) {
+        Path p = new Path("/tmp", String.valueOf(i));
+        srcPaths.add(p);
+        fs.mkdirs(p);
+
+        Path fileName = new Path(p, i + ".txt");
+        srcFiles.add(fileName);
+        try (OutputStream out = fs.create(fileName)) {
+          out.write(i);
+        }
+      }
+
+      Path listingFile = new Path("/tmp/file");
+      DistCpOptions options = new DistCpOptions(srcPaths, target);
+      options.setSyncFolder(true);
+
+      // Check without randomizing files
+      getConf().setBoolean(
+          DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, false);
+      SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
+      listing.buildListing(listingFile, options);
+
+      Assert.assertEquals(listing.getNumberOfPaths(), pathCount);
+      validateFinalListing(listingFile, srcFiles);
+      fs.delete(listingFile, true);
+
+      // Check with randomized file listing
+      getConf().setBoolean(
+          DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, true);
+      listing = new SimpleCopyListing(getConf(), CREDENTIALS);
+
+      // Set the seed for randomness, so that it can be verified later
+      long seed = System.nanoTime();
+      listing.setSeedForRandomListing(seed);
+      listing.buildListing(listingFile, options);
+      Assert.assertEquals(listing.getNumberOfPaths(), pathCount);
+
+      // validate randomness
+      Collections.shuffle(srcFiles, new Random(seed));
+      validateFinalListing(listingFile, srcFiles);
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+    }
+  }
+
+  private void validateFinalListing(Path pathToListFile, List<Path> srcFiles)
+      throws IOException {
+    FileSystem fs = pathToListFile.getFileSystem(config);
+
+    try (SequenceFile.Reader reader = new SequenceFile.Reader(
+        config, SequenceFile.Reader.file(pathToListFile))) {
+      CopyListingFileStatus currentVal = new CopyListingFileStatus();
+
+      Text currentKey = new Text();
+      int idx = 0;
+      while (reader.next(currentKey)) {
+        reader.getCurrentValue(currentVal);
+        Assert.assertEquals("srcFiles.size=" + srcFiles.size()
+                + ", idx=" + idx, fs.makeQualified(srcFiles.get(idx)),
+            currentVal.getPath());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("val=" + fs.makeQualified(srcFiles.get(idx)));
+        }
+        idx++;
+      }
+    }
+  }
+
+
   @Test(timeout=10000)
   public void testBuildListingForSingleFile() {
     FileSystem fs = null;

+ 53 - 43
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java

@@ -26,8 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 public class AppSchedulingInfo {
   
   private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class);
-  private static final int EPOCH_BIT_SHIFT = 40;
 
   private final ApplicationId applicationId;
   private final ApplicationAttemptId applicationAttemptId;
@@ -79,7 +78,8 @@ public class AppSchedulingInfo {
 
   private Set<String> requestedPartitions = new HashSet<>();
 
-  final Set<SchedulerRequestKey> schedulerKeys = new TreeSet<>();
+  private final ConcurrentSkipListMap<SchedulerRequestKey, Integer>
+      schedulerKeys = new ConcurrentSkipListMap<>();
   final Map<SchedulerRequestKey, Map<String, ResourceRequest>>
       resourceRequestMap = new ConcurrentHashMap<>();
   final Map<NodeId, Map<SchedulerRequestKey, Map<ContainerId,
@@ -236,6 +236,7 @@ public class AppSchedulingInfo {
     if (null == requestsOnNodeWithPriority) {
       requestsOnNodeWithPriority = new TreeMap<>();
       requestsOnNode.put(schedulerKey, requestsOnNodeWithPriority);
+      incrementSchedulerKeyReference(schedulerKey);
     }
 
     requestsOnNodeWithPriority.put(containerId, request);
@@ -250,11 +251,30 @@ public class AppSchedulingInfo {
       LOG.debug("Added increase request:" + request.getContainerId()
           + " delta=" + delta);
     }
-    
-    // update Scheduler Keys
-    schedulerKeys.add(schedulerKey);
   }
-  
+
+  private void incrementSchedulerKeyReference(
+      SchedulerRequestKey schedulerKey) {
+    Integer schedulerKeyCount = schedulerKeys.get(schedulerKey);
+    if (schedulerKeyCount == null) {
+      schedulerKeys.put(schedulerKey, 1);
+    } else {
+      schedulerKeys.put(schedulerKey, schedulerKeyCount + 1);
+    }
+  }
+
+  private void decrementSchedulerKeyReference(
+      SchedulerRequestKey schedulerKey) {
+    Integer schedulerKeyCount = schedulerKeys.get(schedulerKey);
+    if (schedulerKeyCount != null) {
+      if (schedulerKeyCount > 1) {
+        schedulerKeys.put(schedulerKey, schedulerKeyCount - 1);
+      } else {
+        schedulerKeys.remove(schedulerKey);
+      }
+    }
+  }
+
   public synchronized boolean removeIncreaseRequest(NodeId nodeId,
       SchedulerRequestKey schedulerKey, ContainerId containerId) {
     Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
@@ -275,6 +295,7 @@ public class AppSchedulingInfo {
     // remove hierarchies if it becomes empty
     if (requestsOnNodeWithPriority.isEmpty()) {
       requestsOnNode.remove(schedulerKey);
+      decrementSchedulerKeyReference(schedulerKey);
     }
     if (requestsOnNode.isEmpty()) {
       containerIncreaseRequestMap.remove(nodeId);
@@ -341,7 +362,6 @@ public class AppSchedulingInfo {
       if (asks == null) {
         asks = new ConcurrentHashMap<>();
         this.resourceRequestMap.put(schedulerKey, asks);
-        this.schedulerKeys.add(schedulerKey);
       }
 
       // Increment number of containers if recovering preempted resources
@@ -360,29 +380,34 @@ public class AppSchedulingInfo {
 
         anyResourcesUpdated = true;
 
-        // Activate application. Metrics activation is done here.
-        // TODO: Shouldn't we activate even if numContainers = 0?
-        if (request.getNumContainers() > 0) {
-          activeUsersManager.activateApplication(user, applicationId);
-        }
-
         // Update pendingResources
-        updatePendingResources(lastRequest, request, queue.getMetrics());
+        updatePendingResources(lastRequest, request, schedulerKey,
+            queue.getMetrics());
       }
     }
     return anyResourcesUpdated;
   }
 
   private void updatePendingResources(ResourceRequest lastRequest,
-      ResourceRequest request, QueueMetrics metrics) {
+      ResourceRequest request, SchedulerRequestKey schedulerKey,
+      QueueMetrics metrics) {
+    int lastRequestContainers =
+        (lastRequest != null) ? lastRequest.getNumContainers() : 0;
     if (request.getNumContainers() <= 0) {
+      if (lastRequestContainers >= 0) {
+        decrementSchedulerKeyReference(schedulerKey);
+      }
       LOG.info("checking for deactivate of application :"
           + this.applicationId);
       checkForDeactivation();
+    } else {
+      // Activate application. Metrics activation is done here.
+      if (lastRequestContainers <= 0) {
+        incrementSchedulerKeyReference(schedulerKey);
+        activeUsersManager.activateApplication(user, applicationId);
+      }
     }
 
-    int lastRequestContainers =
-        (lastRequest != null) ? lastRequest.getNumContainers() : 0;
     Resource lastRequestCapability =
         lastRequest != null ? lastRequest.getCapability() : Resources.none();
     metrics.incrPendingResources(user,
@@ -505,7 +530,7 @@ public class AppSchedulingInfo {
   }
 
   public synchronized Collection<SchedulerRequestKey> getSchedulerKeys() {
-    return schedulerKeys;
+    return schedulerKeys.keySet();
   }
 
   public synchronized Map<String, ResourceRequest> getResourceRequests(
@@ -617,7 +642,7 @@ public class AppSchedulingInfo {
     } else if (type == NodeType.RACK_LOCAL) {
       allocateRackLocal(node, schedulerKey, request, resourceRequests);
     } else {
-      allocateOffSwitch(request, resourceRequests);
+      allocateOffSwitch(request, resourceRequests, schedulerKey);
     }
     QueueMetrics metrics = queue.getMetrics();
     if (pending) {
@@ -656,7 +681,7 @@ public class AppSchedulingInfo {
 
     ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get(
         ResourceRequest.ANY);
-    decrementOutstanding(offRackRequest);
+    decrementOutstanding(offRackRequest, schedulerKey);
 
     // Update cloned NodeLocal, RackLocal and OffRack requests for recovery
     resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
@@ -684,7 +709,7 @@ public class AppSchedulingInfo {
     
     ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get(
         ResourceRequest.ANY);
-    decrementOutstanding(offRackRequest);
+    decrementOutstanding(offRackRequest, schedulerKey);
 
     // Update cloned RackLocal and OffRack requests for recovery
     resourceRequests.add(cloneResourceRequest(rackLocalRequest));
@@ -696,15 +721,16 @@ public class AppSchedulingInfo {
    * application.
    */
   private synchronized void allocateOffSwitch(
-      ResourceRequest offSwitchRequest, List<ResourceRequest> resourceRequests) {
+      ResourceRequest offSwitchRequest, List<ResourceRequest> resourceRequests,
+      SchedulerRequestKey schedulerKey) {
     // Update future requirements
-    decrementOutstanding(offSwitchRequest);
+    decrementOutstanding(offSwitchRequest, schedulerKey);
     // Update cloned OffRack requests for recovery
     resourceRequests.add(cloneResourceRequest(offSwitchRequest));
   }
 
   private synchronized void decrementOutstanding(
-      ResourceRequest offSwitchRequest) {
+      ResourceRequest offSwitchRequest, SchedulerRequestKey schedulerKey) {
     int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
 
     // Do not remove ANY
@@ -713,6 +739,7 @@ public class AppSchedulingInfo {
     // Do we have any outstanding requests?
     // If there is nothing, we need to deactivate this application
     if (numOffSwitchContainers == 0) {
+      decrementSchedulerKeyReference(schedulerKey);
       checkForDeactivation();
     }
     
@@ -723,24 +750,7 @@ public class AppSchedulingInfo {
   }
   
   private synchronized void checkForDeactivation() {
-    boolean deactivate = true;
-    for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) {
-      ResourceRequest request =
-          getResourceRequest(schedulerKey, ResourceRequest.ANY);
-      if (request != null) {
-        if (request.getNumContainers() > 0) {
-          deactivate = false;
-          break;
-        }
-      }
-    }
-    
-    // also we need to check increase request
-    if (!deactivate) {
-      deactivate = containerIncreaseRequestMap.isEmpty();
-    }
-
-    if (deactivate) {
+    if (schedulerKeys.isEmpty()) {
       activeUsersManager.deactivateApplication(user, applicationId);
     }
   }

File diff ditekan karena terlalu besar
+ 438 - 301
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java


+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java

@@ -251,7 +251,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     return result;
   }
   
-  public synchronized float getLocalityWaitFactor(
+  public float getLocalityWaitFactor(
       SchedulerRequestKey schedulerKey, int clusterNodes) {
     // Estimate: Required unique resources (i.e. hosts + racks)
     int requiredResources = 

+ 221 - 197
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java

@@ -63,7 +63,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMCont
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@@ -99,7 +98,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
    * to hold the message if its app doesn't not get container from a node
    */
   private String appSkipNodeDiagnostics;
-  private CapacitySchedulerContext capacitySchedulerContext;
 
   public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, 
       String user, Queue queue, ActiveUsersManager activeUsersManager,
@@ -153,118 +151,128 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
     containerAllocator = new ContainerAllocator(this, rc, rmContext,
         activitiesManager);
-
-    if (scheduler instanceof CapacityScheduler) {
-      capacitySchedulerContext = (CapacitySchedulerContext) scheduler;
-    }
   }
 
-  public synchronized boolean containerCompleted(RMContainer rmContainer,
+  public boolean containerCompleted(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event,
       String partition) {
-    ContainerId containerId = rmContainer.getContainerId();
+    try {
+      writeLock.lock();
+      ContainerId containerId = rmContainer.getContainerId();
 
-    // Remove from the list of containers
-    if (null == liveContainers.remove(containerId)) {
-      return false;
-    }
+      // Remove from the list of containers
+      if (null == liveContainers.remove(containerId)) {
+        return false;
+      }
 
-    // Remove from the list of newly allocated containers if found
-    newlyAllocatedContainers.remove(rmContainer);
+      // Remove from the list of newly allocated containers if found
+      newlyAllocatedContainers.remove(rmContainer);
 
-    // Inform the container
-    rmContainer.handle(
-        new RMContainerFinishedEvent(containerId, containerStatus, event));
+      // Inform the container
+      rmContainer.handle(
+          new RMContainerFinishedEvent(containerId, containerStatus, event));
 
-    containersToPreempt.remove(containerId);
+      containersToPreempt.remove(containerId);
 
-    Resource containerResource = rmContainer.getContainer().getResource();
-    RMAuditLogger.logSuccess(getUser(),
-        AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
-        getApplicationId(), containerId, containerResource);
-    
-    // Update usage metrics 
-    queue.getMetrics().releaseResources(getUser(), 1, containerResource);
-    attemptResourceUsage.decUsed(partition, containerResource);
+      Resource containerResource = rmContainer.getContainer().getResource();
+      RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER,
+          "SchedulerApp", getApplicationId(), containerId, containerResource);
+
+      // Update usage metrics
+      queue.getMetrics().releaseResources(getUser(), 1, containerResource);
+      attemptResourceUsage.decUsed(partition, containerResource);
 
-    // Clear resource utilization metrics cache.
-    lastMemoryAggregateAllocationUpdateTime = -1;
+      // Clear resource utilization metrics cache.
+      lastMemoryAggregateAllocationUpdateTime = -1;
 
-    return true;
+      return true;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
-  public synchronized RMContainer allocate(NodeType type, FiCaSchedulerNode node,
+  public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
       SchedulerRequestKey schedulerKey, ResourceRequest request,
       Container container) {
+    try {
+      writeLock.lock();
 
-    if (isStopped) {
-      return null;
-    }
-    
-    // Required sanity check - AM can call 'allocate' to update resource
-    // request without locking the scheduler, hence we need to check
-    if (getTotalRequiredResources(schedulerKey) <= 0) {
-      return null;
-    }
+      if (isStopped) {
+        return null;
+      }
+
+      // Required sanity check - AM can call 'allocate' to update resource
+      // request without locking the scheduler, hence we need to check
+      if (getTotalRequiredResources(schedulerKey) <= 0) {
+        return null;
+      }
 
-    // Create RMContainer
-    RMContainer rmContainer =
-        new RMContainerImpl(container, this.getApplicationAttemptId(),
-            node.getNodeID(), appSchedulingInfo.getUser(), this.rmContext,
-            request.getNodeLabelExpression());
-    ((RMContainerImpl)rmContainer).setQueueName(this.getQueueName());
+      // Create RMContainer
+      RMContainer rmContainer = new RMContainerImpl(container,
+          this.getApplicationAttemptId(), node.getNodeID(),
+          appSchedulingInfo.getUser(), this.rmContext,
+          request.getNodeLabelExpression());
+      ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());
 
-    updateAMContainerDiagnostics(AMState.ASSIGNED, null);
+      updateAMContainerDiagnostics(AMState.ASSIGNED, null);
 
-    // Add it to allContainers list.
-    newlyAllocatedContainers.add(rmContainer);
+      // Add it to allContainers list.
+      newlyAllocatedContainers.add(rmContainer);
 
-    ContainerId containerId = container.getId();
-    liveContainers.put(containerId, rmContainer);
+      ContainerId containerId = container.getId();
+      liveContainers.put(containerId, rmContainer);
 
-    // Update consumption and track allocations
-    List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
-        type, node, schedulerKey, request, container);
+      // Update consumption and track allocations
+      List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
+          type, node, schedulerKey, request, container);
 
-    attemptResourceUsage.incUsed(node.getPartition(), container.getResource());
+      attemptResourceUsage.incUsed(node.getPartition(),
+          container.getResource());
 
-    // Update resource requests related to "request" and store in RMContainer
-    ((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);
+      // Update resource requests related to "request" and store in RMContainer
+      ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
 
-    // Inform the container
-    rmContainer.handle(
-        new RMContainerEvent(containerId, RMContainerEventType.START));
+      // Inform the container
+      rmContainer.handle(
+          new RMContainerEvent(containerId, RMContainerEventType.START));
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("allocate: applicationAttemptId=" 
-          + containerId.getApplicationAttemptId()
-          + " container=" + containerId + " host="
-          + container.getNodeId().getHost() + " type=" + type);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("allocate: applicationAttemptId=" + containerId
+            .getApplicationAttemptId() + " container=" + containerId + " host="
+            + container.getNodeId().getHost() + " type=" + type);
+      }
+      RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
+          "SchedulerApp", getApplicationId(), containerId,
+          container.getResource());
+
+      return rmContainer;
+    } finally {
+      writeLock.unlock();
     }
-    RMAuditLogger.logSuccess(getUser(),
-        AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
-        getApplicationId(), containerId, container.getResource());
-    
-    return rmContainer;
   }
 
-  public synchronized boolean unreserve(SchedulerRequestKey schedulerKey,
+  public boolean unreserve(SchedulerRequestKey schedulerKey,
       FiCaSchedulerNode node, RMContainer rmContainer) {
-    // Cancel increase request (if it has reserved increase request 
-    rmContainer.cancelIncreaseReservation();
-    
-    // Done with the reservation?
-    if (internalUnreserve(node, schedulerKey)) {
-      node.unreserveResource(this);
-
-      // Update reserved metrics
-      queue.getMetrics().unreserveResource(getUser(),
-          rmContainer.getReservedResource());
-      queue.decReservedResource(node.getPartition(),
-          rmContainer.getReservedResource());
-      return true;
+    try {
+      writeLock.lock();
+      // Cancel increase request (if it has reserved increase request
+      rmContainer.cancelIncreaseReservation();
+
+      // Done with the reservation?
+      if (internalUnreserve(node, schedulerKey)) {
+        node.unreserveResource(this);
+
+        // Update reserved metrics
+        queue.getMetrics().unreserveResource(getUser(),
+            rmContainer.getReservedResource());
+        queue.decReservedResource(node.getPartition(),
+            rmContainer.getReservedResource());
+        return true;
+      }
+      return false;
+    } finally {
+      writeLock.unlock();
     }
-    return false;
   }
 
   private boolean internalUnreserve(FiCaSchedulerNode node,
@@ -303,33 +311,15 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     return false;
   }
 
-  public synchronized float getLocalityWaitFactor(
-      SchedulerRequestKey schedulerKey, int clusterNodes) {
-    // Estimate: Required unique resources (i.e. hosts + racks)
-    int requiredResources = 
-        Math.max(this.getResourceRequests(schedulerKey).size() - 1, 0);
-    
-    // waitFactor can't be more than '1' 
-    // i.e. no point skipping more than clustersize opportunities
-    return Math.min(((float)requiredResources / clusterNodes), 1.0f);
-  }
-
-  public synchronized Resource getTotalPendingRequests() {
-    Resource ret = Resource.newInstance(0, 0);
-    for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) {
-      // to avoid double counting we count only "ANY" resource requests
-      if (ResourceRequest.isAnyLocation(rr.getResourceName())){
-        Resources.addTo(ret,
-            Resources.multiply(rr.getCapability(), rr.getNumContainers()));
+  public void markContainerForPreemption(ContainerId cont) {
+    try {
+      writeLock.lock();
+      // ignore already completed containers
+      if (liveContainers.containsKey(cont)) {
+        containersToPreempt.add(cont);
       }
-    }
-    return ret;
-  }
-
-  public synchronized void markContainerForPreemption(ContainerId cont) {
-    // ignore already completed containers
-    if (liveContainers.containsKey(cont)) {
-      containersToPreempt.add(cont);
+    } finally {
+      writeLock.unlock();
     }
   }
 
@@ -343,94 +333,115 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
    * @param minimumAllocation
    * @return an allocation
    */
-  public synchronized Allocation getAllocation(ResourceCalculator rc,
+  public Allocation getAllocation(ResourceCalculator resourceCalculator,
       Resource clusterResource, Resource minimumAllocation) {
-
-    Set<ContainerId> currentContPreemption = Collections.unmodifiableSet(
-        new HashSet<ContainerId>(containersToPreempt));
-    containersToPreempt.clear();
-    Resource tot = Resource.newInstance(0, 0);
-    for(ContainerId c : currentContPreemption){
-      Resources.addTo(tot,
-          liveContainers.get(c).getContainer().getResource());
+    try {
+      writeLock.lock();
+      Set<ContainerId> currentContPreemption = Collections.unmodifiableSet(
+          new HashSet<ContainerId>(containersToPreempt));
+      containersToPreempt.clear();
+      Resource tot = Resource.newInstance(0, 0);
+      for (ContainerId c : currentContPreemption) {
+        Resources.addTo(tot, liveContainers.get(c).getContainer()
+            .getResource());
+      }
+      int numCont = (int) Math.ceil(
+          Resources.divide(rc, clusterResource, tot, minimumAllocation));
+      ResourceRequest rr = ResourceRequest.newInstance(Priority.UNDEFINED,
+          ResourceRequest.ANY, minimumAllocation, numCont);
+      List<Container> newlyAllocatedContainers = pullNewlyAllocatedContainers();
+      List<Container> newlyIncreasedContainers = pullNewlyIncreasedContainers();
+      List<Container> newlyDecreasedContainers = pullNewlyDecreasedContainers();
+      List<NMToken> updatedNMTokens = pullUpdatedNMTokens();
+      Resource headroom = getHeadroom();
+      setApplicationHeadroomForMetrics(headroom);
+      return new Allocation(newlyAllocatedContainers, headroom, null,
+          currentContPreemption, Collections.singletonList(rr), updatedNMTokens,
+          newlyIncreasedContainers, newlyDecreasedContainers);
+    } finally {
+      writeLock.unlock();
     }
-    int numCont = (int) Math.ceil(
-        Resources.divide(rc, clusterResource, tot, minimumAllocation));
-    ResourceRequest rr = ResourceRequest.newInstance(
-        Priority.UNDEFINED, ResourceRequest.ANY,
-        minimumAllocation, numCont);
-    List<Container> newlyAllocatedContainers = pullNewlyAllocatedContainers();
-    List<Container> newlyIncreasedContainers = pullNewlyIncreasedContainers();
-    List<Container> newlyDecreasedContainers = pullNewlyDecreasedContainers();
-    List<NMToken> updatedNMTokens = pullUpdatedNMTokens();
-    Resource headroom = getHeadroom();
-    setApplicationHeadroomForMetrics(headroom);
-    return new Allocation(newlyAllocatedContainers, headroom, null,
-        currentContPreemption, Collections.singletonList(rr), updatedNMTokens,
-        newlyIncreasedContainers, newlyDecreasedContainers);
   }
-  
-  synchronized public NodeId getNodeIdToUnreserve(
+
+  @VisibleForTesting
+  public NodeId getNodeIdToUnreserve(
       SchedulerRequestKey schedulerKey, Resource resourceNeedUnreserve,
       ResourceCalculator rc, Resource clusterResource) {
+    try {
+      writeLock.lock();
+      // first go around make this algorithm simple and just grab first
+      // reservation that has enough resources
+      Map<NodeId, RMContainer> reservedContainers = this.reservedContainers.get(
+          schedulerKey);
+
+      if ((reservedContainers != null) && (!reservedContainers.isEmpty())) {
+        for (Map.Entry<NodeId, RMContainer> entry : reservedContainers
+            .entrySet()) {
+          NodeId nodeId = entry.getKey();
+          RMContainer reservedContainer = entry.getValue();
+          if (reservedContainer.hasIncreaseReservation()) {
+            // Currently, only regular container allocation supports continuous
+            // reservation looking, we don't support canceling increase request
+            // reservation when allocating regular container.
+            continue;
+          }
 
-    // first go around make this algorithm simple and just grab first
-    // reservation that has enough resources
-    Map<NodeId, RMContainer> reservedContainers = this.reservedContainers
-        .get(schedulerKey);
-
-    if ((reservedContainers != null) && (!reservedContainers.isEmpty())) {
-      for (Map.Entry<NodeId, RMContainer> entry : reservedContainers.entrySet()) {
-        NodeId nodeId = entry.getKey();
-        RMContainer reservedContainer = entry.getValue();
-        if (reservedContainer.hasIncreaseReservation()) {
-          // Currently, only regular container allocation supports continuous
-          // reservation looking, we don't support canceling increase request
-          // reservation when allocating regular container.
-          continue;
-        }
-        
-        Resource reservedResource = reservedContainer.getReservedResource();
-        
-        // make sure we unreserve one with at least the same amount of
-        // resources, otherwise could affect capacity limits
-        if (Resources.fitsIn(rc, clusterResource, resourceNeedUnreserve,
-            reservedResource)) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("unreserving node with reservation size: "
-                + reservedResource
-                + " in order to allocate container with size: " + resourceNeedUnreserve);
+          Resource reservedResource = reservedContainer.getReservedResource();
+
+          // make sure we unreserve one with at least the same amount of
+          // resources, otherwise could affect capacity limits
+          if (Resources.fitsIn(rc, clusterResource, resourceNeedUnreserve,
+              reservedResource)) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(
+                  "unreserving node with reservation size: " + reservedResource
+                      + " in order to allocate container with size: "
+                      + resourceNeedUnreserve);
+            }
+            return nodeId;
           }
-          return nodeId;
         }
       }
+      return null;
+    } finally {
+      writeLock.unlock();
     }
-    return null;
   }
   
-  public synchronized void setHeadroomProvider(
+  public void setHeadroomProvider(
     CapacityHeadroomProvider headroomProvider) {
-    this.headroomProvider = headroomProvider;
-  }
-
-  public synchronized CapacityHeadroomProvider getHeadroomProvider() {
-    return headroomProvider;
+    try {
+      writeLock.lock();
+      this.headroomProvider = headroomProvider;
+    } finally {
+      writeLock.unlock();
+    }
   }
   
   @Override
-  public synchronized Resource getHeadroom() {
-    if (headroomProvider != null) {
-      return headroomProvider.getHeadroom();
+  public Resource getHeadroom() {
+    try {
+      readLock.lock();
+      if (headroomProvider != null) {
+        return headroomProvider.getHeadroom();
+      }
+      return super.getHeadroom();
+    } finally {
+      readLock.unlock();
     }
-    return super.getHeadroom();
+
   }
   
   @Override
-  public synchronized void transferStateFromPreviousAttempt(
+  public void transferStateFromPreviousAttempt(
       SchedulerApplicationAttempt appAttempt) {
-    super.transferStateFromPreviousAttempt(appAttempt);
-    this.headroomProvider = 
-      ((FiCaSchedulerApp) appAttempt).getHeadroomProvider();
+    try {
+      writeLock.lock();
+      super.transferStateFromPreviousAttempt(appAttempt);
+      this.headroomProvider = ((FiCaSchedulerApp) appAttempt).headroomProvider;
+    } finally {
+      writeLock.unlock();
+    }
   }
   
   public boolean reserveIncreasedContainer(SchedulerRequestKey schedulerKey,
@@ -444,11 +455,11 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
       // Update the node
       node.reserveResource(this, schedulerKey, rmContainer);
-      
+
       // Succeeded
       return true;
     }
-    
+
     return false;
   }
 
@@ -515,9 +526,12 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       showRequests();
     }
 
-    synchronized (this) {
+    try {
+      writeLock.lock();
       return containerAllocator.assignContainers(clusterResource, node,
           schedulingMode, currentResourceLimits, reservedContainer);
+    } finally {
+      writeLock.unlock();
     }
   }
 
@@ -625,23 +639,33 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
    * Capacity Scheduler.
    */
   @Override
-  public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
-    ApplicationResourceUsageReport report = super.getResourceUsageReport();
-    Resource cluster = rmContext.getScheduler().getClusterResource();
-    Resource totalPartitionRes =
-        rmContext.getNodeLabelManager()
-          .getResourceByLabel(getAppAMNodePartitionName(), cluster);
-    ResourceCalculator calc = rmContext.getScheduler().getResourceCalculator();
-    if (!calc.isInvalidDivisor(totalPartitionRes)) {
-      float queueAbsMaxCapPerPartition =
-          ((AbstractCSQueue)getQueue()).getQueueCapacities()
-            .getAbsoluteCapacity(getAppAMNodePartitionName());
-      float queueUsagePerc =
-          calc.divide(totalPartitionRes, report.getUsedResources(),
-              Resources.multiply(totalPartitionRes,
-                  queueAbsMaxCapPerPartition)) * 100;
-      report.setQueueUsagePercentage(queueUsagePerc);
+  public ApplicationResourceUsageReport getResourceUsageReport() {
+    try {
+      // Use write lock here because
+      // SchedulerApplicationAttempt#getResourceUsageReport updated fields
+      // TODO: improve this
+      writeLock.lock();
+      ApplicationResourceUsageReport report = super.getResourceUsageReport();
+      Resource cluster = rmContext.getScheduler().getClusterResource();
+      Resource totalPartitionRes =
+          rmContext.getNodeLabelManager().getResourceByLabel(
+              getAppAMNodePartitionName(), cluster);
+      ResourceCalculator calc =
+          rmContext.getScheduler().getResourceCalculator();
+      if (!calc.isInvalidDivisor(totalPartitionRes)) {
+        float queueAbsMaxCapPerPartition =
+            ((AbstractCSQueue) getQueue()).getQueueCapacities()
+                .getAbsoluteCapacity(getAppAMNodePartitionName());
+        float queueUsagePerc = calc.divide(totalPartitionRes,
+            report.getUsedResources(),
+            Resources.multiply(totalPartitionRes, queueAbsMaxCapPerPartition))
+            * 100;
+        report.setQueueUsagePercentage(queueUsagePerc);
+      }
+      return report;
+    } finally {
+      writeLock.unlock();
     }
-    return report;
+
   }
 }

+ 258 - 207
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java

@@ -123,65 +123,72 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     return queue.getMetrics();
   }
 
-  synchronized public void containerCompleted(RMContainer rmContainer,
+  public void containerCompleted(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event) {
-    
-    Container container = rmContainer.getContainer();
-    ContainerId containerId = container.getId();
-    
-    // Remove from the list of newly allocated containers if found
-    newlyAllocatedContainers.remove(rmContainer);
-    
-    // Inform the container
-    rmContainer.handle(
-        new RMContainerFinishedEvent(
-            containerId,
-            containerStatus,
-            event)
-    );
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Completed container: " + rmContainer.getContainerId() +
-              " in state: " + rmContainer.getState() + " event:" + event);
-    }
+    try {
+      writeLock.lock();
+      Container container = rmContainer.getContainer();
+      ContainerId containerId = container.getId();
+
+      // Remove from the list of newly allocated containers if found
+      newlyAllocatedContainers.remove(rmContainer);
+
+      // Inform the container
+      rmContainer.handle(
+          new RMContainerFinishedEvent(containerId, containerStatus, event));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Completed container: " + rmContainer.getContainerId()
+            + " in state: " + rmContainer.getState() + " event:" + event);
+      }
+
+      // Remove from the list of containers
+      liveContainers.remove(rmContainer.getContainerId());
 
-    // Remove from the list of containers
-    liveContainers.remove(rmContainer.getContainerId());
+      Resource containerResource = rmContainer.getContainer().getResource();
+      RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER,
+          "SchedulerApp", getApplicationId(), containerId, containerResource);
 
-    Resource containerResource = rmContainer.getContainer().getResource();
-    RMAuditLogger.logSuccess(getUser(), 
-        AuditConstants.RELEASE_CONTAINER, "SchedulerApp", 
-        getApplicationId(), containerId, containerResource);
-    
-    // Update usage metrics 
-    queue.getMetrics().releaseResources(getUser(), 1, containerResource);
-    this.attemptResourceUsage.decUsed(containerResource);
+      // Update usage metrics
+      queue.getMetrics().releaseResources(getUser(), 1, containerResource);
+      this.attemptResourceUsage.decUsed(containerResource);
 
-    // remove from preemption map if it is completed
-    preemptionMap.remove(rmContainer);
+      // remove from preemption map if it is completed
+      preemptionMap.remove(rmContainer);
 
-    // Clear resource utilization metrics cache.
-    lastMemoryAggregateAllocationUpdateTime = -1;
+      // Clear resource utilization metrics cache.
+      lastMemoryAggregateAllocationUpdateTime = -1;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
-  private synchronized void unreserveInternal(
+  private void unreserveInternal(
       SchedulerRequestKey schedulerKey, FSSchedulerNode node) {
-    Map<NodeId, RMContainer> reservedContainers = 
-        this.reservedContainers.get(schedulerKey);
-    RMContainer reservedContainer = reservedContainers.remove(node.getNodeID());
-    if (reservedContainers.isEmpty()) {
-      this.reservedContainers.remove(schedulerKey);
-    }
-    
-    // Reset the re-reservation count
-    resetReReservations(schedulerKey);
+    try {
+      writeLock.lock();
+      Map<NodeId, RMContainer> reservedContainers = this.reservedContainers.get(
+          schedulerKey);
+      RMContainer reservedContainer = reservedContainers.remove(
+          node.getNodeID());
+      if (reservedContainers.isEmpty()) {
+        this.reservedContainers.remove(schedulerKey);
+      }
+
+      // Reset the re-reservation count
+      resetReReservations(schedulerKey);
 
-    Resource resource = reservedContainer.getContainer().getResource();
-    this.attemptResourceUsage.decReserved(resource);
+      Resource resource = reservedContainer.getContainer().getResource();
+      this.attemptResourceUsage.decReserved(resource);
 
-    LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
-        + node + ", currently has " + reservedContainers.size()
-        + " at priority " + schedulerKey.getPriority() + "; currentReservation "
-        + this.attemptResourceUsage.getReserved());
+      LOG.info(
+          "Application " + getApplicationId() + " unreserved " + " on node "
+              + node + ", currently has " + reservedContainers.size()
+              + " at priority " + schedulerKey.getPriority()
+              + "; currentReservation " + this.attemptResourceUsage
+              .getReserved());
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   private void subtractResourcesOnBlacklistedNodes(
@@ -239,17 +246,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     return headroom;
   }
 
-  public synchronized float getLocalityWaitFactor(
-      SchedulerRequestKey schedulerKey, int clusterNodes) {
-    // Estimate: Required unique resources (i.e. hosts + racks)
-    int requiredResources = 
-        Math.max(this.getResourceRequests(schedulerKey).size() - 1, 0);
-    
-    // waitFactor can't be more than '1' 
-    // i.e. no point skipping more than clustersize opportunities
-    return Math.min(((float)requiredResources / clusterNodes), 1.0f);
-  }
-
   /**
    * Return the level at which we are allowed to schedule containers, given the
    * current size of the cluster and thresholds indicating how many nodes to
@@ -261,44 +257,56 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    * @param rackLocalityThreshold rackLocalityThreshold
    * @return NodeType
    */
-  public synchronized NodeType getAllowedLocalityLevel(
+  NodeType getAllowedLocalityLevel(
       SchedulerRequestKey schedulerKey, int numNodes,
       double nodeLocalityThreshold, double rackLocalityThreshold) {
     // upper limit on threshold
-    if (nodeLocalityThreshold > 1.0) { nodeLocalityThreshold = 1.0; }
-    if (rackLocalityThreshold > 1.0) { rackLocalityThreshold = 1.0; }
+    if (nodeLocalityThreshold > 1.0) {
+      nodeLocalityThreshold = 1.0;
+    }
+    if (rackLocalityThreshold > 1.0) {
+      rackLocalityThreshold = 1.0;
+    }
 
     // If delay scheduling is not being used, can schedule anywhere
     if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) {
       return NodeType.OFF_SWITCH;
     }
 
-    // Default level is NODE_LOCAL
-    if (!allowedLocalityLevel.containsKey(schedulerKey)) {
-      allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
-      return NodeType.NODE_LOCAL;
-    }
-
-    NodeType allowed = allowedLocalityLevel.get(schedulerKey);
+    try {
+      writeLock.lock();
 
-    // If level is already most liberal, we're done
-    if (allowed.equals(NodeType.OFF_SWITCH)) return NodeType.OFF_SWITCH;
+      // Default level is NODE_LOCAL
+      if (!allowedLocalityLevel.containsKey(schedulerKey)) {
+        allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
+        return NodeType.NODE_LOCAL;
+      }
 
-    double threshold = allowed.equals(NodeType.NODE_LOCAL) ? nodeLocalityThreshold :
-      rackLocalityThreshold;
+      NodeType allowed = allowedLocalityLevel.get(schedulerKey);
 
-    // Relax locality constraints once we've surpassed threshold.
-    if (getSchedulingOpportunities(schedulerKey) > (numNodes * threshold)) {
-      if (allowed.equals(NodeType.NODE_LOCAL)) {
-        allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
-        resetSchedulingOpportunities(schedulerKey);
+      // If level is already most liberal, we're done
+      if (allowed.equals(NodeType.OFF_SWITCH)) {
+        return NodeType.OFF_SWITCH;
       }
-      else if (allowed.equals(NodeType.RACK_LOCAL)) {
-        allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
-        resetSchedulingOpportunities(schedulerKey);
+
+      double threshold = allowed.equals(NodeType.NODE_LOCAL) ?
+          nodeLocalityThreshold :
+          rackLocalityThreshold;
+
+      // Relax locality constraints once we've surpassed threshold.
+      if (getSchedulingOpportunities(schedulerKey) > (numNodes * threshold)) {
+        if (allowed.equals(NodeType.NODE_LOCAL)) {
+          allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
+          resetSchedulingOpportunities(schedulerKey);
+        } else if (allowed.equals(NodeType.RACK_LOCAL)) {
+          allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
+          resetSchedulingOpportunities(schedulerKey);
+        }
       }
+      return allowedLocalityLevel.get(schedulerKey);
+    } finally {
+      writeLock.unlock();
     }
-    return allowedLocalityLevel.get(schedulerKey);
   }
 
   /**
@@ -311,119 +319,131 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    * @param currentTimeMs currentTimeMs
    * @return NodeType
    */
-  public synchronized NodeType getAllowedLocalityLevelByTime(
+  NodeType getAllowedLocalityLevelByTime(
       SchedulerRequestKey schedulerKey, long nodeLocalityDelayMs,
       long rackLocalityDelayMs, long currentTimeMs) {
-
     // if not being used, can schedule anywhere
     if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) {
       return NodeType.OFF_SWITCH;
     }
 
-    // default level is NODE_LOCAL
-    if (!allowedLocalityLevel.containsKey(schedulerKey)) {
-      // add the initial time of priority to prevent comparing with FsApp
-      // startTime and allowedLocalityLevel degrade
-      lastScheduledContainer.put(schedulerKey, currentTimeMs);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Init the lastScheduledContainer time, priority: "
-            + schedulerKey.getPriority() + ", time: " + currentTimeMs);
+    try {
+      writeLock.lock();
+
+      // default level is NODE_LOCAL
+      if (!allowedLocalityLevel.containsKey(schedulerKey)) {
+        // add the initial time of priority to prevent comparing with FsApp
+        // startTime and allowedLocalityLevel degrade
+        lastScheduledContainer.put(schedulerKey, currentTimeMs);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "Init the lastScheduledContainer time, priority: " + schedulerKey
+                  .getPriority() + ", time: " + currentTimeMs);
+        }
+        allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
+        return NodeType.NODE_LOCAL;
       }
-      allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
-      return NodeType.NODE_LOCAL;
-    }
 
-    NodeType allowed = allowedLocalityLevel.get(schedulerKey);
+      NodeType allowed = allowedLocalityLevel.get(schedulerKey);
 
-    // if level is already most liberal, we're done
-    if (allowed.equals(NodeType.OFF_SWITCH)) {
-      return NodeType.OFF_SWITCH;
-    }
-
-    // check waiting time
-    long waitTime = currentTimeMs;
-    if (lastScheduledContainer.containsKey(schedulerKey)) {
-      waitTime -= lastScheduledContainer.get(schedulerKey);
-    } else {
-      waitTime -= getStartTime();
-    }
+      // if level is already most liberal, we're done
+      if (allowed.equals(NodeType.OFF_SWITCH)) {
+        return NodeType.OFF_SWITCH;
+      }
 
-    long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ?
-            nodeLocalityDelayMs : rackLocalityDelayMs;
+      // check waiting time
+      long waitTime = currentTimeMs;
+      if (lastScheduledContainer.containsKey(schedulerKey)) {
+        waitTime -= lastScheduledContainer.get(schedulerKey);
+      } else{
+        waitTime -= getStartTime();
+      }
 
-    if (waitTime > thresholdTime) {
-      if (allowed.equals(NodeType.NODE_LOCAL)) {
-        allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
-        resetSchedulingOpportunities(schedulerKey, currentTimeMs);
-      } else if (allowed.equals(NodeType.RACK_LOCAL)) {
-        allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
-        resetSchedulingOpportunities(schedulerKey, currentTimeMs);
+      long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ?
+          nodeLocalityDelayMs :
+          rackLocalityDelayMs;
+
+      if (waitTime > thresholdTime) {
+        if (allowed.equals(NodeType.NODE_LOCAL)) {
+          allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
+          resetSchedulingOpportunities(schedulerKey, currentTimeMs);
+        } else if (allowed.equals(NodeType.RACK_LOCAL)) {
+          allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
+          resetSchedulingOpportunities(schedulerKey, currentTimeMs);
+        }
       }
+      return allowedLocalityLevel.get(schedulerKey);
+    } finally {
+      writeLock.unlock();
     }
-    return allowedLocalityLevel.get(schedulerKey);
   }
 
-  synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node,
+  public RMContainer allocate(NodeType type, FSSchedulerNode node,
       SchedulerRequestKey schedulerKey, ResourceRequest request,
       Container reservedContainer) {
-    // Update allowed locality level
-    NodeType allowed = allowedLocalityLevel.get(schedulerKey);
-    if (allowed != null) {
-      if (allowed.equals(NodeType.OFF_SWITCH) &&
-          (type.equals(NodeType.NODE_LOCAL) ||
-              type.equals(NodeType.RACK_LOCAL))) {
-        this.resetAllowedLocalityLevel(schedulerKey, type);
+    RMContainer rmContainer;
+    Container container;
+
+    try {
+      writeLock.lock();
+      // Update allowed locality level
+      NodeType allowed = allowedLocalityLevel.get(schedulerKey);
+      if (allowed != null) {
+        if (allowed.equals(NodeType.OFF_SWITCH) && (type.equals(
+            NodeType.NODE_LOCAL) || type.equals(NodeType.RACK_LOCAL))) {
+          this.resetAllowedLocalityLevel(schedulerKey, type);
+        } else if (allowed.equals(NodeType.RACK_LOCAL) && type.equals(
+            NodeType.NODE_LOCAL)) {
+          this.resetAllowedLocalityLevel(schedulerKey, type);
+        }
       }
-      else if (allowed.equals(NodeType.RACK_LOCAL) &&
-          type.equals(NodeType.NODE_LOCAL)) {
-        this.resetAllowedLocalityLevel(schedulerKey, type);
+
+      // Required sanity check - AM can call 'allocate' to update resource
+      // request without locking the scheduler, hence we need to check
+      if (getTotalRequiredResources(schedulerKey) <= 0) {
+        return null;
       }
-    }
 
-    // Required sanity check - AM can call 'allocate' to update resource 
-    // request without locking the scheduler, hence we need to check
-    if (getTotalRequiredResources(schedulerKey) <= 0) {
-      return null;
-    }
+      container = reservedContainer;
+      if (container == null) {
+        container = createContainer(node, request.getCapability(),
+            schedulerKey);
+      }
 
-    Container container = reservedContainer;
-    if (container == null) {
-      container =
-          createContainer(node, request.getCapability(), schedulerKey);
-    }
-    
-    // Create RMContainer
-    RMContainer rmContainer = new RMContainerImpl(container,
-        getApplicationAttemptId(), node.getNodeID(),
-        appSchedulingInfo.getUser(), rmContext);
-    ((RMContainerImpl)rmContainer).setQueueName(this.getQueueName());
+      // Create RMContainer
+      rmContainer = new RMContainerImpl(container,
+          getApplicationAttemptId(), node.getNodeID(),
+          appSchedulingInfo.getUser(), rmContext);
+      ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());
 
-    // Add it to allContainers list.
-    newlyAllocatedContainers.add(rmContainer);
-    liveContainers.put(container.getId(), rmContainer);    
+      // Add it to allContainers list.
+      newlyAllocatedContainers.add(rmContainer);
+      liveContainers.put(container.getId(), rmContainer);
 
-    // Update consumption and track allocations
-    List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
-        type, node, schedulerKey, request, container);
-    this.attemptResourceUsage.incUsed(container.getResource());
+      // Update consumption and track allocations
+      List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
+          type, node, schedulerKey, request, container);
+      this.attemptResourceUsage.incUsed(container.getResource());
 
-    // Update resource requests related to "request" and store in RMContainer
-    ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
+      // Update resource requests related to "request" and store in RMContainer
+      ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
 
-    // Inform the container
-    rmContainer.handle(
-        new RMContainerEvent(container.getId(), RMContainerEventType.START));
+      // Inform the container
+      rmContainer.handle(
+          new RMContainerEvent(container.getId(), RMContainerEventType.START));
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("allocate: applicationAttemptId=" + container.getId()
+            .getApplicationAttemptId() + " container=" + container.getId()
+            + " host=" + container.getNodeId().getHost() + " type=" + type);
+      }
+      RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
+          "SchedulerApp", getApplicationId(), container.getId(),
+          container.getResource());
+    } finally {
+      writeLock.unlock();
+    }
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("allocate: applicationAttemptId=" 
-          + container.getId().getApplicationAttemptId() 
-          + " container=" + container.getId() + " host="
-          + container.getNodeId().getHost() + " type=" + type);
-    }
-    RMAuditLogger.logSuccess(getUser(), 
-        AuditConstants.ALLOC_CONTAINER, "SchedulerApp", 
-        getApplicationId(), container.getId(), container.getResource());
-    
     return rmContainer;
   }
 
@@ -434,19 +454,30 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    * @param schedulerKey Scheduler Key
    * @param level NodeType
    */
-  public synchronized void resetAllowedLocalityLevel(
+  public void resetAllowedLocalityLevel(
       SchedulerRequestKey schedulerKey, NodeType level) {
-    NodeType old = allowedLocalityLevel.get(schedulerKey);
-    LOG.info("Raising locality level from " + old + " to " + level + " at " +
-        " priority " + schedulerKey.getPriority());
-    allowedLocalityLevel.put(schedulerKey, level);
+    NodeType old;
+    try {
+      writeLock.lock();
+      old = allowedLocalityLevel.put(schedulerKey, level);
+    } finally {
+      writeLock.unlock();
+    }
+
+    LOG.info("Raising locality level from " + old + " to " + level + " at "
+        + " priority " + schedulerKey.getPriority());
   }
 
   // related methods
   public void addPreemption(RMContainer container, long time) {
     assert preemptionMap.get(container) == null;
-    preemptionMap.put(container, time);
-    Resources.addTo(preemptedResources, container.getAllocatedResource());
+    try {
+      writeLock.lock();
+      preemptionMap.put(container, time);
+      Resources.addTo(preemptedResources, container.getAllocatedResource());
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   public Long getContainerPreemptionTime(RMContainer container) {
@@ -584,21 +615,35 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
         getUser(), rmContainer.getContainer().getResource());
   }
 
-  private synchronized void setReservation(SchedulerNode node) {
-    String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
-    Set<String> rackReservations = reservations.get(rackName);
-    if (rackReservations == null) {
-      rackReservations = new HashSet<>();
-      reservations.put(rackName, rackReservations);
+  private void setReservation(SchedulerNode node) {
+    String rackName =
+        node.getRackName() == null ? "NULL" : node.getRackName();
+
+    try {
+      writeLock.lock();
+      Set<String> rackReservations = reservations.get(rackName);
+      if (rackReservations == null) {
+        rackReservations = new HashSet<>();
+        reservations.put(rackName, rackReservations);
+      }
+      rackReservations.add(node.getNodeName());
+    } finally {
+      writeLock.unlock();
     }
-    rackReservations.add(node.getNodeName());
   }
 
-  private synchronized void clearReservation(SchedulerNode node) {
-    String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
-    Set<String> rackReservations = reservations.get(rackName);
-    if (rackReservations != null) {
-      rackReservations.remove(node.getNodeName());
+  private void clearReservation(SchedulerNode node) {
+    String rackName =
+        node.getRackName() == null ? "NULL" : node.getRackName();
+
+    try {
+      writeLock.lock();
+      Set<String> rackReservations = reservations.get(rackName);
+      if (rackReservations != null) {
+        rackReservations.remove(node.getNodeName());
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 
@@ -737,7 +782,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     // For each priority, see if we can schedule a node local, rack local
     // or off-switch request. Rack of off-switch requests may be delayed
     // (not scheduled) in order to promote better locality.
-    synchronized (this) {
+    try {
+      writeLock.lock();
       for (SchedulerRequestKey schedulerKey : keysToTry) {
         // Skip it for reserved container, since
         // we already check it in isValidReservation.
@@ -772,8 +818,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
         if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
             && localRequest != null && localRequest.getNumContainers() != 0) {
-          return assignContainer(node, localRequest,
-              NodeType.NODE_LOCAL, reserved, schedulerKey);
+          return assignContainer(node, localRequest, NodeType.NODE_LOCAL,
+              reserved, schedulerKey);
         }
 
         if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) {
@@ -781,29 +827,31 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
         }
 
         if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
-            && (allowedLocality.equals(NodeType.RACK_LOCAL) ||
-            allowedLocality.equals(NodeType.OFF_SWITCH))) {
-          return assignContainer(node, rackLocalRequest,
-              NodeType.RACK_LOCAL, reserved, schedulerKey);
+            && (allowedLocality.equals(NodeType.RACK_LOCAL) || allowedLocality
+            .equals(NodeType.OFF_SWITCH))) {
+          return assignContainer(node, rackLocalRequest, NodeType.RACK_LOCAL,
+              reserved, schedulerKey);
         }
 
-        ResourceRequest offSwitchRequest =
-            getResourceRequest(schedulerKey, ResourceRequest.ANY);
+        ResourceRequest offSwitchRequest = getResourceRequest(schedulerKey,
+            ResourceRequest.ANY);
         if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) {
           continue;
         }
 
-        if (offSwitchRequest != null &&
-            offSwitchRequest.getNumContainers() != 0) {
-          if (!hasNodeOrRackLocalRequests(schedulerKey) ||
-              allowedLocality.equals(NodeType.OFF_SWITCH)) {
-            return assignContainer(
-                node, offSwitchRequest, NodeType.OFF_SWITCH, reserved,
-                schedulerKey);
+        if (offSwitchRequest != null
+            && offSwitchRequest.getNumContainers() != 0) {
+          if (!hasNodeOrRackLocalRequests(schedulerKey) || allowedLocality
+              .equals(NodeType.OFF_SWITCH)) {
+            return assignContainer(node, offSwitchRequest, NodeType.OFF_SWITCH,
+                reserved, schedulerKey);
           }
         }
       }
+    } finally {
+      writeLock.unlock();
     }
+
     return Resources.none();
   }
 
@@ -963,14 +1011,17 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     Resources.addTo(demand, getCurrentConsumption());
 
     // Add up outstanding resource requests
-    synchronized (this) {
+    try {
+      writeLock.lock();
       for (SchedulerRequestKey k : getSchedulerKeys()) {
         ResourceRequest r = getResourceRequest(k, ResourceRequest.ANY);
         if (r != null) {
-          Resources.multiplyAndAddTo(demand,
-              r.getCapability(), r.getNumContainers());
+          Resources.multiplyAndAddTo(demand, r.getCapability(),
+              r.getNumContainers());
         }
       }
+    } finally {
+      writeLock.unlock();
     }
   }
 

+ 65 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java

@@ -23,11 +23,14 @@ import static org.mockito.Mockito.mock;
 
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.TreeSet;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
 import org.junit.Assert;
@@ -105,4 +108,66 @@ public class TestAppSchedulingInfo {
     Assert.assertEquals(2, sk.getPriority().getPriority());
     Assert.assertEquals(6, sk.getAllocationRequestId());
   }
+
+  @Test
+  public void testSchedulerKeyAccounting() {
+    ApplicationId appIdImpl = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appIdImpl, 1);
+
+    Queue queue = mock(Queue.class);
+    doReturn(mock(QueueMetrics.class)).when(queue).getMetrics();
+    AppSchedulingInfo  info = new AppSchedulingInfo(
+        appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0,
+        new ResourceUsage());
+    Assert.assertEquals(0, info.getSchedulerKeys().size());
+
+    Priority pri1 = Priority.newInstance(1);
+    ResourceRequest req1 = ResourceRequest.newInstance(pri1,
+        ResourceRequest.ANY, Resource.newInstance(1024, 1), 1);
+    Priority pri2 = Priority.newInstance(2);
+    ResourceRequest req2 = ResourceRequest.newInstance(pri2,
+        ResourceRequest.ANY, Resource.newInstance(1024, 1), 2);
+    List<ResourceRequest> reqs = new ArrayList<>();
+    reqs.add(req1);
+    reqs.add(req2);
+    info.updateResourceRequests(reqs, false);
+    ArrayList<SchedulerRequestKey> keys =
+        new ArrayList<>(info.getSchedulerKeys());
+    Assert.assertEquals(2, keys.size());
+    Assert.assertEquals(SchedulerRequestKey.create(req1), keys.get(0));
+    Assert.assertEquals(SchedulerRequestKey.create(req2), keys.get(1));
+
+    // iterate to verify no ConcurrentModificationException
+    for (SchedulerRequestKey schedulerKey : info.getSchedulerKeys()) {
+      info.allocate(NodeType.OFF_SWITCH, null, schedulerKey, req1, null);
+    }
+    Assert.assertEquals(1, info.getSchedulerKeys().size());
+    Assert.assertEquals(SchedulerRequestKey.create(req2),
+        info.getSchedulerKeys().iterator().next());
+
+    req2 = ResourceRequest.newInstance(pri2,
+        ResourceRequest.ANY, Resource.newInstance(1024, 1), 1);
+    reqs.clear();
+    reqs.add(req2);
+    info.updateResourceRequests(reqs, false);
+    info.allocate(NodeType.OFF_SWITCH, null, SchedulerRequestKey.create(req2),
+        req2, null);
+    Assert.assertEquals(0, info.getSchedulerKeys().size());
+
+    req1 = ResourceRequest.newInstance(pri1,
+        ResourceRequest.ANY, Resource.newInstance(1024, 1), 5);
+    reqs.clear();
+    reqs.add(req1);
+    info.updateResourceRequests(reqs, false);
+    Assert.assertEquals(1, info.getSchedulerKeys().size());
+    Assert.assertEquals(SchedulerRequestKey.create(req1),
+        info.getSchedulerKeys().iterator().next());
+    req1 = ResourceRequest.newInstance(pri1,
+        ResourceRequest.ANY, Resource.newInstance(1024, 1), 0);
+    reqs.clear();
+    reqs.add(req1);
+    info.updateResourceRequests(reqs, false);
+    Assert.assertEquals(0, info.getSchedulerKeys().size());
+  }
 }

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md

@@ -712,6 +712,8 @@ none of the apps match the predicates, an empty list will be returned.
   "eq" means equals, "ne" means not equals and existence of key is not required for a match and "ene" means not equals but existence of key is
   required. We can combine any number of ANDs' and ORs' to create complex expressions.  Brackets can be used to club expressions together.<br/>
   _For example_ : infofilters can be "(((infokey1 eq value1) AND (infokey2 ne value1)) OR (infokey1 ene value3))".<br/>
+  Note : If value is an object then value can be given in the form of JSON format without any space.<br/>
+  _For example_ : infofilters can be (infokey1 eq {"&lt;key&gt;":"&lt;value&gt;","&lt;key&gt;":"&lt;value&gt;"...}).<br/>
   Please note that URL unsafe characters such as spaces will have to be suitably encoded.
 1. `conffilters` - If specified, matched applications must have exact matches to the given config name and must be either equal or not equal
   to the given config value. Both the config name and value must be strings. conffilters are represented in the same form as infofilters.
@@ -837,6 +839,8 @@ match the predicates, an empty list will be returned.
   "eq" means equals, "ne" means not equals and existence of key is not required for a match and "ene" means not equals but existence of key is
   required. We can combine any number of ANDs' and ORs' to create complex expressions.  Brackets can be used to club expressions together.<br/>
   _For example_ : infofilters can be "(((infokey1 eq value1) AND (infokey2 ne value1)) OR (infokey1 ene value3))".<br/>
+  Note : If value is an object then value can be given in the form of JSON format without any space.<br/>
+  _For example_ : infofilters can be (infokey1 eq {"&lt;key&gt;":"&lt;value&gt;","&lt;key&gt;":"&lt;value&gt;"...}).<br/>
   Please note that URL unsafe characters such as spaces will have to be suitably encoded.
 1. `conffilters` - If specified, matched applications must have exact matches to the given config name and must be either equal or not equal
   to the given config value. Both the config name and value must be strings. conffilters are represented in the same form as infofilters.
@@ -1035,6 +1039,8 @@ If none of the entities match the predicates, an empty list will be returned.
   "eq" means equals, "ne" means not equals and existence of key is not required for a match and "ene" means not equals but existence of key is
   required. We can combine any number of ANDs' and ORs' to create complex expressions.  Brackets can be used to club expressions together.<br/>
   _For example_ : infofilters can be "(((infokey1 eq value1) AND (infokey2 ne value1)) OR (infokey1 ene value3))".<br/>
+  Note : If value is an object then value can be given in the form of JSON format without any space.<br/>
+  _For example_ : infofilters can be (infokey1 eq {"&lt;key&gt;":"&lt;value&gt;","&lt;key&gt;":"&lt;value&gt;"...}).<br/>
   Please note that URL unsafe characters such as spaces will have to be suitably encoded.
 1. `conffilters` - If specified, matched entities must have exact matches to the given config name and must be either equal or not equal
   to the given config value. Both the config name and value must be strings. conffilters are represented in the same form as infofilters.

Beberapa file tidak ditampilkan karena terlalu banyak file yang berubah dalam diff ini