Browse Source

HDFS-3548. NamenodeFsck.copyBlock fails to create a Block Reader. Contributed by Colin Patrick McCabe

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1358824 13f79535-47bb-0310-9956-ffa450edef68
Eli Collins 13 years ago
parent
commit
b4c8c50456

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -277,6 +277,9 @@ Release 2.0.1-alpha - UNRELEASED
     HDFS-711. hdfsUtime does not handle atime = 0 or mtime = 0 correctly.
     (Colin Patrick McCabe via eli)
 
+    HDFS-3548. NamenodeFsck.copyBlock fails to create a Block Reader.
+    (Colin Patrick McCabe via eli)
+
   BREAKDOWN OF HDFS-3042 SUBTASKS
 
     HDFS-2185. HDFS portion of ZK-based FailoverController (todd)

+ 80 - 43
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintWriter;
@@ -35,6 +36,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReaderFactory;
 import org.apache.hadoop.hdfs.DFSClient;
@@ -51,6 +53,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -103,6 +106,12 @@ public class NamenodeFsck {
   private boolean showRacks = false;
   private boolean showCorruptFileBlocks = false;
 
+  /**
+   * True if we encountered an internal error during FSCK, such as not being
+   * able to delete a corrupt file.
+   */
+  private boolean internalError = false;
+
   /** 
    * True if the user specified the -move option.
    *
@@ -200,6 +209,13 @@ public class NamenodeFsck {
         out.println("FSCK ended at " + new Date() + " in "
             + (System.currentTimeMillis() - startTime + " milliseconds"));
 
+        // If there were internal errors during the fsck operation, we want to
+        // return FAILURE_STATUS, even if those errors were not immediately
+        // fatal.  Otherwise many unit tests will pass even when there are bugs.
+        if (internalError) {
+          throw new IOException("fsck encountered internal errors!");
+        }
+
         // DFSck client scans for the string HEALTHY/CORRUPT to check the status
         // of file system and return appropriate code. Changing the output
         // string might break testcases. Also note this must be the last line 
@@ -388,20 +404,11 @@ public class NamenodeFsck {
             + " blocks of total size " + missize + " B.");
       }
       res.corruptFiles++;
-      try {
-        if (doMove) {
-          if (!isOpen) {
-            copyBlocksToLostFound(parent, file, blocks);
-          }
-        }
-        if (doDelete) {
-          if (!isOpen) {
-            LOG.warn("\n - deleting corrupted file " + path);
-            namenode.getRpcServer().delete(path, true);
-          }
-        }
-      } catch (IOException e) {
-        LOG.error("error processing " + path + ": " + e.toString());
+      if (isOpen) {
+        LOG.info("Fsck: ignoring open file " + path);
+      } else {
+        if (doMove) copyBlocksToLostFound(parent, file, blocks);
+        if (doDelete) deleteCorruptedFile(path);
       }
     }
     if (showFiles) {
@@ -415,29 +422,52 @@ public class NamenodeFsck {
       }
     }
   }
+
+  private void deleteCorruptedFile(String path) {
+    try {
+      namenode.getRpcServer().delete(path, true);
+      LOG.info("Fsck: deleted corrupt file " + path);
+    } catch (Exception e) {
+      LOG.error("Fsck: error deleting corrupted file " + path, e);
+      internalError = true;
+    }
+  }
+
+  boolean hdfsPathExists(String path)
+      throws AccessControlException, UnresolvedLinkException, IOException {
+    try {
+      HdfsFileStatus hfs = namenode.getRpcServer().getFileInfo(path);
+      return (hfs != null);
+    } catch (FileNotFoundException e) {
+      return false;
+    }
+  }
   
   private void copyBlocksToLostFound(String parent, HdfsFileStatus file,
         LocatedBlocks blocks) throws IOException {
     final DFSClient dfs = new DFSClient(NameNode.getAddress(conf), conf);
+    final String fullName = file.getFullName(parent);
+    OutputStream fos = null;
     try {
-    if (!lfInited) {
-      lostFoundInit(dfs);
-    }
-    if (!lfInitedOk) {
-      return;
-    }
-    String fullName = file.getFullName(parent);
-    String target = lostFound + fullName;
-    String errmsg = "Failed to move " + fullName + " to /lost+found";
-    try {
+      if (!lfInited) {
+        lostFoundInit(dfs);
+      }
+      if (!lfInitedOk) {
+        throw new IOException("failed to initialize lost+found");
+      }
+      String target = lostFound + fullName;
+      if (hdfsPathExists(target)) {
+        LOG.warn("Fsck: can't copy the remains of " + fullName + " to " +
+          "lost+found, because " + target + " already exists.");
+        return;
+      }
       if (!namenode.getRpcServer().mkdirs(
           target, file.getPermission(), true)) {
-        LOG.warn(errmsg);
-        return;
+        throw new IOException("failed to create directory " + target);
       }
       // create chains
       int chain = 0;
-      OutputStream fos = null;
+      boolean copyError = false;
       for (LocatedBlock lBlk : blocks.getLocatedBlocks()) {
         LocatedBlock lblock = lBlk;
         DatanodeInfo[] locs = lblock.getLocations();
@@ -451,32 +481,38 @@ public class NamenodeFsck {
         }
         if (fos == null) {
           fos = dfs.create(target + "/" + chain, true);
-          if (fos != null)
-            chain++;
-          else {
-            throw new IOException(errmsg + ": could not store chain " + chain);
+          if (fos == null) {
+            throw new IOException("Failed to copy " + fullName +
+                " to /lost+found: could not store chain " + chain);
           }
+          chain++;
         }
         
         // copy the block. It's a pity it's not abstracted from DFSInputStream ...
         try {
           copyBlock(dfs, lblock, fos);
         } catch (Exception e) {
-          e.printStackTrace();
-          // something went wrong copying this block...
-          LOG.warn(" - could not copy block " + lblock.getBlock() + " to " + target);
+          LOG.error("Fsck: could not copy block " + lblock.getBlock() +
+              " to " + target, e);
           fos.flush();
           fos.close();
           fos = null;
+          internalError = true;
+          copyError = true;
         }
       }
-      if (fos != null) fos.close();
-      LOG.warn("\n - copied corrupted file " + fullName + " to /lost+found");
-    }  catch (Exception e) {
-      e.printStackTrace();
-      LOG.warn(errmsg + ": " + e.getMessage());
-    }
+      if (copyError) {
+        LOG.warn("Fsck: there were errors copying the remains of the " +
+          "corrupted file " + fullName + " to /lost+found");
+      } else {
+        LOG.info("Fsck: copied the remains of the corrupted file " + 
+          fullName + " to /lost+found");
+      }
+    } catch (Exception e) {
+      LOG.error("copyBlocksToLostFound: error processing " + fullName, e);
+      internalError = true;
     } finally {
+      if (fos != null) fos.close();
       dfs.close();
     }
   }
@@ -503,7 +539,7 @@ public class NamenodeFsck {
         targetAddr = NetUtils.createSocketAddr(chosenNode.getXferAddr());
       }  catch (IOException ie) {
         if (failures >= DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT) {
-          throw new IOException("Could not obtain block " + lblock);
+          throw new IOException("Could not obtain block " + lblock, ie);
         }
         LOG.info("Could not obtain block from any node:  " + ie);
         try {
@@ -515,7 +551,7 @@ public class NamenodeFsck {
         continue;
       }
       try {
-        s = new Socket();
+        s = NetUtils.getDefaultSocketFactory(conf).createSocket();
         s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
         s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
         
@@ -555,7 +591,7 @@ public class NamenodeFsck {
                               ", but datanode returned " +bytesRead+" bytes");
       }
     } catch (Exception e) {
-      e.printStackTrace();
+      LOG.error("Error reading block", e);
       success = false;
     } finally {
       try {s.close(); } catch (Exception e1) {}
@@ -606,6 +642,7 @@ public class NamenodeFsck {
     if (lostFound == null) {
       LOG.warn("Cannot initialize /lost+found .");
       lfInitedOk = false;
+      internalError = true;
     }
   }
 

+ 16 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java

@@ -115,6 +115,22 @@ public class DFSTestUtil {
     this.maxSize = maxSize;
     this.minSize = minSize;
   }
+
+  /** Creates a new instance of DFSTestUtil
+   *
+   * @param testName Name of the test from where this utility is used
+   * @param nFiles Number of files to be created
+   * @param maxLevels Maximum number of directory levels
+   * @param maxSize Maximum size for file
+   * @param minSize Minimum size for file
+   */
+  public DFSTestUtil(String testName, int nFiles, int maxLevels, int maxSize,
+      int minSize) {
+    this.nFiles = nFiles;
+    this.maxLevels = maxLevels;
+    this.maxSize = maxSize;
+    this.minSize = minSize;
+  }
   
   /**
    * when formating a namenode - we must provide clusterid.

+ 199 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java

@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.PrintStream;
@@ -37,6 +38,8 @@ import java.security.PrivilegedExceptionAction;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
+import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.impl.Log4JLogger;
@@ -44,14 +47,17 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSInputStream;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -60,6 +66,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.tools.DFSck;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Level;
@@ -68,6 +75,8 @@ import org.apache.log4j.PatternLayout;
 import org.apache.log4j.RollingFileAppender;
 import org.junit.Test;
 
+import com.google.common.collect.Sets;
+
 /**
  * A JUnit test for doing fsck
  */
@@ -84,6 +93,9 @@ public class TestFsck {
       "cmd=fsck\\ssrc=\\/\\sdst=null\\s" + 
       "perm=null");
   
+  static final Pattern numCorruptBlocksPattern = Pattern.compile(
+      ".*Corrupt blocks:\t\t([0123456789]*).*");
+  
   static String runFsck(Configuration conf, int expectedErrCode, 
                         boolean checkErrorCode,String... path) 
                         throws Exception {
@@ -95,6 +107,7 @@ public class TestFsck {
       assertEquals(expectedErrCode, errCode);
     }
     ((Log4JLogger)FSPermissionChecker.LOG).getLogger().setLevel(Level.INFO);
+    FSImage.LOG.error("OUTPUT = " + bStream.toString());
     return bStream.toString();
   }
 
@@ -246,6 +259,192 @@ public class TestFsck {
     }
   }
 
+  @Test
+  public void testFsckMove() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    final int DFS_BLOCK_SIZE = 1024;
+    final int NUM_DATANODES = 4;
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE);
+    conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1);
+    DFSTestUtil util = new DFSTestUtil("TestFsck", 5, 3,
+        (5 * DFS_BLOCK_SIZE) + (DFS_BLOCK_SIZE - 1), 5 * DFS_BLOCK_SIZE);
+    MiniDFSCluster cluster = null;
+    FileSystem fs = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).
+          numDataNodes(NUM_DATANODES).build();
+      String topDir = "/srcdat";
+      fs = cluster.getFileSystem();
+      cluster.waitActive();
+      util.createFiles(fs, topDir);
+      util.waitReplication(fs, topDir, (short)3);
+      String outStr = runFsck(conf, 0, true, "/");
+      assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
+      DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
+                                          cluster.getNameNodePort()), conf);
+      String fileNames[] = util.getFileNames(topDir);
+      CorruptedTestFile ctFiles[] = new CorruptedTestFile[] {
+        new CorruptedTestFile(fileNames[0], Sets.newHashSet(0),
+          dfsClient, NUM_DATANODES, DFS_BLOCK_SIZE),
+        new CorruptedTestFile(fileNames[1], Sets.newHashSet(2, 3),
+          dfsClient, NUM_DATANODES, DFS_BLOCK_SIZE),
+        new CorruptedTestFile(fileNames[2], Sets.newHashSet(4),
+          dfsClient, NUM_DATANODES, DFS_BLOCK_SIZE),
+        new CorruptedTestFile(fileNames[3], Sets.newHashSet(0, 1, 2, 3),
+          dfsClient, NUM_DATANODES, DFS_BLOCK_SIZE),
+        new CorruptedTestFile(fileNames[4], Sets.newHashSet(1, 2, 3, 4),
+          dfsClient, NUM_DATANODES, DFS_BLOCK_SIZE)
+      };
+      int totalMissingBlocks = 0;
+      for (CorruptedTestFile ctFile : ctFiles) {
+        totalMissingBlocks += ctFile.getTotalMissingBlocks();
+      }
+      for (CorruptedTestFile ctFile : ctFiles) {
+        ctFile.removeBlocks();
+      }
+      // Wait for fsck to discover all the missing blocks
+      while (true) {
+        outStr = runFsck(conf, 1, false, "/");
+        String numCorrupt = null;
+        for (String line : outStr.split("\n")) {
+          Matcher m = numCorruptBlocksPattern.matcher(line);
+          if (m.matches()) {
+            numCorrupt = m.group(1);
+            break;
+          }
+        }
+        if (numCorrupt == null) {
+          throw new IOException("failed to find number of corrupt " +
+              "blocks in fsck output.");
+        }
+        if (numCorrupt.equals(Integer.toString(totalMissingBlocks))) {
+          assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
+          break;
+        }
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException ignore) {
+        }
+      }
+
+      // Copy the non-corrupt blocks of corruptFileName to lost+found.
+      outStr = runFsck(conf, 1, false, "/", "-move");
+      assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
+
+      // Make sure that we properly copied the block files from the DataNodes
+      // to lost+found
+      for (CorruptedTestFile ctFile : ctFiles) {
+        ctFile.checkSalvagedRemains();
+      }
+
+      // Fix the filesystem by removing corruptFileName
+      outStr = runFsck(conf, 1, true, "/", "-delete");
+      assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
+      
+      // Check to make sure we have a healthy filesystem
+      outStr = runFsck(conf, 0, true, "/");
+      assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); 
+      util.cleanup(fs, topDir);
+    } finally {
+      if (fs != null) {try{fs.close();} catch(Exception e){}}
+      if (cluster != null) { cluster.shutdown(); }
+    }
+  }
+
+  static private class CorruptedTestFile {
+    final private String name;
+    final private Set<Integer> blocksToCorrupt;
+    final private DFSClient dfsClient;
+    final private int numDataNodes;
+    final private int blockSize;
+    final private byte[] initialContents;
+    
+    public CorruptedTestFile(String name, Set<Integer> blocksToCorrupt,
+        DFSClient dfsClient, int numDataNodes, int blockSize)
+            throws IOException {
+      this.name = name;
+      this.blocksToCorrupt = blocksToCorrupt;
+      this.dfsClient = dfsClient;
+      this.numDataNodes = numDataNodes;
+      this.blockSize = blockSize;
+      this.initialContents = cacheInitialContents();
+    }
+
+    public int getTotalMissingBlocks() {
+      return blocksToCorrupt.size();
+    }
+
+    private byte[] cacheInitialContents() throws IOException {
+      HdfsFileStatus status = dfsClient.getFileInfo(name);
+      byte[] content = new byte[(int)status.getLen()];
+      DFSInputStream in = null;
+      try {
+        in = dfsClient.open(name);
+        IOUtils.readFully(in, content, 0, content.length);
+      } finally {
+        in.close();
+      }
+      return content;
+    }
+    
+    public void removeBlocks() throws AccessControlException,
+        FileNotFoundException, UnresolvedLinkException, IOException {
+      for (int corruptIdx : blocksToCorrupt) {
+        // Corrupt a block by deleting it
+        ExtendedBlock block = dfsClient.getNamenode().getBlockLocations(
+            name, blockSize * corruptIdx, Long.MAX_VALUE).get(0).getBlock();
+        for (int i = 0; i < numDataNodes; i++) {
+          File blockFile = MiniDFSCluster.getBlockFile(i, block);
+          if(blockFile != null && blockFile.exists()) {
+            assertTrue(blockFile.delete());
+          }
+        }
+      }
+    }
+    
+    public void checkSalvagedRemains() throws IOException {
+      int chainIdx = 0;
+      HdfsFileStatus status = dfsClient.getFileInfo(name);
+      long length = status.getLen();
+      int numBlocks = (int)((length + blockSize - 1) / blockSize);
+      DFSInputStream in = null;
+      byte[] blockBuffer = new byte[blockSize];
+
+      try {
+        for (int blockIdx = 0; blockIdx < numBlocks; blockIdx++) {
+          if (blocksToCorrupt.contains(blockIdx)) {
+            if (in != null) {
+              in.close();
+              in = null;
+            }
+            continue;
+          }
+          if (in == null) {
+            in = dfsClient.open("/lost+found" + name + "/" + chainIdx);
+            chainIdx++;
+          }
+          int len = blockBuffer.length;
+          if (blockIdx == (numBlocks - 1)) {
+            // The last block might not be full-length
+            len = (int)(in.getFileLength() % blockSize);
+            if (len == 0) len = blockBuffer.length;
+          }
+          IOUtils.readFully(in, blockBuffer, 0, (int)len);
+          int startIdx = blockIdx * blockSize;
+          for (int i = 0; i < len; i++) {
+            if (initialContents[startIdx + i] != blockBuffer[i]) {
+              throw new IOException("salvaged file " + name + " differed " +
+              "from what we expected on block " + blockIdx);
+            }
+          }
+        }
+      } finally {
+        IOUtils.cleanup(null, in);
+      }
+    }
+  }
+  
   @Test
   public void testFsckMoveAndDelete() throws Exception {
     final int MAX_MOVE_TRIES = 5;