Quellcode durchsuchen

Merge r1480440 through r1480820 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1480824 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze vor 12 Jahren
Ursprung
Commit
3ecf94e2b7
56 geänderte Dateien mit 1191 neuen und 292 gelöschten Zeilen
  1. 0 4
      hadoop-client/pom.xml
  2. 3 1
      hadoop-common-project/hadoop-common/CHANGES.txt
  3. 1 1
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  4. 16 1
      hadoop-mapreduce-project/CHANGES.txt
  5. 3 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
  6. 4 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
  7. 1 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
  8. 15 47
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java
  9. 225 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java
  10. 61 36
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
  11. 7 7
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java
  12. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
  13. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
  14. 3 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
  15. 149 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java
  16. 0 10
      hadoop-project/pom.xml
  17. 1 1
      hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java
  18. 23 4
      hadoop-yarn-project/CHANGES.txt
  19. 8 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/AMRMProtocol.java
  20. 28 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java
  21. 4 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
  22. 6 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
  23. 3 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java
  24. 8 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
  25. 8 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java
  26. 4 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
  27. 35 13
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java
  28. 20 13
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
  29. 5 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
  30. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java
  31. 4 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
  32. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java
  33. 5 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/AMRMProtocolPBClientImpl.java
  34. 21 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java
  35. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/AMRMProtocolPBServiceImpl.java
  36. 26 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java
  37. 6 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPCFactories.java
  38. 68 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
  39. 40 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
  40. 32 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
  41. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
  42. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
  43. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
  44. 26 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
  45. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
  46. 105 29
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  47. 17 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
  48. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
  49. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
  50. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
  51. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
  52. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
  53. 48 43
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
  54. 96 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
  55. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
  56. 3 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java

+ 0 - 4
hadoop-client/pom.xml

@@ -99,10 +99,6 @@
           <groupId>org.eclipse.jdt</groupId>
           <artifactId>core</artifactId>
         </exclusion>
-        <exclusion>
-          <groupId>org.aspectj</groupId>
-          <artifactId>aspectjrt</artifactId>
-        </exclusion>
         <exclusion>
           <groupId>org.apache.avro</groupId>
           <artifactId>avro-ipc</artifactId>

+ 3 - 1
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -694,7 +694,9 @@ Release 2.0.5-beta - UNRELEASED
     HADOOP-9455. HADOOP_CLIENT_OPTS appended twice causes JVM failures.
     (Chris Nauroth via suresh)
 
-Release 2.0.4-alpha - UNRELEASED
+    HADOOP-9550. Remove aspectj dependency. (kkambatl via tucu)
+
+Release 2.0.4-alpha - 2013-04-25 
 
   INCOMPATIBLE CHANGES
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -630,7 +630,7 @@ Release 2.0.5-beta - UNRELEASED
 
     HDFS-4784. NPE in FSDirectory.resolvePath(). (Brandon Li via suresh)
 
-Release 2.0.4-alpha - UNRELEASED
+Release 2.0.4-alpha - 2013-04-25
 
   INCOMPATIBLE CHANGES
 

+ 16 - 1
hadoop-mapreduce-project/CHANGES.txt

@@ -224,6 +224,9 @@ Release 2.0.5-beta - UNRELEASED
     MAPREDUCE-5159. Change ValueAggregatorJob to add APIs which can support
     binary compatibility with hadoop-1 examples. (Zhijie Shen via vinodkv)
 
+    MAPREDUCE-5157. Bring back old sampler related code so that we can support
+    binary compatibility with hadoop-1 sorter example. (Zhijie Shen via vinodkv)
+
   OPTIMIZATIONS
 
     MAPREDUCE-4974. Optimising the LineRecordReader initialize() method 
@@ -383,7 +386,19 @@ Release 2.0.5-beta - UNRELEASED
     MAPREDUCE-5204. Handling YarnRemoteException separately from IOException in
     MR app after YARN-629. (Xuan Gong via vinodkv)
 
-Release 2.0.4-alpha - UNRELEASED
+    MAPREDUCE-5209. Fix units in a ShuffleScheduler log message.
+    (Tsuyoshi OZAWA via cdouglas)
+
+    MAPREDUCE-5212. Handling YarnRemoteException separately from IOException in
+    MR App's use of ClientRMProtocol after YARN-631. (Xuan Gong via vinodkv)
+
+    MAPREDUCE-5226. Handling YarnRemoteException separately from IOException in
+    MR App's use of AMRMProtocol after YARN-630. (Xuan Gong via vinodkv)
+
+    MAPREDUCE-4942. mapreduce.Job has a bunch of methods that throw 
+    InterruptedException so its incompatible with MR1. (rkanter via tucu)
+
+Release 2.0.4-alpha - 2013-04-25
 
   INCOMPATIBLE CHANGES
 

+ 3 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce.v2.app.rm;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -144,7 +145,8 @@ public abstract class RMContainerRequestor extends RMCommunicator {
     LOG.info("blacklistDisablePercent is " + blacklistDisablePercent);
   }
 
-  protected AllocateResponse makeRemoteRequest() throws YarnRemoteException {
+  protected AllocateResponse makeRemoteRequest() throws YarnRemoteException,
+      IOException {
     AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
         applicationAttemptId, lastResponseID, super.getApplicationProgress(),
         new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>(

+ 4 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
@@ -202,7 +203,7 @@ public class MRAppBenchmark {
               public RegisterApplicationMasterResponse
                   registerApplicationMaster(
                       RegisterApplicationMasterRequest request)
-                      throws YarnRemoteException {
+                      throws YarnRemoteException, IOException {
                 RegisterApplicationMasterResponse response =
                     Records.newRecord(RegisterApplicationMasterResponse.class);
                 response.setMinimumResourceCapability(BuilderUtils
@@ -215,7 +216,7 @@ public class MRAppBenchmark {
               @Override
               public FinishApplicationMasterResponse finishApplicationMaster(
                   FinishApplicationMasterRequest request)
-                  throws YarnRemoteException {
+                  throws YarnRemoteException, IOException {
                 FinishApplicationMasterResponse response =
                     Records.newRecord(FinishApplicationMasterResponse.class);
                 return response;
@@ -223,7 +224,7 @@ public class MRAppBenchmark {
 
               @Override
               public AllocateResponse allocate(AllocateRequest request)
-                  throws YarnRemoteException {
+                  throws YarnRemoteException, IOException {
 
                 AllocateResponse response =
                     Records.newRecord(AllocateResponse.class);

+ 1 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java

@@ -100,6 +100,7 @@ public class TestLocalContainerAllocator {
         when(scheduler.allocate(isA(AllocateRequest.class)))
           .thenThrow(RPCUtil.getRemoteException(new IOException("forcefail")));
       } catch (YarnRemoteException e) {
+      } catch (IOException e) {
       }
       return scheduler;
     }

+ 15 - 47
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java

@@ -209,11 +209,7 @@ public class JobClient extends CLI {
      * completed.
      */
     public float mapProgress() throws IOException {
-      try {
-        return job.mapProgress();
-      } catch (InterruptedException ie) {
-        throw new IOException(ie);
-      }
+      return job.mapProgress();
     }
 
     /**
@@ -221,11 +217,7 @@ public class JobClient extends CLI {
      * completed.
      */
     public float reduceProgress() throws IOException {
-      try {
-        return job.reduceProgress();
-      } catch (InterruptedException ie) {
-        throw new IOException(ie);
-      }
+      return job.reduceProgress();
     }
 
     /**
@@ -245,33 +237,21 @@ public class JobClient extends CLI {
      * completed.
      */
     public float setupProgress() throws IOException {
-      try {
-        return job.setupProgress();
-      } catch (InterruptedException ie) {
-        throw new IOException(ie);
-      }
+      return job.setupProgress();
     }
 
     /**
      * Returns immediately whether the whole job is done yet or not.
      */
     public synchronized boolean isComplete() throws IOException {
-      try {
-        return job.isComplete();
-      } catch (InterruptedException ie) {
-        throw new IOException(ie);
-      }
+      return job.isComplete();
     }
 
     /**
      * True iff job completed successfully.
      */
     public synchronized boolean isSuccessful() throws IOException {
-      try {
-        return job.isSuccessful();
-      } catch (InterruptedException ie) {
-        throw new IOException(ie);
-      }
+      return job.isSuccessful();
     }
 
     /**
@@ -302,11 +282,7 @@ public class JobClient extends CLI {
      * Tells the service to terminate the current job.
      */
     public synchronized void killJob() throws IOException {
-      try {
-        job.killJob();
-      } catch (InterruptedException ie) {
-        throw new IOException(ie);
-      }
+      job.killJob();
     }
    
     
@@ -331,14 +307,10 @@ public class JobClient extends CLI {
      */
     public synchronized void killTask(TaskAttemptID taskId,
         boolean shouldFail) throws IOException {
-      try {
-        if (shouldFail) {
-          job.failTask(taskId);
-        } else {
-          job.killTask(taskId);
-        }
-      } catch (InterruptedException ie) {
-        throw new IOException(ie);
+      if (shouldFail) {
+        job.failTask(taskId);
+      } else {
+        job.killTask(taskId);
       }
     }
 
@@ -378,16 +350,12 @@ public class JobClient extends CLI {
      * Returns the counters for this job
      */
     public Counters getCounters() throws IOException {
-      try { 
-        Counters result = null;
-        org.apache.hadoop.mapreduce.Counters temp = job.getCounters();
-        if(temp != null) {
-          result = Counters.downgrade(temp);
-        }
-        return result;
-      } catch (InterruptedException ie) {
-        throw new IOException(ie);
+      Counters result = null;
+      org.apache.hadoop.mapreduce.Counters temp = job.getCounters();
+      if(temp != null) {
+        result = Counters.downgrade(temp);
       }
+      return result;
     }
     
     @Override

+ 225 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java

@@ -19,10 +19,18 @@
 package org.apache.hadoop.mapred.lib;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.Job;
 
 @InterfaceAudience.Public
@@ -30,6 +38,8 @@ import org.apache.hadoop.mapreduce.Job;
 public class InputSampler<K,V> extends 
   org.apache.hadoop.mapreduce.lib.partition.InputSampler<K, V> {
 
+  private static final Log LOG = LogFactory.getLog(InputSampler.class);
+
   public InputSampler(JobConf conf) {
     super(conf);
   }
@@ -38,4 +48,219 @@ public class InputSampler<K,V> extends
       throws IOException, ClassNotFoundException, InterruptedException {
     writePartitionFile(new Job(job), sampler);
   }
+  /**
+   * Interface to sample using an {@link org.apache.hadoop.mapred.InputFormat}.
+   */
+  public interface Sampler<K,V> extends
+    org.apache.hadoop.mapreduce.lib.partition.InputSampler.Sampler<K, V> {
+    /**
+     * For a given job, collect and return a subset of the keys from the
+     * input data.
+     */
+    K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException;
+  }
+
+  /**
+   * Samples the first n records from s splits.
+   * Inexpensive way to sample random data.
+   */
+  public static class SplitSampler<K,V> extends
+      org.apache.hadoop.mapreduce.lib.partition.InputSampler.SplitSampler<K, V>
+          implements Sampler<K,V> {
+
+    /**
+     * Create a SplitSampler sampling <em>all</em> splits.
+     * Takes the first numSamples / numSplits records from each split.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     */
+    public SplitSampler(int numSamples) {
+      this(numSamples, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Create a new SplitSampler.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     * @param maxSplitsSampled The maximum number of splits to examine.
+     */
+    public SplitSampler(int numSamples, int maxSplitsSampled) {
+      super(numSamples, maxSplitsSampled);
+    }
+
+    /**
+     * From each split sampled, take the first numSamples / numSplits records.
+     */
+    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
+      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+      ArrayList<K> samples = new ArrayList<K>(numSamples);
+      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
+      int splitStep = splits.length / splitsToSample;
+      int samplesPerSplit = numSamples / splitsToSample;
+      long records = 0;
+      for (int i = 0; i < splitsToSample; ++i) {
+        RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
+            job, Reporter.NULL);
+        K key = reader.createKey();
+        V value = reader.createValue();
+        while (reader.next(key, value)) {
+          samples.add(key);
+          key = reader.createKey();
+          ++records;
+          if ((i+1) * samplesPerSplit <= records) {
+            break;
+          }
+        }
+        reader.close();
+      }
+      return (K[])samples.toArray();
+    }
+  }
+
+  /**
+   * Sample from random points in the input.
+   * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
+   * each split.
+   */
+  public static class RandomSampler<K,V> extends
+      org.apache.hadoop.mapreduce.lib.partition.InputSampler.RandomSampler<K, V>
+          implements Sampler<K,V> {
+
+    /**
+     * Create a new RandomSampler sampling <em>all</em> splits.
+     * This will read every split at the client, which is very expensive.
+     * @param freq Probability with which a key will be chosen.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     */
+    public RandomSampler(double freq, int numSamples) {
+      this(freq, numSamples, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Create a new RandomSampler.
+     * @param freq Probability with which a key will be chosen.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     * @param maxSplitsSampled The maximum number of splits to examine.
+     */
+    public RandomSampler(double freq, int numSamples, int maxSplitsSampled) {
+      super(freq, numSamples, maxSplitsSampled);
+    }
+
+    /**
+     * Randomize the split order, then take the specified number of keys from
+     * each split sampled, where each key is selected with the specified
+     * probability and possibly replaced by a subsequently selected key when
+     * the quota of keys from that split is satisfied.
+     */
+    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
+      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+      ArrayList<K> samples = new ArrayList<K>(numSamples);
+      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
+
+      Random r = new Random();
+      long seed = r.nextLong();
+      r.setSeed(seed);
+      LOG.debug("seed: " + seed);
+      // shuffle splits
+      for (int i = 0; i < splits.length; ++i) {
+        InputSplit tmp = splits[i];
+        int j = r.nextInt(splits.length);
+        splits[i] = splits[j];
+        splits[j] = tmp;
+      }
+      // our target rate is in terms of the maximum number of sample splits,
+      // but we accept the possibility of sampling additional splits to hit
+      // the target sample keyset
+      for (int i = 0; i < splitsToSample ||
+                     (i < splits.length && samples.size() < numSamples); ++i) {
+        RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
+            Reporter.NULL);
+        K key = reader.createKey();
+        V value = reader.createValue();
+        while (reader.next(key, value)) {
+          if (r.nextDouble() <= freq) {
+            if (samples.size() < numSamples) {
+              samples.add(key);
+            } else {
+              // When exceeding the maximum number of samples, replace a
+              // random element with this one, then adjust the frequency
+              // to reflect the possibility of existing elements being
+              // pushed out
+              int ind = r.nextInt(numSamples);
+              if (ind != numSamples) {
+                samples.set(ind, key);
+              }
+              freq *= (numSamples - 1) / (double) numSamples;
+            }
+            key = reader.createKey();
+          }
+        }
+        reader.close();
+      }
+      return (K[])samples.toArray();
+    }
+  }
+
+  /**
+   * Sample from s splits at regular intervals.
+   * Useful for sorted data.
+   */
+  public static class IntervalSampler<K,V> extends
+      org.apache.hadoop.mapreduce.lib.partition.InputSampler.IntervalSampler<K, V>
+          implements Sampler<K,V> {
+
+    /**
+     * Create a new IntervalSampler sampling <em>all</em> splits.
+     * @param freq The frequency with which records will be emitted.
+     */
+    public IntervalSampler(double freq) {
+      this(freq, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Create a new IntervalSampler.
+     * @param freq The frequency with which records will be emitted.
+     * @param maxSplitsSampled The maximum number of splits to examine.
+     * @see #getSample
+     */
+    public IntervalSampler(double freq, int maxSplitsSampled) {
+      super(freq, maxSplitsSampled);
+    }
+
+    /**
+     * For each split sampled, emit when the ratio of the number of records
+     * retained to the total record count is less than the specified
+     * frequency.
+     */
+    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
+      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+      ArrayList<K> samples = new ArrayList<K>();
+      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
+      int splitStep = splits.length / splitsToSample;
+      long records = 0;
+      long kept = 0;
+      for (int i = 0; i < splitsToSample; ++i) {
+        RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
+            job, Reporter.NULL);
+        K key = reader.createKey();
+        V value = reader.createValue();
+        while (reader.next(key, value)) {
+          ++records;
+          if ((double) kept / records < freq) {
+            ++kept;
+            samples.add(key);
+            key = reader.createKey();
+          }
+        }
+        reader.close();
+      }
+      return (K[])samples.toArray();
+    }
+  }
+
 }

+ 61 - 36
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java

@@ -296,7 +296,7 @@ public class Job extends JobContextImpl implements JobContext {
    * it, if necessary
    */
   synchronized void ensureFreshStatus() 
-      throws IOException, InterruptedException {
+      throws IOException {
     if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
       updateStatus();
     }
@@ -306,13 +306,18 @@ public class Job extends JobContextImpl implements JobContext {
    * immediately
    * @throws IOException
    */
-  synchronized void updateStatus() throws IOException, InterruptedException {
-    this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
-      @Override
-      public JobStatus run() throws IOException, InterruptedException {
-        return cluster.getClient().getJobStatus(status.getJobID());
-      }
-    });
+  synchronized void updateStatus() throws IOException {
+    try {
+      this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
+        @Override
+        public JobStatus run() throws IOException, InterruptedException {
+          return cluster.getClient().getJobStatus(status.getJobID());
+        }
+      });
+    }
+    catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
     if (this.status == null) {
       throw new IOException("Job status not available ");
     }
@@ -537,7 +542,7 @@ public class Job extends JobContextImpl implements JobContext {
    * @return the progress of the job's map-tasks.
    * @throws IOException
    */
-  public float mapProgress() throws IOException, InterruptedException {
+  public float mapProgress() throws IOException {
     ensureState(JobState.RUNNING);
     ensureFreshStatus();
     return status.getMapProgress();
@@ -550,7 +555,7 @@ public class Job extends JobContextImpl implements JobContext {
    * @return the progress of the job's reduce-tasks.
    * @throws IOException
    */
-  public float reduceProgress() throws IOException, InterruptedException {
+  public float reduceProgress() throws IOException {
     ensureState(JobState.RUNNING);
     ensureFreshStatus();
     return status.getReduceProgress();
@@ -576,7 +581,7 @@ public class Job extends JobContextImpl implements JobContext {
    * @return the progress of the job's setup-tasks.
    * @throws IOException
    */
-  public float setupProgress() throws IOException, InterruptedException {
+  public float setupProgress() throws IOException {
     ensureState(JobState.RUNNING);
     ensureFreshStatus();
     return status.getSetupProgress();
@@ -589,7 +594,7 @@ public class Job extends JobContextImpl implements JobContext {
    * @return <code>true</code> if the job is complete, else <code>false</code>.
    * @throws IOException
    */
-  public boolean isComplete() throws IOException, InterruptedException {
+  public boolean isComplete() throws IOException {
     ensureState(JobState.RUNNING);
     updateStatus();
     return status.isJobComplete();
@@ -601,7 +606,7 @@ public class Job extends JobContextImpl implements JobContext {
    * @return <code>true</code> if the job succeeded, else <code>false</code>.
    * @throws IOException
    */
-  public boolean isSuccessful() throws IOException, InterruptedException {
+  public boolean isSuccessful() throws IOException {
     ensureState(JobState.RUNNING);
     updateStatus();
     return status.getState() == JobStatus.State.SUCCEEDED;
@@ -613,9 +618,14 @@ public class Job extends JobContextImpl implements JobContext {
    * 
    * @throws IOException
    */
-  public void killJob() throws IOException, InterruptedException {
+  public void killJob() throws IOException {
     ensureState(JobState.RUNNING);
-    cluster.getClient().killJob(getJobID());
+    try {
+      cluster.getClient().killJob(getJobID());
+    }
+    catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
   }
 
   /**
@@ -673,7 +683,7 @@ public class Job extends JobContextImpl implements JobContext {
     try {
       return getTaskCompletionEvents(startFrom, 10);
     } catch (InterruptedException ie) {
-      throw new RuntimeException(ie);
+      throw new IOException(ie);
     }
   }
 
@@ -684,13 +694,18 @@ public class Job extends JobContextImpl implements JobContext {
    * @throws IOException
    */
   public boolean killTask(final TaskAttemptID taskId) 
-      throws IOException, InterruptedException {
+      throws IOException {
     ensureState(JobState.RUNNING);
-    return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
-      public Boolean run() throws IOException, InterruptedException {
-        return cluster.getClient().killTask(taskId, false);
-      }
-    });
+    try {
+      return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+        public Boolean run() throws IOException, InterruptedException {
+          return cluster.getClient().killTask(taskId, false);
+        }
+      });
+    }
+    catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
   }
 
   /**
@@ -700,14 +715,19 @@ public class Job extends JobContextImpl implements JobContext {
    * @throws IOException
    */
   public boolean failTask(final TaskAttemptID taskId) 
-      throws IOException, InterruptedException {
+      throws IOException {
     ensureState(JobState.RUNNING);
-    return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
-      @Override
-      public Boolean run() throws IOException, InterruptedException {
-        return cluster.getClient().killTask(taskId, true);
-      }
-    });
+    try {
+      return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+        @Override
+        public Boolean run() throws IOException, InterruptedException {
+          return cluster.getClient().killTask(taskId, true);
+        }
+      });
+    }
+    catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
   }
 
   /**
@@ -718,14 +738,19 @@ public class Job extends JobContextImpl implements JobContext {
    * @throws IOException
    */
   public Counters getCounters() 
-      throws IOException, InterruptedException {
+      throws IOException {
     ensureState(JobState.RUNNING);
-    return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
-      @Override
-      public Counters run() throws IOException, InterruptedException {
-        return cluster.getClient().getJobCounters(getJobID());
-      }
-    });
+    try {
+      return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
+        @Override
+        public Counters run() throws IOException, InterruptedException {
+          return cluster.getClient().getJobCounters(getJobID());
+        }
+      });
+    }
+    catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
   }
 
   /**

+ 7 - 7
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java

@@ -96,8 +96,8 @@ public class InputSampler<K,V> extends Configured implements Tool  {
    */
   public static class SplitSampler<K,V> implements Sampler<K,V> {
 
-    private final int numSamples;
-    private final int maxSplitsSampled;
+    protected final int numSamples;
+    protected final int maxSplitsSampled;
 
     /**
      * Create a SplitSampler sampling <em>all</em> splits.
@@ -157,9 +157,9 @@ public class InputSampler<K,V> extends Configured implements Tool  {
    * each split.
    */
   public static class RandomSampler<K,V> implements Sampler<K,V> {
-    private double freq;
-    private final int numSamples;
-    private final int maxSplitsSampled;
+    protected double freq;
+    protected final int numSamples;
+    protected final int maxSplitsSampled;
 
     /**
      * Create a new RandomSampler sampling <em>all</em> splits.
@@ -249,8 +249,8 @@ public class InputSampler<K,V> extends Configured implements Tool  {
    * Useful for sorted data.
    */
   public static class IntervalSampler<K,V> implements Sampler<K,V> {
-    private final double freq;
-    private final int maxSplitsSampled;
+    protected final double freq;
+    protected final int maxSplitsSampled;
 
     /**
      * Create a new IntervalSampler sampling <em>all</em> splits.

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java

@@ -359,7 +359,7 @@ class ShuffleScheduler<K,V> {
       }
     }
     LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " + 
-             (System.currentTimeMillis()-shuffleStart.get()) + "s");
+             (System.currentTimeMillis()-shuffleStart.get()) + "ms");
   }
     
   public synchronized void resetKnownMaps() {

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java

@@ -137,7 +137,7 @@ public class ClientServiceDelegate {
     }
   }
 
-  private MRClientProtocol getProxy() throws YarnRemoteException {
+  private MRClientProtocol getProxy() throws YarnRemoteException, IOException {
     if (realProxy != null) {
       return realProxy;
     }

+ 3 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java

@@ -362,7 +362,7 @@ public class TestClientServiceDelegate {
   }
 
   private void testRMDownForJobStatusBeforeGetAMReport(Configuration conf,
-      int noOfRetries) throws YarnRemoteException {
+      int noOfRetries) throws YarnRemoteException, IOException {
     conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
     conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
         !isAMReachableFromClient);
@@ -429,7 +429,8 @@ public class TestClientServiceDelegate {
         "N/A", 0.0f);
   }
 
-  private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException {
+  private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException,
+      IOException {
     ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
     when(rm.getApplicationReport(jobId.getAppId())).thenReturn(null);
     return rm;

+ 149 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java

@@ -17,23 +17,26 @@
  */
 package org.apache.hadoop.mapreduce.lib.partition;
 
+import static org.junit.Assert.assertEquals;
+
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.junit.Test;
-import static org.junit.Assert.*;
-
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.junit.Test;
 
 public class TestInputSampler {
 
@@ -47,6 +50,24 @@ public class TestInputSampler {
     public int getInit() { return i; }
   }
 
+  static class MapredSequentialSplit implements org.apache.hadoop.mapred.InputSplit {
+    private int i;
+    MapredSequentialSplit(int i) {
+      this.i = i;
+    }
+    @Override
+    public long getLength() { return 0; }
+    @Override
+    public String[] getLocations() { return new String[0]; }
+    public int getInit() { return i; }
+    @Override
+    public void write(DataOutput out) throws IOException {
+    }
+    @Override
+    public void readFields(DataInput in) throws IOException {
+    }
+  }
+
   static class TestInputSamplerIF
       extends InputFormat<IntWritable,NullWritable> {
 
@@ -90,6 +111,71 @@ public class TestInputSampler {
 
   }
 
+  static class TestMapredInputSamplerIF extends TestInputSamplerIF implements
+      org.apache.hadoop.mapred.InputFormat<IntWritable,NullWritable> {
+
+    TestMapredInputSamplerIF(int maxDepth, int numSplits, int... splitInit) {
+      super(maxDepth, numSplits, splitInit);
+    }
+
+    @Override
+    public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job,
+        int numSplits) throws IOException {
+      List<InputSplit> splits = null;
+      try {
+        splits = getSplits(Job.getInstance(job));
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+      org.apache.hadoop.mapred.InputSplit[] retVals =
+          new org.apache.hadoop.mapred.InputSplit[splits.size()];
+      for (int i = 0; i < splits.size(); ++i) {
+        MapredSequentialSplit split = new MapredSequentialSplit(
+            ((SequentialSplit) splits.get(i)).getInit());
+        retVals[i] = split;
+      }
+      return retVals;
+    }
+
+    @Override
+    public org.apache.hadoop.mapred.RecordReader<IntWritable, NullWritable>
+        getRecordReader(final org.apache.hadoop.mapred.InputSplit split,
+            JobConf job, Reporter reporter) throws IOException {
+      return new org.apache.hadoop.mapred.RecordReader
+          <IntWritable, NullWritable>() {
+        private final IntWritable i =
+            new IntWritable(((MapredSequentialSplit)split).getInit());
+        private int maxVal = i.get() + maxDepth + 1;
+
+        @Override
+        public boolean next(IntWritable key, NullWritable value)
+            throws IOException {
+          i.set(i.get() + 1);
+          return i.get() < maxVal;
+        }
+        @Override
+        public IntWritable createKey() {
+          return new IntWritable(i.get());
+        }
+        @Override
+        public NullWritable createValue() {
+          return NullWritable.get();
+        }
+        @Override
+        public long getPos() throws IOException {
+          return 0;
+        }
+        @Override
+        public void close() throws IOException {
+        }
+        @Override
+        public float getProgress() throws IOException {
+          return 0;
+        }
+      };
+    }
+  }
+
   /**
    * Verify SplitSampler contract, that an equal number of records are taken
    * from the first splits.
@@ -118,6 +204,36 @@ public class TestInputSampler {
     }
   }
 
+  /**
+   * Verify SplitSampler contract in mapred.lib.InputSampler, which is added
+   * back for binary compatibility of M/R 1.x
+   */
+  @Test (timeout = 30000)
+  @SuppressWarnings("unchecked") // IntWritable comparator not typesafe
+  public void testMapredSplitSampler() throws Exception {
+    final int TOT_SPLITS = 15;
+    final int NUM_SPLITS = 5;
+    final int STEP_SAMPLE = 5;
+    final int NUM_SAMPLES = NUM_SPLITS * STEP_SAMPLE;
+    org.apache.hadoop.mapred.lib.InputSampler.Sampler<IntWritable,NullWritable>
+        sampler = new org.apache.hadoop.mapred.lib.InputSampler.SplitSampler
+            <IntWritable,NullWritable>(NUM_SAMPLES, NUM_SPLITS);
+    int inits[] = new int[TOT_SPLITS];
+    for (int i = 0; i < TOT_SPLITS; ++i) {
+      inits[i] = i * STEP_SAMPLE;
+    }
+    Object[] samples = sampler.getSample(
+        new TestMapredInputSamplerIF(100000, TOT_SPLITS, inits),
+        new JobConf());
+    assertEquals(NUM_SAMPLES, samples.length);
+    Arrays.sort(samples, new IntWritable.Comparator());
+    for (int i = 0; i < NUM_SAMPLES; ++i) {
+      // mapred.lib.InputSampler.SplitSampler has a sampling step
+      assertEquals(i % STEP_SAMPLE + TOT_SPLITS * (i / STEP_SAMPLE),
+          ((IntWritable)samples[i]).get());
+    }
+  }
+
   /**
    * Verify IntervalSampler contract, that samples are taken at regular
    * intervals from the given splits.
@@ -146,4 +262,33 @@ public class TestInputSampler {
     }
   }
 
+  /**
+   * Verify IntervalSampler in mapred.lib.InputSampler, which is added back
+   * for binary compatibility of M/R 1.x
+   */
+  @Test (timeout = 30000)
+  @SuppressWarnings("unchecked") // IntWritable comparator not typesafe
+  public void testMapredIntervalSampler() throws Exception {
+    final int TOT_SPLITS = 16;
+    final int PER_SPLIT_SAMPLE = 4;
+    final int NUM_SAMPLES = TOT_SPLITS * PER_SPLIT_SAMPLE;
+    final double FREQ = 1.0 / TOT_SPLITS;
+    org.apache.hadoop.mapred.lib.InputSampler.Sampler<IntWritable,NullWritable>
+        sampler = new org.apache.hadoop.mapred.lib.InputSampler.IntervalSampler
+            <IntWritable,NullWritable>(FREQ, NUM_SAMPLES);
+    int inits[] = new int[TOT_SPLITS];
+    for (int i = 0; i < TOT_SPLITS; ++i) {
+      inits[i] = i;
+    }
+    Job ignored = Job.getInstance();
+    Object[] samples = sampler.getSample(new TestInputSamplerIF(
+          NUM_SAMPLES, TOT_SPLITS, inits), ignored);
+    assertEquals(NUM_SAMPLES, samples.length);
+    Arrays.sort(samples, new IntWritable.Comparator());
+    for (int i = 0; i < NUM_SAMPLES; ++i) {
+      assertEquals(i,
+          ((IntWritable)samples[i]).get());
+    }
+  }
+
 }

+ 0 - 10
hadoop-project/pom.xml

@@ -577,16 +577,6 @@
         <artifactId>jackson-xc</artifactId>
         <version>1.8.8</version>
       </dependency>
-      <dependency>
-        <groupId>org.aspectj</groupId>
-        <artifactId>aspectjtools</artifactId>
-        <version>1.6.5</version>
-      </dependency>
-      <dependency>
-        <groupId>org.aspectj</groupId>
-        <artifactId>aspectjrt</artifactId>
-        <version>1.6.5</version>
-      </dependency>
       <dependency>
         <groupId>org.mockito</groupId>
         <artifactId>mockito-all</artifactId>

+ 1 - 1
hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java

@@ -346,7 +346,7 @@ public class TestGridmixSummary {
       };
       
       @Override
-      public boolean isSuccessful() throws IOException, InterruptedException {
+      public boolean isSuccessful() throws IOException {
         if (lost) {
           throw new IOException("Test failure!");
         }

+ 23 - 4
hadoop-yarn-project/CHANGES.txt

@@ -10,9 +10,6 @@ Trunk - Unreleased
     Azure environments. (See breakdown of tasks below for subtasks and
     contributors)
 
-    YARN-45. Add protocol for schedulers to request containers back from
-    ApplicationMasters. (Carlo Curino, cdouglas)
-
   IMPROVEMENTS
 
     YARN-84. Use Builder to build RPC server. (Brandon Li via suresh)
@@ -118,11 +115,20 @@ Release 2.0.5-beta - UNRELEASED
     YARN-632. Changed ContainerManager api to throw IOException and
     YarnRemoteException. (Xuan Gong via vinodkv)
 
+    YARN-631. Changed ClientRMProtocol api to throw IOException and
+    YarnRemoteException. (Xuan Gong via vinodkv)
+
+    YARN-630. Changed AMRMProtocol api to throw IOException and
+    YarnRemoteException. (Xuan Gong via vinodkv)
+
   NEW FEATURES
 
     YARN-482. FS: Extend SchedulingMode to intermediate queues. 
     (kkambatl via tucu)
 
+    YARN-45. Add protocol for schedulers to request containers back from
+    ApplicationMasters. (Carlo Curino, cdouglas)
+
   IMPROVEMENTS
 
     YARN-365. Change NM heartbeat handling to not generate a scheduler event
@@ -229,6 +235,11 @@ Release 2.0.5-beta - UNRELEASED
     tokens for app attempt so that RM can be restarted while preserving current
     applications. (Jian He via vinodkv)
 
+    YARN-568. Add support for work preserving preemption to the FairScheduler.
+    (Carlo Curino and Sandy Ryza via cdouglas)
+
+    YARN-598. Add virtual cores to queue metrics. (sandyr via tucu)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -353,7 +364,15 @@ Release 2.0.5-beta - UNRELEASED
 
     YARN-646. Fix two typos in Fair Scheduler user guide. (Dapeng Sun via atm)
 
-Release 2.0.4-alpha - UNRELEASED
+    YARN-507. Add interface visibility and stability annotations to FS 
+    interfaces/classes. (kkambatl via tucu)
+
+    YARN-637. FS: maxAssign is not honored. (kkambatl via tucu)
+
+    YARN-655. Fair scheduler metrics should subtract allocated memory from 
+    available memory. (sandyr via tucu)
+
+Release 2.0.4-alpha - 2013-04-25 
 
   INCOMPATIBLE CHANGES
 

+ 8 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/AMRMProtocol.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.api;
 
+import java.io.IOException;
+
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -57,10 +59,11 @@ public interface AMRMProtocol {
    * @param request registration request
    * @return registration respose
    * @throws YarnRemoteException
+   * @throws IOException
    */
   public RegisterApplicationMasterResponse registerApplicationMaster(
       RegisterApplicationMasterRequest request) 
-  throws YarnRemoteException;
+  throws YarnRemoteException, IOException;
   
   /**
    * <p>The interface used by an <code>ApplicationMaster</code> to notify the 
@@ -76,10 +79,11 @@ public interface AMRMProtocol {
    * @param request completion request
    * @return completion response
    * @throws YarnRemoteException
+   * @throws IOException
    */
   public FinishApplicationMasterResponse finishApplicationMaster(
       FinishApplicationMasterRequest request) 
-  throws YarnRemoteException;
+  throws YarnRemoteException, IOException;
   
   /**
    * <p>The main interface between an <code>ApplicationMaster</code> 
@@ -105,7 +109,8 @@ public interface AMRMProtocol {
    * @param request allocation request
    * @return allocation response
    * @throws YarnRemoteException
+   * @throws IOException
    */
   public AllocateResponse allocate(AllocateRequest request) 
-  throws YarnRemoteException;
+  throws YarnRemoteException, IOException;
 }

+ 28 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.api;
 
+import java.io.IOException;
+
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
@@ -78,11 +80,12 @@ public interface ClientRMProtocol {
    * @return response containing the new <code>ApplicationId</code> to be used
    * to submit an application
    * @throws YarnRemoteException
+   * @throws IOException
    * @see #submitApplication(SubmitApplicationRequest)
    */
   public GetNewApplicationResponse getNewApplication(
       GetNewApplicationRequest request)
-  throws YarnRemoteException;
+  throws YarnRemoteException, IOException;
   
   /**
    * <p>The interface used by clients to submit a new application to the
@@ -106,11 +109,12 @@ public interface ClientRMProtocol {
    * @param request request to submit a new application
    * @return (empty) response on accepting the submission
    * @throws YarnRemoteException
+   * @throws IOException
    * @see #getNewApplication(GetNewApplicationRequest)
    */
   public SubmitApplicationResponse submitApplication(
       SubmitApplicationRequest request) 
-  throws YarnRemoteException;
+  throws YarnRemoteException, IOException;
   
   /**
    * <p>The interface used by clients to request the 
@@ -129,11 +133,12 @@ public interface ClientRMProtocol {
    * @return <code>ResourceManager</code> returns an empty response
    *         on success and throws an exception on rejecting the request
    * @throws YarnRemoteException
+   * @throws IOException
    * @see #getQueueUserAcls(GetQueueUserAclsInfoRequest) 
    */
   public KillApplicationResponse forceKillApplication(
       KillApplicationRequest request) 
-  throws YarnRemoteException;
+  throws YarnRemoteException, IOException;
 
   /**
    * <p>The interface used by clients to get a report of an Application from
@@ -164,10 +169,11 @@ public interface ClientRMProtocol {
    * @param request request for an application report
    * @return application report 
    * @throws YarnRemoteException
+   * @throws IOException
    */
   public GetApplicationReportResponse getApplicationReport(
       GetApplicationReportRequest request) 
-  throws YarnRemoteException;
+  throws YarnRemoteException, IOException;
   
   /**
    * <p>The interface used by clients to get metrics about the cluster from
@@ -181,10 +187,11 @@ public interface ClientRMProtocol {
    * @param request request for cluster metrics
    * @return cluster metrics
    * @throws YarnRemoteException
+   * @throws IOException
    */
   public GetClusterMetricsResponse getClusterMetrics(
       GetClusterMetricsRequest request) 
-  throws YarnRemoteException;
+  throws YarnRemoteException, IOException;
   
   /**
    * <p>The interface used by clients to get a report of all Applications
@@ -202,10 +209,11 @@ public interface ClientRMProtocol {
    * @param request request for report on all running applications
    * @return report on all running applications
    * @throws YarnRemoteException
+   * @throws IOException
    */
   public GetAllApplicationsResponse getAllApplications(
       GetAllApplicationsRequest request) 
-  throws YarnRemoteException;
+  throws YarnRemoteException, IOException;
   
   /**
    * <p>The interface used by clients to get a report of all nodes
@@ -218,10 +226,11 @@ public interface ClientRMProtocol {
    * @param request request for report on all nodes
    * @return report on all nodes
    * @throws YarnRemoteException
+   * @throws IOException
    */
   public GetClusterNodesResponse getClusterNodes(
       GetClusterNodesRequest request) 
-  throws YarnRemoteException;
+  throws YarnRemoteException, IOException;
   
   /**
    * <p>The interface used by clients to get information about <em>queues</em>
@@ -236,10 +245,11 @@ public interface ClientRMProtocol {
    * @param request request to get queue information
    * @return queue information
    * @throws YarnRemoteException
+   * @throws IOException
    */
   public GetQueueInfoResponse getQueueInfo(
       GetQueueInfoRequest request) 
-  throws YarnRemoteException;
+  throws YarnRemoteException, IOException;
   
   /**
    * <p>The interface used by clients to get information about <em>queue 
@@ -252,10 +262,11 @@ public interface ClientRMProtocol {
    * @param request request to get queue acls for <em>current user</em>
    * @return queue acls for <em>current user</em>
    * @throws YarnRemoteException
+   * @throws IOException
    */
   public GetQueueUserAclsInfoResponse getQueueUserAcls(
       GetQueueUserAclsInfoRequest request) 
-  throws YarnRemoteException;
+  throws YarnRemoteException, IOException;
   
   /**
    * <p>The interface used by clients to get delegation token, enabling the 
@@ -267,10 +278,11 @@ public interface ClientRMProtocol {
    * @param request request to get a delegation token for the client.
    * @return delegation token that can be used to talk to this service
    * @throws YarnRemoteException
+   * @throws IOException
    */
   public GetDelegationTokenResponse getDelegationToken(
       GetDelegationTokenRequest request) 
-  throws YarnRemoteException;
+  throws YarnRemoteException, IOException;
   
   /**
    * Renew an existing delegation token.
@@ -278,10 +290,12 @@ public interface ClientRMProtocol {
    * @param request the delegation token to be renewed.
    * @return the new expiry time for the delegation token.
    * @throws YarnRemoteException
+   * @throws IOException
    */
   @Private
   public RenewDelegationTokenResponse renewDelegationToken(
-      RenewDelegationTokenRequest request) throws YarnRemoteException;
+      RenewDelegationTokenRequest request) throws YarnRemoteException,
+      IOException;
 
   /**
    * Cancel an existing delegation token.
@@ -289,8 +303,10 @@ public interface ClientRMProtocol {
    * @param request the delegation token to be cancelled.
    * @return an empty response.
    * @throws YarnRemoteException
+   * @throws IOException
    */
   @Private
   public CancelDelegationTokenResponse cancelDelegationToken(
-      CancelDelegationTokenRequest request) throws YarnRemoteException;
+      CancelDelegationTokenRequest request) throws YarnRemoteException,
+      IOException;
 }

+ 4 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java

@@ -433,8 +433,9 @@ public class ApplicationMaster {
    * Main run function for the application master
    *
    * @throws YarnRemoteException
+   * @throws IOException
    */
-  public boolean run() throws YarnRemoteException {
+  public boolean run() throws YarnRemoteException, IOException {
     LOG.info("Starting ApplicationMaster");
 
     AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
@@ -533,6 +534,8 @@ public class ApplicationMaster {
       resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
     } catch (YarnRemoteException ex) {
       LOG.error("Failed to unregister application", ex);
+    } catch (IOException e) {
+      LOG.error("Failed to unregister application", e);
     }
     
     done = true;

+ 6 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java

@@ -592,8 +592,10 @@ public class Client extends YarnClientImpl {
    * @param appId Application Id of application to be monitored
    * @return true if application completed successfully
    * @throws YarnRemoteException
+   * @throws IOException
    */
-  private boolean monitorApplication(ApplicationId appId) throws YarnRemoteException {
+  private boolean monitorApplication(ApplicationId appId)
+      throws YarnRemoteException, IOException {
 
     while (true) {
 
@@ -655,8 +657,10 @@ public class Client extends YarnClientImpl {
    * Kill a submitted application by sending a call to the ASM
    * @param appId Application Id to be killed. 
    * @throws YarnRemoteException
+   * @throws IOException
    */
-  private void forceKillApplication(ApplicationId appId) throws YarnRemoteException {
+  private void forceKillApplication(ApplicationId appId)
+      throws YarnRemoteException, IOException {
     // TODO clarify whether multiple jobs with the same app id can be submitted and be running at 
     // the same time. 
     // If yes, can we kill a particular attempt only?

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java

@@ -357,9 +357,11 @@ public class UnmanagedAMLauncher {
    *          Application Id of application to be monitored
    * @return true if application completed successfully
    * @throws YarnRemoteException
+   * @throws IOException
    */
   private ApplicationReport monitorApplication(ApplicationId appId,
-      Set<YarnApplicationState> finalState) throws YarnRemoteException {
+      Set<YarnApplicationState> finalState) throws YarnRemoteException,
+      IOException {
 
     long foundAMCompletedTime = 0;
     final int timeToWaitMS = 10000;

+ 8 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java

@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.client;
 
 
+import java.io.IOException;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -72,12 +74,13 @@ public interface AMRMClient extends Service {
    * @param appTrackingUrl URL at which the master info can be seen
    * @return <code>RegisterApplicationMasterResponse</code>
    * @throws YarnRemoteException
+   * @throws IOException
    */
   public RegisterApplicationMasterResponse 
                registerApplicationMaster(String appHostName,
                                          int appHostPort,
                                          String appTrackingUrl) 
-               throws YarnRemoteException;
+               throws YarnRemoteException, IOException;
   
   /**
    * Request additional containers and receive new container allocations.
@@ -92,9 +95,10 @@ public interface AMRMClient extends Service {
    * @param progressIndicator Indicates progress made by the master
    * @return the response of the allocate request
    * @throws YarnRemoteException
+   * @throws IOException
    */
   public AllocateResponse allocate(float progressIndicator) 
-                           throws YarnRemoteException;
+                           throws YarnRemoteException, IOException;
   
   /**
    * Unregister the application master. This must be called in the end.
@@ -102,11 +106,12 @@ public interface AMRMClient extends Service {
    * @param appMessage Diagnostics message on failure
    * @param appTrackingUrl New URL to get master info
    * @throws YarnRemoteException
+   * @throws IOException
    */
   public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
                                            String appMessage,
                                            String appTrackingUrl) 
-               throws YarnRemoteException;
+               throws YarnRemoteException, IOException;
   
   /**
    * Request containers for resources before calling <code>allocate</code>

+ 8 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.client;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -173,10 +174,12 @@ public class AMRMClientAsync extends AbstractService {
   /**
    * Registers this application master with the resource manager. On successful
    * registration, starts the heartbeating thread.
+   * @throws YarnRemoteException
+   * @throws IOException
    */
   public RegisterApplicationMasterResponse registerApplicationMaster(
       String appHostName, int appHostPort, String appTrackingUrl)
-      throws YarnRemoteException {
+      throws YarnRemoteException, IOException {
     RegisterApplicationMasterResponse response =
         client.registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
     heartbeatThread.start();
@@ -189,9 +192,10 @@ public class AMRMClientAsync extends AbstractService {
    * @param appMessage Diagnostics message on failure
    * @param appTrackingUrl New URL to get master info
    * @throws YarnRemoteException
+   * @throws IOException
    */
   public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
-      String appMessage, String appTrackingUrl) throws YarnRemoteException {
+      String appMessage, String appTrackingUrl) throws YarnRemoteException, IOException {
     synchronized (client) {
       keepRunning = false;
       client.unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
@@ -264,6 +268,8 @@ public class AMRMClientAsync extends AbstractService {
             response = client.allocate(progress);
           } catch (YarnRemoteException ex) {
             LOG.error("Failed to heartbeat", ex);
+          } catch (IOException e) {
+            LOG.error("Failed to heartbeat", e);
           }
         }
         if (response != null) {

+ 4 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java

@@ -134,7 +134,7 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient {
   @Override
   public RegisterApplicationMasterResponse registerApplicationMaster(
       String appHostName, int appHostPort, String appTrackingUrl)
-      throws YarnRemoteException {
+      throws YarnRemoteException, IOException {
     // do this only once ???
     RegisterApplicationMasterRequest request = recordFactory
         .newRecordInstance(RegisterApplicationMasterRequest.class);
@@ -153,7 +153,7 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient {
 
   @Override
   public AllocateResponse allocate(float progressIndicator) 
-      throws YarnRemoteException {
+      throws YarnRemoteException, IOException {
     AllocateResponse allocateResponse = null;
     ArrayList<ResourceRequest> askList = null;
     ArrayList<ContainerId> releaseList = null;
@@ -207,7 +207,8 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient {
 
   @Override
   public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
-      String appMessage, String appTrackingUrl) throws YarnRemoteException {
+      String appMessage, String appTrackingUrl) throws YarnRemoteException,
+      IOException {
     FinishApplicationMasterRequest request = recordFactory
                   .newRecordInstance(FinishApplicationMasterRequest.class);
     request.setAppAttemptId(appAttemptId);

+ 35 - 13
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.client;
 
+import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -58,8 +59,10 @@ public interface YarnClient extends Service {
    * @return response containing the new <code>ApplicationId</code> to be used
    *         to submit an application
    * @throws YarnRemoteException
+   * @throws IOException
    */
-  GetNewApplicationResponse getNewApplication() throws YarnRemoteException;
+  GetNewApplicationResponse getNewApplication() throws YarnRemoteException,
+      IOException;
 
   /**
    * <p>
@@ -73,10 +76,11 @@ public interface YarnClient extends Service {
    *          needed to submit a new application
    * @return {@link ApplicationId} of the accepted application
    * @throws YarnRemoteException
+   * @throws IOException
    * @see #getNewApplication()
    */
   ApplicationId submitApplication(ApplicationSubmissionContext appContext)
-      throws YarnRemoteException;
+      throws YarnRemoteException, IOException;
 
   /**
    * <p>
@@ -88,9 +92,11 @@ public interface YarnClient extends Service {
    * @throws YarnRemoteException
    *           in case of errors or if YARN rejects the request due to
    *           access-control restrictions.
+   * @throws IOException
    * @see #getQueueAclsInfo()
    */
-  void killApplication(ApplicationId applicationId) throws YarnRemoteException;
+  void killApplication(ApplicationId applicationId) throws YarnRemoteException,
+      IOException;
 
   /**
    * <p>
@@ -120,9 +126,10 @@ public interface YarnClient extends Service {
    *          {@link ApplicationId} of the application that needs a report
    * @return application report
    * @throws YarnRemoteException
+   * @throws IOException
    */
   ApplicationReport getApplicationReport(ApplicationId appId)
-      throws YarnRemoteException;
+      throws YarnRemoteException, IOException;
 
   /**
    * <p>
@@ -137,8 +144,10 @@ public interface YarnClient extends Service {
    * 
    * @return a list of reports of all running applications
    * @throws YarnRemoteException
+   * @throws IOException
    */
-  List<ApplicationReport> getApplicationList() throws YarnRemoteException;
+  List<ApplicationReport> getApplicationList() throws YarnRemoteException,
+      IOException;
 
   /**
    * <p>
@@ -147,8 +156,10 @@ public interface YarnClient extends Service {
    * 
    * @return cluster metrics
    * @throws YarnRemoteException
+   * @throws IOException
    */
-  YarnClusterMetrics getYarnClusterMetrics() throws YarnRemoteException;
+  YarnClusterMetrics getYarnClusterMetrics() throws YarnRemoteException,
+      IOException;
 
   /**
    * <p>
@@ -157,8 +168,9 @@ public interface YarnClient extends Service {
    * 
    * @return A list of report of all nodes
    * @throws YarnRemoteException
+   * @throws IOException
    */
-  List<NodeReport> getNodeReports() throws YarnRemoteException;
+  List<NodeReport> getNodeReports() throws YarnRemoteException, IOException;
 
   /**
    * <p>
@@ -170,8 +182,10 @@ public interface YarnClient extends Service {
    * @return a delegation token ({@link DelegationToken}) that can be used to
    *         talk to YARN
    * @throws YarnRemoteException
+   * @throws IOException
    */
-  DelegationToken getRMDelegationToken(Text renewer) throws YarnRemoteException;
+  DelegationToken getRMDelegationToken(Text renewer)
+      throws YarnRemoteException, IOException;
 
   /**
    * <p>
@@ -184,8 +198,10 @@ public interface YarnClient extends Service {
    * @throws YarnRemoteException
    *           in case of errors or if YARN rejects the request due to
    *           access-control restrictions.
+   * @throws IOException
    */
-  QueueInfo getQueueInfo(String queueName) throws YarnRemoteException;
+  QueueInfo getQueueInfo(String queueName) throws YarnRemoteException,
+      IOException;
 
   /**
    * <p>
@@ -195,8 +211,9 @@ public interface YarnClient extends Service {
    * 
    * @return a list of queue-information for all queues
    * @throws YarnRemoteException
+   * @throws IOException
    */
-  List<QueueInfo> getAllQueues() throws YarnRemoteException;
+  List<QueueInfo> getAllQueues() throws YarnRemoteException, IOException;
 
   /**
    * <p>
@@ -205,8 +222,9 @@ public interface YarnClient extends Service {
    * 
    * @return a list of queue-information for all the top-level queues
    * @throws YarnRemoteException
+   * @throws IOException
    */
-  List<QueueInfo> getRootQueueInfos() throws YarnRemoteException;
+  List<QueueInfo> getRootQueueInfos() throws YarnRemoteException, IOException;
 
   /**
    * <p>
@@ -219,8 +237,10 @@ public interface YarnClient extends Service {
    * @return a list of queue-information for all queues who are direct children
    *         of the given parent queue.
    * @throws YarnRemoteException
+   * @throws IOException
    */
-  List<QueueInfo> getChildQueueInfos(String parent) throws YarnRemoteException;
+  List<QueueInfo> getChildQueueInfos(String parent) throws YarnRemoteException,
+      IOException;
 
   /**
    * <p>
@@ -231,6 +251,8 @@ public interface YarnClient extends Service {
    * @return a list of queue acls ({@link QueueUserACLInfo}) for
    *         <em>current user</em>
    * @throws YarnRemoteException
+   * @throws IOException
    */
-  List<QueueUserACLInfo> getQueueAclsInfo() throws YarnRemoteException;
+  List<QueueUserACLInfo> getQueueAclsInfo() throws YarnRemoteException,
+      IOException;
 }

+ 20 - 13
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.client;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
@@ -121,7 +122,7 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
 
   @Override
   public GetNewApplicationResponse getNewApplication()
-      throws YarnRemoteException {
+      throws YarnRemoteException, IOException {
     GetNewApplicationRequest request =
         Records.newRecord(GetNewApplicationRequest.class);
     return rmClient.getNewApplication(request);
@@ -130,7 +131,7 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
   @Override
   public ApplicationId
       submitApplication(ApplicationSubmissionContext appContext)
-          throws YarnRemoteException {
+          throws YarnRemoteException, IOException {
     ApplicationId applicationId = appContext.getApplicationId();
     appContext.setApplicationId(applicationId);
     SubmitApplicationRequest request =
@@ -167,7 +168,7 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
 
   @Override
   public void killApplication(ApplicationId applicationId)
-      throws YarnRemoteException {
+      throws YarnRemoteException, IOException {
     LOG.info("Killing application " + applicationId);
     KillApplicationRequest request =
         Records.newRecord(KillApplicationRequest.class);
@@ -177,7 +178,7 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
 
   @Override
   public ApplicationReport getApplicationReport(ApplicationId appId)
-      throws YarnRemoteException {
+      throws YarnRemoteException, IOException {
     GetApplicationReportRequest request =
         Records.newRecord(GetApplicationReportRequest.class);
     request.setApplicationId(appId);
@@ -188,7 +189,7 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
 
   @Override
   public List<ApplicationReport> getApplicationList()
-      throws YarnRemoteException {
+      throws YarnRemoteException, IOException {
     GetAllApplicationsRequest request =
         Records.newRecord(GetAllApplicationsRequest.class);
     GetAllApplicationsResponse response = rmClient.getAllApplications(request);
@@ -196,7 +197,8 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
   }
 
   @Override
-  public YarnClusterMetrics getYarnClusterMetrics() throws YarnRemoteException {
+  public YarnClusterMetrics getYarnClusterMetrics() throws YarnRemoteException,
+      IOException {
     GetClusterMetricsRequest request =
         Records.newRecord(GetClusterMetricsRequest.class);
     GetClusterMetricsResponse response = rmClient.getClusterMetrics(request);
@@ -204,7 +206,8 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
   }
 
   @Override
-  public List<NodeReport> getNodeReports() throws YarnRemoteException {
+  public List<NodeReport> getNodeReports() throws YarnRemoteException,
+      IOException {
     GetClusterNodesRequest request =
         Records.newRecord(GetClusterNodesRequest.class);
     GetClusterNodesResponse response = rmClient.getClusterNodes(request);
@@ -213,7 +216,7 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
 
   @Override
   public DelegationToken getRMDelegationToken(Text renewer)
-      throws YarnRemoteException {
+      throws YarnRemoteException, IOException {
     /* get the token from RM */
     GetDelegationTokenRequest rmDTRequest =
         Records.newRecord(GetDelegationTokenRequest.class);
@@ -236,7 +239,8 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
   }
 
   @Override
-  public QueueInfo getQueueInfo(String queueName) throws YarnRemoteException {
+  public QueueInfo getQueueInfo(String queueName) throws YarnRemoteException,
+      IOException {
     GetQueueInfoRequest request =
         getQueueInfoRequest(queueName, true, false, false);
     Records.newRecord(GetQueueInfoRequest.class);
@@ -244,14 +248,16 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
   }
 
   @Override
-  public List<QueueUserACLInfo> getQueueAclsInfo() throws YarnRemoteException {
+  public List<QueueUserACLInfo> getQueueAclsInfo() throws YarnRemoteException,
+      IOException {
     GetQueueUserAclsInfoRequest request =
         Records.newRecord(GetQueueUserAclsInfoRequest.class);
     return rmClient.getQueueUserAcls(request).getUserAclsInfoList();
   }
 
   @Override
-  public List<QueueInfo> getAllQueues() throws YarnRemoteException {
+  public List<QueueInfo> getAllQueues() throws YarnRemoteException,
+      IOException {
     List<QueueInfo> queues = new ArrayList<QueueInfo>();
 
     QueueInfo rootQueue =
@@ -262,7 +268,8 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
   }
 
   @Override
-  public List<QueueInfo> getRootQueueInfos() throws YarnRemoteException {
+  public List<QueueInfo> getRootQueueInfos() throws YarnRemoteException,
+      IOException {
     List<QueueInfo> queues = new ArrayList<QueueInfo>();
 
     QueueInfo rootQueue =
@@ -274,7 +281,7 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
 
   @Override
   public List<QueueInfo> getChildQueueInfos(String parent)
-      throws YarnRemoteException {
+      throws YarnRemoteException, IOException {
     List<QueueInfo> queues = new ArrayList<QueueInfo>();
 
     QueueInfo parentQueue =

+ 5 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java

@@ -91,8 +91,9 @@ public class ApplicationCLI extends YarnCLI {
    * Lists all the applications present in the Resource Manager
    * 
    * @throws YarnRemoteException
+   * @throws IOException
    */
-  private void listAllApplications() throws YarnRemoteException {
+  private void listAllApplications() throws YarnRemoteException, IOException {
     PrintWriter writer = new PrintWriter(sysout);
     List<ApplicationReport> appsReport = client.getApplicationList();
 
@@ -117,8 +118,10 @@ public class ApplicationCLI extends YarnCLI {
    * 
    * @param applicationId
    * @throws YarnRemoteException
+   * @throws IOException
    */
-  private void killApplication(String applicationId) throws YarnRemoteException {
+  private void killApplication(String applicationId)
+      throws YarnRemoteException, IOException {
     ApplicationId appId = ConverterUtils.toApplicationId(applicationId);
     sysout.println("Killing application " + applicationId);
     client.killApplication(appId);

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java

@@ -84,8 +84,9 @@ public class NodeCLI extends YarnCLI {
    * Lists all the nodes present in the cluster
    * 
    * @throws YarnRemoteException
+   * @throws IOException
    */
-  private void listClusterNodes() throws YarnRemoteException {
+  private void listClusterNodes() throws YarnRemoteException, IOException {
     PrintWriter writer = new PrintWriter(sysout);
     List<NodeReport> nodesReport = client.getNodeReports();
     writer.println("Total Nodes:" + nodesReport.size());

+ 4 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java

@@ -23,6 +23,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
@@ -69,7 +70,7 @@ public class TestAMRMClient {
   int nodeCount = 3;
   
   @Before
-  public void setup() throws YarnRemoteException {
+  public void setup() throws YarnRemoteException, IOException {
     // start minicluster
     conf = new YarnConfiguration();
     yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
@@ -134,7 +135,7 @@ public class TestAMRMClient {
   }
 
   @Test (timeout=60000)
-  public void testAMRMClient() throws YarnRemoteException {
+  public void testAMRMClient() throws YarnRemoteException, IOException {
     AMRMClientImpl amClient = null;
     try {
       // start am rm client
@@ -158,7 +159,7 @@ public class TestAMRMClient {
   
   
   private void testAllocation(final AMRMClientImpl amClient)  
-      throws YarnRemoteException {
+      throws YarnRemoteException, IOException {
     // setup container request
     final Resource capability = Records.newRecord(Resource.class);
     final Priority priority = Records.newRecord(Priority.class);

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java

@@ -23,6 +23,9 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
 import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
@@ -89,6 +92,8 @@ public class TestYarnClient {
         client.submitApplication(context);
       } catch (YarnRemoteException e) {
         Assert.fail("Exception is not expected.");
+      } catch (IOException e) {
+        Assert.fail("Exception is not expected.");
       }
       verify(((MockYarnClient) client).mockReport,times(4 * i + 4))
           .getYarnApplicationState();
@@ -115,6 +120,8 @@ public class TestYarnClient {
             GetApplicationReportRequest.class))).thenReturn(mockResponse);
       } catch (YarnRemoteException e) {
         Assert.fail("Exception is not expected.");
+      } catch (IOException e) {
+        Assert.fail("Exception is not expected.");
       }
       when(mockResponse.getApplicationReport()).thenReturn(mockReport);
     }

+ 5 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/AMRMProtocolPBClientImpl.java

@@ -68,7 +68,7 @@ public class AMRMProtocolPBClientImpl implements AMRMProtocol, Closeable {
 
   @Override
   public AllocateResponse allocate(AllocateRequest request)
-      throws YarnRemoteException {
+      throws YarnRemoteException, IOException {
     AllocateRequestProto requestProto =
         ((AllocateRequestPBImpl) request).getProto();
     try {
@@ -80,7 +80,8 @@ public class AMRMProtocolPBClientImpl implements AMRMProtocol, Closeable {
 
   @Override
   public FinishApplicationMasterResponse finishApplicationMaster(
-      FinishApplicationMasterRequest request) throws YarnRemoteException {
+      FinishApplicationMasterRequest request) throws YarnRemoteException,
+      IOException {
     FinishApplicationMasterRequestProto requestProto =
         ((FinishApplicationMasterRequestPBImpl) request).getProto();
     try {
@@ -93,7 +94,8 @@ public class AMRMProtocolPBClientImpl implements AMRMProtocol, Closeable {
 
   @Override
   public RegisterApplicationMasterResponse registerApplicationMaster(
-      RegisterApplicationMasterRequest request) throws YarnRemoteException {
+      RegisterApplicationMasterRequest request) throws YarnRemoteException,
+      IOException {
     RegisterApplicationMasterRequestProto requestProto =
         ((RegisterApplicationMasterRequestPBImpl) request).getProto();
     try {

+ 21 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java

@@ -113,7 +113,7 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol,
 
   @Override
   public KillApplicationResponse forceKillApplication(
-      KillApplicationRequest request) throws YarnRemoteException {
+      KillApplicationRequest request) throws YarnRemoteException, IOException {
     KillApplicationRequestProto requestProto =
         ((KillApplicationRequestPBImpl) request).getProto();
     try {
@@ -126,7 +126,8 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol,
 
   @Override
   public GetApplicationReportResponse getApplicationReport(
-      GetApplicationReportRequest request) throws YarnRemoteException {
+      GetApplicationReportRequest request) throws YarnRemoteException,
+      IOException {
     GetApplicationReportRequestProto requestProto =
         ((GetApplicationReportRequestPBImpl) request).getProto();
     try {
@@ -139,7 +140,8 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol,
 
   @Override
   public GetClusterMetricsResponse getClusterMetrics(
-      GetClusterMetricsRequest request) throws YarnRemoteException {
+      GetClusterMetricsRequest request) throws YarnRemoteException,
+      IOException {
     GetClusterMetricsRequestProto requestProto =
         ((GetClusterMetricsRequestPBImpl) request).getProto();
     try {
@@ -152,7 +154,8 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol,
 
   @Override
   public GetNewApplicationResponse getNewApplication(
-      GetNewApplicationRequest request) throws YarnRemoteException {
+      GetNewApplicationRequest request) throws YarnRemoteException,
+      IOException {
     GetNewApplicationRequestProto requestProto =
         ((GetNewApplicationRequestPBImpl) request).getProto();
     try {
@@ -165,7 +168,8 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol,
 
   @Override
   public SubmitApplicationResponse submitApplication(
-      SubmitApplicationRequest request) throws YarnRemoteException {
+      SubmitApplicationRequest request) throws YarnRemoteException,
+      IOException {
     SubmitApplicationRequestProto requestProto =
         ((SubmitApplicationRequestPBImpl) request).getProto();
     try {
@@ -178,7 +182,8 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol,
 
   @Override
   public GetAllApplicationsResponse getAllApplications(
-      GetAllApplicationsRequest request) throws YarnRemoteException {
+      GetAllApplicationsRequest request) throws YarnRemoteException,
+      IOException {
     GetAllApplicationsRequestProto requestProto =
         ((GetAllApplicationsRequestPBImpl) request).getProto();
     try {
@@ -192,7 +197,7 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol,
   @Override
   public GetClusterNodesResponse
       getClusterNodes(GetClusterNodesRequest request)
-          throws YarnRemoteException {
+          throws YarnRemoteException, IOException {
     GetClusterNodesRequestProto requestProto =
         ((GetClusterNodesRequestPBImpl) request).getProto();
     try {
@@ -205,7 +210,7 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol,
 
   @Override
   public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
-      throws YarnRemoteException {
+      throws YarnRemoteException, IOException {
     GetQueueInfoRequestProto requestProto =
         ((GetQueueInfoRequestPBImpl) request).getProto();
     try {
@@ -218,7 +223,8 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol,
 
   @Override
   public GetQueueUserAclsInfoResponse getQueueUserAcls(
-      GetQueueUserAclsInfoRequest request) throws YarnRemoteException {
+      GetQueueUserAclsInfoRequest request) throws YarnRemoteException,
+      IOException {
     GetQueueUserAclsInfoRequestProto requestProto =
         ((GetQueueUserAclsInfoRequestPBImpl) request).getProto();
     try {
@@ -231,7 +237,8 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol,
 
   @Override
   public GetDelegationTokenResponse getDelegationToken(
-      GetDelegationTokenRequest request) throws YarnRemoteException {
+      GetDelegationTokenRequest request) throws YarnRemoteException,
+      IOException {
     GetDelegationTokenRequestProto requestProto =
         ((GetDelegationTokenRequestPBImpl) request).getProto();
     try {
@@ -244,7 +251,8 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol,
 
   @Override
   public RenewDelegationTokenResponse renewDelegationToken(
-      RenewDelegationTokenRequest request) throws YarnRemoteException {
+      RenewDelegationTokenRequest request) throws YarnRemoteException,
+      IOException {
     RenewDelegationTokenRequestProto requestProto = 
         ((RenewDelegationTokenRequestPBImpl) request).getProto();
     try {
@@ -257,7 +265,8 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol,
 
   @Override
   public CancelDelegationTokenResponse cancelDelegationToken(
-      CancelDelegationTokenRequest request) throws YarnRemoteException {
+      CancelDelegationTokenRequest request) throws YarnRemoteException,
+      IOException {
     CancelDelegationTokenRequestProto requestProto =
         ((CancelDelegationTokenRequestPBImpl) request).getProto();
     try {

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/AMRMProtocolPBServiceImpl.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.api.impl.pb.service;
 
+import java.io.IOException;
+
 import org.apache.hadoop.yarn.api.AMRMProtocol;
 import org.apache.hadoop.yarn.api.AMRMProtocolPB;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -57,6 +59,8 @@ public class AMRMProtocolPBServiceImpl implements AMRMProtocolPB {
       return ((AllocateResponsePBImpl)response).getProto();
     } catch (YarnRemoteException e) {
       throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
     }
   }
 
@@ -70,6 +74,8 @@ public class AMRMProtocolPBServiceImpl implements AMRMProtocolPB {
       return ((FinishApplicationMasterResponsePBImpl)response).getProto();
     } catch (YarnRemoteException e) {
       throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
     }
   }
 
@@ -83,6 +89,8 @@ public class AMRMProtocolPBServiceImpl implements AMRMProtocolPB {
       return ((RegisterApplicationMasterResponsePBImpl)response).getProto();
     } catch (YarnRemoteException e) {
       throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
     }
   }
 }

+ 26 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.api.impl.pb.service;
 
+import java.io.IOException;
+
 import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
 import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto;
 import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
@@ -102,6 +104,8 @@ public class ClientRMProtocolPBServiceImpl implements ClientRMProtocolPB {
       return ((KillApplicationResponsePBImpl)response).getProto();
     } catch (YarnRemoteException e) {
       throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
     }
   }
 
@@ -115,6 +119,8 @@ public class ClientRMProtocolPBServiceImpl implements ClientRMProtocolPB {
       return ((GetApplicationReportResponsePBImpl)response).getProto();
     } catch (YarnRemoteException e) {
       throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
     }
   }
 
@@ -127,6 +133,8 @@ public class ClientRMProtocolPBServiceImpl implements ClientRMProtocolPB {
       return ((GetClusterMetricsResponsePBImpl)response).getProto();
     } catch (YarnRemoteException e) {
       throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
     }
   }
 
@@ -140,6 +148,8 @@ public class ClientRMProtocolPBServiceImpl implements ClientRMProtocolPB {
       return ((GetNewApplicationResponsePBImpl)response).getProto();
     } catch (YarnRemoteException e) {
       throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
     }
   }
 
@@ -152,6 +162,8 @@ public class ClientRMProtocolPBServiceImpl implements ClientRMProtocolPB {
       return ((SubmitApplicationResponsePBImpl)response).getProto();
     } catch (YarnRemoteException e) {
       throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
     }
   }
 
@@ -166,6 +178,8 @@ public class ClientRMProtocolPBServiceImpl implements ClientRMProtocolPB {
       return ((GetAllApplicationsResponsePBImpl)response).getProto();
     } catch (YarnRemoteException e) {
       throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
     }
   }
 
@@ -179,6 +193,8 @@ public class ClientRMProtocolPBServiceImpl implements ClientRMProtocolPB {
       return ((GetClusterNodesResponsePBImpl)response).getProto();
     } catch (YarnRemoteException e) {
       throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
     }
   }
 
@@ -192,6 +208,8 @@ public class ClientRMProtocolPBServiceImpl implements ClientRMProtocolPB {
       return ((GetQueueInfoResponsePBImpl)response).getProto();
     } catch (YarnRemoteException e) {
       throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
     }
   }
 
@@ -206,6 +224,8 @@ public class ClientRMProtocolPBServiceImpl implements ClientRMProtocolPB {
       return ((GetQueueUserAclsInfoResponsePBImpl)response).getProto();
     } catch (YarnRemoteException e) {
       throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
     }
   }
 
@@ -220,6 +240,8 @@ public class ClientRMProtocolPBServiceImpl implements ClientRMProtocolPB {
         return ((GetDelegationTokenResponsePBImpl)response).getProto();
       } catch (YarnRemoteException e) {
         throw new ServiceException(e);
+      } catch (IOException e) {
+        throw new ServiceException(e);
       }
   }
 
@@ -234,6 +256,8 @@ public class ClientRMProtocolPBServiceImpl implements ClientRMProtocolPB {
         return ((RenewDelegationTokenResponsePBImpl)response).getProto();
       } catch (YarnRemoteException e) {
         throw new ServiceException(e);
+      } catch (IOException e) {
+        throw new ServiceException(e);
       }
   }
 
@@ -248,6 +272,8 @@ public class ClientRMProtocolPBServiceImpl implements ClientRMProtocolPB {
         return ((CancelDelegationTokenResponsePBImpl)response).getProto();
       } catch (YarnRemoteException e) {
         throw new ServiceException(e);
+      } catch (IOException e) {
+        throw new ServiceException(e);
       }
   }
 }

+ 6 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPCFactories.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 
 import junit.framework.Assert;
@@ -107,21 +108,23 @@ public class TestRPCFactories {
 
     @Override
     public RegisterApplicationMasterResponse registerApplicationMaster(
-        RegisterApplicationMasterRequest request) throws YarnRemoteException {
+        RegisterApplicationMasterRequest request) throws YarnRemoteException,
+        IOException {
       // TODO Auto-generated method stub
       return null;
     }
 
     @Override
     public FinishApplicationMasterResponse finishApplicationMaster(
-        FinishApplicationMasterRequest request) throws YarnRemoteException {
+        FinishApplicationMasterRequest request) throws YarnRemoteException,
+        IOException {
       // TODO Auto-generated method stub
       return null;
     }
 
     @Override
     public AllocateResponse allocate(AllocateRequest request)
-        throws YarnRemoteException {
+        throws YarnRemoteException, IOException {
       // TODO Auto-generated method stub
       return null;
     }

+ 68 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

@@ -21,7 +21,9 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -39,6 +41,11 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContainer;
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContract;
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StrictPreemptionContract;
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionMessage;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -162,7 +169,8 @@ public class ApplicationMasterService extends AbstractService implements
 
   @Override
   public RegisterApplicationMasterResponse registerApplicationMaster(
-      RegisterApplicationMasterRequest request) throws YarnRemoteException {
+      RegisterApplicationMasterRequest request) throws YarnRemoteException,
+      IOException {
 
     ApplicationAttemptId applicationAttemptId = request
         .getApplicationAttemptId();
@@ -211,7 +219,8 @@ public class ApplicationMasterService extends AbstractService implements
 
   @Override
   public FinishApplicationMasterResponse finishApplicationMaster(
-      FinishApplicationMasterRequest request) throws YarnRemoteException {
+      FinishApplicationMasterRequest request) throws YarnRemoteException,
+      IOException {
 
     ApplicationAttemptId applicationAttemptId = request
         .getApplicationAttemptId();
@@ -243,7 +252,7 @@ public class ApplicationMasterService extends AbstractService implements
 
   @Override
   public AllocateResponse allocate(AllocateRequest request)
-      throws YarnRemoteException {
+      throws YarnRemoteException, IOException {
 
     ApplicationAttemptId appAttemptId = request.getApplicationAttemptId();
     authorizeRequest(appAttemptId);
@@ -339,9 +348,65 @@ public class ApplicationMasterService extends AbstractService implements
       }
       
       allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
+   
+      // add preemption to the allocateResponse message (if any)
+      allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation));
+      
       return allocateResponse;
     }
   }
+  
+  private PreemptionMessage generatePreemptionMessage(Allocation allocation){
+    PreemptionMessage pMsg = null;
+    // assemble strict preemption request
+    if (allocation.getStrictContainerPreemptions() != null) {
+       pMsg =
+        recordFactory.newRecordInstance(PreemptionMessage.class);
+      StrictPreemptionContract pStrict =
+          recordFactory.newRecordInstance(StrictPreemptionContract.class);
+      Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
+      for (ContainerId cId : allocation.getStrictContainerPreemptions()) {
+        PreemptionContainer pc =
+            recordFactory.newRecordInstance(PreemptionContainer.class);
+        pc.setId(cId);
+        pCont.add(pc);
+      }
+      pStrict.setContainers(pCont);
+      pMsg.setStrictContract(pStrict);
+    }
+
+    // assemble negotiable preemption request
+    if (allocation.getResourcePreemptions() != null &&
+        allocation.getResourcePreemptions().size() > 0 &&
+        allocation.getContainerPreemptions() != null &&
+        allocation.getContainerPreemptions().size() > 0) {
+      if (pMsg == null) {
+        pMsg =
+            recordFactory.newRecordInstance(PreemptionMessage.class);
+      }
+      PreemptionContract contract =
+          recordFactory.newRecordInstance(PreemptionContract.class);
+      Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
+      for (ContainerId cId : allocation.getContainerPreemptions()) {
+        PreemptionContainer pc =
+            recordFactory.newRecordInstance(PreemptionContainer.class);
+        pc.setId(cId);
+        pCont.add(pc);
+      }
+      List<PreemptionResourceRequest> pRes = new ArrayList<PreemptionResourceRequest>();
+      for (ResourceRequest crr : allocation.getResourcePreemptions()) {
+        PreemptionResourceRequest prr =
+            recordFactory.newRecordInstance(PreemptionResourceRequest.class);
+        prr.setResourceRequest(crr);
+        pRes.add(prr);
+      }
+      contract.setContainers(pCont);
+      contract.setResourceRequest(pRes);
+      pMsg.setContract(contract);
+    }
+    
+    return pMsg;
+  }
 
   public void registerAppAttempt(ApplicationAttemptId attemptId) {
     AllocateResponse response =

+ 40 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java

@@ -19,17 +19,43 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 
 public class Allocation {
+  
+  private final RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
+
   final List<Container> containers;
   final Resource resourceLimit;
-  
+  final Set<ContainerId> strictContainers;
+  final Set<ContainerId> fungibleContainers;
+  final List<ResourceRequest> fungibleResources;
+
   public Allocation(List<Container> containers, Resource resourceLimit) {
+    this(containers, resourceLimit, null, null, null);
+  }
+
+  public Allocation(List<Container> containers, Resource resourceLimit,
+      Set<ContainerId> strictContainers) {
+    this(containers, resourceLimit, strictContainers, null, null);
+  }
+
+  public Allocation(List<Container> containers, Resource resourceLimit,
+      Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
+      List<ResourceRequest> fungibleResources) {
     this.containers = containers;
     this.resourceLimit = resourceLimit;
+    this.strictContainers = strictContainers;
+    this.fungibleContainers = fungibleContainers;
+    this.fungibleResources = fungibleResources;
   }
 
   public List<Container> getContainers() {
@@ -39,5 +65,17 @@ public class Allocation {
   public Resource getResourceLimit() {
     return resourceLimit;
   }
-  
+
+  public Set<ContainerId> getStrictContainerPreemptions() {
+    return strictContainers;
+  }
+
+  public Set<ContainerId> getContainerPreemptions() {
+    return fungibleContainers;
+  }
+
+  public List<ResourceRequest> getResourcePreemptions() {
+    return fungibleResources;
+  }
+    
 }

+ 32 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,13 +60,17 @@ public class QueueMetrics implements MetricsSource {
   @Metric("# of apps failed") MutableGaugeInt appsFailed;
 
   @Metric("Allocated memory in MB") MutableGaugeInt allocatedMB;
+  @Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores;
   @Metric("# of allocated containers") MutableGaugeInt allocatedContainers;
   @Metric("Aggregate # of allocated containers") MutableCounterLong aggregateContainersAllocated;
   @Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased;
   @Metric("Available memory in MB") MutableGaugeInt availableMB;
+  @Metric("Available CPU in virtual cores") MutableGaugeInt availableVCores;
   @Metric("Pending memory allocation in MB") MutableGaugeInt pendingMB;
+  @Metric("Pending CPU allocation in virtual cores") MutableGaugeInt pendingVCores;
   @Metric("# of pending containers") MutableGaugeInt pendingContainers;
   @Metric("# of reserved memory in MB") MutableGaugeInt reservedMB;
+  @Metric("Reserved CPU in virtual cores") MutableGaugeInt reservedVCores;
   @Metric("# of reserved containers") MutableGaugeInt reservedContainers;
   @Metric("# of active users") MutableGaugeInt activeUsers;
   @Metric("# of active users") MutableGaugeInt activeApplications;
@@ -267,6 +272,7 @@ public class QueueMetrics implements MetricsSource {
    */
   public void setAvailableResourcesToQueue(Resource limit) {
     availableMB.set(limit.getMemory());
+    availableVCores.set(limit.getVirtualCores());
   }
 
   /**
@@ -303,6 +309,7 @@ public class QueueMetrics implements MetricsSource {
   private void _incrPendingResources(int containers, Resource res) {
     pendingContainers.incr(containers);
     pendingMB.incr(res.getMemory());
+    pendingVCores.incr(res.getVirtualCores());
   }
 
   public void decrPendingResources(String user, int containers, Resource res) {
@@ -319,12 +326,14 @@ public class QueueMetrics implements MetricsSource {
   private void _decrPendingResources(int containers, Resource res) {
     pendingContainers.decr(containers);
     pendingMB.decr(res.getMemory());
+    pendingVCores.decr(res.getVirtualCores());
   }
 
   public void allocateResources(String user, int containers, Resource res) {
     allocatedContainers.incr(containers);
     aggregateContainersAllocated.incr(containers);
     allocatedMB.incr(res.getMemory() * containers);
+    allocatedVCores.incr(res.getVirtualCores() * containers);
     _decrPendingResources(containers, Resources.multiply(res, containers));
     QueueMetrics userMetrics = getUserMetrics(user);
     if (userMetrics != null) {
@@ -339,6 +348,7 @@ public class QueueMetrics implements MetricsSource {
     allocatedContainers.decr(containers);
     aggregateContainersReleased.incr(containers);
     allocatedMB.decr(res.getMemory() * containers);
+    allocatedVCores.decr(res.getVirtualCores() * containers);
     QueueMetrics userMetrics = getUserMetrics(user);
     if (userMetrics != null) {
       userMetrics.releaseResources(user, containers, res);
@@ -351,6 +361,7 @@ public class QueueMetrics implements MetricsSource {
   public void reserveResource(String user, Resource res) {
     reservedContainers.incr();
     reservedMB.incr(res.getMemory());
+    reservedVCores.incr(res.getVirtualCores());
     QueueMetrics userMetrics = getUserMetrics(user);
     if (userMetrics != null) {
       userMetrics.reserveResource(user, res);
@@ -363,6 +374,7 @@ public class QueueMetrics implements MetricsSource {
   public void unreserveResource(String user, Resource res) {
     reservedContainers.decr();
     reservedMB.decr(res.getMemory());
+    reservedVCores.decr(res.getVirtualCores());
     QueueMetrics userMetrics = getUserMetrics(user);
     if (userMetrics != null) {
       userMetrics.unreserveResource(user, res);
@@ -425,10 +437,18 @@ public class QueueMetrics implements MetricsSource {
   public int getAppsFailed() {
     return appsFailed.value();
   }
+  
+  public Resource getAllocatedResources() {
+    return BuilderUtils.newResource(allocatedMB.value(), 0);
+  }
 
   public int getAllocatedMB() {
     return allocatedMB.value();
   }
+  
+  public int getAllocatedVirtualCores() {
+    return allocatedVCores.value();
+  }
 
   public int getAllocatedContainers() {
     return allocatedContainers.value();
@@ -437,10 +457,18 @@ public class QueueMetrics implements MetricsSource {
   public int getAvailableMB() {
     return availableMB.value();
   }  
+  
+  public int getAvailableVirtualCores() {
+    return availableVCores.value();
+  }
 
   public int getPendingMB() {
     return pendingMB.value();
   }
+  
+  public int getPendingVirtualCores() {
+    return pendingVCores.value();
+  }
 
   public int getPendingContainers() {
     return pendingContainers.value();
@@ -449,6 +477,10 @@ public class QueueMetrics implements MetricsSource {
   public int getReservedMB() {
     return reservedMB.value();
   }
+  
+  public int getReservedVirtualCores() {
+    return reservedVCores.value();
+  }
 
   public int getReservedContainers() {
     return reservedContainers.value();

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java

@@ -28,6 +28,8 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -35,6 +37,8 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 
+@Private
+@Unstable
 public class FSLeafQueue extends FSQueue {
   private static final Log LOG = LogFactory.getLog(
       FSLeafQueue.class.getName());

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java

@@ -25,12 +25,16 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 
+@Private
+@Unstable
 public class FSParentQueue extends FSQueue {
   private static final Log LOG = LogFactory.getLog(
       FSParentQueue.class.getName());

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java

@@ -23,6 +23,8 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -36,6 +38,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 
+@Private
+@Unstable
 public abstract class FSQueue extends Schedulable implements Queue {
   private final String name;
   private final QueueManager queueMgr;

+ 26 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java

@@ -23,11 +23,12 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -57,9 +58,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Multiset;
 
+@Private
+@Unstable
 public class FSSchedulerApp extends SchedulerApplication {
 
   private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class);
@@ -83,7 +87,9 @@ public class FSSchedulerApp extends SchedulerApplication {
 
   final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = 
       new HashMap<Priority, Map<NodeId, RMContainer>>();
-  
+
+  final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
+
   /**
    * Count how many times the application has been given an opportunity
    * to schedule a task at each priority. Each time the scheduler
@@ -230,6 +236,9 @@ public class FSSchedulerApp extends SchedulerApplication {
     Resource containerResource = rmContainer.getContainer().getResource();
     queue.getMetrics().releaseResources(getUser(), 1, containerResource);
     Resources.subtractFrom(currentConsumption, containerResource);
+
+    // remove from preemption map if it is completed
+    preemptionMap.remove(rmContainer);
   }
 
   synchronized public List<Container> pullNewlyAllocatedContainers() {
@@ -306,8 +315,7 @@ public class FSSchedulerApp extends SchedulerApplication {
    * Used only by unit tests
    * @return total current reservations
    */
-  @Stable
-  @Private
+  @VisibleForTesting
   public synchronized Resource getCurrentReservation() {
     return currentReservation;
   }
@@ -572,4 +580,18 @@ public class FSSchedulerApp extends SchedulerApplication {
         " priority " + priority);
     allowedLocalityLevel.put(priority, level);
   }
+
+  // related methods
+  public void addPreemption(RMContainer container, long time) {
+    assert preemptionMap.get(container) == null;
+    preemptionMap.put(container, time);
+  }
+
+  public Long getContainerPreemptionTime(RMContainer container) {
+    return preemptionMap.get(container);
+  }
+
+  public Set<RMContainer> getPreemptionContainers() {
+    return preemptionMap.keySet();
+  }
 }

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java

@@ -25,6 +25,8 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -39,6 +41,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 
+@Private
+@Unstable
 public class FSSchedulerNode extends SchedulerNode {
 
   private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class);

+ 105 - 29
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -24,8 +24,11 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
@@ -155,10 +158,16 @@ public class FairScheduler implements ResourceScheduler {
   private Resource clusterCapacity = 
       RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
 
-  // How often tasks are preempted (must be longer than a couple
+  // How often tasks are preempted 
+  protected long preemptionInterval; 
+  
+  // ms to wait before force killing stuff (must be longer than a couple
   // of heartbeats to give task-kill commands a chance to act).
-  protected long preemptionInterval = 15000;
-
+  protected long waitTimeBeforeKill; 
+  
+  // Containers whose AMs have been warned that they will be preempted soon.
+  private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
+  
   protected boolean preemptionEnabled;
   protected boolean sizeBasedWeight; // Give larger weights to larger jobs
   protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
@@ -225,10 +234,6 @@ public class FairScheduler implements ResourceScheduler {
     // Recursively compute fair shares for all queues
     // and update metrics
     rootQueue.recomputeShares();
-
-    // Update recorded capacity of root queue (child queues are updated
-    // when fair share is calculated).
-    rootMetrics.setAvailableResourcesToQueue(clusterCapacity);
   }
 
   /**
@@ -335,34 +340,78 @@ public class FairScheduler implements ResourceScheduler {
     // Sort containers into reverse order of priority
     Collections.sort(runningContainers, new Comparator<RMContainer>() {
       public int compare(RMContainer c1, RMContainer c2) {
-        return c2.getContainer().getPriority().compareTo(
+        int ret = c2.getContainer().getPriority().compareTo(
             c1.getContainer().getPriority());
+        if (ret == 0) {
+          return c2.getContainerId().compareTo(c1.getContainerId());
+        }
+        return ret;
       }
     });
+    
+    // Scan down the list of containers we've already warned and kill them
+    // if we need to.  Remove any containers from the list that we don't need
+    // or that are no longer running.
+    Iterator<RMContainer> warnedIter = warnedContainers.iterator();
+    Set<RMContainer> preemptedThisRound = new HashSet<RMContainer>();
+    while (warnedIter.hasNext()) {
+      RMContainer container = warnedIter.next();
+      if (container.getState() == RMContainerState.RUNNING &&
+          Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+              toPreempt, Resources.none())) {
+        warnOrKillContainer(container, apps.get(container), queues.get(container));
+        preemptedThisRound.add(container);
+        Resources.subtractFrom(toPreempt, container.getContainer().getResource());
+      } else {
+        warnedIter.remove();
+      }
+    }
 
-    // Scan down the sorted list of task statuses until we've killed enough
-    // tasks, making sure we don't kill too many from any queue
-    for (RMContainer container : runningContainers) {
+    // Scan down the rest of the containers until we've preempted enough, making
+    // sure we don't preempt too many from any queue
+    Iterator<RMContainer> runningIter = runningContainers.iterator();
+    while (runningIter.hasNext() &&
+        Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+            toPreempt, Resources.none())) {
+      RMContainer container = runningIter.next();
       FSLeafQueue sched = queues.get(container);
-      if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
-          sched.getResourceUsage(), sched.getFairShare())) {
-        LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
-            "res=" + container.getContainer().getResource() +
-            ") from queue " + sched.getName());
-        ContainerStatus status = SchedulerUtils.createAbnormalContainerStatus(
+      if (!preemptedThisRound.contains(container) &&
+          Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+              sched.getResourceUsage(), sched.getFairShare())) {
+        warnOrKillContainer(container, apps.get(container), sched);
+        
+        warnedContainers.add(container);
+        Resources.subtractFrom(toPreempt, container.getContainer().getResource());
+      }
+    }
+  }
+  
+  private void warnOrKillContainer(RMContainer container, FSSchedulerApp app,
+      FSLeafQueue queue) {
+    LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
+        "res=" + container.getContainer().getResource() +
+        ") from queue " + queue.getName());
+    
+    Long time = app.getContainerPreemptionTime(container);
+
+    if (time != null) {
+      // if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
+      // proceed with kill
+      if (time + waitTimeBeforeKill < clock.getTime()) {
+        ContainerStatus status =
+          SchedulerUtils.createAbnormalContainerStatus(
             container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
 
         // TODO: Not sure if this ever actually adds this to the list of cleanup
         // containers on the RMNode (see SchedulerNode.releaseContainer()).
         completedContainer(container, status, RMContainerEventType.KILL);
-
-        toPreempt = Resources.subtract(toPreempt,
-            container.getContainer().getResource());
-        if (Resources.lessThanOrEqual(RESOURCE_CALCULATOR, clusterCapacity,
-            toPreempt, Resources.none())) {
-          break;
-        }
+        LOG.info("Killing container" + container +
+            " (after waiting for premption for " +
+            (clock.getTime() - time) + "ms)");
       }
+    } else {
+      // track the request in the FSSchedulerApp itself
+      app.addPreemption(container, clock.getTime());
     }
   }
 
@@ -487,11 +536,11 @@ public class FairScheduler implements ResourceScheduler {
     return clusterCapacity;
   }
 
-  public Clock getClock() {
+  public synchronized Clock getClock() {
     return clock;
   }
 
-  protected void setClock(Clock clock) {
+  protected synchronized void setClock(Clock clock) {
     this.clock = clock;
   }
 
@@ -617,6 +666,7 @@ public class FairScheduler implements ResourceScheduler {
     } else {
       application.containerCompleted(rmContainer, containerStatus, event);
       node.releaseContainer(container);
+      updateRootQueueMetrics();
     }
 
     LOG.info("Application " + applicationAttemptId +
@@ -628,6 +678,7 @@ public class FairScheduler implements ResourceScheduler {
   private synchronized void addNode(RMNode node) {
     nodes.put(node.getNodeID(), new FSSchedulerNode(node));
     Resources.addTo(clusterCapacity, node.getTotalCapability());
+    updateRootQueueMetrics();
 
     LOG.info("Added node " + node.getNodeAddress() +
         " cluster capacity: " + clusterCapacity);
@@ -636,6 +687,7 @@ public class FairScheduler implements ResourceScheduler {
   private synchronized void removeNode(RMNode rmNode) {
     FSSchedulerNode node = nodes.get(rmNode.getNodeID());
     Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability());
+    updateRootQueueMetrics();
 
     // Remove running containers
     List<RMContainer> runningContainers = node.getRunningContainers();
@@ -746,10 +798,18 @@ public class FairScheduler implements ResourceScheduler {
         LOG.debug("allocate:" +
             " applicationAttemptId=" + appAttemptId +
             " #ask=" + ask.size());
-      }
 
+        LOG.debug("Preempting " + application.getPreemptionContainers().size()
+            + " container(s)");
+      }
+      
+      Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>();
+      for (RMContainer container : application.getPreemptionContainers()) {
+        preemptionContainerIds.add(container.getContainerId());
+      }
+      
       return new Allocation(application.pullNewlyAllocatedContainers(),
-          application.getHeadroom());
+          application.getHeadroom(), preemptionContainerIds);
     }
   }
 
@@ -832,6 +892,7 @@ public class FairScheduler implements ResourceScheduler {
         if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
               queueMgr.getRootQueue().assignContainer(node),
               Resources.none())) {
+          assignedContainers++;
           assignedContainer = true;
         }
         if (!assignedContainer) { break; }
@@ -839,6 +900,7 @@ public class FairScheduler implements ResourceScheduler {
         if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; }
       }
     }
+    updateRootQueueMetrics();
   }
 
   @Override
@@ -860,6 +922,18 @@ public class FairScheduler implements ResourceScheduler {
     }
     return new SchedulerAppReport(applications.get(appAttemptId));
   }
+  
+  /**
+   * Subqueue metrics might be a little out of date because fair shares are
+   * recalculated at the update interval, but the root queue metrics needs to
+   * be updated synchronously with allocations and completions so that cluster
+   * metrics will be consistent.
+   */
+  private void updateRootQueueMetrics() {
+    rootMetrics.setAvailableResourcesToQueue(
+        Resources.subtract(
+            clusterCapacity, rootMetrics.getAllocatedResources()));
+  }
 
   @Override
   public QueueMetrics getRootQueueMetrics() {
@@ -950,7 +1024,9 @@ public class FairScheduler implements ResourceScheduler {
     assignMultiple = this.conf.getAssignMultiple();
     maxAssign = this.conf.getMaxAssign();
     sizeBasedWeight = this.conf.getSizeBasedWeight();
-    
+    preemptionInterval = this.conf.getPreemptionInterval();
+    waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
+
     if (!initialized) {
       rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
       this.rmContext = rmContext;

+ 17 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java

@@ -18,12 +18,15 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import java.io.File;
-
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 
+@Private
+@Evolving
 public class FairSchedulerConfiguration extends Configuration {
   public static final String FS_CONFIGURATION_FILE = "fair-scheduler.xml";
 
@@ -52,6 +55,11 @@ public class FairSchedulerConfiguration extends Configuration {
   /** Whether preemption is enabled. */
   protected static final String  PREEMPTION = CONF_PREFIX + "preemption";
   protected static final boolean DEFAULT_PREEMPTION = false;
+  
+  protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval";
+  protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000;
+  protected static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX + "waitTimeBeforeKill";
+  protected static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000;
 
   /** Whether to assign multiple containers in one check-in. */
   protected static final String  ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple";
@@ -120,4 +128,12 @@ public class FairSchedulerConfiguration extends Configuration {
     return get(EVENT_LOG_DIR, new File(System.getProperty("hadoop.log.dir",
     		"/tmp/")).getAbsolutePath() + File.separator + "fairscheduler");
   }
+  
+  public int getPreemptionInterval() {
+    return getInt(PREEMPTION_INTERVAL, DEFAULT_PREEMPTION_INTERVAL);
+  }
+  
+  public int getWaitTimeBeforeKill() {
+    return getInt(WAIT_TIME_BEFORE_KILL, DEFAULT_WAIT_TIME_BEFORE_KILL);
+  }
 }

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java

@@ -22,14 +22,14 @@ import java.util.Comparator;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
 
 @Public
-@Unstable
+@Evolving
 public abstract class SchedulingPolicy {
   private static final ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy> instances =
       new ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy>();

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java

@@ -21,6 +21,8 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.Comparator;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
@@ -29,6 +31,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPo
 
 import com.google.common.annotations.VisibleForTesting;
 
+@Private
+@Unstable
 public class FairSharePolicy extends SchedulingPolicy {
   @VisibleForTesting
   public static final String NAME = "Fairshare";

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java

@@ -21,6 +21,8 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.Comparator;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
@@ -28,6 +30,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPo
 
 import com.google.common.annotations.VisibleForTesting;
 
+@Private
+@Unstable
 public class FifoPolicy extends SchedulingPolicy {
   @VisibleForTesting
   public static final String NAME = "FIFO";

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedAction;
 import java.util.Map;
@@ -199,6 +200,8 @@ public class MockRM extends ResourceManager {
           return client.submitApplication(req);
         } catch (YarnRemoteException e) {
           e.printStackTrace();
+        } catch (IOException e) {
+          e.printStackTrace();
         }
         return null;
       }

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java

@@ -339,7 +339,7 @@ public class TestClientRMTokens {
     DelegationToken token = loggedInUser
         .doAs(new PrivilegedExceptionAction<DelegationToken>() {
           @Override
-          public DelegationToken run() throws YarnRemoteException {
+          public DelegationToken run() throws YarnRemoteException, IOException {
             GetDelegationTokenRequest request = Records
                 .newRecord(GetDelegationTokenRequest.class);
             request.setRenewer(renewerString);
@@ -355,7 +355,7 @@ public class TestClientRMTokens {
       throws IOException, InterruptedException {
     long nextExpTime = loggedInUser.doAs(new PrivilegedExceptionAction<Long>() {
       @Override
-      public Long run() throws YarnRemoteException {
+      public Long run() throws YarnRemoteException, IOException {
         RenewDelegationTokenRequest request = Records
             .newRecord(RenewDelegationTokenRequest.class);
         request.setDelegationToken(dToken);
@@ -371,7 +371,7 @@ public class TestClientRMTokens {
       throws IOException, InterruptedException {
     loggedInUser.doAs(new PrivilegedExceptionAction<Void>() {
       @Override
-      public Void run() throws YarnRemoteException {
+      public Void run() throws YarnRemoteException, IOException {
         CancelDelegationTokenRequest request = Records
             .newRecord(CancelDelegationTokenRequest.class);
         request.setDelegationToken(dToken);

+ 48 - 43
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java

@@ -66,20 +66,20 @@ public class TestQueueMetrics {
     MetricsSource userSource = userSource(ms, queueName, user);
     checkApps(queueSource, 1, 1, 0, 0, 0, 0);
 
-    metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB));
-    metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
+    metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
+    metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
     // Available resources is set externally, as it depends on dynamic
     // configurable cluster/queue resources
-    checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0);
+    checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
 
     metrics.incrAppsRunning(app, user);
     checkApps(queueSource, 1, 0, 1, 0, 0, 0);
 
-    metrics.allocateResources(user, 3, Resources.createResource(2*GB));
-    checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 0, 0);
+    metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
+    checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
 
-    metrics.releaseResources(user, 1, Resources.createResource(2*GB));
-    checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0);
+    metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
+    checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
 
     metrics.finishApp(app, RMAppAttemptState.FINISHED);
     checkApps(queueSource, 1, 0, 0, 1, 0, 0);
@@ -148,25 +148,25 @@ public class TestQueueMetrics {
     checkApps(queueSource, 1, 1, 0, 0, 0, 0);
     checkApps(userSource, 1, 1, 0, 0, 0, 0);
 
-    metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB));
-    metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB));
-    metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
+    metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
+    metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
+    metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
     // Available resources is set externally, as it depends on dynamic
     // configurable cluster/queue resources
-    checkResources(queueSource, 0, 0, 0, 0,  100*GB, 15*GB, 5, 0, 0);
-    checkResources(userSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0);
+    checkResources(queueSource, 0, 0, 0, 0, 0,  100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
+    checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
 
     metrics.incrAppsRunning(app, user);
     checkApps(queueSource, 1, 0, 1, 0, 0, 0);
     checkApps(userSource, 1, 0, 1, 0, 0, 0);
 
-    metrics.allocateResources(user, 3, Resources.createResource(2*GB));
-    checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 0, 0);
-    checkResources(userSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 0, 0);
+    metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
+    checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
+    checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
 
-    metrics.releaseResources(user, 1, Resources.createResource(2*GB));
-    checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0);
-    checkResources(userSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0);
+    metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
+    checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
+    checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
 
     metrics.finishApp(app, RMAppAttemptState.FINISHED);
     checkApps(queueSource, 1, 0, 0, 1, 0, 0);
@@ -197,35 +197,35 @@ public class TestQueueMetrics {
     checkApps(userSource, 1, 1, 0, 0, 0, 0);
     checkApps(parentUserSource, 1, 1, 0, 0, 0, 0);
 
-    parentMetrics.setAvailableResourcesToQueue(Resources.createResource(100*GB));
-    metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB));
-    parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB));
-    metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB));
-    metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
-    checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0);
-    checkResources(parentQueueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0);
-    checkResources(userSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0);
-    checkResources(parentUserSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0);
+    parentMetrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
+    metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
+    parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
+    metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
+    metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
+    checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
+    checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
+    checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
+    checkResources(parentUserSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
 
     metrics.incrAppsRunning(app, user);
     checkApps(queueSource, 1, 0, 1, 0, 0, 0);
     checkApps(userSource, 1, 0, 1, 0, 0, 0);
 
-    metrics.allocateResources(user, 3, Resources.createResource(2*GB));
-    metrics.reserveResource(user, Resources.createResource(3*GB));
+    metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
+    metrics.reserveResource(user, Resources.createResource(3*GB, 3));
     // Available resources is set externally, as it depends on dynamic
     // configurable cluster/queue resources
-    checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 3*GB, 1);
-    checkResources(parentQueueSource, 6*GB, 3, 3, 0,  100*GB, 9*GB, 2, 3*GB, 1);
-    checkResources(userSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 3*GB, 1);
-    checkResources(parentUserSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 3*GB, 1);
-
-    metrics.releaseResources(user, 1, Resources.createResource(2*GB));
-    metrics.unreserveResource(user, Resources.createResource(3*GB));
-    checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0);
-    checkResources(parentQueueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0);
-    checkResources(userSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0);
-    checkResources(parentUserSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0);
+    checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1);
+    checkResources(parentQueueSource, 6*GB, 6, 3, 3, 0,  100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1);
+    checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1);
+    checkResources(parentUserSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1);
+
+    metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
+    metrics.unreserveResource(user, Resources.createResource(3*GB, 3));
+    checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
+    checkResources(parentQueueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
+    checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
+    checkResources(parentUserSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
 
     metrics.finishApp(app, RMAppAttemptState.FINISHED);
     checkApps(queueSource, 1, 0, 0, 1, 0, 0);
@@ -277,18 +277,23 @@ public class TestQueueMetrics {
   }
 
   public static void checkResources(MetricsSource source, int allocatedMB,
-      int allocCtnrs, long aggreAllocCtnrs, long aggreReleasedCtnrs, 
-      int availableMB, int pendingMB, int pendingCtnrs,
-      int reservedMB, int reservedCtnrs) {
+      int allocatedCores, int allocCtnrs, long aggreAllocCtnrs,
+      long aggreReleasedCtnrs, int availableMB, int availableCores, int pendingMB,
+      int pendingCores, int pendingCtnrs, int reservedMB, int reservedCores,
+      int reservedCtnrs) {
     MetricsRecordBuilder rb = getMetrics(source);
     assertGauge("AllocatedMB", allocatedMB, rb);
+    assertGauge("AllocatedVCores", allocatedCores, rb);
     assertGauge("AllocatedContainers", allocCtnrs, rb);
     assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb);
     assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb);
     assertGauge("AvailableMB", availableMB, rb);
+    assertGauge("AvailableVCores", availableCores, rb);
     assertGauge("PendingMB", pendingMB, rb);
+    assertGauge("PendingVCores", pendingCores, rb);
     assertGauge("PendingContainers", pendingCtnrs, rb);
     assertGauge("ReservedMB", reservedMB, rb);
+    assertGauge("ReservedVCores", reservedCores, rb);
     assertGauge("ReservedContainers", reservedCtnrs, rb);
   }
 

+ 96 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -30,6 +30,7 @@ import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
@@ -127,6 +129,7 @@ public class TestFairScheduler {
   public void tearDown() {
     scheduler = null;
     resourceManager = null;
+    QueueMetrics.clearQueueMetrics();
   }
 
   private Configuration createConfiguration() {
@@ -336,6 +339,13 @@ public class TestFairScheduler {
 
     assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
       getResourceUsage().getMemory());
+
+    // verify metrics
+    QueueMetrics queue1Metrics = scheduler.getQueueManager().getQueue("queue1")
+        .getMetrics();
+    assertEquals(1024, queue1Metrics.getAllocatedMB());
+    assertEquals(1024, scheduler.getRootQueueMetrics().getAllocatedMB());
+    assertEquals(512, scheduler.getRootQueueMetrics().getAvailableMB());
   }
 
   @Test (timeout = 5000)
@@ -891,9 +901,16 @@ public class TestFairScheduler {
    */
   public void testChoiceOfPreemptedContainers() throws Exception {
     Configuration conf = createConfiguration();
+    
+    conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
+    conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); 
+    
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+    
+    MockClock clock = new MockClock();
+    scheduler.setClock(clock);
+    
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
     out.println("<allocations>");
@@ -988,15 +1005,38 @@ public class TestFairScheduler {
         Resources.createResource(2 * 1024));
     assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
     assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size());
-    assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
     assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size());
     assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size());
+    
+    // First verify we are adding containers to preemption list for the application
+    assertTrue(!Collections.disjoint(scheduler.applications.get(app3).getLiveContainers(),
+                                     scheduler.applications.get(app3).getPreemptionContainers()));
+    assertTrue(!Collections.disjoint(scheduler.applications.get(app6).getLiveContainers(),
+                                     scheduler.applications.get(app6).getPreemptionContainers()));
+
+    // Pretend 15 seconds have passed
+    clock.tick(15);
+
+    // Trigger a kill by insisting we want containers back
+    scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
+        Resources.createResource(2 * 1024));
+
+    // At this point the containers should have been killed (since we are not simulating AM)
     assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
+    assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
+
+    // Trigger a kill by insisting we want containers back
+    scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
+        Resources.createResource(2 * 1024));
+
+    // Pretend 15 seconds have passed
+    clock.tick(15);
 
     // We should be able to claw back another container from A and B each.
     // Make sure it is lowest priority container.
     scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
         Resources.createResource(2 * 1024));
+    
     assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
     assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size());
     assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
@@ -1245,6 +1285,7 @@ public class TestFairScheduler {
     scheduler.handle(updateEvent);
 
     assertEquals(1, app.getLiveContainers().size());
+    assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
     
     // Create request at higher priority
     createSchedulingRequestExistingApplication(1024, 1, attId);
@@ -1260,6 +1301,7 @@ public class TestFairScheduler {
     // Complete container
     scheduler.allocate(attId, new ArrayList<ResourceRequest>(),
         Arrays.asList(containerId));
+    assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB());
     
     // Schedule at opening
     scheduler.update();
@@ -1271,6 +1313,7 @@ public class TestFairScheduler {
     for (RMContainer liveContainer : liveContainers) {
       Assert.assertEquals(2, liveContainer.getContainer().getPriority().getPriority());
     }
+    assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
   }
   
   @Test
@@ -1382,6 +1425,37 @@ public class TestFairScheduler {
     assertEquals(1, app2.getLiveContainers().size());
   }
 
+  @Test(timeout = 3000)
+  public void testMaxAssign() throws AllocationConfigurationException {
+    // set required scheduler configs
+    scheduler.assignMultiple = true;
+    scheduler.getQueueManager().getLeafQueue("root.default")
+        .setPolicy(SchedulingPolicy.getDefault());
+
+    RMNode node = MockNodes.newNodeInfo(1, Resources.createResource(16384));
+    NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+    scheduler.handle(nodeEvent);
+
+    ApplicationAttemptId attId =
+        createSchedulingRequest(1024, "root.default", "user", 8);
+    FSSchedulerApp app = scheduler.applications.get(attId);
+
+    // set maxAssign to 2: only 2 containers should be allocated
+    scheduler.maxAssign = 2;
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    assertEquals("Incorrect number of containers allocated", 2, app
+        .getLiveContainers().size());
+
+    // set maxAssign to -1: all remaining containers should be allocated
+    scheduler.maxAssign = -1;
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    assertEquals("Incorrect number of containers allocated", 8, app
+        .getLiveContainers().size());
+  }
+
   /**
    * Test to verify the behavior of
    * {@link FSQueue#assignContainer(FSSchedulerNode)})
@@ -1544,4 +1618,24 @@ public class TestFairScheduler {
     assertEquals(1, app.getLiveContainers().size());
     assertEquals(0, app.getReservedContainers().size());
   }
+  
+  @Test
+  public void testRemoveNodeUpdatesRootQueueMetrics() {
+    assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
+    
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
+    NodeAddedSchedulerEvent addEvent = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(addEvent);
+    
+    assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB());
+    scheduler.update(); // update shouldn't change things
+    assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB());
+    
+    NodeRemovedSchedulerEvent removeEvent = new NodeRemovedSchedulerEvent(node1);
+    scheduler.handle(removeEvent);
+    
+    assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
+    scheduler.update(); // update shouldn't change things
+    assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
+  }
 }

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java

@@ -485,7 +485,8 @@ public class TestContainerManagerSecurity {
   }
 
   private Container requestAndGetContainer(AMRMProtocol scheduler,
-      ApplicationId appID) throws YarnRemoteException, InterruptedException {
+      ApplicationId appID) throws YarnRemoteException, InterruptedException,
+      IOException {
 
     // Request a container allocation.
     List<ResourceRequest> ask = new ArrayList<ResourceRequest>();

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.webproxy;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 
 import org.apache.commons.logging.Log;
@@ -78,9 +79,10 @@ public class AppReportFetcher {
    * @param appId the id of the application to get. 
    * @return the ApplicationReport for that app.
    * @throws YarnRemoteException on any error.
+   * @throws IOException
    */
   public ApplicationReport getApplicationReport(ApplicationId appId)
-  throws YarnRemoteException {
+  throws YarnRemoteException, IOException {
     GetApplicationReportRequest request = recordFactory
         .newRecordInstance(GetApplicationReportRequest.class);
     request.setApplicationId(appId);