Ver código fonte

YARN-2155. FairScheduler: Incorrect threshold check for preemption. (Wei Yan via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1602297 13f79535-47bb-0310-9956-ffa450edef68
Karthik Kambatla 11 anos atrás
pai
commit
c43ab7f08e
14 arquivos alterados com 159 adições e 25 exclusões
  1. 3 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 7 6
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
  3. 1 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
  4. 1 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SetFile.java
  5. 34 5
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WritableComparator.java
  6. 65 2
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWritable.java
  7. 2 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
  8. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java
  9. 21 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java
  10. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java
  11. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/WrappedRecordReader.java
  12. 3 0
      hadoop-yarn-project/CHANGES.txt
  13. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  14. 17 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -200,6 +200,9 @@ Release 2.5.0 - UNRELEASED
 
     HADOOP-10622. Shell.runCommand can deadlock (Gera Shegalov via jlowe)
 
+    HADOOP-10686. Writables are not always configured. 
+    (Abraham Elmahrek via kasha)
+
   BREAKDOWN OF HADOOP-10514 SUBTASKS AND RELATED JIRAS
 
     HADOOP-10520. Extended attributes definition and FileSystem APIs for

+ 7 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java

@@ -256,7 +256,7 @@ public class MapFile {
       } else {
         keyClass= 
           (Class<? extends WritableComparable>) keyClassOption.getValue();
-        this.comparator = WritableComparator.get(keyClass);
+        this.comparator = WritableComparator.get(keyClass, conf);
       }
       this.lastKey = comparator.newKey();
       FileSystem fs = dirName.getFileSystem(conf);
@@ -428,12 +428,13 @@ public class MapFile {
       this.data = createDataFileReader(dataFile, conf, options);
       this.firstPosition = data.getPosition();
 
-      if (comparator == null)
-        this.comparator = 
-          WritableComparator.get(data.getKeyClass().
-                                   asSubclass(WritableComparable.class));
-      else
+      if (comparator == null) {
+        Class<? extends WritableComparable> cls;
+        cls = data.getKeyClass().asSubclass(WritableComparable.class);
+        this.comparator = WritableComparator.get(cls, conf);
+      } else {
         this.comparator = comparator;
+      }
 
       // open the index
       SequenceFile.Reader.Option[] indexOptions =

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java

@@ -2676,7 +2676,7 @@ public class SequenceFile {
     /** Sort and merge files containing the named classes. */
     public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
                   Class valClass, Configuration conf)  {
-      this(fs, WritableComparator.get(keyClass), keyClass, valClass, conf);
+      this(fs, WritableComparator.get(keyClass, conf), keyClass, valClass, conf);
     }
 
     /** Sort and merge using an arbitrary {@link RawComparator}. */

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SetFile.java

@@ -52,7 +52,7 @@ public class SetFile extends MapFile {
                   Class<? extends WritableComparable> keyClass,
                   SequenceFile.CompressionType compress)
       throws IOException {
-      this(conf, fs, dirName, WritableComparator.get(keyClass), compress);
+      this(conf, fs, dirName, WritableComparator.get(keyClass, conf), compress);
     }
 
     /** Create a set naming the element comparator and compression type. */

+ 34 - 5
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WritableComparator.java

@@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /** A Comparator for {@link WritableComparable}s.
@@ -37,13 +39,21 @@ import org.apache.hadoop.util.ReflectionUtils;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class WritableComparator implements RawComparator {
+public class WritableComparator implements RawComparator, Configurable {
 
   private static final ConcurrentHashMap<Class, WritableComparator> comparators 
           = new ConcurrentHashMap<Class, WritableComparator>(); // registry
 
-  /** Get a comparator for a {@link WritableComparable} implementation. */
+  private Configuration conf;
+
+  /** For backwards compatibility. **/
   public static WritableComparator get(Class<? extends WritableComparable> c) {
+    return get(c, null);
+  }
+
+  /** Get a comparator for a {@link WritableComparable} implementation. */
+  public static WritableComparator get(
+      Class<? extends WritableComparable> c, Configuration conf) {
     WritableComparator comparator = comparators.get(c);
     if (comparator == null) {
       // force the static initializers to run
@@ -52,12 +62,24 @@ public class WritableComparator implements RawComparator {
       comparator = comparators.get(c);
       // if not, use the generic one
       if (comparator == null) {
-        comparator = new WritableComparator(c, true);
+        comparator = new WritableComparator(c, conf, true);
       }
     }
+    // Newly passed Configuration objects should be used.
+    ReflectionUtils.setConf(comparator, conf);
     return comparator;
   }
 
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
   /**
    * Force initialization of the static members.
    * As of Java 5, referencing a class doesn't force it to initialize. Since
@@ -91,12 +113,19 @@ public class WritableComparator implements RawComparator {
 
   /** Construct for a {@link WritableComparable} implementation. */
   protected WritableComparator(Class<? extends WritableComparable> keyClass) {
-    this(keyClass, false);
+    this(keyClass, null, false);
   }
 
   protected WritableComparator(Class<? extends WritableComparable> keyClass,
       boolean createInstances) {
+    this(keyClass, null, createInstances);
+  }
+
+  protected WritableComparator(Class<? extends WritableComparable> keyClass,
+                               Configuration conf,
+                               boolean createInstances) {
     this.keyClass = keyClass;
+    this.conf = (conf != null) ? conf : new Configuration();
     if (createInstances) {
       key1 = newKey();
       key2 = newKey();
@@ -112,7 +141,7 @@ public class WritableComparator implements RawComparator {
 
   /** Construct a new {@link WritableComparable} instance. */
   public WritableComparable newKey() {
-    return ReflectionUtils.newInstance(keyClass, null);
+    return ReflectionUtils.newInstance(keyClass, conf);
   }
 
   /** Optimization hook.  Override this to make SequenceFile.Sorter's scream.

+ 65 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWritable.java

@@ -23,6 +23,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Random;
 
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -30,6 +31,11 @@ import junit.framework.TestCase;
 
 /** Unit tests for Writable. */
 public class TestWritable extends TestCase {
+private static final String TEST_CONFIG_PARAM = "frob.test";
+private static final String TEST_CONFIG_VALUE = "test";
+private static final String TEST_WRITABLE_CONFIG_PARAM = "test.writable";
+private static final String TEST_WRITABLE_CONFIG_VALUE = TEST_CONFIG_VALUE;
+
   public TestWritable(String name) { super(name); }
 
   /** Example class used in test cases below. */
@@ -64,6 +70,25 @@ public class TestWritable extends TestCase {
     }
   }
 
+  public static class SimpleWritableComparable extends SimpleWritable
+      implements WritableComparable<SimpleWritableComparable>, Configurable {
+    private Configuration conf;
+
+    public SimpleWritableComparable() {}
+
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+    }
+
+    public Configuration getConf() {
+      return this.conf;
+    }
+
+    public int compareTo(SimpleWritableComparable o) {
+      return this.state - o.state;
+    }
+  }
+
   /** Test 1: Check that SimpleWritable. */
   public void testSimpleWritable() throws Exception {
     testWritable(new SimpleWritable());
@@ -121,9 +146,34 @@ public class TestWritable extends TestCase {
     @Override public int compareTo(Frob o) { return 0; }
   }
 
-  /** Test that comparator is defined. */
+  /** Test that comparator is defined and configured. */
   public static void testGetComparator() throws Exception {
-    assert(WritableComparator.get(Frob.class) instanceof FrobComparator);
+    Configuration conf = new Configuration();
+
+    // Without conf.
+    WritableComparator frobComparator = WritableComparator.get(Frob.class);
+    assert(frobComparator instanceof FrobComparator);
+    assertNotNull(frobComparator.getConf());
+    assertNull(frobComparator.getConf().get(TEST_CONFIG_PARAM));
+
+    // With conf.
+    conf.set(TEST_CONFIG_PARAM, TEST_CONFIG_VALUE);
+    frobComparator = WritableComparator.get(Frob.class, conf);
+    assert(frobComparator instanceof FrobComparator);
+    assertNotNull(frobComparator.getConf());
+    assertEquals(conf.get(TEST_CONFIG_PARAM), TEST_CONFIG_VALUE);
+
+    // Without conf. should reuse configuration.
+    frobComparator = WritableComparator.get(Frob.class);
+    assert(frobComparator instanceof FrobComparator);
+    assertNotNull(frobComparator.getConf());
+    assertEquals(conf.get(TEST_CONFIG_PARAM), TEST_CONFIG_VALUE);
+
+    // New conf. should use new configuration.
+    frobComparator = WritableComparator.get(Frob.class, new Configuration());
+    assert(frobComparator instanceof FrobComparator);
+    assertNotNull(frobComparator.getConf());
+    assertNull(frobComparator.getConf().get(TEST_CONFIG_PARAM));
   }
 
   /**
@@ -153,4 +203,17 @@ public class TestWritable extends TestCase {
         .compare(writable1, writable3) == 0);
   }
 
+  /**
+   * Test that Writable's are configured by Comparator.
+   */
+  public void testConfigurableWritableComparator() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(TEST_WRITABLE_CONFIG_PARAM, TEST_WRITABLE_CONFIG_VALUE);
+
+    WritableComparator wc = WritableComparator.get(SimpleWritableComparable.class, conf);
+    SimpleWritableComparable key = ((SimpleWritableComparable)wc.newKey());
+    assertNotNull(wc.getConf());
+    assertNotNull(key.getConf());
+    assertEquals(key.getConf().get(TEST_WRITABLE_CONFIG_PARAM), TEST_WRITABLE_CONFIG_VALUE);
+  }
 }

+ 2 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java

@@ -112,7 +112,7 @@ import org.apache.log4j.Level;
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class JobConf extends Configuration {
-  
+
   private static final Log LOG = LogFactory.getLog(JobConf.class);
 
   static{
@@ -882,7 +882,7 @@ public class JobConf extends Configuration {
       JobContext.KEY_COMPARATOR, null, RawComparator.class);
     if (theClass != null)
       return ReflectionUtils.newInstance(theClass, this);
-    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));
+    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
   }
 
   /**

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java

@@ -131,7 +131,7 @@ public abstract class CompositeRecordReader<
   public void add(ComposableRecordReader<K,? extends V> rr) throws IOException {
     kids[rr.id()] = rr;
     if (null == q) {
-      cmp = WritableComparator.get(rr.createKey().getClass());
+      cmp = WritableComparator.get(rr.createKey().getClass(), conf);
       q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
           new Comparator<ComposableRecordReader<K,?>>() {
             public int compare(ComposableRecordReader<K,?> o1,

+ 21 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java

@@ -22,6 +22,8 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
@@ -38,7 +40,7 @@ import org.apache.hadoop.mapred.RecordReader;
 @InterfaceStability.Stable
 public class WrappedRecordReader<K extends WritableComparable,
                           U extends Writable>
-    implements ComposableRecordReader<K,U> {
+    implements ComposableRecordReader<K,U>, Configurable {
 
   private boolean empty = false;
   private RecordReader<K,U> rr;
@@ -47,6 +49,7 @@ public class WrappedRecordReader<K extends WritableComparable,
   private K khead; // key at the top of this RR
   private U vhead; // value assoc with khead
   private WritableComparator cmp;
+  private Configuration conf;
 
   private ResetableIterator<U> vjoin;
 
@@ -55,13 +58,20 @@ public class WrappedRecordReader<K extends WritableComparable,
    */
   WrappedRecordReader(int id, RecordReader<K,U> rr,
       Class<? extends WritableComparator> cmpcl) throws IOException {
+    this(id, rr, cmpcl, null);
+  }
+
+  WrappedRecordReader(int id, RecordReader<K,U> rr,
+                      Class<? extends WritableComparator> cmpcl,
+                      Configuration conf) throws IOException {
     this.id = id;
     this.rr = rr;
+    this.conf = (conf == null) ? new Configuration() : conf;
     khead = rr.createKey();
     vhead = rr.createValue();
     try {
       cmp = (null == cmpcl)
-        ? WritableComparator.get(khead.getClass())
+        ? WritableComparator.get(khead.getClass(), this.conf)
         : cmpcl.newInstance();
     } catch (InstantiationException e) {
       throw (IOException)new IOException().initCause(e);
@@ -207,4 +217,13 @@ public class WrappedRecordReader<K extends WritableComparable,
     return 42;
   }
 
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
 }

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java

@@ -109,7 +109,7 @@ public abstract class CompositeRecordReader<
         }
         // create priority queue
         if (null == q) {
-          cmp = WritableComparator.get(keyclass);
+          cmp = WritableComparator.get(keyclass, conf);
           q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
                 new Comparator<ComposableRecordReader<K,?>>() {
                   public int compare(ComposableRecordReader<K,?> o1,

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/WrappedRecordReader.java

@@ -92,7 +92,7 @@ public class WrappedRecordReader<K extends WritableComparable<?>,
       keyclass = key.getClass().asSubclass(WritableComparable.class);
       valueclass = value.getClass();
       if (cmp == null) {
-        cmp = WritableComparator.get(keyclass);
+        cmp = WritableComparator.get(keyclass, conf);
       }
     }
   }

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -224,6 +224,9 @@ Release 2.5.0 - UNRELEASED
     YARN-2075. Fixed the test failure of TestRMAdminCLI. (Kenji Kikushima via
     zjshen)
 
+    YARN-2155. FairScheduler: Incorrect threshold check for preemption.
+    (Wei Yan via kasha)
+
 Release 2.4.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -1072,8 +1072,8 @@ public class FairScheduler extends
   private boolean shouldAttemptPreemption() {
     if (preemptionEnabled) {
       return (preemptionUtilizationThreshold < Math.max(
-          (float) rootMetrics.getAvailableMB() / clusterResource.getMemory(),
-          (float) rootMetrics.getAvailableVirtualCores() /
+          (float) rootMetrics.getAllocatedMB() / clusterResource.getMemory(),
+          (float) rootMetrics.getAllocatedVirtualCores() /
               clusterResource.getVirtualCores()));
     }
     return false;

+ 17 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java

@@ -146,7 +146,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     // Create node with 4GB memory and 4 vcores
     registerNodeAndSubmitApp(4 * 1024, 4, 2, 1024);
 
-    // Verify submitting another request doesn't trigger preemption
+    // Verify submitting another request triggers preemption
     createSchedulingRequest(1024, "queueB", "user1", 1, 1);
     scheduler.update();
     clock.tick(6);
@@ -171,5 +171,21 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     scheduler.preemptTasksIfNecessary();
     assertEquals("preemptResources() should not have been called", -1,
         ((StubbedFairScheduler) scheduler).lastPreemptMemory);
+
+    resourceManager.stop();
+
+    startResourceManager(0.7f);
+    // Create node with 4GB memory and 4 vcores
+    registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024);
+
+    // Verify submitting another request triggers preemption
+    createSchedulingRequest(1024, "queueB", "user1", 1, 1);
+    scheduler.update();
+    clock.tick(6);
+
+    ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
+    scheduler.preemptTasksIfNecessary();
+    assertEquals("preemptResources() should have been called", 1024,
+        ((StubbedFairScheduler) scheduler).lastPreemptMemory);
   }
 }