Bladeren bron

HDFS-1634. Federation: Convert single threaded DataNode into per BlockPool thread model.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1073927 13f79535-47bb-0310-9956-ffa450edef68
Boris Shkolnik 14 jaren geleden
bovenliggende
commit
93ab38a231

+ 3 - 0
CHANGES.txt

@@ -22,6 +22,9 @@ Trunk (unreleased changes)
     HDFS-1632. Federation: data node storage structure changes and
     introduce block pool storage. (tanping via suresh)
 
+    HDFS-1634. Federation: Convert single threaded DataNode into 
+    per BlockPool thread model.(boryas)
+
   IMPROVEMENTS
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)

+ 2 - 0
src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -223,4 +223,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_SECONDARY_NAMENODE_KRB_HTTPS_USER_NAME_KEY = "dfs.secondary.namenode.kerberos.https.principal";
   public static final String  DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY = "dfs.namenode.name.cache.threshold";
   public static final int     DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT = 10;
+  
+  public static final String DFS_FEDERATION_NAMENODES = "dfs.federation.namenodes.uri";
 }

+ 7 - 0
src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java

@@ -130,4 +130,11 @@ public class StorageInfo implements Writable {
     }
     clusterID = cid;
   }
+  
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("lv=").append(layoutVersion).append(";cid=").append(clusterID)
+    .append(";nsid=").append(namespaceID).append(";c=").append(cTime);
+    return sb.toString();
+  }
 }

+ 7 - 1
src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolStorage.java

@@ -66,11 +66,12 @@ public class BlockPoolStorage extends Storage {
     super(NodeType.DATA_NODE);
   }
 
-  BlockPoolStorage(int namespaceID, String bpID, long cTime) {
+  BlockPoolStorage(int namespaceID, String bpID, long cTime, String clusterId) {
     super(NodeType.DATA_NODE);
     this.namespaceID = namespaceID;
     this.blockpoolID = bpID;
     this.cTime = cTime;
+    this.clusterID = clusterId;
   }
 
   /**
@@ -508,4 +509,9 @@ public class BlockPoolStorage extends Storage {
   public boolean isConversionNeeded(StorageDirectory sd) throws IOException {
     return false;
   }
+  
+  @Override
+  public String toString() {
+    return super.toString() + ";bpid=" + blockpoolID;
+  }
 }

File diff suppressed because it is too large
+ 703 - 169
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java


+ 15 - 4
src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java

@@ -87,6 +87,10 @@ public class DataStorage extends Storage {
     storageID = "";
   }
   
+  public StorageInfo getBPStorage(String bpid) {
+    return bpStorageMap.get(bpid);
+  }
+  
   public DataStorage(StorageInfo storageInfo, String strgID) {
     super(NodeType.DATA_NODE, storageInfo);
     this.storageID = strgID;
@@ -118,7 +122,7 @@ public class DataStorage extends Storage {
                              Collection<File> dataDirs,
                              StartupOption startOpt
                              ) throws IOException {
-    if (this.initilized) {
+    if (initilized) {
       // DN storage has been initialized, no need to do anything
       return;
     }
@@ -169,7 +173,7 @@ public class DataStorage extends Storage {
 
     // 2. Do transitions
     // Each storage directory is treated individually.
-    // During sturtup some of them can upgrade or rollback 
+    // During startup some of them can upgrade or rollback 
     // while others could be uptodate for the regular startup.
     for(int idx = 0; idx < getNumStorageDirs(); idx++) {
       doTransition(getStorageDir(idx), nsInfo, startOpt);
@@ -179,6 +183,12 @@ public class DataStorage extends Storage {
         "Data-node and name-node CTimes must be the same.";
     }
     
+    // make sure we have storage id set - if not - generate new one
+    if(storageID.isEmpty()) {
+      DataNode.setNewStorageID(DataNode.datanodeObject.dnRegistration);
+      storageID = DataNode.datanodeObject.dnRegistration.storageID;
+    }
+    
     // 3. Update all storages. Some of them might have just been formatted.
     this.writeAll();
     
@@ -210,7 +220,8 @@ public class DataStorage extends Storage {
     // mkdir for the list of BlockPoolStorage
     makeBlockPoolDataDir(bpDataDirs, null);
     BlockPoolStorage bpStorage = new BlockPoolStorage(nsInfo.getNamespaceID(), 
-        bpID, nsInfo.getCTime());
+        bpID, nsInfo.getCTime(), nsInfo.getClusterID());
+    
     bpStorage.recoverTransitionRead(nsInfo, bpDataDirs, startOpt);
     addBlockPoolStorage(bpID, bpStorage);
   }
@@ -447,7 +458,7 @@ public class DataStorage extends Storage {
     // 3. Format BP and hard link blocks from previous directory
     File curBpDir = getBpRoot(nsInfo.getBlockPoolID(), curDir);
     BlockPoolStorage bpStorage = new BlockPoolStorage(nsInfo.getNamespaceID(), 
-        nsInfo.getBlockPoolID(), nsInfo.getCTime());
+        nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID());
     bpStorage.format(new StorageDirectory(curBpDir), nsInfo);
     linkAllBlocks(tmpDir, curBpDir);
     

+ 2 - 0
src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java

@@ -881,6 +881,8 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     }
     volumes = new FSVolumeSet(volArray);
     volumes.getVolumeMap(volumeMap);
+
+    // TODO:FEDERATION this needs to be moved to addStorage()
     File[] roots = new File[storage.getNumStorageDirs()];
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
       roots[idx] = storage.getStorageDir(idx).getCurrentDir();

+ 3 - 1
src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -722,6 +722,7 @@ public class FSImage extends Storage {
     blockpoolID = bpid;
   }
   
+  @Override
   protected void getFields(Properties props, 
                            StorageDirectory sd 
                            ) throws IOException {
@@ -774,6 +775,7 @@ public class FSImage extends Storage {
    * @param sd storage directory
    * @throws IOException
    */
+  @Override
   protected void setFields(Properties props, 
                            StorageDirectory sd 
                            ) throws IOException {
@@ -2338,7 +2340,7 @@ public class FSImage extends Storage {
     U_STR.write(out);
   }
 
-  String getBlockPoolID() {
+  public String getBlockPoolID() {
     return blockpoolID;
   }
 }

+ 5 - 4
src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -525,11 +525,12 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
   }
   
   NamespaceInfo getNamespaceInfo() {
-    return new NamespaceInfo(dir.fsImage.getNamespaceID(),
-                             dir.fsImage.getClusterID(),
-                             dir.fsImage.getBlockPoolID(),
+    NamespaceInfo nsinfo = new NamespaceInfo(dir.fsImage.getNamespaceID(),
+                             getClusterId(),
+                             getBlockpoolId(),
                              dir.fsImage.getCTime(),
                              getDistributedUpgradeVersion());
+    return nsinfo;
   }
 
   /**
@@ -2502,7 +2503,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
    * namespaceID and will continue serving the datanodes that has previously
    * registered with the namenode without restarting the whole cluster.
    * 
-   * @see org.apache.hadoop.hdfs.server.datanode.DataNode#register()
+   * @see org.apache.hadoop.hdfs.server.datanode.DataNode
    */
   public void registerDatanode(DatanodeRegistration nodeReg
                                             ) throws IOException {

+ 58 - 2
src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
+import java.util.AbstractList;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -31,9 +33,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Options;
@@ -46,12 +48,12 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -217,6 +219,51 @@ public class NameNode implements NamenodeProtocols, FSConstants {
   public static InetSocketAddress getAddress(String address) {
     return NetUtils.createSocketAddr(address, DEFAULT_PORT);
   }
+  
+  /**
+   * TODO:FEDERATION
+   * at this moment only support fs.default style enteries.
+   * @param conf
+   * @return array of namenodes' addresses
+   */
+  public static InetSocketAddress [] getNNAddresses(Configuration conf) 
+  throws IOException {
+    URI[] nns=getNameNodesURIs(conf);
+    if(nns == null) {
+      throw new IOException("Federation namnodes are not configured correctly");
+    }
+
+    InetSocketAddress [] isas = new InetSocketAddress[nns.length];
+    int i=0;
+    for(URI u : nns) {
+      isas[i++] = getAddress(u); 
+    }
+    return isas;
+  }
+
+  /**
+   * TODO:FEDERATION
+   * get the list of namenodes from the configuration
+   * create URI for each one of them
+   * @param conf
+   * @return list of URIs of all configured NameNodes
+   */
+  public static URI [] getNameNodesURIs(Configuration conf) {
+    String [] nnURIs = conf.getStrings(DFSConfigKeys.DFS_FEDERATION_NAMENODES);
+    if(nnURIs == null) {
+      nnURIs = new String[] { conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY)};
+    }
+
+    AbstractList<URI> nns = new ArrayList<URI>(nnURIs.length);
+    for(String uri : nnURIs) {
+      // name should be prepened with FileSystem.fixName(uri)  
+      // TBD
+      nns.add(URI.create(uri));
+    }
+
+    URI[] r = new URI[nns.size()];
+    return nns.toArray(r);
+  }
 
   /**
    * Set the configuration property for the service rpc address
@@ -246,6 +293,15 @@ public class NameNode implements NamenodeProtocols, FSConstants {
 
   public static InetSocketAddress getAddress(Configuration conf) {
     URI filesystemURI = FileSystem.getDefaultUri(conf);
+    return getAddress(filesystemURI);
+  }
+
+
+  /**
+   * TODO:FEDERATION
+   * @param filesystemURI
+   */
+  public static InetSocketAddress getAddress(URI filesystemURI) {
     String authority = filesystemURI.getAuthority();
     if (authority == null) {
       throw new IllegalArgumentException(String.format(

+ 3 - 3
src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java

@@ -28,7 +28,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
-import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.BlockPoolStorage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
@@ -77,9 +77,8 @@ implements Writable, NodeRegistration {
     this.ipcPort = ipcPort;
   }
 
-  public void setStorageInfo(DataStorage storage) {
+  public void setStorageInfo(StorageInfo storage) {
     this.storageInfo = new StorageInfo(storage);
-    this.storageID = storage.getStorageID();
   }
   
   public void setName(String name) {
@@ -108,6 +107,7 @@ implements Writable, NodeRegistration {
       + ", storageID=" + storageID
       + ", infoPort=" + infoPort
       + ", ipcPort=" + ipcPort
+      + ", storageInfo=" + storageInfo
       + ")";
   }
 

+ 4 - 0
src/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java

@@ -94,4 +94,8 @@ public class NamespaceInfo extends StorageInfo {
     distributedUpgradeVersion = in.readInt();
     blockPoolID = WritableUtils.readString(in);
   }
+  
+  public String toString(){
+    return super.toString() + ";bpid=" + blockPoolID;
+  }
 }

+ 2 - 2
src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -629,7 +629,7 @@ public class MiniDFSCluster {
         StaticMapping.addNodeToRack(ipAddr + ":" + port,
                                   racks[i-curDatanodesNum]);
       }
-      DataNode.runDatanodeDaemon(dn);
+      dn.runDatanodeDaemon();
       dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs));
     }
     curDatanodesNum += numDataNodes;
@@ -1137,7 +1137,7 @@ public class MiniDFSCluster {
     String bpid = getNamesystem().getPoolId();
     SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet;
     sdataset.injectBlocks(bpid, blocksToInject);
-    dataNodes.get(dataNodeIndex).datanode.scheduleBlockReport(0);
+    dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0);
   }
   
   /**

+ 286 - 0
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java

@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDataNodeMultipleRegistrations {
+  private static final Log LOG = 
+    LogFactory.getLog(TestDataNodeMultipleRegistrations.class);
+  File common_base_dir;
+  String localHost;
+  Configuration conf;
+
+  @Before
+  public void setUp() throws Exception {
+    common_base_dir = new File(MiniDFSCluster.getBaseDirectory());
+    if (common_base_dir != null) {
+      if (common_base_dir.exists() && !FileUtil.fullyDelete(common_base_dir)) {
+        throw new IOException("cannot get directory ready:"
+            + common_base_dir.getAbsolutePath());
+      }
+    }
+
+    conf = new HdfsConfiguration();
+    localHost = DNS.getDefaultHost(conf.get("dfs.datanode.dns.interface",
+        "default"), conf.get("dfs.datanode.dns.nameserver", "default"));
+
+    localHost = "127.0.0.1";
+    conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, localHost);
+  }
+
+  NameNode startNameNode(Configuration conf, int nnPort) throws IOException {
+    // per nn base_dir
+    File base_dir = new File(common_base_dir, Integer.toString(nnPort));
+
+    boolean manageNameDfsDirs = true; // for now
+    boolean format = true; // for now
+    // disable service authorization
+    conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
+        false);
+
+    // Setup the NameNode configuration
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, localHost + ":0");
+    if (manageNameDfsDirs) {
+      String name = fileAsURI(new File(base_dir, "name1")) + ","
+          + fileAsURI(new File(base_dir, "name2"));
+      conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, name);
+      String sname = fileAsURI(new File(base_dir, "namesecondary1")) + ","
+          + fileAsURI(new File(base_dir, "namesecondary2"));
+      conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY, sname);
+    }
+
+    // Format and clean out DataNode directories
+    if (format) {
+      GenericTestUtils.formatNamenode(conf);
+    }
+
+    // Start the NameNode
+    String[] args = new String[] {};
+    return NameNode.createNameNode(args, conf);
+  }
+
+  public DataNode startDataNode(Configuration conf) throws IOException {
+    Configuration dnConf = new HdfsConfiguration(conf);
+    boolean manageDfsDirs = true; // for now
+    File data_dir = new File(common_base_dir, "data");
+    if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) {
+      throw new IOException("Cannot remove data directory: " + data_dir);
+    }
+
+    if (manageDfsDirs) {
+      File dir1 = new File(data_dir, "data1");
+      File dir2 = new File(data_dir, "data2");
+      dir1.mkdirs();
+      dir2.mkdirs();
+      if (!dir1.isDirectory() || !dir2.isDirectory()) {
+        throw new IOException(
+            "Mkdirs failed to create directory for DataNode: " + dir1 + " or "
+                + dir2);
+      }
+      String dirs = fileAsURI(dir1) + "," + fileAsURI(dir2);
+      dnConf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
+      conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
+    }
+    LOG.debug("Starting DataNode " + " with "
+        + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY + ": "
+        + dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
+
+    String[] dnArgs = null; // for now
+    DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf);
+    if (dn == null)
+      throw new IOException("Cannot start DataNode in "
+          + dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
+
+    dn.runDatanodeDaemon();
+    return dn;
+  }
+
+  /**
+   * start multiple NNs and single DN and verifies per BP registrations and
+   * handshakes.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void test2NNRegistration() throws IOException {
+    NameNode nn1, nn2;
+    // figure out host name for DataNode
+    int nnPort = 9928;
+    String nnURL1 = "hdfs://" + localHost + ":" + Integer.toString(nnPort);
+    FileSystem.setDefaultUri(conf, nnURL1);
+    nn1 = startNameNode(conf, nnPort);
+    
+    nnPort = 9929;
+    String nnURL2 = "hdfs://" + localHost + ":" + Integer.toString(nnPort);
+    FileSystem.setDefaultUri(conf, nnURL2);
+    nn2 = startNameNode(conf, nnPort);
+    
+    Assert.assertNotNull("cannot create nn1", nn1);
+    Assert.assertNotNull("cannot create nn2", nn2);
+    
+    String bpid1 = nn1.getFSImage().getBlockPoolID();
+    String bpid2 = nn2.getFSImage().getBlockPoolID();
+    String cid1 = nn1.getFSImage().getClusterID();
+    String cid2 = nn2.getFSImage().getClusterID();
+    int lv1 = nn1.getFSImage().getLayoutVersion();
+    int lv2 = nn2.getFSImage().getLayoutVersion();
+    int ns1 = nn1.getFSImage().namespaceID;
+    int ns2 = nn2.getFSImage().namespaceID;
+    Assert.assertNotSame("namespace ids should be different", ns1, ns2);
+    LOG.info("nn1: lv=" + lv1 + ";cid=" + cid1 + ";bpid=" + bpid1
+        + ";uri=" + nn1.getNameNodeAddress());
+    LOG.info("nn2: lv=" + lv2 + ";cid=" + cid2 + ";bpid=" + bpid2
+        + ";uri=" + nn2.getNameNodeAddress());
+
+    // now start the datanode...
+    String nns = nnURL1 + "," + nnURL2;
+    conf.set(DFSConfigKeys.DFS_FEDERATION_NAMENODES, nns);
+    DataNode dn = startDataNode(conf);
+    Assert.assertNotNull("failed to create DataNode", dn);
+    waitDataNodeUp(dn);
+
+    for (BPOfferService bpos : dn.nameNodeThreads) {
+      LOG.info("reg: bpid=" + "; name=" + bpos.bpRegistration.name
+          + "; sid=" + bpos.bpRegistration.storageID + "; nna=" + bpos.nn_addr);
+    }
+    
+    BPOfferService bpos1 = dn.nameNodeThreads[0];
+    BPOfferService bpos2 = dn.nameNodeThreads[1];
+
+    Assert.assertEquals("wrong nn address", bpos1.nn_addr, nn1
+        .getNameNodeAddress());
+    Assert.assertEquals("wrong nn address", bpos2.nn_addr, nn2
+        .getNameNodeAddress());
+    Assert.assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1);
+    Assert.assertEquals("wrong bpid", bpos2.getBlockPoolId(), bpid2);
+    Assert.assertEquals("wrong cid", dn.getClusterId(), cid1);
+    Assert.assertEquals("cid should be same", cid2, cid1);
+    Assert.assertEquals("namespace should be same", bpos1.bpNSInfo.namespaceID,
+        ns1);
+    Assert.assertEquals("namespace should be same", bpos2.bpNSInfo.namespaceID,
+        ns2);
+
+    dn.shutdown();
+    shutdownNN(nn1);
+    nn1 = null;
+    shutdownNN(nn2);
+    nn2 = null;
+  }
+
+  /**
+   * starts single nn and single dn and verifies registration and handshake
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testFedSingleNN() throws IOException {
+    NameNode nn1;
+    int nnPort = 9927;
+    // figure out host name for DataNode
+    String nnURL = "hdfs://" + localHost + ":" + Integer.toString(nnPort);
+
+    FileSystem.setDefaultUri(conf, nnURL);
+    nn1 = startNameNode(conf, nnPort);
+    Assert.assertNotNull("cannot create nn1", nn1);
+
+    String bpid1 = nn1.getFSImage().getBlockPoolID();
+    String cid1 = nn1.getFSImage().getClusterID();
+    int lv1 = nn1.getFSImage().getLayoutVersion();
+    LOG.info("nn1: lv=" + lv1 + ";cid=" + cid1 + ";bpid=" + bpid1
+        + ";uri=" + nn1.getNameNodeAddress());
+
+    // now start the datanode...
+    String nns = nnURL;
+    conf.set(DFSConfigKeys.DFS_FEDERATION_NAMENODES, nns);
+
+    DataNode dn = startDataNode(conf);
+    Assert.assertNotNull("failed to create DataNode", dn);
+
+    waitDataNodeUp(dn);
+    // try block report
+
+    for (BPOfferService bpos : dn.nameNodeThreads) {
+      LOG.info("reg: bpid=" + "; name=" + bpos.bpRegistration.name
+          + "; sid=" + bpos.bpRegistration.storageID + "; nna=" + bpos.nn_addr);
+    }
+    BPOfferService bpos1 = dn.nameNodeThreads[0];
+    bpos1.lastBlockReport = 0;
+    DatanodeCommand cmd = bpos1.blockReport();
+
+    Assert.assertNotNull("cmd is null", cmd);
+
+    Assert.assertEquals("wrong nn address", bpos1.nn_addr, nn1
+        .getNameNodeAddress());
+    Assert.assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1);
+    Assert.assertEquals("wrong cid", dn.getClusterId(), cid1);
+
+    dn.shutdown();
+    dn = null;
+    shutdownNN(nn1);
+    nn1 = null;
+  }
+
+  private void shutdownNN(NameNode nn) {
+    if (nn == null) {
+      return;
+    }
+    nn.stop();
+    nn.join();
+  }
+
+  public boolean isDnUp(DataNode dn) {
+    boolean up = dn.nameNodeThreads.length > 0;
+    for (BPOfferService bpos : dn.nameNodeThreads) {
+      up = up && bpos.initialized();
+    }
+    return up;
+  }
+
+  public void waitDataNodeUp(DataNode dn) {
+    // should be something smart
+    while (!isDnUp(dn)) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ie) {
+      }
+    }
+  }
+}

Some files were not shown because too many files changed in this diff