Ver Fonte

HADOOP-2065. Delay invalidating corrupt replicas of block until its
is removed from under replicated state. If all replicas are found to
be corrupt, retain all copies and mark the block as corrupt.
(Lohit Vjayarenu via rangadi)



git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@655660 13f79535-47bb-0310-9956-ffa450edef68

Raghu Angadi há 17 anos atrás
pai
commit
21db8aff06

+ 5 - 0
CHANGES.txt

@@ -74,6 +74,11 @@ Trunk (unreleased changes)
     HADOOP-1915. Allow users to specify counters via strings instead
     of enumerations. (tomwhite via omalley)
 
+    HADOOP-2065. Delay invalidating corrupt replicas of block until its 
+    is removed from under replicated state. If all replicas are found to 
+    be corrupt, retain all copies and mark the block as corrupt.
+    (Lohit Vjayarenu via rangadi)
+
   IMPROVEMENTS
    
     HADOOP-2928. Remove deprecated FileSystem.getContentLength().

+ 6 - 0
src/java/org/apache/hadoop/dfs/BlocksMap.java

@@ -383,4 +383,10 @@ class BlocksMap {
   Collection<BlockInfo> getBlocks() {
     return map.values();
   }
+  /**
+   * Check if the block exists in map
+   */
+  boolean contains(Block block) {
+    return map.containsKey(block);
+  }
 }

+ 2 - 2
src/java/org/apache/hadoop/dfs/ClientProtocol.java

@@ -37,9 +37,9 @@ interface ClientProtocol extends VersionedProtocol {
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 31: changed the serialization in DatanodeRegistration and DatanodeInfo
+   * 32: add corrupt field to LocatedBlock
    */
-  public static final long versionID = 31L;
+  public static final long versionID = 32L;
   
   ///////////////////////////////////////
   // File contents

+ 2 - 2
src/java/org/apache/hadoop/dfs/DatanodeProtocol.java

@@ -31,9 +31,9 @@ import org.apache.hadoop.ipc.VersionedProtocol;
  **********************************************************************/
 interface DatanodeProtocol extends VersionedProtocol {
   /**
-   * 13: changed the serialization in DatanodeRegistration and DatanodeInfo
+   * 14: add corrupt field to LocatedBlock
    */
-  public static final long versionID = 13L;
+  public static final long versionID = 14L;
   
   // error code
   final static int NOTIFY = 0;

+ 4 - 0
src/java/org/apache/hadoop/dfs/FSDirectory.java

@@ -308,6 +308,8 @@ class FSDirectory implements FSConstants {
       // modify file-> block and blocksMap
       fileNode.removeBlock(block);
       namesystem.blocksMap.removeINode(block);
+      // If block is removed from blocksMap remove it from corruptReplicasMap
+      namesystem.corruptReplicas.removeFromCorruptReplicasMap(block);
 
       // write modified block locations to log
       fsImage.getEditLog().logOpenFile(path, fileNode);
@@ -546,6 +548,8 @@ class FSDirectory implements FSConstants {
           totalInodes -= filesRemoved;
           for (Block b : v) {
             namesystem.blocksMap.removeINode(b);
+            // If block is removed from blocksMap remove it from corruptReplicasMap
+            namesystem.corruptReplicas.removeFromCorruptReplicasMap(b);
             if (deletedBlocks != null) {
               deletedBlocks.add(b);
             }

+ 72 - 14
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -91,6 +91,11 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
   // Mapping: Block -> { INode, datanodes, self ref } 
   //
   BlocksMap blocksMap = new BlocksMap();
+
+  //
+  // Store blocks-->datanodedescriptor(s) map of corrupt replicas
+  //
+  CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
     
   /**
    * Stores the datanode -> block map.  
@@ -718,15 +723,26 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
     do {
       // get block locations
       int numNodes = blocksMap.numNodes(blocks[curBlk]);
-      DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numNodes];
-      if (numNodes > 0) {
+      Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(
+                                                      blocks[curBlk]);
+      int numCorruptNodes = (nodesCorrupt == null) ? 0 : nodesCorrupt.size();
+      boolean blockCorrupt = (numCorruptNodes == numNodes);
+      int numMachineSet = blockCorrupt ? numNodes : 
+                            (numNodes - numCorruptNodes);
+      DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numMachineSet];
+      if (numMachineSet > 0) {
         numNodes = 0;
         for(Iterator<DatanodeDescriptor> it = 
             blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
-          machineSet[numNodes++] = it.next();
+          DatanodeDescriptor dn = it.next();
+          boolean replicaCorrupt = ((nodesCorrupt != null) &&
+                                    nodesCorrupt.contains(dn));
+          if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
+            machineSet[numNodes++] = dn;
         }
       }
-      results.add(new LocatedBlock(blocks[curBlk], machineSet, curPos));
+      results.add(new LocatedBlock(blocks[curBlk], machineSet, curPos,
+                  blockCorrupt));
       curPos += blocks[curBlk].getNumBytes();
       curBlk++;
     } while (curPos < endOff 
@@ -1262,6 +1278,22 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
     }
   }
 
+  /**
+   * Mark the block belonging to datanode as corrupt
+   * @param blk Block to be marked as corrupt
+   * @param datanode Datanode which holds the corrupt replica
+   */
+  public synchronized void markBlockAsCorrupt(Block blk, DatanodeInfo dn)
+    throws IOException {
+    DatanodeDescriptor node = getDatanode(dn);
+    if (node == null) {
+      throw new IOException("Cannot mark block" + blk.getBlockName() +
+                            " as corrupt because datanode " + dn.getName() +
+                            " does not exist. ");
+    }
+    corruptReplicas.addToCorruptReplicasMap(blk, node);
+  }
+
   /**
    * Invalidates the given block on the given datanode.
    */
@@ -2138,14 +2170,22 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
     DatanodeDescriptor srcNode = null;
     int live = 0;
     int decommissioned = 0;
+    int corrupt = 0;
     Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
     while(it.hasNext()) {
       DatanodeDescriptor node = it.next();
-      if(!node.isDecommissionInProgress() && !node.isDecommissioned())
+      Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(block);
+      if ((nodes != null) && (nodes.contains(node)))
+        corrupt++;
+      else if(!node.isDecommissionInProgress() && !node.isDecommissioned())
         live++;
       else
         decommissioned++;
       containingNodes.add(node);
+      // Check if this replica is corrupt
+      // If so, do not select the node as src node
+      if ((nodes != null) && nodes.contains(node))
+        continue;
       if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
         continue; // already reached replication limit
       // the block must not be scheduled for removal on srcNode
@@ -2170,7 +2210,7 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
         srcNode = node;
     }
     if(numReplicas != null)
-      numReplicas.initialize(live, decommissioned);
+      numReplicas.initialize(live, decommissioned, corrupt);
     return srcNode;
   }
 
@@ -2598,6 +2638,11 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
     if (numCurrentReplica > fileReplication) {
       proccessOverReplicatedBlock(block, fileReplication, node, delNodeHint);
     }
+    // If the file replication has reached desired value
+    // we can remove any corrupt replicas the block may have
+    int corruptReplicasCount = num.corruptReplicas();
+    if ((corruptReplicasCount > 0) && (numCurrentReplica == fileReplication))
+      corruptReplicas.invalidateCorruptReplicas(block);
     return block;
   }
 
@@ -2827,6 +2872,8 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
         excessReplicateMap.remove(node.getStorageID());
       }
     }
+    // If block is removed from blocksMap, remove it from corruptReplicas
+    corruptReplicas.removeFromCorruptReplicasMap(block);
   }
 
   /**
@@ -3079,18 +3126,20 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
   private static class NumberReplicas {
     private int liveReplicas;
     private int decommissionedReplicas;
+    private int corruptReplicas;
 
     NumberReplicas() {
-      initialize(0, 0);
+      initialize(0, 0, 0);
     }
 
-    NumberReplicas(int live, int decommissioned) {
-      initialize(live, decommissioned);
+    NumberReplicas(int live, int decommissioned, int corrupt) {
+      initialize(live, decommissioned, corrupt);
     }
 
-    void initialize(int live, int decommissioned) {
+    void initialize(int live, int decommissioned, int corrupt) {
       liveReplicas = live;
       decommissionedReplicas = decommissioned;
+      corruptReplicas = corrupt;
     }
 
     int liveReplicas() {
@@ -3099,32 +3148,41 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
     int decommissionedReplicas() {
       return decommissionedReplicas;
     }
+    int corruptReplicas() {
+      return corruptReplicas;
+    }
   } 
 
   /**
    * Counts the number of nodes in the given list into active and
    * decommissioned counters.
    */
-  private NumberReplicas countNodes(Iterator<DatanodeDescriptor> nodeIter) {
+  private NumberReplicas countNodes(Block b,
+                                    Iterator<DatanodeDescriptor> nodeIter) {
     int count = 0;
     int live = 0;
+    int corrupt = 0;
+    Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
     while ( nodeIter.hasNext() ) {
       DatanodeDescriptor node = nodeIter.next();
-      if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+      if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
+        corrupt++;
+      }
+      else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
         count++;
       }
       else {
         live++;
       }
     }
-    return new NumberReplicas(live, count);
+    return new NumberReplicas(live, count, corrupt);
   }
 
   /**
    * Return the number of nodes that are live and decommissioned.
    */
   private NumberReplicas countNodes(Block b) {
-    return countNodes(blocksMap.nodeIterator(b));
+    return countNodes(b, blocksMap.nodeIterator(b));
   }
 
   /**

+ 24 - 2
src/java/org/apache/hadoop/dfs/LocatedBlock.java

@@ -39,24 +39,36 @@ class LocatedBlock implements Writable {
   private Block b;
   private long offset;  // offset of the first byte of the block in the file
   private DatanodeInfo[] locs;
+  // corrupt flag is true if all of the replicas of a block are corrupt.
+  // else false. If block has few corrupt replicas, they are filtered and 
+  // their locations are not part of this object
+  private boolean corrupt;
 
   /**
    */
   public LocatedBlock() {
-    this(new Block(), new DatanodeInfo[0], 0L);
+    this(new Block(), new DatanodeInfo[0], 0L, false);
   }
 
   /**
    */
   public LocatedBlock(Block b, DatanodeInfo[] locs) {
-    this(b, locs, -1); // startOffset is unknown
+    this(b, locs, -1, false); // startOffset is unknown
   }
 
   /**
    */
   public LocatedBlock(Block b, DatanodeInfo[] locs, long startOffset) {
+    this(b, locs, startOffset, false);
+  }
+
+  /**
+   */
+  public LocatedBlock(Block b, DatanodeInfo[] locs, long startOffset, 
+                      boolean corrupt) {
     this.b = b;
     this.offset = startOffset;
+    this.corrupt = corrupt;
     if (locs==null) {
       this.locs = new DatanodeInfo[0];
     } else {
@@ -88,10 +100,19 @@ class LocatedBlock implements Writable {
     this.offset = value;
   }
 
+  void setCorrupt(boolean corrupt) {
+    this.corrupt = corrupt;
+  }
+  
+  boolean isCorrupt() {
+    return this.corrupt;
+  }
+
   ///////////////////////////////////////////
   // Writable
   ///////////////////////////////////////////
   public void write(DataOutput out) throws IOException {
+    out.writeBoolean(corrupt);
     out.writeLong(offset);
     b.write(out);
     out.writeInt(locs.length);
@@ -101,6 +122,7 @@ class LocatedBlock implements Writable {
   }
 
   public void readFields(DataInput in) throws IOException {
+    this.corrupt = in.readBoolean();
     offset = in.readLong();
     this.b = new Block();
     b.readFields(in);

+ 2 - 2
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -351,7 +351,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
   /**
    * The client has detected an error on the specified located blocks 
    * and is reporting them to the server.  For now, the namenode will 
-   * delete the blocks from the datanodes.  In the future we might 
+   * mark the block as corrupt.  In the future we might 
    * check the blocks are actually corrupt. 
    */
   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
@@ -361,7 +361,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
       DatanodeInfo[] nodes = blocks[i].getLocations();
       for (int j = 0; j < nodes.length; j++) {
         DatanodeInfo dn = nodes[j];
-        namesystem.invalidateBlock(blk, dn);
+        namesystem.markBlockAsCorrupt(blk, dn);
       }
     }
   }

+ 88 - 0
src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java

@@ -25,6 +25,10 @@ import java.net.URL;
 import java.net.URLConnection;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.io.*;
+import java.nio.channels.FileChannel;
+import java.nio.ByteBuffer;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -137,4 +141,88 @@ public class TestDatanodeBlockScanner extends TestCase {
     
     cluster.shutdown();
   }
+
+  void corruptReplica(String blockName, int replica) throws IOException {
+    Random random = new Random();
+    File baseDir = new File(System.getProperty("test.build.data"), "dfs/data");
+    for (int i=replica*2; i<replica*2+2; i++) {
+      File blockFile = new File(baseDir, "data" + (i+1)+ "/current/" + 
+                               blockName);
+      if (blockFile.exists()) {
+        // Corrupt replica by writing random bytes into replica
+        RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
+        FileChannel channel = raFile.getChannel();
+        String badString = "BADBAD";
+        int rand = random.nextInt((int)channel.size()/2);
+        raFile.seek(rand);
+        raFile.write(badString.getBytes());
+        raFile.close();
+      }
+    }
+  }
+
+  public void testBlockCorruptionPolicy() throws IOException {
+    Configuration conf = new Configuration();
+    Random random = new Random();
+    FileSystem fs = null;
+    DFSClient dfsClient = null;
+    LocatedBlocks blocks = null;
+    int blockCount = 0;
+
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    Path file1 = new Path("/tmp/testBlockVerification/file1");
+    DFSTestUtil.createFile(fs, file1, 1024, (short)3, 0);
+    String block = DFSTestUtil.getFirstBlock(fs, file1).toString();
+    
+    dfsClient = new DFSClient(new InetSocketAddress("localhost", 
+                                        cluster.getNameNodePort()), conf);
+    blocks = dfsClient.namenode.
+                   getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+    blockCount = blocks.get(0).getLocations().length;
+    assertTrue(blockCount == 3);
+    assertTrue(blocks.get(0).isCorrupt() == false);
+
+    // Corrupt random replica of block 
+    corruptReplica(block, random.nextInt(3));
+    cluster.shutdown();
+
+    // Restart the cluster hoping the corrupt block to be reported
+    // We have 2 good replicas and block is not corrupt
+    cluster = new MiniDFSCluster(conf, 3, false, null);
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    dfsClient = new DFSClient(new InetSocketAddress("localhost", 
+                                        cluster.getNameNodePort()), conf);
+    blocks = dfsClient.namenode.
+                   getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+    blockCount = blocks.get(0).getLocations().length;
+    assertTrue (blockCount == 2);
+    assertTrue(blocks.get(0).isCorrupt() == false);
+  
+    // Corrupt all replicas. Now, block should be marked as corrupt
+    // and we should get all the replicas 
+    corruptReplica(block, 0);
+    corruptReplica(block, 1);
+    corruptReplica(block, 2);
+
+    // Read the file to trigger reportBadBlocks by client
+    try {
+      IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(), 
+                        conf, true);
+    } catch (IOException e) {
+      // Ignore exception
+    }
+
+    // We now have he blocks to be marked as corrup and we get back all
+    // its replicas
+    blocks = dfsClient.namenode.
+                   getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+    blockCount = blocks.get(0).getLocations().length;
+    assertTrue (blockCount == 3);
+    assertTrue(blocks.get(0).isCorrupt() == true);
+
+    cluster.shutdown();
+  }
 }