Browse Source

HADOOP-698. Fix HDFS client to not retry the same datanode on read failures. Contributed by Milind.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@480291 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 years ago
parent
commit
70d274173a

+ 3 - 0
CHANGES.txt

@@ -134,6 +134,9 @@ Trunk (unreleased changes)
 39. HADOOP-747.  Fix record serialization to work correctly when
     records are embedded in Maps.  (Milind Bhandarkar via cutting)
 
+40. HADOOP-698.  Fix HDFS client not to retry the same datanode on
+    read failures.  (Milind Bhandarkar via cutting)
+
 
 Release 0.8.0 - 2006-11-03
 

+ 14 - 5
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -536,7 +536,7 @@ class DFSClient implements FSConstants {
          * Open a DataInputStream to a DataNode so that it can be read from.
          * We get block ID and the IDs of the destinations at startup, from the namenode.
          */
-        private synchronized void blockSeekTo(long target) throws IOException {
+        private synchronized DatanodeInfo blockSeekTo(long target, TreeSet deadNodes) throws IOException {
             if (target >= filelen) {
                 throw new IOException("Attempted to read past end of file");
             }
@@ -572,10 +572,10 @@ class DFSClient implements FSConstants {
             // Connect to best DataNode for desired Block, with potential offset
             //
             int failures = 0;
-            TreeSet deadNodes = new TreeSet();
+            DatanodeInfo chosenNode = null;
             while (s == null) {
                 DNAddrPair retval = chooseDataNode(targetBlock, deadNodes);
-                DatanodeInfo chosenNode = retval.info;
+                chosenNode = retval.info;
                 InetSocketAddress targetAddr = retval.addr;
             
                 try {
@@ -608,6 +608,7 @@ class DFSClient implements FSConstants {
                     this.pos = target;
                     this.blockEnd = targetBlockEnd;
                     this.blockStream = in;
+                    return chosenNode;
                 } catch (IOException ex) {
                     // Put chosen node into dead list, continue
                     LOG.debug("Failed to connect to " + targetAddr + ":" 
@@ -622,6 +623,7 @@ class DFSClient implements FSConstants {
                     s = null;
                 }
             }
+            return chosenNode;
         }
 
         /**
@@ -653,7 +655,7 @@ class DFSClient implements FSConstants {
             int result = -1;
             if (pos < filelen) {
                 if (pos > blockEnd) {
-                    blockSeekTo(pos);
+                    blockSeekTo(pos, new TreeSet());
                 }
                 result = blockStream.read();
                 if (result >= 0) {
@@ -673,10 +675,15 @@ class DFSClient implements FSConstants {
             }
             if (pos < filelen) {
               int retries = 2;
+              DatanodeInfo chosenNode = null;
+              TreeSet deadNodes = null;
               while (retries > 0) {
                 try {
                   if (pos > blockEnd) {
-                      blockSeekTo(pos);
+                      if (deadNodes == null) {
+                        deadNodes = new TreeSet();
+                      }
+                      chosenNode = blockSeekTo(pos, deadNodes);
                   }
                   int realLen = Math.min(len, (int) (blockEnd - pos + 1));
                   int result = blockStream.read(buf, off, realLen);
@@ -687,6 +694,8 @@ class DFSClient implements FSConstants {
                 } catch (IOException e) {
                   LOG.warn("DFS Read: " + StringUtils.stringifyException(e));
                   blockEnd = -1;
+                  if (deadNodes == null) { deadNodes = new TreeSet(); }
+                  if (chosenNode != null) { deadNodes.add(chosenNode); }
                   if (--retries == 0) {
                     throw e;
                   }

+ 28 - 9
src/test/org/apache/hadoop/dfs/MiniDFSCluster.java

@@ -35,13 +35,12 @@ public class MiniDFSCluster {
   private Thread dataNodeThreads[];
   private NameNodeRunner nameNode;
   private DataNodeRunner dataNodes[];
-  private int maxRetries = 10;
   private int MAX_RETRIES  = 10;
   private int MAX_RETRIES_PER_PORT = 10;
 
   private int nameNodePort = 0;
   private int nameNodeInfoPort = 0;
-
+  
   /**
    * An inner class that runs a name node.
    */
@@ -107,10 +106,12 @@ public class MiniDFSCluster {
         String[] dirs = conf.getStrings("dfs.data.dir");
         for (int idx = 0; idx < dirs.length; idx++) {
           File dataDir = new File(dirs[idx]);
-          if (!dataDir.mkdirs()) {      
-            if (!dataDir.isDirectory()) {
-              throw new RuntimeException("Mkdirs failed to create directory " +
-                                         dataDir.toString());
+          synchronized (DataNodeRunner.class) {
+            if (!dataDir.mkdirs()) {
+              if (!dataDir.isDirectory()) {
+                throw new RuntimeException("Mkdirs failed to create directory " +
+                    dataDir.toString());
+              }
             }
           }
         }
@@ -143,7 +144,7 @@ public class MiniDFSCluster {
   public MiniDFSCluster(int namenodePort, 
                         Configuration conf,
                         boolean dataNodeFirst) throws IOException {
-    this(namenodePort, conf, 1, dataNodeFirst);
+    this(namenodePort, conf, 1, dataNodeFirst, true);
   }
   
   /**
@@ -158,18 +159,36 @@ public class MiniDFSCluster {
                         Configuration conf,
                         int nDatanodes,
                         boolean dataNodeFirst) throws IOException {
+    this(namenodePort, conf, nDatanodes, dataNodeFirst, true);
+  }
+  
+  /**
+   * Create the config and start up the servers.  If either the rpc or info port is already 
+   * in use, we will try new ports.
+   * @param namenodePort suggestion for which rpc port to use.  caller should use 
+   *                     getNameNodePort() to get the actual port used.
+   * @param nDatanodes Number of datanodes   
+   * @param dataNodeFirst should the datanode be brought up before the namenode?
+   * @param formatNamenode should the namenode be formatted before starting up ?
+   */
+  public MiniDFSCluster(int namenodePort, 
+                        Configuration conf,
+                        int nDatanodes,
+                        boolean dataNodeFirst,
+                        boolean formatNamenode) throws IOException {
 
     this.conf = conf;
 
     this.nDatanodes = nDatanodes;
     this.nameNodePort = namenodePort;
-    this.nameNodeInfoPort = 50080;   // We just want this port to be different from the default. 
+    this.nameNodeInfoPort = 50080;   // We just want this port to be different from the default.
     File base_dir = new File(System.getProperty("test.build.data"),
                              "dfs/");
     File data_dir = new File(base_dir, "data");
     conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+
         new File(base_dir, "name2").getPath());
     conf.setInt("dfs.replication", Math.min(3, nDatanodes));
+    conf.setInt("dfs.safemode.extension", 0);
     // this timeout seems to control the minimum time for the test, so
     // decrease it considerably.
     conf.setInt("ipc.client.timeout", 1000);
@@ -183,7 +202,7 @@ public class MiniDFSCluster {
                "localhost:"+ Integer.toString(nameNodePort));
       conf.set("dfs.info.port", nameNodeInfoPort);
       
-      NameNode.format(conf);
+      if (formatNamenode) { NameNode.format(conf); }
       nameNode = new NameNodeRunner();
       nameNodeThread = new Thread(nameNode);
       dataNodes = new DataNodeRunner[nDatanodes];

+ 197 - 0
src/test/org/apache/hadoop/dfs/TestFileCorruption.java

@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.dfs;
+
+import java.io.*;
+import java.util.Random;
+import junit.framework.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A JUnit test for corrupted file handling.
+ *
+ * @author Milind Bhandarkar
+ */
+public class TestFileCorruption extends TestCase {
+  
+  private static final int NFILES = 20;
+  private static String TEST_ROOT_DIR =
+    new Path(System.getProperty("test.build.data","/tmp"))
+    .toString().replace(' ', '+');
+
+  /** class MyFile contains enough information to recreate the contents of
+   * a single file.
+   */
+  private static class MyFile {
+    private static Random gen = new Random();
+    private static final int MAX_LEVELS = 3;
+    private static final int MAX_SIZE = 8*1024;
+    private static String[] dirNames = {
+      "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine"
+    };
+    private String name = "";
+    private int size;
+    private long seed;
+    
+    MyFile() {
+      int nLevels = gen.nextInt(MAX_LEVELS);
+      if(nLevels != 0) {
+        int[] levels = new int[nLevels];
+        for (int idx = 0; idx < nLevels; idx++) {
+          levels[idx] = gen.nextInt(10);
+        }
+        StringBuffer sb = new StringBuffer();
+        for (int idx = 0; idx < nLevels; idx++) {
+          sb.append(dirNames[levels[idx]]);
+          sb.append("/");
+        }
+        name = sb.toString();
+      }
+      long fidx = -1;
+      while (fidx < 0) { fidx = gen.nextLong(); }
+      name = name + Long.toString(fidx);
+      size = gen.nextInt(MAX_SIZE);
+      seed = gen.nextLong();
+    }
+    
+    String getName() { return name; }
+    int getSize() { return size; }
+    long getSeed() { return seed; }
+  }
+  
+  public TestFileCorruption(String testName) {
+    super(testName);
+  }
+
+  
+  
+  protected void setUp() throws Exception {
+  }
+
+  protected void tearDown() throws Exception {
+  }
+  
+  /** create NFILES with random names and directory hierarchies
+   * with random (but reproducible) data in them.
+   */
+  private static MyFile[] createFiles(String fsname, String topdir)
+  throws IOException {
+    MyFile[] files = new MyFile[NFILES];
+    
+    for (int idx = 0; idx < NFILES; idx++) {
+      files[idx] = new MyFile();
+    }
+    
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getNamed(fsname, conf);
+    Path root = new Path(topdir);
+    
+    for (int idx = 0; idx < NFILES; idx++) {
+      Path fPath = new Path(root, files[idx].getName());
+      if (!fs.mkdirs(fPath.getParent())) {
+        throw new IOException("Mkdirs failed to create " + 
+                              fPath.getParent().toString());
+      }
+      FSDataOutputStream out = fs.create(fPath);
+      byte[] toWrite = new byte[files[idx].getSize()];
+      Random rb = new Random(files[idx].getSeed());
+      rb.nextBytes(toWrite);
+      out.write(toWrite);
+      out.close();
+      toWrite = null;
+    }
+    
+    return files;
+  }
+  
+  /** check if the files have been copied correctly. */
+  private static boolean checkFiles(String fsname, String topdir, MyFile[] files) 
+  throws IOException {
+    
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getNamed(fsname, conf);
+    Path root = new Path(topdir);
+    
+    for (int idx = 0; idx < NFILES; idx++) {
+      Path fPath = new Path(root, files[idx].getName());
+      FSDataInputStream in = fs.open(fPath);
+      byte[] toRead = new byte[files[idx].getSize()];
+      byte[] toCompare = new byte[files[idx].getSize()];
+      Random rb = new Random(files[idx].getSeed());
+      rb.nextBytes(toCompare);
+      assertEquals("Cannnot read file.", toRead.length, in.read(toRead));
+      in.close();
+      for (int i = 0; i < toRead.length; i++) {
+        if (toRead[i] != toCompare[i]) {
+          return false;
+        }
+      }
+      toRead = null;
+      toCompare = null;
+    }
+    
+    return true;
+  }
+  
+  /** delete directory and everything underneath it.*/
+  private static void deldir(String fsname, String topdir)
+  throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getNamed(fsname, conf);
+    Path root = new Path(topdir);
+    fs.delete(root);
+  }
+  
+  /** check if DFS can handle corrupted blocks properly */
+  public void testFileCorruption() throws Exception {
+    String namenode = null;
+    MiniDFSCluster cluster = null;
+    MyFile[] files = null;
+    try {
+      Configuration conf = new Configuration();
+      cluster = new MiniDFSCluster(65314, conf, 3, false);
+      namenode = conf.get("fs.default.name", "local");
+      if (!"local".equals(namenode)) {
+        files = createFiles(namenode, "/srcdat");
+        // Now deliberately remove the blocks
+        File data_dir = new File(System.getProperty("test.build.data"),
+            "dfs/data/data5/data");
+        assertTrue("data directory does not exist", data_dir.exists());
+        File[] blocks = data_dir.listFiles();
+        assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length > 0));
+        for (int idx = 0; idx < blocks.length; idx++) {
+          if (!blocks[idx].getName().startsWith("blk_")) {
+            continue;
+          }
+          System.out.println("Deliberately removing file "+blocks[idx].getName());
+          assertTrue("Cannot remove file.", blocks[idx].delete());
+        }
+        assertTrue("Corrupted replicas not handled properly.",
+            checkFiles(namenode, "/srcdat", files));
+        deldir(namenode, "/srcdat");
+      }
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
+    }
+  }
+}

+ 195 - 0
src/test/org/apache/hadoop/dfs/TestRestartDFS.java

@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.util.Random;
+import junit.framework.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A JUnit test for checking if restarting DFS preserves integrity.
+ *
+ * @author Milind Bhandarkar
+ */
+public class TestRestartDFS extends TestCase {
+  
+  private static final int NFILES = 20;
+  private static String TEST_ROOT_DIR =
+    new Path(System.getProperty("test.build.data","/tmp"))
+    .toString().replace(' ', '+');
+
+  /** class MyFile contains enough information to recreate the contents of
+   * a single file.
+   */
+  private static class MyFile {
+    private static Random gen = new Random();
+    private static final int MAX_LEVELS = 3;
+    private static final int MAX_SIZE = 8*1024;
+    private static String[] dirNames = {
+      "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine"
+    };
+    private String name = "";
+    private int size;
+    private long seed;
+    
+    MyFile() {
+      int nLevels = gen.nextInt(MAX_LEVELS);
+      if(nLevels != 0) {
+        int[] levels = new int[nLevels];
+        for (int idx = 0; idx < nLevels; idx++) {
+          levels[idx] = gen.nextInt(10);
+        }
+        StringBuffer sb = new StringBuffer();
+        for (int idx = 0; idx < nLevels; idx++) {
+          sb.append(dirNames[levels[idx]]);
+          sb.append("/");
+        }
+        name = sb.toString();
+      }
+      long fidx = -1;
+      while (fidx < 0) { fidx = gen.nextLong(); }
+      name = name + Long.toString(fidx);
+      size = gen.nextInt(MAX_SIZE);
+      seed = gen.nextLong();
+    }
+    
+    String getName() { return name; }
+    int getSize() { return size; }
+    long getSeed() { return seed; }
+  }
+  
+  public TestRestartDFS(String testName) {
+    super(testName);
+  }
+
+  
+  
+  protected void setUp() throws Exception {
+  }
+
+  protected void tearDown() throws Exception {
+  }
+  
+  /** create NFILES with random names and directory hierarchies
+   * with random (but reproducible) data in them.
+   */
+  private static MyFile[] createFiles(String fsname, String topdir)
+  throws IOException {
+    MyFile[] files = new MyFile[NFILES];
+    
+    for (int idx = 0; idx < NFILES; idx++) {
+      files[idx] = new MyFile();
+    }
+    
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getNamed(fsname, conf);
+    Path root = new Path(topdir);
+    
+    for (int idx = 0; idx < NFILES; idx++) {
+      Path fPath = new Path(root, files[idx].getName());
+      if (!fs.mkdirs(fPath.getParent())) {
+        throw new IOException("Mkdirs failed to create " + 
+                              fPath.getParent().toString());
+      }
+      FSDataOutputStream out = fs.create(fPath);
+      byte[] toWrite = new byte[files[idx].getSize()];
+      Random rb = new Random(files[idx].getSeed());
+      rb.nextBytes(toWrite);
+      out.write(toWrite);
+      out.close();
+      toWrite = null;
+    }
+    
+    return files;
+  }
+  
+  /** check if the files have been copied correctly. */
+  private static boolean checkFiles(String fsname, String topdir, MyFile[] files) 
+  throws IOException {
+    
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getNamed(fsname, conf);
+    Path root = new Path(topdir);
+    
+    for (int idx = 0; idx < NFILES; idx++) {
+      Path fPath = new Path(root, files[idx].getName());
+      FSDataInputStream in = fs.open(fPath);
+      byte[] toRead = new byte[files[idx].getSize()];
+      byte[] toCompare = new byte[files[idx].getSize()];
+      Random rb = new Random(files[idx].getSeed());
+      rb.nextBytes(toCompare);
+      assertEquals("Cannnot read file.", toRead.length, in.read(toRead));
+      in.close();
+      for (int i = 0; i < toRead.length; i++) {
+        if (toRead[i] != toCompare[i]) {
+          return false;
+        }
+      }
+      toRead = null;
+      toCompare = null;
+    }
+    
+    return true;
+  }
+  
+  /** delete directory and everything underneath it.*/
+  private static void deldir(String fsname, String topdir)
+  throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getNamed(fsname, conf);
+    Path root = new Path(topdir);
+    fs.delete(root);
+  }
+  
+  /** check if DFS remains in proper condition after a restart */
+  public void testRestartDFS() throws Exception {
+    String namenode = null;
+    MiniDFSCluster cluster = null;
+    MyFile[] files = null;
+    try {
+      Configuration conf = new Configuration();
+      cluster = new MiniDFSCluster(65314, conf, 4, false);
+      namenode = conf.get("fs.default.name", "local");
+      if (!"local".equals(namenode)) {
+        files = createFiles(namenode, "/srcdat");
+      }
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
+    }
+    assertTrue("Error creating files", files != null);
+    try {
+      Configuration conf = new Configuration();
+      // Here we restart the MiniDFScluster without formatting namenode
+      cluster = new MiniDFSCluster(65320, conf, 4, false, false);
+      namenode = conf.get("fs.default.name", "local");
+      if (!"local".equals(namenode)) {
+        assertTrue("Filesystem corrupted after restart.",
+            checkFiles(namenode, "/srcdat", files));
+        deldir(namenode, "/srcdat");
+      }
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
+    }
+  }
+}