Browse Source

Merge r1325052 through r1325569 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3092@1325570 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 13 years ago
parent
commit
f4da7cb2f5
44 changed files with 803 additions and 137 deletions
  1. 10 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 0 2
      hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemon.sh
  3. 6 9
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
  4. 9 4
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java
  5. 2 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
  6. 16 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
  7. 14 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  8. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  9. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  10. 6 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
  11. 4 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  12. 44 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
  13. 15 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/IncorrectVersionException.java
  14. 16 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
  15. 11 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
  16. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  17. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
  18. 7 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
  19. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
  20. 63 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
  21. 45 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
  22. 11 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
  23. 10 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
  24. 101 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/VersionUtil.java
  25. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
  26. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
  27. 118 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
  28. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
  29. 27 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
  30. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
  31. 79 22
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java
  32. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
  33. 11 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java
  34. 54 17
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java
  35. 62 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestVersionUtil.java
  36. 9 0
      hadoop-mapreduce-project/CHANGES.txt
  37. 7 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java
  38. 4 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
  39. 1 3
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenRequest.java
  40. 0 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
  41. 6 3
      hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java
  42. 5 2
      hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/TotalHeapUsageEmulatorPlugin.java
  43. 5 0
      hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixMemoryEmulation.java
  44. 5 1
      hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java

+ 10 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -346,6 +346,13 @@ Release 2.0.0 - UNRELEASED
     HADOOP-8264. Remove irritating double double quotes in front of hostname
     (Bernd Fondermann via bobby)
 
+    HADOOP-8270. hadoop-daemon.sh stop action should return 0 for an
+    already stopped service. (Roman Shaposhnik via eli)
+
+    HADOOP-8144. pseudoSortByDistance in NetworkTopology doesn't work
+    properly if no local node and first node is local rack node.
+    (Junping Du)
+
   BREAKDOWN OF HADOOP-7454 SUBTASKS
 
     HADOOP-7455. HA: Introduce HA Service Protocol Interface. (suresh)
@@ -425,6 +432,9 @@ Release 0.23.3 - UNRELEASED
     HADOOP-8014. ViewFileSystem does not correctly implement getDefaultBlockSize,
     getDefaultReplication, getContentSummary (John George via bobby)
 
+    HADOOP-7510. Tokens should use original hostname provided instead of ip
+    (Daryn Sharp via bobby)
+
 Release 0.23.2 - UNRELEASED 
 
   INCOMPATIBLE CHANGES

+ 0 - 2
hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemon.sh

@@ -167,11 +167,9 @@ case $startStop in
         kill `cat $pid`
       else
         echo no $command to stop
-        exit 1
       fi
     else
       echo no $command to stop
-      exit 1
     fi
     ;;
 

+ 6 - 9
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

@@ -53,7 +53,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ipc.RpcPayloadHeader.*;
 import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -243,8 +242,8 @@ public class Client {
       this.remoteId = remoteId;
       this.server = remoteId.getAddress();
       if (server.isUnresolved()) {
-        throw NetUtils.wrapException(remoteId.getAddress().getHostName(),
-            remoteId.getAddress().getPort(),
+        throw NetUtils.wrapException(server.getHostName(),
+            server.getPort(),
             null,
             0,
             new UnknownHostException());
@@ -274,9 +273,8 @@ public class Client {
           } catch (IllegalAccessException e) {
             throw new IOException(e.toString());
           }
-          InetSocketAddress addr = remoteId.getAddress();
-          token = tokenSelector.selectToken(new Text(addr.getAddress()
-              .getHostAddress() + ":" + addr.getPort()), 
+          token = tokenSelector.selectToken(
+              SecurityUtil.buildTokenService(server),
               ticket.getTokens());
         }
         KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf);
@@ -305,7 +303,7 @@ public class Client {
             + protocol.getSimpleName());
       
       this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
-          remoteId.getAddress().toString() +
+          server.toString() +
           " from " + ((ticket==null)?"an unknown user":ticket.getUserName()));
       this.setDaemon(true);
     }
@@ -751,7 +749,6 @@ public class Client {
       }
     }
 
-    @SuppressWarnings("unused")
     public InetSocketAddress getRemoteAddress() {
       return server;
     }
@@ -1159,7 +1156,7 @@ public class Client {
           call.error.fillInStackTrace();
           throw call.error;
         } else { // local exception
-          InetSocketAddress address = remoteId.getAddress();
+          InetSocketAddress address = connection.getRemoteAddress();
           throw NetUtils.wrapException(address.getHostName(),
                   address.getPort(),
                   NetUtils.getHostname(),

+ 9 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java

@@ -344,8 +344,8 @@ public class NetUtils {
   /**
    * Returns InetSocketAddress that a client can use to 
    * connect to the server. Server.getListenerAddress() is not correct when
-   * the server binds to "0.0.0.0". This returns "127.0.0.1:port" when
-   * the getListenerAddress() returns "0.0.0.0:port".
+   * the server binds to "0.0.0.0". This returns "hostname:port" of the server,
+   * or "127.0.0.1:port" when the getListenerAddress() returns "0.0.0.0:port".
    * 
    * @param server
    * @return socket address that a client can use to connect to the server.
@@ -353,7 +353,12 @@ public class NetUtils {
   public static InetSocketAddress getConnectAddress(Server server) {
     InetSocketAddress addr = server.getListenerAddress();
     if (addr.getAddress().isAnyLocalAddress()) {
-      addr = createSocketAddrForHost("127.0.0.1", addr.getPort());
+      try {
+        addr = new InetSocketAddress(InetAddress.getLocalHost(), addr.getPort());
+      } catch (UnknownHostException uhe) {
+        // shouldn't get here unless the host doesn't have a loopback iface
+        addr = createSocketAddrForHost("127.0.0.1", addr.getPort());
+      }
     }
     return addr;
   }
@@ -655,7 +660,7 @@ public class NetUtils {
     }
     InetAddress addr = null;
     try {
-      addr = InetAddress.getByName(host);
+      addr = SecurityUtil.getByName(host);
       if (NetworkInterface.getByInetAddress(addr) == null) {
         addr = null; // Not a local address
       }

+ 2 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java

@@ -662,8 +662,8 @@ public class NetworkTopology {
    */
   public void pseudoSortByDistance( Node reader, Node[] nodes ) {
     int tempIndex = 0;
+    int localRackNode = -1;
     if (reader != null ) {
-      int localRackNode = -1;
       //scan the array to find the local node & local rack node
       for(int i=0; i<nodes.length; i++) {
         if(tempIndex == 0 && reader == nodes[i]) { //local node
@@ -693,7 +693,7 @@ public class NetworkTopology {
     }
     
     // put a random node at position 0 if it is not a local/local-rack node
-    if(tempIndex == 0 && nodes.length != 0) {
+    if(tempIndex == 0 && localRackNode == -1 && nodes.length != 0) {
       swap(nodes, 0, r.nextInt(nodes.length));
     }
   }

+ 16 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.ipc;
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.ConnectException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
@@ -41,6 +42,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.ipc.Client.ConnectionId;
+import org.apache.hadoop.ipc.TestSaslRPC.TestSaslImpl;
+import org.apache.hadoop.ipc.TestSaslRPC.TestSaslProtocol;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.authorize.AuthorizationException;
@@ -542,6 +545,19 @@ public class TestRPC {
     }
   }
   
+  @Test
+  public void testServerAddress() throws IOException {
+    Server server = RPC.getServer(TestProtocol.class,
+        new TestImpl(), ADDRESS, 0, 5, true, conf, null);
+    InetSocketAddress bindAddr = null;
+    try {
+      bindAddr = NetUtils.getConnectAddress(server);
+    } finally {
+      server.stop();
+    }
+    assertEquals(bindAddr.getAddress(), InetAddress.getLocalHost());
+  }
+  
   @Test
   public void testAuthorization() throws Exception {
     Configuration conf = new Configuration();

+ 14 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -370,6 +370,12 @@ Release 2.0.0 - UNRELEASED
     HDFS-3179.  Improve the exception message thrown by DataStreamer when 
     it failed to add a datanode.  (szetszwo)
 
+    HDFS-2983. Relax the build version check to permit rolling upgrades within
+    a release. (atm)
+
+    HDFS-3259. NameNode#initializeSharedEdits should populate shared edits dir
+    with edit log segments. (atm)
+
   OPTIMIZATIONS
 
     HDFS-3024. Improve performance of stringification in addStoredBlock (todd)
@@ -495,6 +501,14 @@ Release 2.0.0 - UNRELEASED
 
     HDFS-2696. Fix the fuse-fds build. (Bruno Mahé via eli)
 
+    HDFS-3260. TestDatanodeRegistration should set minimum DN version in
+    addition to minimum NN version. (atm)
+
+    HDFS-3255. HA DFS returns wrong token service (Daryn Sharp via todd)
+
+    HDFS-3256. HDFS considers blocks under-replicated if topology script is
+    configured with only 1 rack. (atm)
+
   BREAKDOWN OF HDFS-1623 SUBTASKS
 
     HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd)

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -146,6 +146,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_DEFAULT = 2;
   public static final String  DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY = "dfs.namenode.num.extra.edits.retained";
   public static final int     DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_DEFAULT = 1000000; //1M
+  public static final String  DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY = "dfs.namenode.min.supported.datanode.version";
+  public static final String  DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT = "3.0.0";
 
   public static final String  DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY = "dfs.namenode.edits.dir.minimum";
   public static final int     DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT = 1;
@@ -265,6 +267,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_DATANODE_IPC_ADDRESS_KEY = "dfs.datanode.ipc.address";
   public static final int     DFS_DATANODE_IPC_DEFAULT_PORT = 50020;
   public static final String  DFS_DATANODE_IPC_ADDRESS_DEFAULT = "0.0.0.0" + DFS_DATANODE_IPC_DEFAULT_PORT;
+  public static final String  DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY = "dfs.datanode.min.supported.namenode.version";
+  public static final String  DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT = "3.0.0";
 
   public static final String  DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY = "dfs.block.access.token.enable";
   public static final boolean DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT = false;

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -848,8 +848,9 @@ public class DistributedFileSystem extends FileSystem {
    */
   @Override
   public String getCanonicalServiceName() {
-    if (HAUtil.isLogicalUri(getConf(), getUri())) {
-      return getUri().getHost();
+    URI uri = getUri();
+    if (HAUtil.isLogicalUri(getConf(), uri)) {
+      return HAUtil.buildTokenServiceForLogicalUri(uri).toString();
     } else {
       return super.getCanonicalServiceName();
     }

+ 6 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -386,7 +386,7 @@ public class PBHelper {
     StorageInfoProto storage = info.getStorageInfo();
     return new NamespaceInfo(storage.getNamespceID(), storage.getClusterID(),
         info.getBlockPoolID(), storage.getCTime(), info.getDistUpgradeVersion(),
-        info.getBuildVersion());
+        info.getBuildVersion(), info.getSoftwareVersion());
   }
 
   public static NamenodeCommand convert(NamenodeCommandProto cmd) {
@@ -612,13 +612,14 @@ public class PBHelper {
         .newBuilder();
     return builder.setDatanodeID(PBHelper.convert((DatanodeID) registration))
         .setStorageInfo(PBHelper.convert(registration.getStorageInfo()))
-        .setKeys(PBHelper.convert(registration.getExportedKeys())).build();
+        .setKeys(PBHelper.convert(registration.getExportedKeys()))
+        .setSoftwareVersion(registration.getSoftwareVersion()).build();
   }
 
   public static DatanodeRegistration convert(DatanodeRegistrationProto proto) {
     return new DatanodeRegistration(PBHelper.convert(proto.getDatanodeID()),
         PBHelper.convert(proto.getStorageInfo()), PBHelper.convert(proto
-            .getKeys()));
+            .getKeys()), proto.getSoftwareVersion());
   }
 
   public static DatanodeCommand convert(DatanodeCommandProto proto) {
@@ -894,7 +895,8 @@ public class PBHelper {
         .setBlockPoolID(info.getBlockPoolID())
         .setBuildVersion(info.getBuildVersion())
         .setDistUpgradeVersion(info.getDistributedUpgradeVersion())
-        .setStorageInfo(PBHelper.convert((StorageInfo)info)).build();
+        .setStorageInfo(PBHelper.convert((StorageInfo)info))
+        .setSoftwareVersion(info.getSoftwareVersion()).build();
   }
   
   // Located Block Arrays and Lists

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -247,8 +247,7 @@ public class BlockManager {
 
     this.maxReplicationStreams = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
                                              DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
-    this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null ? false
-                                                                             : true;
+    this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) != null;
     
     this.replicationRecheckInterval = 
       conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
@@ -2829,7 +2828,9 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
       DatanodeDescriptor cur = it.next();
       if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
         if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
-          if (numExpectedReplicas == 1) {
+          if (numExpectedReplicas == 1 ||
+              (numExpectedReplicas > 1 &&
+                  !datanodeManager.hasClusterEverBeenMultiRack())) {
             enoughRacks = true;
             break;
           }

+ 44 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -71,6 +71,7 @@ import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.net.InetAddresses;
 
 /**
@@ -126,6 +127,12 @@ public class DatanodeManager {
   /** Ask Datanode only up to this many blocks to delete. */
   final int blockInvalidateLimit;
   
+  /**
+   * Whether or not this cluster has ever consisted of more than 1 rack,
+   * according to the NetworkTopology.
+   */
+  private boolean hasClusterEverBeenMultiRack = false;
+  
   DatanodeManager(final BlockManager blockManager,
       final Namesystem namesystem, final Configuration conf
       ) throws IOException {
@@ -331,6 +338,7 @@ public class DatanodeManager {
 
     host2DatanodeMap.add(node);
     networktopology.add(node);
+    checkIfClusterIsNowMultiRack(node);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug(getClass().getSimpleName() + ".addDatanode: "
@@ -768,6 +776,42 @@ public class DatanodeManager {
     }
   }
 
+  /**
+   * @return true if this cluster has ever consisted of multiple racks, even if
+   *         it is not now a multi-rack cluster.
+   */
+  boolean hasClusterEverBeenMultiRack() {
+    return hasClusterEverBeenMultiRack;
+  }
+
+  /**
+   * Check if the cluster now consists of multiple racks. If it does, and this
+   * is the first time it's consisted of multiple racks, then process blocks
+   * that may now be misreplicated.
+   * 
+   * @param node DN which caused cluster to become multi-rack. Used for logging.
+   */
+  @VisibleForTesting
+  void checkIfClusterIsNowMultiRack(DatanodeDescriptor node) {
+    if (!hasClusterEverBeenMultiRack && networktopology.getNumOfRacks() > 1) {
+      String message = "DN " + node + " joining cluster has expanded a formerly " +
+          "single-rack cluster to be multi-rack. ";
+      if (namesystem.isPopulatingReplQueues()) {
+        message += "Re-checking all blocks for replication, since they should " +
+            "now be replicated cross-rack";
+        LOG.info(message);
+      } else {
+        message += "Not checking for mis-replicated blocks because this NN is " +
+            "not yet processing repl queues.";
+        LOG.debug(message);
+      }
+      hasClusterEverBeenMultiRack = true;
+      if (namesystem.isPopulatingReplQueues()) {
+        blockManager.processMisReplicatedBlocks();
+      }
+    }
+  }
+
   /**
    * Parse a DatanodeID from a hosts file entry
    * @param hostLine of form [hostname|ip][:port]?

+ 15 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/IncorrectVersionException.java

@@ -32,7 +32,19 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 @InterfaceStability.Evolving
 public class IncorrectVersionException extends IOException {
   private static final long serialVersionUID = 1L;
+  
+  public IncorrectVersionException(String message) {
+    super(message);
+  }
 
+  public IncorrectVersionException(String minimumVersion, String reportedVersion,
+      String remoteDaemon, String thisDaemon) {
+    this("The reported " + remoteDaemon + " version is too low to communicate" +
+        " with this " + thisDaemon + ". " + remoteDaemon + " version: '" +
+        reportedVersion + "' Minimum " + remoteDaemon + " version: '" +
+        minimumVersion + "'");
+  }
+  
   public IncorrectVersionException(int versionReported, String ofWhat) {
     this(versionReported, ofWhat, HdfsConstants.LAYOUT_VERSION);
   }
@@ -40,16 +52,9 @@ public class IncorrectVersionException extends IOException {
   public IncorrectVersionException(int versionReported,
                                    String ofWhat,
                                    int versionExpected) {
-    super("Unexpected version " 
-          + (ofWhat==null ? "" : "of " + ofWhat) + ". Reported: "
-          + versionReported + ". Expecting = " + versionExpected + ".");
+    this("Unexpected version " 
+        + (ofWhat==null ? "" : "of " + ofWhat) + ". Reported: "
+        + versionReported + ". Expecting = " + versionExpected + ".");
   }
 
-  public IncorrectVersionException(String versionReported,
-                                   String ofWhat,
-                                   String versionExpected) {
-    super("Unexpected version " 
-          + (ofWhat==null ? "" : "of " + ofWhat) + ". Reported: "
-          + versionReported + ". Expecting = " + versionExpected + ".");
-  }
 }

+ 16 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

@@ -37,7 +37,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
-import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -49,9 +48,11 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.util.VersionUtil;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.VersionInfo;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
@@ -178,17 +179,23 @@ class BPServiceActor implements Runnable {
   private void checkNNVersion(NamespaceInfo nsInfo)
       throws IncorrectVersionException {
     // build and layout versions should match
-    String nsBuildVer = nsInfo.getBuildVersion();
-    String stBuildVer = Storage.getBuildVersion();
-    if (!nsBuildVer.equals(stBuildVer)) {
-      LOG.warn("Data-node and name-node Build versions must be the same. " +
-        "Namenode build version: " + nsBuildVer + "Datanode " +
-        "build version: " + stBuildVer);
-      throw new IncorrectVersionException(nsBuildVer, "namenode", stBuildVer);
+    String nnVersion = nsInfo.getSoftwareVersion();
+    String minimumNameNodeVersion = dnConf.getMinimumNameNodeVersion();
+    if (VersionUtil.compareVersions(nnVersion, minimumNameNodeVersion) < 0) {
+      IncorrectVersionException ive = new IncorrectVersionException(
+          minimumNameNodeVersion, nnVersion, "NameNode", "DataNode");
+      LOG.warn(ive.getMessage());
+      throw ive;
+    }
+    String dnVersion = VersionInfo.getVersion();
+    if (!nnVersion.equals(dnVersion)) {
+      LOG.info("Reported NameNode version '" + nnVersion + "' does not match " +
+          "DataNode version '" + dnVersion + "' but is within acceptable " +
+          "limits. Note: This is normal during a rolling upgrade.");
     }
 
     if (HdfsConstants.LAYOUT_VERSION != nsInfo.getLayoutVersion()) {
-      LOG.warn("Data-node and name-node layout versions must be the same." +
+      LOG.warn("DataNode and NameNode layout versions must be the same." +
         " Expected: "+ HdfsConstants.LAYOUT_VERSION +
         " actual "+ nsInfo.getLayoutVersion());
       throw new IncorrectVersionException(

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java

@@ -31,6 +31,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOW
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -58,6 +60,8 @@ class DNConf {
   final long deleteReportInterval;
   final long initialBlockReportDelay;
   final int writePacketSize;
+  
+  final String minimumNameNodeVersion;
 
   public DNConf(Configuration conf) {
     socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
@@ -111,5 +115,12 @@ class DNConf {
     this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, 
         DFS_DATANODE_SYNCONCLOSE_DEFAULT);
 
+    this.minimumNameNodeVersion = conf.get(DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY,
+        DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT);
+  }
+  
+  // We get minimumNameNodeVersion via a method so it can be mocked out in tests.
+  String getMinimumNameNodeVersion() {
+    return this.minimumNameNodeVersion;
   }
 }

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -673,6 +673,7 @@ public class DataNode extends Configured
     bpRegistration.setIpcPort(getIpcPort());
     bpRegistration.setHostName(hostName);
     bpRegistration.setStorageID(getStorageId());
+    bpRegistration.setSoftwareVersion(VersionInfo.getVersion());
 
     StorageInfo storageInfo = storage.getBPStorage(nsInfo.getBlockPoolID());
     if (storageInfo == null) {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java

@@ -242,7 +242,7 @@ public class BackupNode extends NameNode {
      */
     private void verifyJournalRequest(JournalInfo journalInfo)
         throws IOException {
-      verifyVersion(journalInfo.getLayoutVersion());
+      verifyLayoutVersion(journalInfo.getLayoutVersion());
       String errorMsg = null;
       int expectedNamespaceID = namesystem.getNamespaceInfo().getNamespaceID();
       if (journalInfo.getNamespaceId() != expectedNamespaceID) {

+ 7 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

@@ -311,10 +311,12 @@ public class FSEditLog  {
       endCurrentLogSegment(true);
     }
     
-    try {
-      journalSet.close();
-    } catch (IOException ioe) {
-      LOG.warn("Error closing journalSet", ioe);
+    if (!journalSet.isEmpty()) {
+      try {
+        journalSet.close();
+      } catch (IOException ioe) {
+        LOG.warn("Error closing journalSet", ioe);
+      }
     }
 
     state = State.CLOSED;
@@ -813,9 +815,8 @@ public class FSEditLog  {
   }
   
   /**
-   * Used only by unit tests.
+   * Get all the journals this edit log is currently operating on.
    */
-  @VisibleForTesting
   synchronized List<JournalAndStream> getJournals() {
     return journalSet.getAllJournalStreams();
   }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java

@@ -344,7 +344,7 @@ class FileJournalManager implements JournalManager {
     }
   }
 
-  private List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
+  List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
     File currentDir = sd.getCurrentDir();
     List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
     List<EditLogFile> logFiles = Lists.newArrayList();

+ 63 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -18,14 +18,17 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -41,7 +44,6 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Trash;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -49,6 +51,9 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
+import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
 import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
@@ -61,6 +66,8 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
@@ -749,9 +756,10 @@ public class NameNode {
       boolean force) {
     return initializeSharedEdits(conf, force, false);
   }
-  
+
   /**
-   * Format a new shared edits dir.
+   * Format a new shared edits dir and copy in enough edit log segments so that
+   * the standby NN can start up.
    * 
    * @param conf configuration
    * @param force format regardless of whether or not the shared edits dir exists
@@ -785,8 +793,19 @@ public class NameNode {
           existingStorage.getBlockPoolID(),
           existingStorage.getCTime(),
           existingStorage.getDistributedUpgradeVersion()));
-    } catch (Exception e) {
-      LOG.error("Could not format shared edits dir", e);
+      
+      // Need to make sure the edit log segments are in good shape to initialize
+      // the shared edits dir.
+      fsns.getFSImage().getEditLog().close();
+      fsns.getFSImage().getEditLog().initJournalsForWrite();
+      fsns.getFSImage().getEditLog().recoverUnclosedStreams();
+      
+      if (copyEditLogSegmentsToSharedDir(fsns, sharedEditsDirs,
+          newSharedStorage, conf)) {
+        return true; // aborted
+      }
+    } catch (IOException ioe) {
+      LOG.error("Could not initialize shared edits dir", ioe);
       return true; // aborted
     } finally {
       // Have to unlock storage explicitly for the case when we're running in a
@@ -802,6 +821,44 @@ public class NameNode {
     }
     return false; // did not abort
   }
+  
+  private static boolean copyEditLogSegmentsToSharedDir(FSNamesystem fsns,
+      Collection<URI> sharedEditsDirs, NNStorage newSharedStorage,
+      Configuration conf) throws FileNotFoundException, IOException {
+    // Copy edit log segments into the new shared edits dir.
+    for (JournalAndStream jas : fsns.getFSImage().getEditLog().getJournals()) {
+      FileJournalManager fjm = null;
+      if (!(jas.getManager() instanceof FileJournalManager)) {
+        LOG.error("Cannot populate shared edits dir from non-file " +
+            "journal manager: " + jas.getManager());
+        return true; // aborted
+      } else {
+        fjm = (FileJournalManager) jas.getManager();
+      }
+      for (EditLogFile elf : fjm.getLogFiles(fsns.getFSImage()
+          .getMostRecentCheckpointTxId())) {
+        File editLogSegment = elf.getFile();
+        for (URI sharedEditsUri : sharedEditsDirs) {
+          StorageDirectory sharedEditsDir = newSharedStorage
+              .getStorageDirectory(sharedEditsUri);
+          File targetFile = new File(sharedEditsDir.getCurrentDir(),
+              editLogSegment.getName());
+          if (!targetFile.exists()) {
+            InputStream in = null;
+            OutputStream out = null;
+            try {
+              in = new FileInputStream(editLogSegment);
+              out = new AtomicFileOutputStream(targetFile);
+              IOUtils.copyBytes(in, out, conf);
+            } finally {
+              IOUtils.cleanup(LOG, in, out);
+            }
+          }
+        }
+      }
+    }
+    return false; // did not abort
+  }
 
   private static boolean finalize(Configuration conf,
                                boolean isConfirmationNeeded

+ 45 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.ha.ServiceFailedException;
 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService;
 import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
 import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -107,6 +108,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.hdfs.util.VersionUtil;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -121,6 +123,7 @@ import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.VersionInfo;
 
 import com.google.protobuf.BlockingService;
 
@@ -147,6 +150,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
   /** The RPC server that listens to requests from clients */
   protected final RPC.Server clientRpcServer;
   protected final InetSocketAddress clientRpcAddress;
+  
+  private final String minimumDataNodeVersion;
 
   public NameNodeRpcServer(Configuration conf, NameNode nn)
       throws IOException {
@@ -261,6 +266,10 @@ class NameNodeRpcServer implements NamenodeProtocols {
     // The rpc-server port can be ephemeral... ensure we have the correct info
     this.clientRpcAddress = this.clientRpcServer.getListenerAddress(); 
     nn.setRpcServerAddress(conf, clientRpcAddress);
+    
+    this.minimumDataNodeVersion = conf.get(
+        DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY,
+        DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT);
   }
   
   /**
@@ -326,7 +335,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   @Override // NamenodeProtocol
   public NamenodeRegistration register(NamenodeRegistration registration)
   throws IOException {
-    verifyVersion(registration.getVersion());
+    verifyLayoutVersion(registration.getVersion());
     NamenodeRegistration myRegistration = nn.setRegistration();
     namesystem.registerBackupNode(registration, myRegistration);
     return myRegistration;
@@ -829,9 +838,10 @@ class NameNodeRpcServer implements NamenodeProtocols {
 
 
   @Override // DatanodeProtocol
-  public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg
-      ) throws IOException {
-    verifyVersion(nodeReg.getVersion());
+  public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
+      throws IOException {
+    verifyLayoutVersion(nodeReg.getVersion());
+    verifySoftwareVersion(nodeReg);
     namesystem.registerDatanode(nodeReg);
     return nodeReg;
   }
@@ -916,7 +926,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
    * @throws UnregisteredNodeException if the registration is invalid
    */
   void verifyRequest(NodeRegistration nodeReg) throws IOException {
-    verifyVersion(nodeReg.getVersion());
+    verifyLayoutVersion(nodeReg.getVersion());
     if (!namesystem.getRegistrationID().equals(nodeReg.getRegistrationID())) {
       LOG.warn("Invalid registrationID - expected: "
           + namesystem.getRegistrationID() + " received: "
@@ -989,10 +999,39 @@ class NameNodeRpcServer implements NamenodeProtocols {
    * @param version
    * @throws IOException
    */
-  void verifyVersion(int version) throws IOException {
+  void verifyLayoutVersion(int version) throws IOException {
     if (version != HdfsConstants.LAYOUT_VERSION)
       throw new IncorrectVersionException(version, "data node");
   }
+  
+  private void verifySoftwareVersion(DatanodeRegistration dnReg)
+      throws IncorrectVersionException {
+    String dnVersion = dnReg.getSoftwareVersion();
+    if (VersionUtil.compareVersions(dnVersion, minimumDataNodeVersion) < 0) {
+      IncorrectVersionException ive = new IncorrectVersionException(
+          minimumDataNodeVersion, dnVersion, "DataNode", "NameNode");
+      LOG.warn(ive.getMessage() + " DN: " + dnReg);
+      throw ive;
+    }
+    String nnVersion = VersionInfo.getVersion();
+    if (!dnVersion.equals(nnVersion)) {
+      String messagePrefix = "Reported DataNode version '" + dnVersion +
+          "' of DN " + dnReg + " does not match NameNode version '" +
+          nnVersion + "'";
+      long nnCTime = nn.getFSImage().getStorage().getCTime();
+      long dnCTime = dnReg.getStorageInfo().getCTime();
+      if (nnCTime != dnCTime) {
+        IncorrectVersionException ive = new IncorrectVersionException(
+            messagePrefix + " and CTime of DN ('" + dnCTime +
+            "') does not match CTime of NN ('" + nnCTime + "')");
+        LOG.warn(ive);
+        throw ive;
+      } else {
+        LOG.info(messagePrefix +
+            ". Note: This is normal during a rolling upgrade.");
+      }
+    }
+  }
 
   private static String getClientMachine() {
     String clientMachine = NamenodeWebHdfsMethods.getRemoteAddress();

+ 11 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java

@@ -37,12 +37,14 @@ public class DatanodeRegistration extends DatanodeID
 
   private StorageInfo storageInfo;
   private ExportedBlockKeys exportedKeys;
+  private String softwareVersion;
 
   public DatanodeRegistration(DatanodeID dn, StorageInfo info,
-      ExportedBlockKeys keys) {
+      ExportedBlockKeys keys, String softwareVersion) {
     super(dn);
     this.storageInfo = info;
     this.exportedKeys = keys;
+    this.softwareVersion = softwareVersion;
   }
 
   public DatanodeRegistration(String ipAddr, int xferPort) {
@@ -71,6 +73,14 @@ public class DatanodeRegistration extends DatanodeID
   public ExportedBlockKeys getExportedKeys() {
     return exportedKeys;
   }
+  
+  public void setSoftwareVersion(String softwareVersion) {
+    this.softwareVersion = softwareVersion;
+  }
+  
+  public String getSoftwareVersion() {
+    return softwareVersion;
+  }
 
   @Override // NodeRegistration
   public int getVersion() {

+ 10 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage;
+import org.apache.hadoop.util.VersionInfo;
 
 /**
  * NamespaceInfo is returned by the name-node in reply 
@@ -38,6 +39,7 @@ public class NamespaceInfo extends StorageInfo {
   String  buildVersion;
   int distributedUpgradeVersion;
   String blockPoolID = "";    // id of the block pool
+  String softwareVersion;
 
   public NamespaceInfo() {
     super();
@@ -45,16 +47,18 @@ public class NamespaceInfo extends StorageInfo {
   }
 
   public NamespaceInfo(int nsID, String clusterID, String bpID,
-      long cT, int duVersion, String buildVersion) {
+      long cT, int duVersion, String buildVersion, String softwareVersion) {
     super(HdfsConstants.LAYOUT_VERSION, nsID, clusterID, cT);
     blockPoolID = bpID;
     this.buildVersion = buildVersion;
     this.distributedUpgradeVersion = duVersion;
+    this.softwareVersion = softwareVersion;
   }
 
   public NamespaceInfo(int nsID, String clusterID, String bpID, 
       long cT, int duVersion) {
-    this(nsID, clusterID, bpID, cT, duVersion, Storage.getBuildVersion());
+    this(nsID, clusterID, bpID, cT, duVersion, Storage.getBuildVersion(),
+        VersionInfo.getVersion());
   }
   
   public String getBuildVersion() {
@@ -68,6 +72,10 @@ public class NamespaceInfo extends StorageInfo {
   public String getBlockPoolID() {
     return blockPoolID;
   }
+  
+  public String getSoftwareVersion() {
+    return softwareVersion;
+  }
 
   public String toString(){
     return super.toString() + ";bpid=" + blockPoolID;

+ 101 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/VersionUtil.java

@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public abstract class VersionUtil {
+  
+  private static final Pattern COMPONENT_GROUPS = Pattern.compile("(\\d+)|(\\D+)");
+
+  /**
+   * This function splits the two versions on &quot;.&quot; and performs a
+   * naturally-ordered comparison of the resulting components. For example, the
+   * version string "0.3" is considered to precede "0.20", despite the fact that
+   * lexical comparison would consider "0.20" to precede "0.3". This method of
+   * comparison is similar to the method used by package versioning systems like
+   * deb and RPM.
+   * 
+   * Version components are compared numerically whenever possible, however a
+   * version component can contain non-numeric characters. When a non-numeric
+   * group of characters is found in a version component, this group is compared
+   * with the similarly-indexed group in the other version component. If the
+   * other group is numeric, then the numeric group is considered to precede the
+   * non-numeric group. If both groups are non-numeric, then a lexical
+   * comparison is performed.
+   * 
+   * If two versions have a different number of components, then only the lower
+   * number of components are compared. If those components are identical
+   * between the two versions, then the version with fewer components is
+   * considered to precede the version with more components.
+   * 
+   * This function returns a negative integer if version1 precedes version2, a
+   * positive integer if version2 precedes version1, and 0 if and only if the
+   * two versions' components are identical in value and cardinality.
+   * 
+   * @param version1
+   *          the first version to compare
+   * @param version2
+   *          the second version to compare
+   * @return a negative integer if version1 precedes version2, a positive
+   *         integer if version2 precedes version1, and 0 if and only if the two
+   *         versions are equal.
+   */
+  public static int compareVersions(String version1, String version2) {
+    String[] version1Parts = version1.split("\\.");
+    String[] version2Parts = version2.split("\\.");
+    
+    for (int i = 0; i < version1Parts.length && i < version2Parts.length; i++) {
+      String component1 = version1Parts[i];
+      String component2 = version2Parts[i];
+      if (!component1.equals(component2)) {
+        Matcher matcher1 = COMPONENT_GROUPS.matcher(component1);
+        Matcher matcher2 = COMPONENT_GROUPS.matcher(component2);
+        
+        while (matcher1.find() && matcher2.find()) {
+          String group1 = matcher1.group();
+          String group2 = matcher2.group();
+          if (!group1.equals(group2)) {
+            if (isNumeric(group1) && isNumeric(group2)) {
+              return Integer.parseInt(group1) - Integer.parseInt(group2);
+            } else if (!isNumeric(group1) && !isNumeric(group2)) {
+              return group1.compareTo(group2);
+            } else {
+              return isNumeric(group1) ? -1 : 1;
+            }
+          }
+        }
+        return component1.length() - component2.length();
+      }
+    }
+    return version1Parts.length - version2Parts.length;
+  }
+  
+  private static boolean isNumeric(String s) {
+    try {
+      Integer.parseInt(s);
+      return true;
+    } catch (NumberFormatException nfe) {
+      return false;
+    }
+  }
+}

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto

@@ -33,6 +33,7 @@ message DatanodeRegistrationProto {
   required DatanodeIDProto datanodeID = 1;    // Datanode information
   required StorageInfoProto storageInfo = 2;  // Node information
   required ExportedBlockKeysProto keys = 3;   // Block keys
+  required string softwareVersion = 4;        // Software version of the DN, e.g. "2.0.0"
 }
 
 /**

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto

@@ -303,10 +303,11 @@ message RemoteEditLogManifestProto {
  * Namespace information that describes namespace on a namenode
  */
 message NamespaceInfoProto {
-  required string buildVersion = 1;         // Software build version
+  required string buildVersion = 1;         // Software revision version (e.g. an svn or git revision)
   required uint32 distUpgradeVersion = 2;   // Distributed upgrade version
   required string blockPoolID = 3;          // block pool used by the namespace
-  required StorageInfoProto storageInfo = 4;// Noe information
+  required StorageInfoProto storageInfo = 4;// Node information
+  required string softwareVersion = 5;      // Software version number (e.g. 2.0.0)
 }
 
 /**

+ 118 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java

@@ -17,24 +17,40 @@
  */
 package org.apache.hadoop.hdfs;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.*;
+
 import java.net.InetSocketAddress;
 
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.DFSClient;
-import junit.framework.TestCase;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.VersionInfo;
+import org.junit.Test;
 
 /**
  * This class tests that a file need not be closed before its
  * data can be read by another client.
  */
-public class TestDatanodeRegistration extends TestCase {
+public class TestDatanodeRegistration {
+  
+  public static final Log LOG = LogFactory.getLog(TestDatanodeRegistration.class);
 
   /**
    * Regression test for HDFS-894 ensures that, when datanodes
    * are restarted, the new IPC port is registered with the
    * namenode.
    */
+  @Test
   public void testChangeIpcPort() throws Exception {
     HdfsConfiguration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = null;
@@ -74,4 +90,102 @@ public class TestDatanodeRegistration extends TestCase {
       }
     }
   }
+  
+  @Test
+  public void testRegistrationWithDifferentSoftwareVersions() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY, "3.0.0");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY, "3.0.0");
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf)
+          .numDataNodes(0)
+          .build();
+      
+      NamenodeProtocols rpcServer = cluster.getNameNodeRpc();
+      
+      long nnCTime = cluster.getNamesystem().getFSImage().getStorage().getCTime();
+      StorageInfo mockStorageInfo = mock(StorageInfo.class);
+      doReturn(nnCTime).when(mockStorageInfo).getCTime();
+      
+      DatanodeRegistration mockDnReg = mock(DatanodeRegistration.class);
+      doReturn(HdfsConstants.LAYOUT_VERSION).when(mockDnReg).getVersion();
+      doReturn("fake-storage-id").when(mockDnReg).getStorageID();
+      doReturn(mockStorageInfo).when(mockDnReg).getStorageInfo();
+      
+      // Should succeed when software versions are the same.
+      doReturn("3.0.0").when(mockDnReg).getSoftwareVersion();
+      rpcServer.registerDatanode(mockDnReg);
+      
+      // Should succeed when software version of DN is above minimum required by NN.
+      doReturn("4.0.0").when(mockDnReg).getSoftwareVersion();
+      rpcServer.registerDatanode(mockDnReg);
+      
+      // Should fail when software version of DN is below minimum required by NN.
+      doReturn("2.0.0").when(mockDnReg).getSoftwareVersion();
+      try {
+        rpcServer.registerDatanode(mockDnReg);
+        fail("Should not have been able to register DN with too-low version.");
+      } catch (IncorrectVersionException ive) {
+        GenericTestUtils.assertExceptionContains(
+            "The reported DataNode version is too low", ive);
+        LOG.info("Got expected exception", ive);
+      }
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  @Test
+  public void testRegistrationWithDifferentSoftwareVersionsDuringUpgrade()
+      throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY, "1.0.0");
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf)
+          .numDataNodes(0)
+          .build();
+      
+      NamenodeProtocols rpcServer = cluster.getNameNodeRpc();
+      
+      long nnCTime = cluster.getNamesystem().getFSImage().getStorage().getCTime();
+      StorageInfo mockStorageInfo = mock(StorageInfo.class);
+      doReturn(nnCTime).when(mockStorageInfo).getCTime();
+      
+      DatanodeRegistration mockDnReg = mock(DatanodeRegistration.class);
+      doReturn(HdfsConstants.LAYOUT_VERSION).when(mockDnReg).getVersion();
+      doReturn("fake-storage-id").when(mockDnReg).getStorageID();
+      doReturn(mockStorageInfo).when(mockDnReg).getStorageInfo();
+      
+      // Should succeed when software versions are the same and CTimes are the
+      // same.
+      doReturn(VersionInfo.getVersion()).when(mockDnReg).getSoftwareVersion();
+      rpcServer.registerDatanode(mockDnReg);
+      
+      // Should succeed when software versions are the same and CTimes are
+      // different.
+      doReturn(nnCTime + 1).when(mockStorageInfo).getCTime();
+      rpcServer.registerDatanode(mockDnReg);
+      
+      // Should fail when software version of DN is different from NN and CTimes
+      // are different.
+      doReturn(VersionInfo.getVersion() + ".1").when(mockDnReg).getSoftwareVersion();
+      try {
+        rpcServer.registerDatanode(mockDnReg);
+        fail("Should not have been able to register DN with different software" +
+            " versions and CTimes");
+      } catch (IncorrectVersionException ive) {
+        GenericTestUtils.assertExceptionContains(
+            "does not match CTime of NN", ive);
+        LOG.info("Got expected exception", ive);
+      }
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java

@@ -429,12 +429,13 @@ public class TestPBHelper {
     ExportedBlockKeys expKeys = new ExportedBlockKeys(true, 9, 10,
         getBlockKey(1), keys);
     DatanodeRegistration reg = new DatanodeRegistration(dnId,
-        new StorageInfo(), expKeys);
+        new StorageInfo(), expKeys, "3.0.0");
     DatanodeRegistrationProto proto = PBHelper.convert(reg);
     DatanodeRegistration reg2 = PBHelper.convert(proto);
     compare(reg.getStorageInfo(), reg2.getStorageInfo());
     compare(reg.getExportedKeys(), reg2.getExportedKeys());
     compare((DatanodeID)reg, (DatanodeID)reg2);
+    assertEquals(reg.getSoftwareVersion(), reg2.getSoftwareVersion());
   }
   
   @Test

+ 27 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java

@@ -92,6 +92,7 @@ public class TestBlockManager {
       dn.updateHeartbeat(
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+      bm.getDatanodeManager().checkIfClusterIsNowMultiRack(dn);
     }
   }
 
@@ -310,6 +311,32 @@ public class TestBlockManager {
         rackB.contains(pipeline[1]));
   }
   
+  @Test
+  public void testBlocksAreNotUnderreplicatedInSingleRack() throws Exception {
+    List<DatanodeDescriptor> nodes = ImmutableList.of( 
+        new DatanodeDescriptor(new DatanodeID("h1", 5020), "/rackA"),
+        new DatanodeDescriptor(new DatanodeID("h2", 5020), "/rackA"),
+        new DatanodeDescriptor(new DatanodeID("h3", 5020), "/rackA"),
+        new DatanodeDescriptor(new DatanodeID("h4", 5020), "/rackA"),
+        new DatanodeDescriptor(new DatanodeID("h5", 5020), "/rackA"),
+        new DatanodeDescriptor(new DatanodeID("h6", 5020), "/rackA")
+      );
+    addNodes(nodes);
+    List<DatanodeDescriptor> origNodes = nodes.subList(0, 3);;
+    for (int i = 0; i < NUM_TEST_ITERS; i++) {
+      doTestSingleRackClusterIsSufficientlyReplicated(i, origNodes);
+    }
+  }
+  
+  private void doTestSingleRackClusterIsSufficientlyReplicated(int testIndex,
+      List<DatanodeDescriptor> origNodes)
+      throws Exception {
+    assertEquals(0, bm.numOfUnderReplicatedBlocks());
+    addBlockOnNodes((long)testIndex, origNodes);
+    bm.processMisReplicatedBlocks();
+    assertEquals(0, bm.numOfUnderReplicatedBlocks());
+  }
+  
   
   /**
    * Tell the block manager that replication is completed for the given

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java

@@ -97,7 +97,7 @@ public class TestBlocksWithNotEnoughRacks {
       final FileSystem fs = cluster.getFileSystem();
       DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
       ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
-      DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
+      DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0);
 
       // Add a new datanode on a different rack
       String newRacks[] = {"/rack2"};
@@ -165,7 +165,7 @@ public class TestBlocksWithNotEnoughRacks {
       final FileSystem fs = cluster.getFileSystem();
       DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
       ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
-      DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
+      DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0);
       
       // Add new datanodes on a different rack and increase the
       // replication factor so the block is underreplicated and make

+ 79 - 22
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java

@@ -18,48 +18,105 @@
 
 package org.apache.hadoop.hdfs.server.datanode;
 
-import java.net.InetSocketAddress;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.VersionInfo;
+import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
-
 
 public class TestDatanodeRegister { 
   public static final Log LOG = LogFactory.getLog(TestDatanodeRegister.class);
 
   // Invalid address
-  static final InetSocketAddress INVALID_ADDR =
+  private static final InetSocketAddress INVALID_ADDR =
     new InetSocketAddress("127.0.0.1", 1);
-
-  @Test
-  public void testDataNodeRegister() throws Exception {
+  
+  private BPServiceActor actor;
+  NamespaceInfo fakeNsInfo;
+  DNConf mockDnConf;
+  
+  @Before
+  public void setUp() throws IOException {
+    mockDnConf = mock(DNConf.class);
+    doReturn(VersionInfo.getVersion()).when(mockDnConf).getMinimumNameNodeVersion();
+    
     DataNode mockDN = mock(DataNode.class);
-    Mockito.doReturn(true).when(mockDN).shouldRun();
+    doReturn(true).when(mockDN).shouldRun();
+    doReturn(mockDnConf).when(mockDN).getDnConf();
     
-    BPOfferService mockBPOS = Mockito.mock(BPOfferService.class);
-    Mockito.doReturn(mockDN).when(mockBPOS).getDataNode();
+    BPOfferService mockBPOS = mock(BPOfferService.class);
+    doReturn(mockDN).when(mockBPOS).getDataNode();
     
-    BPServiceActor actor = new BPServiceActor(INVALID_ADDR, mockBPOS);
+    actor = new BPServiceActor(INVALID_ADDR, mockBPOS);
 
-    NamespaceInfo fakeNSInfo = mock(NamespaceInfo.class);
-    when(fakeNSInfo.getBuildVersion()).thenReturn("NSBuildVersion");
-    DatanodeProtocolClientSideTranslatorPB fakeDNProt = 
+    fakeNsInfo = mock(NamespaceInfo.class);
+    // Return a a good software version.
+    doReturn(VersionInfo.getVersion()).when(fakeNsInfo).getSoftwareVersion();
+    // Return a good layout version for now.
+    doReturn(HdfsConstants.LAYOUT_VERSION).when(fakeNsInfo).getLayoutVersion();
+    
+    DatanodeProtocolClientSideTranslatorPB fakeDnProt = 
         mock(DatanodeProtocolClientSideTranslatorPB.class);
-    when(fakeDNProt.versionRequest()).thenReturn(fakeNSInfo);
+    when(fakeDnProt.versionRequest()).thenReturn(fakeNsInfo);
+    actor.setNameNode(fakeDnProt);
+  }
 
-    actor.setNameNode( fakeDNProt );
-    try {   
+  @Test
+  public void testSoftwareVersionDifferences() throws Exception {
+    // We expect no exception to be thrown when the software versions match.
+    assertEquals(VersionInfo.getVersion(),
+        actor.retrieveNamespaceInfo().getSoftwareVersion());
+    
+    // We expect no exception to be thrown when the min NN version is below the
+    // reported NN version.
+    doReturn("4.0.0").when(fakeNsInfo).getSoftwareVersion();
+    doReturn("3.0.0").when(mockDnConf).getMinimumNameNodeVersion();
+    assertEquals("4.0.0", actor.retrieveNamespaceInfo().getSoftwareVersion());
+    
+    // When the NN reports a version that's too low, throw an exception.
+    doReturn("3.0.0").when(fakeNsInfo).getSoftwareVersion();
+    doReturn("4.0.0").when(mockDnConf).getMinimumNameNodeVersion();
+    try {
+      actor.retrieveNamespaceInfo();
+      fail("Should have thrown an exception for NN with too-low version");
+    } catch (IncorrectVersionException ive) {
+      GenericTestUtils.assertExceptionContains(
+          "The reported NameNode version is too low", ive);
+      LOG.info("Got expected exception", ive);
+    }
+  }
+  
+  @Test
+  public void testDifferentLayoutVersions() throws Exception {
+    // We expect no exceptions to be thrown when the layout versions match.
+    assertEquals(HdfsConstants.LAYOUT_VERSION,
+        actor.retrieveNamespaceInfo().getLayoutVersion());
+    
+    // We expect an exception to be thrown when the NN reports a layout version
+    // different from that of the DN.
+    doReturn(HdfsConstants.LAYOUT_VERSION * 1000).when(fakeNsInfo)
+        .getLayoutVersion();
+    try {
       actor.retrieveNamespaceInfo();
-      fail("register() did not throw exception! " +
-           "Expected: IncorrectVersionException");
-    } catch (IncorrectVersionException ie) {
-      LOG.info("register() returned correct Exception: IncorrectVersionException");
+      fail("Should have failed to retrieve NS info from DN with bad layout version");
+    } catch (IncorrectVersionException ive) {
+      GenericTestUtils.assertExceptionContains(
+          "Unexpected version of namenode", ive);
+      LOG.info("Got expected exception", ive);
     }
   }
 }

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java

@@ -58,6 +58,7 @@ import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.VersionInfo;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 
@@ -783,6 +784,7 @@ public class NNThroughputBenchmark {
       String hostName = DNS.getDefaultHost("default", "default");
       dnRegistration = new DatanodeRegistration(ipAddr, getNodePort(dnIdx));
       dnRegistration.setHostName(hostName);
+      dnRegistration.setSoftwareVersion(VersionInfo.getVersion());
       this.blocks = new ArrayList<Block>(blockCapacity);
       this.nrBlocks = 0;
     }

+ 11 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java

@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -196,8 +197,7 @@ public class TestDelegationTokensWithHA {
     // check that the token selected for one of the physical IPC addresses
     // matches the one we received
     InetSocketAddress addr = nn0.getNameNodeAddress();
-    Text ipcDtService = new Text(
-        addr.getAddress().getHostAddress() + ":" + addr.getPort());
+    Text ipcDtService = SecurityUtil.buildTokenService(addr);
     Token<DelegationTokenIdentifier> token2 =
         DelegationTokenSelector.selectHdfsDelegationToken(ipcDtService, ugi);
     assertNotNull(token2);
@@ -212,8 +212,15 @@ public class TestDelegationTokensWithHA {
    */
   @Test
   public void testDFSGetCanonicalServiceName() throws Exception {
-    assertEquals(fs.getCanonicalServiceName(), 
-        HATestUtil.getLogicalUri(cluster).getHost());
+    URI hAUri = HATestUtil.getLogicalUri(cluster);
+    String haService = HAUtil.buildTokenServiceForLogicalUri(hAUri).toString();
+    assertEquals(haService, dfs.getCanonicalServiceName());
+    Token<?> token = dfs.getDelegationToken(
+        UserGroupInformation.getCurrentUser().getShortUserName());
+    assertEquals(haService, token.getService().toString());
+    // make sure the logical uri is handled correctly
+    token.renew(dfs.getConf());
+    token.cancel(dfs.getConf());
   }
   
   enum TokenTestAction {

+ 54 - 17
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java

@@ -19,17 +19,22 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URISyntaxException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ha.ServiceFailedException;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -48,7 +53,10 @@ public class TestInitializeSharedEdits {
   @Before
   public void setupCluster() throws IOException {
     conf = new Configuration();
-
+    conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+    HAUtil.setAllowStandbyReads(conf, true);
+    
     MiniDFSNNTopology topology = MiniDFSNNTopology.simpleHATopology();
     
     cluster = new MiniDFSCluster.Builder(conf)
@@ -56,11 +64,8 @@ public class TestInitializeSharedEdits {
       .numDataNodes(0)
       .build();
     cluster.waitActive();
-  
-    cluster.shutdownNameNode(0);
-    cluster.shutdownNameNode(1);
-    File sharedEditsDir = new File(cluster.getSharedEditsDir(0, 1));
-    assertTrue(FileUtil.fullyDelete(sharedEditsDir));
+
+    shutdownClusterAndRemoveSharedEditsDir();
   }
   
   @After
@@ -70,8 +75,14 @@ public class TestInitializeSharedEdits {
     }
   }
   
-  @Test
-  public void testInitializeSharedEdits() throws Exception {
+  private void shutdownClusterAndRemoveSharedEditsDir() throws IOException {
+    cluster.shutdownNameNode(0);
+    cluster.shutdownNameNode(1);
+    File sharedEditsDir = new File(cluster.getSharedEditsDir(0, 1));
+    assertTrue(FileUtil.fullyDelete(sharedEditsDir));
+  }
+  
+  private void assertCannotStartNameNodes() {
     // Make sure we can't currently start either NN.
     try {
       cluster.restartNameNode(0, false);
@@ -89,24 +100,27 @@ public class TestInitializeSharedEdits {
       GenericTestUtils.assertExceptionContains(
           "Cannot start an HA namenode with name dirs that need recovery", ioe);
     }
-    
-    // Initialize the shared edits dir.
-    assertFalse(NameNode.initializeSharedEdits(conf));
-    
+  }
+  
+  private void assertCanStartHaNameNodes(String pathSuffix)
+      throws ServiceFailedException, IOException, URISyntaxException,
+      InterruptedException {
     // Now should be able to start both NNs. Pass "false" here so that we don't
     // try to waitActive on all NNs, since the second NN doesn't exist yet.
     cluster.restartNameNode(0, false);
     cluster.restartNameNode(1, true);
     
     // Make sure HA is working.
-    cluster.transitionToActive(0);
+    cluster.getNameNode(0).getRpcServer().transitionToActive();
     FileSystem fs = null;
     try {
+      Path newPath = new Path(TEST_PATH, pathSuffix);
       fs = HATestUtil.configureFailoverFs(cluster, conf);
-      assertTrue(fs.mkdirs(TEST_PATH));
-      cluster.transitionToStandby(0);
-      cluster.transitionToActive(1);
-      assertTrue(fs.isDirectory(TEST_PATH));
+      assertTrue(fs.mkdirs(newPath));
+      HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
+          cluster.getNameNode(1));
+      assertTrue(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
+          newPath.toString(), false).isDir());
     } finally {
       if (fs != null) {
         fs.close();
@@ -114,6 +128,29 @@ public class TestInitializeSharedEdits {
     }
   }
   
+  @Test
+  public void testInitializeSharedEdits() throws Exception {
+    assertCannotStartNameNodes();
+    
+    // Initialize the shared edits dir.
+    assertFalse(NameNode.initializeSharedEdits(cluster.getConfiguration(0)));
+    
+    assertCanStartHaNameNodes("1");
+    
+    // Now that we've done a metadata operation, make sure that deleting and
+    // re-initializing the shared edits dir will let the standby still start.
+    
+    shutdownClusterAndRemoveSharedEditsDir();
+    
+    assertCannotStartNameNodes();
+    
+    // Re-initialize the shared edits dir.
+    assertFalse(NameNode.initializeSharedEdits(cluster.getConfiguration(0)));
+    
+    // Should *still* be able to start both NNs
+    assertCanStartHaNameNodes("2");
+  }
+  
   @Test
   public void testDontOverWriteExistingDir() {
     assertFalse(NameNode.initializeSharedEdits(conf, false));

+ 62 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestVersionUtil.java

@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Test;
+
+public class TestVersionUtil {
+
+  @Test
+  public void testCompareVersions() {
+    // Equal versions are equal.
+    assertEquals(0, VersionUtil.compareVersions("2.0.0", "2.0.0"));
+    assertEquals(0, VersionUtil.compareVersions("2.0.0a", "2.0.0a"));
+    assertEquals(0, VersionUtil.compareVersions("1", "1"));
+    
+    // Assert that lower versions are lower, and higher versions are higher.
+    assertExpectedValues("1", "2.0.0");
+    assertExpectedValues("1.0.0", "2");
+    assertExpectedValues("1.0.0", "2.0.0");
+    assertExpectedValues("1.0", "2.0.0");
+    assertExpectedValues("1.0.0", "2.0.0");
+    assertExpectedValues("1.0.0", "1.0.0a");
+    assertExpectedValues("1.0.0.0", "2.0.0");
+    assertExpectedValues("1.0.0", "1.0.0-dev");
+    assertExpectedValues("1.0.0", "1.0.1");
+    assertExpectedValues("1.0.0", "1.0.2");
+    assertExpectedValues("1.0.0", "1.1.0");
+    assertExpectedValues("2.0.0", "10.0.0");
+    assertExpectedValues("1.0.0", "1.0.0a");
+    assertExpectedValues("1.0.2a", "1.0.10");
+    assertExpectedValues("1.0.2a", "1.0.2b");
+    assertExpectedValues("1.0.2a", "1.0.2ab");
+    assertExpectedValues("1.0.0a1", "1.0.0a2");
+    assertExpectedValues("1.0.0a2", "1.0.0a10");
+    assertExpectedValues("1.0", "1.a");
+    assertExpectedValues("1.0", "1.a0");
+  }
+  
+  private static void assertExpectedValues(String lower, String higher) {
+    assertTrue(VersionUtil.compareVersions(lower, higher) < 0);
+    assertTrue(VersionUtil.compareVersions(higher, lower) > 0);
+  }
+  
+}

+ 9 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -52,6 +52,8 @@ Trunk (unreleased changes)
 
   BUG FIXES
 
+    MAPREDUCE-4083. [Gridmix] NPE in cpu emulation. (amarrk)
+
     MAPREDUCE-4087. [Gridmix] GenerateDistCacheData job of Gridmix can
                     become slow in some cases (ravigummadi).
 
@@ -320,6 +322,13 @@ Release 0.23.3 - UNRELEASED
     CONTIANER_LAUNCHED and CONTIANER_LAUNCH_FAILED events in additional
     states. (Robert Joseph Evans via sseth)
 
+    MAPREDUCE-4140. mapreduce classes incorrectly importing
+    "clover.org.apache.*" classes. (Patrick Hunt via tomwhite)
+
+    MAPREDUCE-4050. For tasks without assigned containers, changes the node
+    text on the UI to N/A instead of a link to null. (Bhallamudi Venkata Siva
+    Kamesh via sseth)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 7 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java

@@ -87,9 +87,13 @@ public class TaskPage extends AppView {
           tr().
             td(".id", taid).
             td(".progress", progress).
-            td(".state", ta.getState()).
-            td().
-              a(".nodelink", url("http://", nodeHttpAddr), nodeHttpAddr);
+            td(".state", ta.getState()).td();
+        if (nodeHttpAddr == null) {
+          nodeTd._("N/A");
+        } else {
+          nodeTd.
+            a(".nodelink", url("http://", nodeHttpAddr), nodeHttpAddr);
+        }
         if (containerId != null) {
           String containerIdStr = ta.getAssignedContainerIdStr();
           nodeTd._(" ").

+ 4 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java

@@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.v2.hs;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobACL;
@@ -37,9 +39,9 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 
-import clover.org.apache.log4j.Logger;
 
 public class PartialJob implements org.apache.hadoop.mapreduce.v2.app.job.Job {
+  private static final Log LOG = LogFactory.getLog(PartialJob.class);
 
   private JobIndexInfo jobIndexInfo = null;
   private JobId jobId = null;
@@ -78,8 +80,7 @@ public class PartialJob implements org.apache.hadoop.mapreduce.v2.app.job.Job {
     } catch (Exception e) {
       // Meant for use by the display UI. Exception would prevent it from being
       // rendered.e Defaulting to KILLED
-      Logger.getLogger(this.getClass().getName()).warn(
-          "Exception while parsing job state. Defaulting to KILLED", e);
+      LOG.warn("Exception while parsing job state. Defaulting to KILLED", e);
       js = JobState.KILLED;
     }
     return js;

+ 1 - 3
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenRequest.java

@@ -21,11 +21,9 @@ package org.apache.hadoop.yarn.api.protocolrecords;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 
-import clover.org.apache.velocity.runtime.resource.ResourceManager;
-
 /**
  * The request issued by the client to get a delegation token from
- * the {@link ResourceManager}. 
+ * the {@code ResourceManager}.
  * for more information.
  */
 @Public

+ 0 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java

@@ -55,8 +55,6 @@ import org.w3c.dom.Element;
 import org.w3c.dom.NodeList;
 import org.xml.sax.InputSource;
 
-import clover.org.jfree.util.Log;
-
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.servlet.GuiceServletContextListener;

+ 6 - 3
hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java

@@ -235,7 +235,9 @@ implements ResourceUsageEmulatorPlugin {
   
   @Override
   public float getProgress() {
-    return Math.min(1f, ((float)getCurrentCPUUsage())/targetCpuUsage);
+    return enabled 
+           ? Math.min(1f, ((float)getCurrentCPUUsage())/targetCpuUsage)
+           : 1.0f;
   }
   
   @Override
@@ -297,6 +299,9 @@ implements ResourceUsageEmulatorPlugin {
   public void initialize(Configuration conf, ResourceUsageMetrics metrics,
                          ResourceCalculatorPlugin monitor,
                          Progressive progress) {
+    this.monitor = monitor;
+    this.progress = progress;
+    
     // get the target CPU usage
     targetCpuUsage = metrics.getCumulativeCpuUsage();
     if (targetCpuUsage <= 0 ) {
@@ -306,8 +311,6 @@ implements ResourceUsageEmulatorPlugin {
       enabled = true;
     }
     
-    this.monitor = monitor;
-    this.progress = progress;
     emulationInterval =  conf.getFloat(CPU_EMULATION_PROGRESS_INTERVAL, 
                                        DEFAULT_EMULATION_FREQUENCY);
     

+ 5 - 2
hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/TotalHeapUsageEmulatorPlugin.java

@@ -188,7 +188,9 @@ implements ResourceUsageEmulatorPlugin {
   
   @Override
   public float getProgress() {
-    return Math.min(1f, ((float)getTotalHeapUsageInMB())/targetHeapUsageInMB);
+    return enabled 
+           ? Math.min(1f, ((float)getTotalHeapUsageInMB())/targetHeapUsageInMB)
+           : 1.0f;
   }
   
   @Override
@@ -237,6 +239,8 @@ implements ResourceUsageEmulatorPlugin {
   public void initialize(Configuration conf, ResourceUsageMetrics metrics,
                          ResourceCalculatorPlugin monitor,
                          Progressive progress) {
+    this.progress = progress;
+    
     // get the target heap usage
     targetHeapUsageInMB = metrics.getHeapUsage() / ONE_MB;
     if (targetHeapUsageInMB <= 0 ) {
@@ -248,7 +252,6 @@ implements ResourceUsageEmulatorPlugin {
       enabled = true;
     }
     
-    this.progress = progress;
     emulationInterval = 
       conf.getFloat(HEAP_EMULATION_PROGRESS_INTERVAL, 
                     DEFAULT_EMULATION_PROGRESS_INTERVAL);

+ 5 - 0
hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixMemoryEmulation.java

@@ -171,6 +171,11 @@ public class TestGridmixMemoryEmulation {
     assertEquals("Disabled heap usage emulation plugin works!", 
                  heapUsagePre, heapUsagePost);
     
+    // test with get progress
+    float progress = heapPlugin.getProgress();
+    assertEquals("Invalid progress of disabled cumulative heap usage emulation "
+                 + "plugin!", 1.0f, progress, 0f);
+    
     // test with wrong/invalid configuration
     Boolean failed = null;
     invalidUsage = 

+ 5 - 1
hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java

@@ -32,7 +32,6 @@ import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.mapreduce.task.MapContextImpl;
 import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
-import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin.ProcResourceValues;
 import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
 import org.apache.hadoop.mapred.DummyResourceCalculatorPlugin;
 import org.apache.hadoop.mapred.gridmix.LoadJob.ResourceUsageMatcherRunner;
@@ -484,6 +483,11 @@ public class TestResourceUsageEmulators {
     assertEquals("Disabled cumulative CPU usage emulation plugin works!", 
                  cpuUsagePre, cpuUsagePost);
     
+    // test with get progress
+    float progress = cpuPlugin.getProgress();
+    assertEquals("Invalid progress of disabled cumulative CPU usage emulation " 
+                 + "plugin!", 1.0f, progress, 0f);
+    
     // test with valid resource usage value
     ResourceUsageMetrics metrics = createMetrics(targetCpuUsage);