Преглед изворни кода

HADOOP-382. Extend unit tests to run multiple datanodes. Contributed by Milind.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@472652 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting пре 18 година
родитељ
комит
176f0d9637

+ 3 - 0
CHANGES.txt

@@ -22,6 +22,9 @@ Trunk (unreleased changes)
  6. HADOOP-683.  Remove a script dependency on bash, so it works with
     dash, the new default for /bin/sh on Ubuntu.  (James Todd via cutting)
 
+ 7. HADOOP-382.  Extend unit tests to run multiple datanodes.
+    (Milind Bhandarkar via cutting)
+
 
 Release 0.8.0 - 2006-11-03
 

+ 4 - 4
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -1996,17 +1996,17 @@ class FSNamesystem implements FSConstants {
         //
         // Build a map of forbidden hostnames from the two forbidden sets.
         //
-        Collection<String> forbiddenMachines = new TreeSet<String>();
+        Collection<DatanodeDescriptor> forbiddenMachines = new TreeSet();
         if (forbidden1 != null) {
             for (Iterator<DatanodeDescriptor> it = forbidden1.iterator(); it.hasNext(); ) {
                 DatanodeDescriptor cur = it.next();
-                forbiddenMachines.add(cur.getHost());
+                forbiddenMachines.add(cur);
             }
         }
         if (forbidden2 != null) {
             for (Iterator<DatanodeDescriptor> it = forbidden2.iterator(); it.hasNext(); ) {
                 DatanodeDescriptor cur = it.next();
-                forbiddenMachines.add(cur.getHost());
+                forbiddenMachines.add(cur);
             }
         }
 
@@ -2017,7 +2017,7 @@ class FSNamesystem implements FSConstants {
         List<DatanodeDescriptor> targetList = new ArrayList<DatanodeDescriptor>();
         for (Iterator<DatanodeDescriptor> it = heartbeats.iterator(); it.hasNext(); ) {
             DatanodeDescriptor node = it.next();
-            if (! forbiddenMachines.contains(node.getHost())) {
+            if (! forbiddenMachines.contains(node)) {
                 targetList.add(node);
                 avgLoad += node.getXceiverCount();
             }

+ 49 - 12
src/test/org/apache/hadoop/dfs/MiniDFSCluster.java

@@ -30,10 +30,11 @@ import org.apache.hadoop.fs.*;
 public class MiniDFSCluster {
 
   private Configuration conf;
+  int nDatanodes;
   private Thread nameNodeThread;
-  private Thread dataNodeThread;
+  private Thread dataNodeThreads[];
   private NameNodeRunner nameNode;
-  private DataNodeRunner dataNode;
+  private DataNodeRunner dataNodes[];
   private int maxRetries = 10;
   private int MAX_RETRIES  = 10;
   private int MAX_RETRIES_PER_PORT = 10;
@@ -88,6 +89,15 @@ public class MiniDFSCluster {
    */
   class DataNodeRunner implements Runnable {
     private DataNode node;
+    Configuration conf = null;
+    
+    public DataNodeRunner(Configuration conf, File dataDir, int index) {
+      this.conf = new Configuration(conf);
+      this.conf.set("dfs.data.dir",
+          new File(dataDir, "data"+(2*index+1)).getPath()+","+
+          new File(dataDir, "data"+(2*index+2)).getPath());
+    
+    }
     
     /**
      * Create and run the data node.
@@ -127,24 +137,39 @@ public class MiniDFSCluster {
    * Create the config and start up the servers.  If either the rpc or info port is already 
    * in use, we will try new ports.
    * @param namenodePort suggestion for which rpc port to use.  caller should use 
-   *                     getNameNodePort() to get the actual port used.   
+   *                     getNameNodePort() to get the actual port used.
    * @param dataNodeFirst should the datanode be brought up before the namenode?
    */
   public MiniDFSCluster(int namenodePort, 
                         Configuration conf,
                         boolean dataNodeFirst) throws IOException {
+    this(namenodePort, conf, 1, dataNodeFirst);
+  }
+  
+  /**
+   * Create the config and start up the servers.  If either the rpc or info port is already 
+   * in use, we will try new ports.
+   * @param namenodePort suggestion for which rpc port to use.  caller should use 
+   *                     getNameNodePort() to get the actual port used.
+   * @param nDatanodes Number of datanodes   
+   * @param dataNodeFirst should the datanode be brought up before the namenode?
+   */
+  public MiniDFSCluster(int namenodePort, 
+                        Configuration conf,
+                        int nDatanodes,
+                        boolean dataNodeFirst) throws IOException {
 
     this.conf = conf;
 
+    this.nDatanodes = nDatanodes;
     this.nameNodePort = namenodePort;
     this.nameNodeInfoPort = 50080;   // We just want this port to be different from the default. 
     File base_dir = new File(System.getProperty("test.build.data"),
                              "dfs/");
+    File data_dir = new File(base_dir, "data");
     conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+
         new File(base_dir, "name2").getPath());
-    conf.set("dfs.data.dir", new File(base_dir, "data1").getPath()+","+
-        new File(base_dir, "data2").getPath());
-    conf.setInt("dfs.replication", 1);
+    conf.setInt("dfs.replication", Math.min(3, nDatanodes));
     // this timeout seems to control the minimum time for the test, so
     // decrease it considerably.
     conf.setInt("ipc.client.timeout", 1000);
@@ -161,14 +186,22 @@ public class MiniDFSCluster {
       NameNode.format(conf);
       nameNode = new NameNodeRunner();
       nameNodeThread = new Thread(nameNode);
-      dataNode = new DataNodeRunner();
-      dataNodeThread = new Thread(dataNode);
+      dataNodes = new DataNodeRunner[nDatanodes];
+      dataNodeThreads = new Thread[nDatanodes];
+      for (int idx = 0; idx < nDatanodes; idx++) {
+        dataNodes[idx] = new DataNodeRunner(conf, data_dir, idx);
+        dataNodeThreads[idx] = new Thread(dataNodes[idx]);
+      }
       if (dataNodeFirst) {
-        dataNodeThread.start();      
+        for (int idx = 0; idx < nDatanodes; idx++) {
+          dataNodeThreads[idx].start();
+        }
         nameNodeThread.start();      
       } else {
         nameNodeThread.start();
-        dataNodeThread.start();      
+        for (int idx = 0; idx < nDatanodes; idx++) {
+          dataNodeThreads[idx].start();
+        }
       }
 
       int retry = 0;
@@ -188,7 +221,9 @@ public class MiniDFSCluster {
         System.out.println("\tNameNode info port: " + nameNodeInfoPort);
 
         nameNode.shutdown();
-        dataNode.shutdown();
+        for (int idx = 0; idx < nDatanodes; idx++) {
+          dataNodes[idx].shutdown();
+        }
         
       } else {
         foundPorts = true;
@@ -212,7 +247,9 @@ public class MiniDFSCluster {
    * Shut down the servers.
    */
   public void shutdown() {
-    dataNode.shutdown();
+    for (int idx = 0; idx < nDatanodes; idx++) {
+      dataNodes[idx].shutdown();
+    }
     nameNode.shutdown();
   }
   

+ 1 - 1
src/test/org/apache/hadoop/dfs/TestDFSMkdirs.java

@@ -43,7 +43,7 @@ public class TestDFSMkdirs extends TestCase {
    */
   public void testDFSMkdirs() throws IOException {
     Configuration conf = new Configuration();
-    MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, false);
+    MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, 2, false);
     FileSystem fileSys = cluster.getFileSystem();
     try {
     	// First create a new directory with mkdirs

+ 1 - 1
src/test/org/apache/hadoop/dfs/TestFsck.java

@@ -139,7 +139,7 @@ public class TestFsck extends TestCase {
     MiniDFSCluster cluster = null;
     try {
       Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster(65314, conf, false);
+      cluster = new MiniDFSCluster(65314, conf, 4, false);
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles(namenode, "/srcdat");

+ 1 - 1
src/test/org/apache/hadoop/dfs/TestPread.java

@@ -111,7 +111,7 @@ public class TestPread extends TestCase {
    */
   public void testPreadDFS() throws IOException {
     Configuration conf = new Configuration();
-    MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, false);
+    MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, 3, false);
     FileSystem fileSys = cluster.getFileSystem();
     try {
       Path file1 = new Path("preadtest.dat");

+ 1 - 1
src/test/org/apache/hadoop/fs/TestCopyFiles.java

@@ -179,7 +179,7 @@ public class TestCopyFiles extends TestCase {
     MiniDFSCluster cluster = null;
     try {
       Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster(65314, conf, false);
+      cluster = new MiniDFSCluster(65314, conf, 2, false);
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles(namenode, "/srcdat");

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java

@@ -150,7 +150,7 @@ public class TestMiniMRWithDFS extends TestCase {
           final int jobTrackerPort = 60050;
 
           Configuration conf = new Configuration();
-          dfs = new MiniDFSCluster(65314, conf, true);
+          dfs = new MiniDFSCluster(65314, conf, 4, true);
           fileSys = dfs.getFileSystem();
           namenode = fileSys.getName();
           mr = new MiniMRCluster(jobTrackerPort, 50060, taskTrackers,