Browse Source

Merge r1231828 through r1232270 from 0.23.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1232275 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 13 years ago
parent
commit
03d05902e4
19 changed files with 616 additions and 265 deletions
  1. 4 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
  3. 181 132
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
  4. 32 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
  5. 121 57
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java
  6. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
  7. 4 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
  8. 13 0
      hadoop-mapreduce-project/CHANGES.txt
  9. 18 14
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java
  10. 1 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
  11. 1 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
  12. 19 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java
  13. 48 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
  14. 3 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
  15. 29 23
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  16. 36 18
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
  17. 15 7
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
  18. 85 5
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
  19. 2 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/Federation.apt.vm

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -189,6 +189,10 @@ Release 0.23.1 - UNRELEASED
 
     HDFS-2788. HdfsServerConstants#DN_KEEPALIVE_TIMEOUT is dead code (eli)
 
+    HDFS-362.  FSEditLog should not writes long and short as UTF8, and should
+    not use ArrayWritable for writing non-array items.  (Uma Maheswara Rao G
+    via szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-2130. Switch default checksum to CRC32C. (todd)

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java

@@ -80,7 +80,9 @@ public class LayoutVersion {
     FEDERATION(-35, "Support for namenode federation"),
     LEASE_REASSIGNMENT(-36, "Support for persisting lease holder reassignment"),
     STORED_TXIDS(-37, "Transaction IDs are stored in edits log and image files"),
-    TXID_BASED_LAYOUT(-38, "File names in NN Storage are based on transaction IDs");
+    TXID_BASED_LAYOUT(-38, "File names in NN Storage are based on transaction IDs"), 
+    EDITLOG_OP_OPTIMIZATION(-39,
+        "Use LongWritable and ShortWritable directly instead of ArrayWritable of UTF8");
     
     final int lv;
     final int ancestorLV;

+ 181 - 132
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java

@@ -40,7 +40,6 @@ import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
@@ -190,19 +189,17 @@ public abstract class FSEditLogOp {
 
     @Override 
     void writeFields(DataOutputStream out) throws IOException {
-      DeprecatedUTF8 nameReplicationPair[] = new DeprecatedUTF8[] { 
-        new DeprecatedUTF8(path), 
-        toLogReplication(replication),
-        toLogLong(mtime),
-        toLogLong(atime),
-        toLogLong(blockSize)};
-      new ArrayWritable(DeprecatedUTF8.class, nameReplicationPair).write(out);
+      FSImageSerialization.writeString(path, out);
+      FSImageSerialization.writeShort(replication, out);
+      FSImageSerialization.writeLong(mtime, out);
+      FSImageSerialization.writeLong(atime, out);
+      FSImageSerialization.writeLong(blockSize, out);
       new ArrayWritable(Block.class, blocks).write(out);
       permissions.write(out);
 
       if (this.opCode == OP_ADD) {
-        new DeprecatedUTF8(clientName).write(out);
-        new DeprecatedUTF8(clientMachine).write(out);
+        FSImageSerialization.writeString(clientName,out);
+        FSImageSerialization.writeString(clientMachine,out);
       }
     }
 
@@ -211,25 +208,43 @@ public abstract class FSEditLogOp {
         throws IOException {
       // versions > 0 support per file replication
       // get name and replication
-      this.length = in.readInt();
+      if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+        this.length = in.readInt();
+      }
       if (-7 == logVersion && length != 3||
           -17 < logVersion && logVersion < -7 && length != 4 ||
-          logVersion <= -17 && length != 5) {
+          (logVersion <= -17 && length != 5 && !LayoutVersion.supports(
+              Feature.EDITLOG_OP_OPTIMIZATION, logVersion))) {
         throw new IOException("Incorrect data format."  +
                               " logVersion is " + logVersion +
                               " but writables.length is " +
                               length + ". ");
       }
       this.path = FSImageSerialization.readString(in);
-      this.replication = readShort(in);
-      this.mtime = readLong(in);
+
+      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+        this.replication = FSImageSerialization.readShort(in);
+        this.mtime = FSImageSerialization.readLong(in);
+      } else {
+        this.replication = readShort(in);
+        this.mtime = readLong(in);
+      }
+
       if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, logVersion)) {
-        this.atime = readLong(in);
+        if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+          this.atime = FSImageSerialization.readLong(in);
+        } else {
+          this.atime = readLong(in);
+        }
       } else {
         this.atime = 0;
       }
       if (logVersion < -7) {
-        this.blockSize = readLong(in);
+        if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+          this.blockSize = FSImageSerialization.readLong(in);
+        } else {
+          this.blockSize = readLong(in);
+        }
       } else {
         this.blockSize = 0;
       }
@@ -333,15 +348,19 @@ public abstract class FSEditLogOp {
 
     @Override 
     void writeFields(DataOutputStream out) throws IOException {
-      new DeprecatedUTF8(path).write(out);
-      new DeprecatedUTF8(Short.toString(replication)).write(out);
+      FSImageSerialization.writeString(path, out);
+      FSImageSerialization.writeShort(replication, out);
     }
     
     @Override
     void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.path = FSImageSerialization.readString(in);
-      this.replication = readShort(in);
+      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+        this.replication = FSImageSerialization.readShort(in);
+      } else {
+        this.replication = readShort(in);
+      }
     }
   }
 
@@ -377,32 +396,45 @@ public abstract class FSEditLogOp {
 
     @Override 
     void writeFields(DataOutputStream out) throws IOException {
-      int size = 1 + srcs.length + 1; // trg, srcs, timestamp
-      DeprecatedUTF8 info[] = new DeprecatedUTF8[size];
+      FSImageSerialization.writeString(trg, out);
+            
+      DeprecatedUTF8 info[] = new DeprecatedUTF8[srcs.length];
       int idx = 0;
-      info[idx++] = new DeprecatedUTF8(trg);
       for(int i=0; i<srcs.length; i++) {
         info[idx++] = new DeprecatedUTF8(srcs[i]);
       }
-      info[idx] = toLogLong(timestamp);
       new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+
+      FSImageSerialization.writeLong(timestamp, out);
     }
 
     @Override
     void readFields(DataInputStream in, int logVersion)
         throws IOException {
-      this.length = in.readInt();
-      if (length < 3) { // trg, srcs.., timestam
-        throw new IOException("Incorrect data format. "
-                              + "Concat delete operation.");
+      if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+        this.length = in.readInt();
+        if (length < 3) { // trg, srcs.., timestamp
+          throw new IOException("Incorrect data format. "
+              + "Concat delete operation.");
+        }
       }
       this.trg = FSImageSerialization.readString(in);
-      int srcSize = this.length - 1 - 1; //trg and timestamp
+      int srcSize = 0;
+      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+        srcSize = in.readInt();
+      } else {
+        srcSize = this.length - 1 - 1; // trg and timestamp
+      }
       this.srcs = new String [srcSize];
       for(int i=0; i<srcSize;i++) {
         srcs[i]= FSImageSerialization.readString(in);
       }
-      this.timestamp = readLong(in);
+      
+      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+        this.timestamp = FSImageSerialization.readLong(in);
+      } else {
+        this.timestamp = readLong(in);
+      }
     }
   }
 
@@ -438,24 +470,28 @@ public abstract class FSEditLogOp {
 
     @Override 
     void writeFields(DataOutputStream out) throws IOException {
-      DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
-        new DeprecatedUTF8(src),
-        new DeprecatedUTF8(dst),
-        toLogLong(timestamp)};
-      new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+      FSImageSerialization.writeString(src, out);
+      FSImageSerialization.writeString(dst, out);
+      FSImageSerialization.writeLong(timestamp, out);
     }
 
     @Override
     void readFields(DataInputStream in, int logVersion)
         throws IOException {
-      this.length = in.readInt();
-      if (this.length != 3) {
-        throw new IOException("Incorrect data format. "
-                              + "Old rename operation.");
+      if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+        this.length = in.readInt();
+        if (this.length != 3) {
+          throw new IOException("Incorrect data format. "
+              + "Old rename operation.");
+        }
       }
       this.src = FSImageSerialization.readString(in);
       this.dst = FSImageSerialization.readString(in);
-      this.timestamp = readLong(in);
+      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+        this.timestamp = FSImageSerialization.readLong(in);
+      } else {
+        this.timestamp = readLong(in);
+      }
     }
   }
 
@@ -485,22 +521,25 @@ public abstract class FSEditLogOp {
 
     @Override 
     void writeFields(DataOutputStream out) throws IOException {
-      DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
-        new DeprecatedUTF8(path),
-        toLogLong(timestamp)};
-      new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+      FSImageSerialization.writeString(path, out);
+      FSImageSerialization.writeLong(timestamp, out);
     }
 
     @Override
     void readFields(DataInputStream in, int logVersion)
         throws IOException {
-      this.length = in.readInt();
-      if (this.length != 2) {
-        throw new IOException("Incorrect data format. "
-                              + "delete operation.");
+      if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+        this.length = in.readInt();
+        if (this.length != 2) {
+          throw new IOException("Incorrect data format. " + "delete operation.");
+        }
       }
       this.path = FSImageSerialization.readString(in);
-      this.timestamp = readLong(in);
+      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+        this.timestamp = FSImageSerialization.readLong(in);
+      } else {
+        this.timestamp = readLong(in);
+      }
     }
   }
 
@@ -536,12 +575,9 @@ public abstract class FSEditLogOp {
 
     @Override 
     void writeFields(DataOutputStream out) throws IOException {
-      DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
-        new DeprecatedUTF8(path),
-        toLogLong(timestamp), // mtime
-        toLogLong(timestamp) // atime, unused at this time
-      };
-      new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+      FSImageSerialization.writeString(path, out);
+      FSImageSerialization.writeLong(timestamp, out); // mtime
+      FSImageSerialization.writeLong(timestamp, out); // atime, unused at this
       permissions.write(out);
     }
     
@@ -549,20 +585,32 @@ public abstract class FSEditLogOp {
     void readFields(DataInputStream in, int logVersion)
         throws IOException {
 
-      this.length = in.readInt();
+      if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+        this.length = in.readInt();
+      }
       if (-17 < logVersion && length != 2 ||
-          logVersion <= -17 && length != 3) {
+          logVersion <= -17 && length != 3
+          && !LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
         throw new IOException("Incorrect data format. "
                               + "Mkdir operation.");
       }
       this.path = FSImageSerialization.readString(in);
-      this.timestamp = readLong(in);
+      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+        this.timestamp = FSImageSerialization.readLong(in);
+      } else {
+        this.timestamp = readLong(in);
+      }
 
       // The disk format stores atimes for directories as well.
       // However, currently this is not being updated/used because of
       // performance reasons.
       if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, logVersion)) {
-        /*unused this.atime = */readLong(in);
+        /* unused this.atime = */
+        if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+          FSImageSerialization.readLong(in);
+        } else {
+          readLong(in);
+        }
       }
 
       if (logVersion <= -11) {
@@ -592,13 +640,13 @@ public abstract class FSEditLogOp {
     
     @Override 
     void writeFields(DataOutputStream out) throws IOException {
-      new LongWritable(genStamp).write(out);
+      FSImageSerialization.writeLong(genStamp, out);
     }
     
     @Override
     void readFields(DataInputStream in, int logVersion)
         throws IOException {
-      this.genStamp = in.readLong();
+      this.genStamp = FSImageSerialization.readLong(in);
     }
   }
 
@@ -676,7 +724,7 @@ public abstract class FSEditLogOp {
 
     @Override 
     void writeFields(DataOutputStream out) throws IOException {
-      new DeprecatedUTF8(src).write(out);
+      FSImageSerialization.writeString(src, out);
       permissions.write(out);
      }
  
@@ -719,11 +767,9 @@ public abstract class FSEditLogOp {
 
     @Override 
     void writeFields(DataOutputStream out) throws IOException {
-      DeprecatedUTF8 u = new DeprecatedUTF8(username == null? "": username);
-      DeprecatedUTF8 g = new DeprecatedUTF8(groupname == null? "": groupname);
-      new DeprecatedUTF8(src).write(out);
-      u.write(out);
-      g.write(out);
+      FSImageSerialization.writeString(src, out);
+      FSImageSerialization.writeString(username == null ? "" : username, out);
+      FSImageSerialization.writeString(groupname == null ? "" : groupname, out);
     }
 
     @Override
@@ -757,7 +803,7 @@ public abstract class FSEditLogOp {
     void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.src = FSImageSerialization.readString(in);
-      this.nsQuota = readLongWritable(in);
+      this.nsQuota = FSImageSerialization.readLong(in);
     }
   }
 
@@ -816,17 +862,17 @@ public abstract class FSEditLogOp {
 
     @Override 
     void writeFields(DataOutputStream out) throws IOException {
-      new DeprecatedUTF8(src).write(out);
-      new LongWritable(nsQuota).write(out);
-      new LongWritable(dsQuota).write(out);
+      FSImageSerialization.writeString(src, out);
+      FSImageSerialization.writeLong(nsQuota, out);
+      FSImageSerialization.writeLong(dsQuota, out);
     }
 
     @Override
     void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.src = FSImageSerialization.readString(in);
-      this.nsQuota = readLongWritable(in);
-      this.dsQuota = readLongWritable(in);
+      this.nsQuota = FSImageSerialization.readLong(in);
+      this.dsQuota = FSImageSerialization.readLong(in);
     }
   }
 
@@ -862,24 +908,29 @@ public abstract class FSEditLogOp {
 
     @Override 
     void writeFields(DataOutputStream out) throws IOException {
-      DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
-        new DeprecatedUTF8(path),
-        toLogLong(mtime),
-        toLogLong(atime)};
-      new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+      FSImageSerialization.writeString(path, out);
+      FSImageSerialization.writeLong(mtime, out);
+      FSImageSerialization.writeLong(atime, out);
     }
 
     @Override
     void readFields(DataInputStream in, int logVersion)
         throws IOException {
-      this.length = in.readInt();
-      if (length != 3) {
-        throw new IOException("Incorrect data format. "
-                              + "times operation.");
+      if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+        this.length = in.readInt();
+        if (length != 3) {
+          throw new IOException("Incorrect data format. " + "times operation.");
+        }
       }
       this.path = FSImageSerialization.readString(in);
-      this.mtime = readLong(in);
-      this.atime = readLong(in);
+
+      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+        this.mtime = FSImageSerialization.readLong(in);
+        this.atime = FSImageSerialization.readLong(in);
+      } else {
+        this.mtime = readLong(in);
+        this.atime = readLong(in);
+      }
     }
   }
 
@@ -927,28 +978,33 @@ public abstract class FSEditLogOp {
 
     @Override 
     void writeFields(DataOutputStream out) throws IOException {
-      DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
-        new DeprecatedUTF8(path),
-        new DeprecatedUTF8(value),
-        toLogLong(mtime),
-        toLogLong(atime)};
-      new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+      FSImageSerialization.writeString(path, out);
+      FSImageSerialization.writeString(value, out);
+      FSImageSerialization.writeLong(mtime, out);
+      FSImageSerialization.writeLong(atime, out);
       permissionStatus.write(out);
     }
 
     @Override
     void readFields(DataInputStream in, int logVersion)
         throws IOException {
-
-      this.length = in.readInt();
-      if (this.length != 4) {
-        throw new IOException("Incorrect data format. "
-                              + "symlink operation.");
+      if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+        this.length = in.readInt();
+        if (this.length != 4) {
+          throw new IOException("Incorrect data format. "
+              + "symlink operation.");
+        }
       }
       this.path = FSImageSerialization.readString(in);
       this.value = FSImageSerialization.readString(in);
-      this.mtime = readLong(in);
-      this.atime = readLong(in);
+
+      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+        this.mtime = FSImageSerialization.readLong(in);
+        this.atime = FSImageSerialization.readLong(in);
+      } else {
+        this.mtime = readLong(in);
+        this.atime = readLong(in);
+      }
       this.permissionStatus = PermissionStatus.read(in);
     }
   }
@@ -991,25 +1047,29 @@ public abstract class FSEditLogOp {
 
     @Override 
     void writeFields(DataOutputStream out) throws IOException {
-      DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
-        new DeprecatedUTF8(src),
-        new DeprecatedUTF8(dst),
-        toLogLong(timestamp)};
-      new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+      FSImageSerialization.writeString(src, out);
+      FSImageSerialization.writeString(dst, out);
+      FSImageSerialization.writeLong(timestamp, out);
       toBytesWritable(options).write(out);
     }
 
     @Override
     void readFields(DataInputStream in, int logVersion)
         throws IOException {
-      this.length = in.readInt();
-      if (this.length != 3) {
-        throw new IOException("Incorrect data format. "
-                              + "Rename operation.");
+      if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+        this.length = in.readInt();
+        if (this.length != 3) {
+          throw new IOException("Incorrect data format. " + "Rename operation.");
+        }
       }
       this.src = FSImageSerialization.readString(in);
       this.dst = FSImageSerialization.readString(in);
-      this.timestamp = readLong(in);
+
+      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+        this.timestamp = FSImageSerialization.readLong(in);
+      } else {
+        this.timestamp = readLong(in);
+      }
       this.options = readRenameOptions(in);
     }
 
@@ -1066,9 +1126,9 @@ public abstract class FSEditLogOp {
 
     @Override 
     void writeFields(DataOutputStream out) throws IOException {
-      new DeprecatedUTF8(leaseHolder).write(out);
-      new DeprecatedUTF8(path).write(out);
-      new DeprecatedUTF8(newHolder).write(out);
+      FSImageSerialization.writeString(leaseHolder, out);
+      FSImageSerialization.writeString(path, out);
+      FSImageSerialization.writeString(newHolder, out);
     }
 
     @Override
@@ -1107,7 +1167,7 @@ public abstract class FSEditLogOp {
     @Override 
     void writeFields(DataOutputStream out) throws IOException {
       token.write(out);
-      toLogLong(expiryTime).write(out);
+      FSImageSerialization.writeLong(expiryTime, out);
     }
 
     @Override
@@ -1115,7 +1175,11 @@ public abstract class FSEditLogOp {
         throws IOException {
       this.token = new DelegationTokenIdentifier();
       this.token.readFields(in);
-      this.expiryTime = readLong(in);
+      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+        this.expiryTime = FSImageSerialization.readLong(in);
+      } else {
+        this.expiryTime = readLong(in);
+      }
     }
   }
 
@@ -1146,7 +1210,7 @@ public abstract class FSEditLogOp {
     @Override 
     void writeFields(DataOutputStream out) throws IOException {
       token.write(out);
-      toLogLong(expiryTime).write(out);
+      FSImageSerialization.writeLong(expiryTime, out);
     }
 
     @Override
@@ -1154,7 +1218,11 @@ public abstract class FSEditLogOp {
         throws IOException {
       this.token = new DelegationTokenIdentifier();
       this.token.readFields(in);
-      this.expiryTime = readLong(in);
+      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+        this.expiryTime = FSImageSerialization.readLong(in);
+      } else {
+        this.expiryTime = readLong(in);
+      }
     }
   }
 
@@ -1269,14 +1337,6 @@ public abstract class FSEditLogOp {
     return Long.parseLong(FSImageSerialization.readString(in));
   }
 
-  static private DeprecatedUTF8 toLogReplication(short replication) {
-    return new DeprecatedUTF8(Short.toString(replication));
-  }
-  
-  static private DeprecatedUTF8 toLogLong(long timestamp) {
-    return new DeprecatedUTF8(Long.toString(timestamp));
-  }
-
   /**
    * A class to read in blocks stored in the old format. The only two
    * fields in the block were blockid and length.
@@ -1312,17 +1372,6 @@ public abstract class FSEditLogOp {
     }
   }
 
-    // a place holder for reading a long
-  private static final LongWritable longWritable = new LongWritable();
-
-  /** Read an integer from an input stream */
-  private static long readLongWritable(DataInputStream in) throws IOException {
-    synchronized (longWritable) {
-      longWritable.readFields(in);
-      return longWritable.get();
-    }
-  }
-
   /**
    * Class for writing editlog ops
    */

+ 32 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java

@@ -36,6 +36,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ShortWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 
@@ -72,6 +74,8 @@ public class FSImageSerialization {
    */
   static private final class TLData {
     final DeprecatedUTF8 U_STR = new DeprecatedUTF8();
+    final ShortWritable U_SHORT = new ShortWritable();
+    final LongWritable U_LONG = new LongWritable();
     final FsPermission FILE_PERM = new FsPermission((short) 0);
   }
 
@@ -215,7 +219,35 @@ public class FSImageSerialization {
     ustr.write(out);
   }
 
+  
+  /** read the long value */
+  static long readLong(DataInputStream in) throws IOException {
+    LongWritable ustr = TL_DATA.get().U_LONG;
+    ustr.readFields(in);
+    return ustr.get();
+  }
+
+  /** write the long value */
+  static void writeLong(long value, DataOutputStream out) throws IOException {
+    LongWritable uLong = TL_DATA.get().U_LONG;
+    uLong.set(value);
+    uLong.write(out);
+  }
 
+  /** read short value */
+  static short readShort(DataInputStream in) throws IOException {
+    ShortWritable uShort = TL_DATA.get().U_SHORT;
+    uShort.readFields(in);
+    return uShort.get();
+  }
+
+  /** write short value */
+  static void writeShort(short value, DataOutputStream out) throws IOException {
+    ShortWritable uShort = TL_DATA.get().U_SHORT;
+    uShort.set(value);
+    uShort.write(out);
+  }
+  
   // Same comments apply for this method as for readString()
   @SuppressWarnings("deprecation")
   public static byte[] readBytes(DataInputStream in) throws IOException {

+ 121 - 57
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java

@@ -41,7 +41,7 @@ import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.VIntToke
 class EditsLoaderCurrent implements EditsLoader {
 
   private static int[] supportedVersions = { -18, -19, -20, -21, -22, -23, -24,
-      -25, -26, -27, -28, -30, -31, -32, -33, -34, -35, -36, -37, -38};
+      -25, -26, -27, -28, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39};
 
   private EditsVisitor v;
   private int editsVersion = 0;
@@ -102,20 +102,29 @@ class EditsLoaderCurrent implements EditsLoader {
   private void visit_OP_ADD_or_OP_CLOSE(FSEditLogOpCodes editsOpCode)
     throws IOException {
     visitTxId();
-
-    IntToken opAddLength = v.visitInt(EditsElement.LENGTH);
-    // this happens if the edits is not properly ended (-1 op code),
-    // it is padded at the end with all zeros, OP_ADD is zero so
-    // without this check we would treat all zeros as empty OP_ADD)
-    if(opAddLength.value == 0) {
-      throw new IOException("OpCode " + editsOpCode +
-        " has zero length (corrupted edits)");
+    if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, editsVersion)) {
+      IntToken opAddLength = v.visitInt(EditsElement.LENGTH);
+      // this happens if the edits is not properly ended (-1 op code),
+      // it is padded at the end with all zeros, OP_ADD is zero so
+      // without this check we would treat all zeros as empty OP_ADD)
+      if (opAddLength.value == 0) {
+        throw new IOException("OpCode " + editsOpCode
+            + " has zero length (corrupted edits)");
+      }
     }
+    
     v.visitStringUTF8(EditsElement.PATH);
-    v.visitStringUTF8(EditsElement.REPLICATION);
-    v.visitStringUTF8(EditsElement.MTIME);
-    v.visitStringUTF8(EditsElement.ATIME);
-    v.visitStringUTF8(EditsElement.BLOCKSIZE);
+    if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, editsVersion)) {
+      v.visitShort(EditsElement.REPLICATION);
+      v.visitLong(EditsElement.MTIME);
+      v.visitLong(EditsElement.ATIME);
+      v.visitLong(EditsElement.BLOCKSIZE);
+    } else {
+      v.visitStringUTF8(EditsElement.REPLICATION);
+      v.visitStringUTF8(EditsElement.MTIME);
+      v.visitStringUTF8(EditsElement.ATIME);
+      v.visitStringUTF8(EditsElement.BLOCKSIZE);
+    }
     // now read blocks
     IntToken numBlocksToken = v.visitInt(EditsElement.NUMBLOCKS);
     for (int i = 0; i < numBlocksToken.value; i++) {
@@ -146,11 +155,16 @@ class EditsLoaderCurrent implements EditsLoader {
    */
   private void visit_OP_RENAME_OLD() throws IOException {
     visitTxId();
-
-    v.visitInt(        EditsElement.LENGTH);
+    if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, editsVersion)) {
+      v.visitInt(EditsElement.LENGTH);
+    }
     v.visitStringUTF8( EditsElement.SOURCE);
     v.visitStringUTF8( EditsElement.DESTINATION);
-    v.visitStringUTF8( EditsElement.TIMESTAMP);
+    if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, editsVersion)) {
+      v.visitLong(EditsElement.TIMESTAMP);
+    } else {
+      v.visitStringUTF8(EditsElement.TIMESTAMP);
+    }
   }
 
   /**
@@ -158,10 +172,15 @@ class EditsLoaderCurrent implements EditsLoader {
    */
   private void visit_OP_DELETE() throws IOException {
     visitTxId();
-
-    v.visitInt(        EditsElement.LENGTH);
+    if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, editsVersion)) {
+      v.visitInt(EditsElement.LENGTH);
+    }
     v.visitStringUTF8( EditsElement.PATH);
-    v.visitStringUTF8( EditsElement.TIMESTAMP);
+    if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, editsVersion)) {
+      v.visitLong(EditsElement.TIMESTAMP);
+    } else {
+      v.visitStringUTF8(EditsElement.TIMESTAMP);
+    }
   }
 
   /**
@@ -169,11 +188,17 @@ class EditsLoaderCurrent implements EditsLoader {
    */
   private void visit_OP_MKDIR() throws IOException {
     visitTxId();
-
-    v.visitInt(        EditsElement.LENGTH);
+    if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, editsVersion)) {
+      v.visitInt(EditsElement.LENGTH);
+    }
     v.visitStringUTF8( EditsElement.PATH);
-    v.visitStringUTF8( EditsElement.TIMESTAMP);
-    v.visitStringUTF8( EditsElement.ATIME);
+    if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, editsVersion)) {
+      v.visitLong(EditsElement.TIMESTAMP);
+      v.visitLong(EditsElement.ATIME);
+    } else {
+      v.visitStringUTF8(EditsElement.TIMESTAMP);
+      v.visitStringUTF8(EditsElement.ATIME);
+    }
     // PERMISSION_STATUS
     v.visitEnclosingElement( EditsElement.PERMISSION_STATUS);
 
@@ -191,7 +216,11 @@ class EditsLoaderCurrent implements EditsLoader {
     visitTxId();
 
     v.visitStringUTF8(EditsElement.PATH);
-    v.visitStringUTF8(EditsElement.REPLICATION);
+    if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, editsVersion)) {
+      v.visitShort(EditsElement.REPLICATION);
+    } else {
+      v.visitStringUTF8(EditsElement.REPLICATION);
+    }
   }
 
   /**
@@ -229,11 +258,17 @@ class EditsLoaderCurrent implements EditsLoader {
    */
   private void visit_OP_TIMES() throws IOException {
     visitTxId();
-
-    v.visitInt(        EditsElement.LENGTH);
+    if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, editsVersion)) {
+      v.visitInt(EditsElement.LENGTH);
+    }
     v.visitStringUTF8( EditsElement.PATH);
-    v.visitStringUTF8( EditsElement.MTIME);
-    v.visitStringUTF8( EditsElement.ATIME);
+    if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, editsVersion)) {
+      v.visitLong(EditsElement.MTIME);
+      v.visitLong(EditsElement.ATIME);
+    } else {
+      v.visitStringUTF8(EditsElement.MTIME);
+      v.visitStringUTF8(EditsElement.ATIME);
+    }
   }
 
   /**
@@ -252,11 +287,16 @@ class EditsLoaderCurrent implements EditsLoader {
    */
   private void visit_OP_RENAME() throws IOException {
     visitTxId();
-
-    v.visitInt(           EditsElement.LENGTH);
+    if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, editsVersion)) {
+      v.visitInt(EditsElement.LENGTH);
+    }
     v.visitStringUTF8(    EditsElement.SOURCE);
     v.visitStringUTF8(    EditsElement.DESTINATION);
-    v.visitStringUTF8(    EditsElement.TIMESTAMP);
+    if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, editsVersion)) {
+      v.visitLong(EditsElement.TIMESTAMP);
+    } else {
+      v.visitStringUTF8(EditsElement.TIMESTAMP);
+    }
     v.visitBytesWritable( EditsElement.RENAME_OPTIONS);
   }
 
@@ -265,15 +305,25 @@ class EditsLoaderCurrent implements EditsLoader {
    */
   private void visit_OP_CONCAT_DELETE() throws IOException {
     visitTxId();
-
-    IntToken lengthToken = v.visitInt(EditsElement.LENGTH);
+    int sourceCount = 0;
+    if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, editsVersion)) {
+      IntToken lengthToken = v.visitInt(EditsElement.LENGTH);
+      sourceCount = lengthToken.value - 2;
+    }
     v.visitStringUTF8(EditsElement.CONCAT_TARGET);
     // all except of CONCAT_TARGET and TIMESTAMP
-    int sourceCount = lengthToken.value - 2;
+    if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, editsVersion)) {
+      IntToken lengthToken = v.visitInt(EditsElement.LENGTH);
+      sourceCount = lengthToken.value;
+    }
     for(int i = 0; i < sourceCount; i++) {
       v.visitStringUTF8(EditsElement.CONCAT_SOURCE);
     }
-    v.visitStringUTF8(EditsElement.TIMESTAMP);
+    if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, editsVersion)) {
+      v.visitLong(EditsElement.TIMESTAMP);
+    } else {
+      v.visitStringUTF8(EditsElement.TIMESTAMP);
+    }
   }
 
   /**
@@ -281,12 +331,18 @@ class EditsLoaderCurrent implements EditsLoader {
    */
   private void visit_OP_SYMLINK() throws IOException {
     visitTxId();
-
-    v.visitInt(        EditsElement.LENGTH);
+    if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, editsVersion)) {
+      v.visitInt(EditsElement.LENGTH);
+    }
     v.visitStringUTF8( EditsElement.SOURCE);
     v.visitStringUTF8( EditsElement.DESTINATION);
-    v.visitStringUTF8( EditsElement.MTIME);
-    v.visitStringUTF8( EditsElement.ATIME);
+    if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, editsVersion)) {
+      v.visitLong(EditsElement.MTIME);
+      v.visitLong(EditsElement.ATIME);
+    } else {
+      v.visitStringUTF8(EditsElement.MTIME);
+      v.visitStringUTF8(EditsElement.ATIME);
+    }
     // PERMISSION_STATUS
     v.visitEnclosingElement(EditsElement.PERMISSION_STATUS);
 
@@ -303,15 +359,19 @@ class EditsLoaderCurrent implements EditsLoader {
   private void visit_OP_GET_DELEGATION_TOKEN() throws IOException {
     visitTxId();
     
-      v.visitByte(       EditsElement.T_VERSION);
-      v.visitStringText( EditsElement.T_OWNER);
-      v.visitStringText( EditsElement.T_RENEWER);
-      v.visitStringText( EditsElement.T_REAL_USER);
-      v.visitVLong(      EditsElement.T_ISSUE_DATE);
-      v.visitVLong(      EditsElement.T_MAX_DATE);
-      v.visitVInt(       EditsElement.T_SEQUENCE_NUMBER);
-      v.visitVInt(       EditsElement.T_MASTER_KEY_ID);
-      v.visitStringUTF8( EditsElement.T_EXPIRY_TIME);
+    v.visitByte(EditsElement.T_VERSION);
+    v.visitStringText(EditsElement.T_OWNER);
+    v.visitStringText(EditsElement.T_RENEWER);
+    v.visitStringText(EditsElement.T_REAL_USER);
+    v.visitVLong(EditsElement.T_ISSUE_DATE);
+    v.visitVLong(EditsElement.T_MAX_DATE);
+    v.visitVInt(EditsElement.T_SEQUENCE_NUMBER);
+    v.visitVInt(EditsElement.T_MASTER_KEY_ID);
+    if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, editsVersion)) {
+      v.visitLong(EditsElement.T_EXPIRY_TIME);
+    } else {
+      v.visitStringUTF8(EditsElement.T_EXPIRY_TIME);
+    }
   }
 
   /**
@@ -321,15 +381,19 @@ class EditsLoaderCurrent implements EditsLoader {
     throws IOException {
     visitTxId();
 
-      v.visitByte(       EditsElement.T_VERSION);
-      v.visitStringText( EditsElement.T_OWNER);
-      v.visitStringText( EditsElement.T_RENEWER);
-      v.visitStringText( EditsElement.T_REAL_USER);
-      v.visitVLong(      EditsElement.T_ISSUE_DATE);
-      v.visitVLong(      EditsElement.T_MAX_DATE);
-      v.visitVInt(       EditsElement.T_SEQUENCE_NUMBER);
-      v.visitVInt(       EditsElement.T_MASTER_KEY_ID);
-      v.visitStringUTF8( EditsElement.T_EXPIRY_TIME);
+    v.visitByte(EditsElement.T_VERSION);
+    v.visitStringText(EditsElement.T_OWNER);
+    v.visitStringText(EditsElement.T_RENEWER);
+    v.visitStringText(EditsElement.T_REAL_USER);
+    v.visitVLong(EditsElement.T_ISSUE_DATE);
+    v.visitVLong(EditsElement.T_MAX_DATE);
+    v.visitVInt(EditsElement.T_SEQUENCE_NUMBER);
+    v.visitVInt(EditsElement.T_MASTER_KEY_ID);
+    if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, editsVersion)) {
+      v.visitLong(EditsElement.T_EXPIRY_TIME);
+    } else {
+      v.visitStringUTF8(EditsElement.T_EXPIRY_TIME);
+    }
   }
 
   /**

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java

@@ -122,7 +122,7 @@ class ImageLoaderCurrent implements ImageLoader {
   protected final DateFormat dateFormat = 
                                       new SimpleDateFormat("yyyy-MM-dd HH:mm");
   private static int[] versions = { -16, -17, -18, -19, -20, -21, -22, -23,
-      -24, -25, -26, -27, -28, -30, -31, -32, -33, -34, -35, -36, -37, -38};
+      -24, -25, -26, -27, -28, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39};
   private int imageVersion = 0;
 
   /* (non-Javadoc)

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java

@@ -17,7 +17,10 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.junit.Assert.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.FileInputStream;

+ 13 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -131,6 +131,10 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3553. Add support for data returned when exceptions thrown from web 
     service apis to be in either xml or in JSON. (Thomas Graves via mahadev)
 
+    MAPREDUCE-3641. Making CapacityScheduler more conservative so as to 
+    assign only one off-switch container in a single scheduling
+    iteration. (Arun C Murthy via vinodkv)
+
   OPTIMIZATIONS
 
     MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
@@ -452,6 +456,15 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3404. Corrected MR AM to honor speculative configuration and enable
     speculating either maps or reduces. (Eric Payne via vinodkv)
 
+    MAPREDUCE-3664. Federation Documentation has incorrect configuration example.
+    (Brandon Li via jitendra)
+
+    MAPREDUCE-3649. Job End notification gives an error on calling back.
+    (Ravi Prakash via mahadev)
+
+    MAPREDUCE-3657. State machine visualize build fails. (Jason Lowe 
+    via mahadev)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

+ 18 - 14
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java

@@ -19,12 +19,11 @@
 package org.apache.hadoop.mapreduce.v2.app;
 
 import java.io.IOException;
-import java.io.InputStream;
+import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLConnection;
 import java.net.Proxy;
+import java.net.URL;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
@@ -40,7 +39,8 @@ import org.mortbay.log.Log;
  * User can specify number of retry attempts and a time interval at which to
  * attempt retries</li><li>
  * Cluster administrators can set final parameters to set maximum number of
- * tries (0 would disable job end notification) and max time interval</li><li>
+ * tries (0 would disable job end notification) and max time interval and a
+ * proxy if needed</li><li>
  * The URL may contain sentinels which will be replaced by jobId and jobStatus 
  * (eg. SUCCEEDED/KILLED/FAILED) </li> </ul>
  * </p>
@@ -59,8 +59,8 @@ public class JobEndNotifier implements Configurable {
 
   /**
    * Parse the URL that needs to be notified of the end of the job, along
-   * with the number of retries in case of failure and the amount of time to
-   * wait between retries
+   * with the number of retries in case of failure, the amount of time to
+   * wait between retries and proxy settings
    * @param conf the configuration 
    */
   public void setConf(Configuration conf) {
@@ -119,15 +119,19 @@ public class JobEndNotifier implements Configurable {
     boolean success = false;
     try {
       Log.info("Job end notification trying " + urlToNotify);
-      URLConnection conn = urlToNotify.openConnection(proxyToUse);
+      HttpURLConnection conn = (HttpURLConnection) urlToNotify.openConnection();
       conn.setConnectTimeout(5*1000);
       conn.setReadTimeout(5*1000);
       conn.setAllowUserInteraction(false);
-      InputStream is = conn.getInputStream();
-      conn.getContent();
-      is.close();
-      success = true;
-      Log.info("Job end notification to " + urlToNotify + " succeeded");
+      if(conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
+        Log.warn("Job end notification to " + urlToNotify +" failed with code: "
+        + conn.getResponseCode() + " and message \"" + conn.getResponseMessage()
+        +"\"");
+      }
+      else {
+        success = true;
+        Log.info("Job end notification to " + urlToNotify + " succeeded");
+      }
     } catch(IOException ioe) {
       Log.warn("Job end notification to " + urlToNotify + " failed", ioe);
     }
@@ -135,8 +139,8 @@ public class JobEndNotifier implements Configurable {
   }
 
   /**
-   * Notify a server of the completion of a submitted job. The server must have
-   * configured MRConfig.JOB_END_NOTIFICATION_URLS
+   * Notify a server of the completion of a submitted job. The user must have
+   * configured MRJobConfig.MR_JOB_END_NOTIFICATION_URL
    * @param jobReport JobReport used to read JobId and JobStatus
    * @throws InterruptedException
    */

+ 1 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml

@@ -109,6 +109,7 @@
                 </goals>
                 <configuration>
                   <mainClass>org.apache.hadoop.yarn.util.VisualizeStateMachine</mainClass>
+		  <classpathScope>compile</classpathScope>
                   <arguments>
                     <argument>NodeManager</argument>
                     <argument>org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl,

+ 1 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml

@@ -147,6 +147,7 @@
                 </goals>
                 <configuration>
                   <mainClass>org.apache.hadoop.yarn.util.VisualizeStateMachine</mainClass>
+		  <classpathScope>compile</classpathScope>
                   <arguments>
                     <argument>ResourceManager</argument>
                     <argument>org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl,

+ 19 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java

@@ -26,8 +26,26 @@ import org.apache.hadoop.yarn.util.Records;
 @Private
 @Evolving
 public class Resources {
+  
   // Java doesn't have const :(
-  private static final Resource NONE = createResource(0);
+  private static final Resource NONE = new Resource() {
+
+    @Override
+    public int getMemory() {
+      return 0;
+    }
+
+    @Override
+    public void setMemory(int memory) {
+      throw new RuntimeException("NONE cannot be modified!");
+    }
+
+    @Override
+    public int compareTo(Resource o) {
+      return (0 - o.getMemory());
+    }
+    
+  };
 
   public static Resource createResource(int memory) {
     Resource resource = Records.newRecord(Resource.class);
@@ -36,7 +54,6 @@ public class Resources {
   }
 
   public static Resource none() {
-    assert NONE.getMemory() == 0 : "NONE should be empty";
     return NONE;
   }
 

+ 48 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java

@@ -0,0 +1,48 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+
+@Private
+@Unstable
+public class CSAssignment {
+  final private Resource resource;
+  final private NodeType type;
+  
+  public CSAssignment(Resource resource, NodeType type) {
+    this.resource = resource;
+    this.type = type;
+  }
+
+  public Resource getResource() {
+    return resource;
+  }
+
+  public NodeType getType() {
+    return type;
+  }
+  
+  @Override
+  public String toString() {
+    return resource.getMemory() + ":" + type;
+  }
+}

+ 3 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java

@@ -155,9 +155,10 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    * Assign containers to applications in the queue or it's children (if any).
    * @param clusterResource the resource of the cluster.
    * @param node node on which resources are available
-   * @return the resource that is being assigned.
+   * @return the assignment
    */
-  public Resource assignContainers(Resource clusterResource, SchedulerNode node);
+  public CSAssignment assignContainers(
+      Resource clusterResource, SchedulerNode node);
   
   /**
    * A container assigned to the queue has completed.

+ 29 - 23
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -35,7 +34,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
@@ -703,8 +701,11 @@ public class LeafQueue implements CSQueue {
     return applicationsMap.get(applicationAttemptId);
   }
 
+  private static final CSAssignment NULL_ASSIGNMENT =
+      new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL);
+  
   @Override
-  public synchronized Resource 
+  public synchronized CSAssignment 
   assignContainers(Resource clusterResource, SchedulerNode node) {
 
     if(LOG.isDebugEnabled()) {
@@ -717,8 +718,11 @@ public class LeafQueue implements CSQueue {
     if (reservedContainer != null) {
       SchedulerApp application = 
           getApplication(reservedContainer.getApplicationAttemptId());
-      return assignReservedContainer(application, node, reservedContainer, 
-          clusterResource);
+      return new CSAssignment(
+          assignReservedContainer(application, node, reservedContainer, 
+              clusterResource),
+          NodeType.NODE_LOCAL); // Don't care about locality constraints 
+                                // for reserved containers
     }
     
     // Try to assign containers to applications in order
@@ -746,7 +750,7 @@ public class LeafQueue implements CSQueue {
           // Are we going over limits by allocating to this application?
           // Maximum Capacity of the queue
           if (!assignToQueue(clusterResource, required)) {
-            return Resources.none();
+            return NULL_ASSIGNMENT;
           }
 
           // User limits
@@ -760,24 +764,23 @@ public class LeafQueue implements CSQueue {
           application.addSchedulingOpportunity(priority);
           
           // Try to schedule
-          Resource assigned = 
+          CSAssignment assignment =  
             assignContainersOnNode(clusterResource, node, application, priority, 
                 null);
-  
+          
+          Resource assigned = assignment.getResource();
+            
           // Did we schedule or reserve a container?
           if (Resources.greaterThan(assigned, Resources.none())) {
-            Resource assignedResource = 
-              application.getResourceRequest(priority, RMNode.ANY).getCapability();
 
             // Book-keeping
-            allocateResource(clusterResource, 
-                application, assignedResource);
+            allocateResource(clusterResource, application, assigned);
             
             // Reset scheduling opportunities
             application.resetSchedulingOpportunities(priority);
             
             // Done
-            return assignedResource; 
+            return assignment;
           } else {
             // Do not assign out of order w.r.t priorities
             break;
@@ -792,7 +795,7 @@ public class LeafQueue implements CSQueue {
       application.showRequests();
     }
   
-    return Resources.none();
+    return NULL_ASSIGNMENT;
 
   }
 
@@ -809,11 +812,12 @@ public class LeafQueue implements CSQueue {
               container.getId(), 
               SchedulerUtils.UNRESERVED_CONTAINER), 
           RMContainerEventType.RELEASED);
-      return container.getResource();
+      return container.getResource(); // Ugh, return resource to force re-sort
     }
 
     // Try to assign if we have sufficient resources
-    assignContainersOnNode(clusterResource, node, application, priority, rmContainer);
+    assignContainersOnNode(clusterResource, node, application, priority, 
+        rmContainer);
     
     // Doesn't matter... since it's already charged for at time of reservation
     // "re-reservation" is *free*
@@ -966,7 +970,7 @@ public class LeafQueue implements CSQueue {
     return (((starvation + requiredContainers) - reservedContainers) > 0);
   }
 
-  private Resource assignContainersOnNode(Resource clusterResource, 
+  private CSAssignment assignContainersOnNode(Resource clusterResource, 
       SchedulerNode node, SchedulerApp application, 
       Priority priority, RMContainer reservedContainer) {
 
@@ -977,7 +981,7 @@ public class LeafQueue implements CSQueue {
         assignNodeLocalContainers(clusterResource, node, application, priority,
             reservedContainer); 
     if (Resources.greaterThan(assigned, Resources.none())) {
-      return assigned;
+      return new CSAssignment(assigned, NodeType.NODE_LOCAL);
     }
 
     // Rack-local
@@ -985,12 +989,14 @@ public class LeafQueue implements CSQueue {
         assignRackLocalContainers(clusterResource, node, application, priority, 
             reservedContainer);
     if (Resources.greaterThan(assigned, Resources.none())) {
-      return assigned;
+      return new CSAssignment(assigned, NodeType.RACK_LOCAL);
     }
     
     // Off-switch
-    return assignOffSwitchContainers(clusterResource, node, application, 
-        priority, reservedContainer);
+    return new CSAssignment(
+        assignOffSwitchContainers(clusterResource, node, application, 
+            priority, reservedContainer), 
+        NodeType.OFF_SWITCH);
   }
 
   private Resource assignNodeLocalContainers(Resource clusterResource, 
@@ -1272,7 +1278,7 @@ public class LeafQueue implements CSQueue {
     metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
     LOG.info(getQueueName() + 
         " used=" + usedResources + " numContainers=" + numContainers + 
-        " user=" + userName + " resources=" + user.getConsumedResources());
+        " user=" + userName + " user-resources=" + user.getConsumedResources());
   }
 
   synchronized void releaseResource(Resource clusterResource, 
@@ -1290,7 +1296,7 @@ public class LeafQueue implements CSQueue {
       
     LOG.info(getQueueName() + 
         " used=" + usedResources + " numContainers=" + numContainers + 
-        " user=" + userName + " resources=" + user.getConsumedResources());
+        " user=" + userName + " user-resources=" + user.getConsumedResources());
   }
 
   @Override

+ 36 - 18
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@@ -500,10 +501,12 @@ public class ParentQueue implements CSQueue {
   }
 
   @Override
-  public synchronized Resource assignContainers(
+  public synchronized CSAssignment assignContainers(
       Resource clusterResource, SchedulerNode node) {
-    Resource assigned = Resources.createResource(0);
-
+    CSAssignment assignment = 
+        new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL);
+    boolean assignedOffSwitch = false;
+    
     while (canAssign(node)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Trying to assign containers to child-queue of "
@@ -516,16 +519,18 @@ public class ParentQueue implements CSQueue {
       }
       
       // Schedule
-      Resource assignedToChild = 
+      CSAssignment assignedToChild = 
           assignContainersToChildQueues(clusterResource, node);
+      assignedOffSwitch = (assignedToChild.getType() == NodeType.OFF_SWITCH);
       
       // Done if no child-queue assigned anything
-      if (Resources.greaterThan(assignedToChild, Resources.none())) {
+      if (Resources.greaterThan(assignedToChild.getResource(), 
+              Resources.none())) {
         // Track resource utilization for the parent-queue
-        allocateResource(clusterResource, assignedToChild);
+        allocateResource(clusterResource, assignedToChild.getResource());
         
         // Track resource utilization in this pass of the scheduler
-        Resources.addTo(assigned, assignedToChild);
+        Resources.addTo(assignment.getResource(), assignedToChild.getResource());
         
         LOG.info("assignedContainer" +
             " queue=" + getQueueName() + 
@@ -539,17 +544,26 @@ public class ParentQueue implements CSQueue {
 
       if (LOG.isDebugEnabled()) {
         LOG.debug("ParentQ=" + getQueueName()
-          + " assignedSoFarInThisIteration=" + assigned
+          + " assignedSoFarInThisIteration=" + assignment.getResource()
           + " utilization=" + getUtilization());
       }
 
       // Do not assign more than one container if this isn't the root queue
-      if (!rootQueue) {
+      // or if we've already assigned an off-switch container
+      if (rootQueue) {
+        if (assignedOffSwitch) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Not assigning more than one off-switch container," +
+            		" assignments so far: " + assignment);
+          }
+          break;
+        }
+      } else {
         break;
       }
     } 
     
-    return assigned;
+    return assignment;
   }
 
   private synchronized boolean assignToQueue(Resource clusterResource) {
@@ -573,9 +587,10 @@ public class ParentQueue implements CSQueue {
                                      minimumAllocation);
   }
   
-  synchronized Resource assignContainersToChildQueues(Resource cluster, 
+  synchronized CSAssignment assignContainersToChildQueues(Resource cluster, 
       SchedulerNode node) {
-    Resource assigned = Resources.createResource(0);
+    CSAssignment assignment = 
+        new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL);
     
     printChildQueues();
 
@@ -586,25 +601,28 @@ public class ParentQueue implements CSQueue {
         LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath()
           + " stats: " + childQueue);
       }
-      assigned = childQueue.assignContainers(cluster, node);
+      assignment = childQueue.assignContainers(cluster, node);
       if(LOG.isDebugEnabled()) {
-        LOG.debug("Assignedto queue: " + childQueue.getQueuePath()
-          + " stats: " + childQueue + " --> " + assigned.getMemory());
+        LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
+          " stats: " + childQueue + " --> " + 
+          assignment.getResource().getMemory() + ", " + assignment.getType());
       }
 
       // If we do assign, remove the queue and re-insert in-order to re-sort
-      if (Resources.greaterThan(assigned, Resources.none())) {
+      if (Resources.greaterThan(assignment.getResource(), Resources.none())) {
         // Remove and re-insert to sort
         iter.remove();
         LOG.info("Re-sorting queues since queue: " + childQueue.getQueuePath() + 
             " stats: " + childQueue);
         childQueues.add(childQueue);
-        printChildQueues();
+        if (LOG.isDebugEnabled()) {
+          printChildQueues();
+        }
         break;
       }
     }
     
-    return assigned;
+    return assignment;
   }
 
   String getChildQueuesToPrint() {

+ 15 - 7
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

@@ -811,49 +811,56 @@ public class TestLeafQueue {
     app_0.updateResourceRequests(app_0_requests_0);
 
     // Start testing...
+    CSAssignment assignment = null;
     
     // Start with off switch, shouldn't allocate due to delay scheduling
-    a.assignContainers(clusterResource, node_2);
+    assignment = a.assignContainers(clusterResource, node_2);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(1, app_0.getSchedulingOpportunities(priority));
     assertEquals(3, app_0.getTotalRequiredResources(priority));
+    assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
 
     // Another off switch, shouldn't allocate due to delay scheduling
-    a.assignContainers(clusterResource, node_2);
+    assignment = a.assignContainers(clusterResource, node_2);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(2, app_0.getSchedulingOpportunities(priority));
     assertEquals(3, app_0.getTotalRequiredResources(priority));
+    assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
     
     // Another off switch, shouldn't allocate due to delay scheduling
-    a.assignContainers(clusterResource, node_2);
+    assignment = a.assignContainers(clusterResource, node_2);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(3, app_0.getSchedulingOpportunities(priority));
     assertEquals(3, app_0.getTotalRequiredResources(priority));
+    assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
     
     // Another off switch, now we should allocate 
     // since missedOpportunities=3 and reqdContainers=3
-    a.assignContainers(clusterResource, node_2);
+    assignment = a.assignContainers(clusterResource, node_2);
     verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
     assertEquals(2, app_0.getTotalRequiredResources(priority));
+    assertEquals(NodeType.OFF_SWITCH, assignment.getType());
     
     // NODE_LOCAL - node_0
-    a.assignContainers(clusterResource, node_0);
+    assignment = a.assignContainers(clusterResource, node_0);
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
     assertEquals(1, app_0.getTotalRequiredResources(priority));
+    assertEquals(NodeType.NODE_LOCAL, assignment.getType());
     
     // NODE_LOCAL - node_1
-    a.assignContainers(clusterResource, node_1);
+    assignment = a.assignContainers(clusterResource, node_1);
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
     assertEquals(0, app_0.getTotalRequiredResources(priority));
+    assertEquals(NodeType.NODE_LOCAL, assignment.getType());
     
     // Add 1 more request to check for RACK_LOCAL
     app_0_requests_0.clear();
@@ -872,11 +879,12 @@ public class TestLeafQueue {
     String host_3 = "host_3"; // on rack_1
     SchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB);
     
-    a.assignContainers(clusterResource, node_3);
+    assignment = a.assignContainers(clusterResource, node_3);
     verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
     assertEquals(0, app_0.getTotalRequiredResources(priority));
+    assertEquals(NodeType.RACK_LOCAL, assignment.getType());
   }
   
   @Test

+ 85 - 5
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java

@@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 
@@ -92,11 +93,18 @@ public class TestParentQueue {
   private void stubQueueAllocation(final CSQueue queue, 
       final Resource clusterResource, final SchedulerNode node, 
       final int allocation) {
+    stubQueueAllocation(queue, clusterResource, node, allocation, 
+        NodeType.NODE_LOCAL);
+  }
+  
+  private void stubQueueAllocation(final CSQueue queue, 
+      final Resource clusterResource, final SchedulerNode node, 
+      final int allocation, final NodeType type) {
     
     // Simulate the queue allocation
-    doAnswer(new Answer<Resource>() {
+    doAnswer(new Answer<CSAssignment>() {
       @Override
-      public Resource answer(InvocationOnMock invocation) throws Throwable {
+      public CSAssignment answer(InvocationOnMock invocation) throws Throwable {
         try {
           throw new Exception();
         } catch (Exception e) {
@@ -115,8 +123,8 @@ public class TestParentQueue {
         
         // Next call - nothing
         if (allocation > 0) {
-          doReturn(Resources.none()).when(queue).assignContainers(
-              eq(clusterResource), eq(node));
+          doReturn(new CSAssignment(Resources.none(), type)).
+            when(queue).assignContainers(eq(clusterResource), eq(node));
 
           // Mock the node's resource availability
           Resource available = node.getAvailableResource();
@@ -124,7 +132,7 @@ public class TestParentQueue {
           when(node).getAvailableResource();
         }
 
-        return allocatedResource;
+        return new CSAssignment(allocatedResource, type);
       }
     }).
     when(queue).assignContainers(eq(clusterResource), eq(node));
@@ -401,6 +409,78 @@ public class TestParentQueue {
     
   }
   
+  @Test
+  public void testOffSwitchScheduling() throws Exception {
+    // Setup queue configs
+    setupSingleLevelQueues(csConf);
+    
+    Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
+    CSQueue root = 
+        CapacityScheduler.parseQueue(csContext, csConf, null, 
+            CapacitySchedulerConfiguration.ROOT, queues, queues, 
+            CapacityScheduler.queueComparator, 
+            CapacityScheduler.applicationComparator,
+            TestUtils.spyHook);
+
+    // Setup some nodes
+    final int memoryPerNode = 10;
+    final int numNodes = 2;
+    
+    SchedulerNode node_0 = 
+        TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
+    SchedulerNode node_1 = 
+        TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB);
+    
+    final Resource clusterResource = 
+        Resources.createResource(numNodes * (memoryPerNode*GB));
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+    // Start testing
+    LeafQueue a = (LeafQueue)queues.get(A);
+    LeafQueue b = (LeafQueue)queues.get(B);
+    final float delta = 0.0001f;
+    
+    // Simulate B returning a container on node_0
+    stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
+    stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
+    root.assignContainers(clusterResource, node_0);
+    assertEquals(0.0f, a.getUtilization(), delta);
+    assertEquals(computeQueueUtilization(b, 1*GB, clusterResource), 
+        b.getUtilization(), delta);
+    
+    // Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
+    // also, B gets a scheduling opportunity since A allocates RACK_LOCAL
+    stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL);
+    stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
+    root.assignContainers(clusterResource, node_1);
+    InOrder allocationOrder = inOrder(a, b);
+    allocationOrder.verify(a).assignContainers(eq(clusterResource), 
+        any(SchedulerNode.class));
+    allocationOrder.verify(b).assignContainers(eq(clusterResource), 
+        any(SchedulerNode.class));
+    assertEquals(computeQueueUtilization(a, 2*GB, clusterResource), 
+        a.getUtilization(), delta);
+    assertEquals(computeQueueUtilization(b, 2*GB, clusterResource), 
+        b.getUtilization(), delta);
+    
+    // Now, B should get the scheduling opportunity 
+    // since A has 2/6G while B has 2/14G, 
+    // However, since B returns off-switch, A won't get an opportunity
+    stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
+    stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH);
+    root.assignContainers(clusterResource, node_0);
+    allocationOrder = inOrder(b, a);
+    allocationOrder.verify(b).assignContainers(eq(clusterResource), 
+        any(SchedulerNode.class));
+    allocationOrder.verify(a).assignContainers(eq(clusterResource), 
+        any(SchedulerNode.class));
+    assertEquals(computeQueueUtilization(a, 2*GB, clusterResource), 
+        a.getUtilization(), delta);
+    assertEquals(computeQueueUtilization(b, 4*GB, clusterResource), 
+        b.getUtilization(), delta);
+
+  }
+  
   @After
   public void tearDown() throws Exception {
   }

+ 2 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/Federation.apt.vm

@@ -169,7 +169,7 @@ HDFS Federation
   </property>
   <property>
     <name>dfs.namenode.rpc-address.ns1</name>
-    <value>hdfs://nn-host1:rpc-port</value>
+    <value>nn-host1:rpc-port</value>
   </property>
   <property>
     <name>dfs.namenode.http-address.ns1</name>
@@ -181,7 +181,7 @@ HDFS Federation
   </property>
   <property>
     <name>dfs.namenode.rpc-address.ns2</name>
-    <value>hdfs://nn-host2:rpc-port</value>
+    <value>nn-host2:rpc-port</value>
   </property>
   <property>
     <name>dfs.namenode.http-address.ns2</name>