Browse Source

HADOOP-3035. During block transfers between datanodes, the receiving
datanode, now can report corrupt replicas received from src node to
the namenode. (Lohit Vijayarenu via rangadi)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@659235 13f79535-47bb-0310-9956-ffa450edef68

Raghu Angadi 17 years ago
parent
commit
704c3ddbe6

+ 4 - 0
CHANGES.txt

@@ -166,6 +166,10 @@ Trunk (unreleased changes)
     that it does not block user. DataNode misses heartbeats in large
     that it does not block user. DataNode misses heartbeats in large
     nodes otherwise. (Johan Oskarsson via rangadi)
     nodes otherwise. (Johan Oskarsson via rangadi)
 
 
+    HADOOP-3035. During block transfers between datanodes, the receiving
+    datanode, now can report corrupt replicas received from src node to
+    the namenode. (Lohit Vijayarenu via rangadi)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HADOOP-3274. The default constructor of BytesWritable creates empty 
     HADOOP-3274. The default constructor of BytesWritable creates empty 

+ 1 - 0
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -2299,6 +2299,7 @@ class DFSClient implements FSConstants {
         out.writeInt( nodes.length );
         out.writeInt( nodes.length );
         out.writeBoolean( recoveryFlag );       // recovery flag
         out.writeBoolean( recoveryFlag );       // recovery flag
         Text.writeString( out, client );
         Text.writeString( out, client );
+        out.writeBoolean(false); // Not sending src node information
         out.writeInt( nodes.length - 1 );
         out.writeInt( nodes.length - 1 );
         for (int i = 1; i < nodes.length; i++) {
         for (int i = 1; i < nodes.length; i++) {
           nodes[i].write(out);
           nodes[i].write(out);

+ 31 - 4
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -1126,6 +1126,7 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
      */
      */
     private void writeBlock(DataInputStream in) throws IOException {
     private void writeBlock(DataInputStream in) throws IOException {
       xceiverCount.incr();
       xceiverCount.incr();
+      DatanodeInfo srcDataNode = null;
       LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
       LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
                 " tcp no delay " + s.getTcpNoDelay());
                 " tcp no delay " + s.getTcpNoDelay());
       //
       //
@@ -1138,6 +1139,11 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
       int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
       int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
       boolean isRecovery = in.readBoolean(); // is this part of recovery?
       boolean isRecovery = in.readBoolean(); // is this part of recovery?
       String client = Text.readString(in); // working on behalf of this client
       String client = Text.readString(in); // working on behalf of this client
+      boolean hasSrcDataNode = in.readBoolean(); // is src node info present
+      if (hasSrcDataNode) {
+        srcDataNode = new DatanodeInfo();
+        srcDataNode.readFields(in);
+      }
       int numTargets = in.readInt();
       int numTargets = in.readInt();
       if (numTargets < 0) {
       if (numTargets < 0) {
         throw new IOException("Mislabelled incoming datastream.");
         throw new IOException("Mislabelled incoming datastream.");
@@ -1159,7 +1165,7 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
       try {
       try {
         // open a block receiver and check if the block does not exist
         // open a block receiver and check if the block does not exist
         blockReceiver = new BlockReceiver(block, in, 
         blockReceiver = new BlockReceiver(block, in, 
-            s.getInetAddress().toString(), isRecovery, client);
+            s.getInetAddress().toString(), isRecovery, client, srcDataNode);
 
 
         // get a connection back to the previous target
         // get a connection back to the previous target
         replyOut = new DataOutputStream(
         replyOut = new DataOutputStream(
@@ -1196,6 +1202,10 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
             mirrorOut.writeInt( pipelineSize );
             mirrorOut.writeInt( pipelineSize );
             mirrorOut.writeBoolean( isRecovery );
             mirrorOut.writeBoolean( isRecovery );
             Text.writeString( mirrorOut, client );
             Text.writeString( mirrorOut, client );
+            mirrorOut.writeBoolean(hasSrcDataNode);
+            if (hasSrcDataNode) { // pass src node information
+              srcDataNode.write(mirrorOut);
+            }
             mirrorOut.writeInt( targets.length - 1 );
             mirrorOut.writeInt( targets.length - 1 );
             for ( int i = 1; i < targets.length; i++ ) {
             for ( int i = 1; i < targets.length; i++ ) {
               targets[i].write( mirrorOut );
               targets[i].write( mirrorOut );
@@ -1419,7 +1429,7 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
       try {
       try {
         // open a block receiver and check if the block does not exist
         // open a block receiver and check if the block does not exist
          blockReceiver = new BlockReceiver(
          blockReceiver = new BlockReceiver(
-            block, in, s.getRemoteSocketAddress().toString(), false, "");
+            block, in, s.getRemoteSocketAddress().toString(), false, "", null);
 
 
         // receive a block
         // receive a block
         blockReceiver.receiveBlock(null, null, null, null, balancingThrottler, -1);
         blockReceiver.receiveBlock(null, null, null, null, balancingThrottler, -1);
@@ -2250,10 +2260,11 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
     private FSDataset.BlockWriteStreams streams;
     private FSDataset.BlockWriteStreams streams;
     private boolean isRecovery = false;
     private boolean isRecovery = false;
     private String clientName;
     private String clientName;
+    DatanodeInfo srcDataNode = null;
 
 
     BlockReceiver(Block block, DataInputStream in, String inAddr,
     BlockReceiver(Block block, DataInputStream in, String inAddr,
-                  boolean isRecovery, String clientName)
-        throws IOException {
+                  boolean isRecovery, String clientName, 
+                  DatanodeInfo srcDataNode) throws IOException {
       try{
       try{
         this.block = block;
         this.block = block;
         this.in = in;
         this.in = in;
@@ -2264,6 +2275,7 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
         this.checksum = DataChecksum.newDataChecksum(in);
         this.checksum = DataChecksum.newDataChecksum(in);
         this.bytesPerChecksum = checksum.getBytesPerChecksum();
         this.bytesPerChecksum = checksum.getBytesPerChecksum();
         this.checksumSize = checksum.getChecksumSize();
         this.checksumSize = checksum.getChecksumSize();
+        this.srcDataNode = srcDataNode;
         //
         //
         // Open local disk out
         // Open local disk out
         //
         //
@@ -2352,6 +2364,18 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
         checksum.update(dataBuf, dataOff, chunkLen);
         checksum.update(dataBuf, dataOff, chunkLen);
 
 
         if (!checksum.compare(checksumBuf, checksumOff)) {
         if (!checksum.compare(checksumBuf, checksumOff)) {
+          if (srcDataNode != null) {
+            try {
+              LOG.info("report corrupt block " + block + " from datanode " +
+                        srcDataNode + " to namenode");
+              LocatedBlock lb = new LocatedBlock(block, 
+                                              new DatanodeInfo[] {srcDataNode});
+              namenode.reportBadBlocks(new LocatedBlock[] {lb});
+            } catch (IOException e) {
+              LOG.warn("Failed to report bad block " + block + 
+                        " from datanode " + srcDataNode + " to namenode");
+            }
+          }
           throw new IOException("Unexpected checksum mismatch " + 
           throw new IOException("Unexpected checksum mismatch " + 
                                 "while writing " + block + " from " + inAddr);
                                 "while writing " + block + " from " + inAddr);
         }
         }
@@ -2770,6 +2794,7 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
                                                             SMALL_BUFFER_SIZE));
                                                             SMALL_BUFFER_SIZE));
 
 
         blockSender = new BlockSender(b, 0, -1, false, false, false);
         blockSender = new BlockSender(b, 0, -1, false, false, false);
+        DatanodeInfo srcNode = new DatanodeInfo(dnRegistration);
 
 
         //
         //
         // Header info
         // Header info
@@ -2781,6 +2806,8 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
         out.writeInt(0);           // no pipelining
         out.writeInt(0);           // no pipelining
         out.writeBoolean(false);   // not part of recovery
         out.writeBoolean(false);   // not part of recovery
         Text.writeString(out, ""); // client
         Text.writeString(out, ""); // client
+        out.writeBoolean(true); // sending src node information
+        srcNode.write(out); // Write src node DatanodeInfo
         // write targets
         // write targets
         out.writeInt(targets.length - 1);
         out.writeInt(targets.length - 1);
         for (int i = 1; i < targets.length; i++) {
         for (int i = 1; i < targets.length; i++) {

+ 4 - 4
src/java/org/apache/hadoop/dfs/FSConstants.java

@@ -101,11 +101,11 @@ public interface FSConstants {
    * when protocol changes. It is not very obvious. 
    * when protocol changes. It is not very obvious. 
    */
    */
   /*
   /*
-   * Version 10:
-   *    DFSClient also sends non-interleaved checksum and data while writing
-   *    to DFS.
+   * Version 11:
+   *    OP_WRITE_BLOCK sends a boolean. If its value is true, an additonal 
+   *    DatanodeInfo of client requesting transfer is also sent. 
    */
    */
-  public static final int DATA_TRANSFER_VERSION = 10;
+  public static final int DATA_TRANSFER_VERSION = 11;
 
 
   // Return codes for file create
   // Return codes for file create
   public static final int OPERATION_FAILED = 0;
   public static final int OPERATION_FAILED = 0;

+ 39 - 0
src/test/org/apache/hadoop/dfs/MiniDFSCluster.java

@@ -22,6 +22,10 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
+import java.nio.channels.FileChannel;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.io.RandomAccessFile;
 
 
 import javax.security.auth.login.LoginException;
 import javax.security.auth.login.LoginException;
 
 
@@ -546,6 +550,41 @@ public class MiniDFSCluster {
     }
     }
   }
   }
 
 
+  /*
+   * Corrupt a block on all datanode
+   */
+  void corruptBlockOnDataNodes(String blockName) throws Exception{
+    for (int i=0; i < dataNodes.size(); i++)
+      corruptBlockOnDataNode(i,blockName);
+  }
+
+  /*
+   * Corrupt a block on a particular datanode
+   */
+  boolean corruptBlockOnDataNode(int i, String blockName) throws Exception {
+    Random random = new Random();
+    boolean corrupted = false;
+    File baseDir = new File(System.getProperty("test.build.data"), "dfs/data");
+    if (i < 0 || i >= dataNodes.size())
+      return false;
+    for (int dn = i*2; dn < i*2+2; dn++) {
+      File blockFile = new File(baseDir, "data" + (dn+1) + "/current/" +
+                                blockName);
+      if (blockFile.exists()) {
+        // Corrupt replica by writing random bytes into replica
+        RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
+        FileChannel channel = raFile.getChannel();
+        String badString = "BADBAD";
+        int rand = random.nextInt((int)channel.size()/2);
+        raFile.seek(rand);
+        raFile.write(badString.getBytes());
+        raFile.close();
+      }
+      corrupted = true;
+    }
+    return corrupted;
+  }
+
   /*
   /*
    * Shutdown a particular datanode
    * Shutdown a particular datanode
    */
    */

+ 4 - 0
src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java

@@ -171,6 +171,7 @@ public class TestDataTransferProtocol extends TestCase {
     sendOut.writeInt(0);           // targets in pipeline 
     sendOut.writeInt(0);           // targets in pipeline 
     sendOut.writeBoolean(false);   // recoveryFlag
     sendOut.writeBoolean(false);   // recoveryFlag
     Text.writeString(sendOut, "cl");// clientID
     Text.writeString(sendOut, "cl");// clientID
+    sendOut.writeBoolean(false); // no src node info
     sendOut.writeInt(0);           // number of downstream targets
     sendOut.writeInt(0);           // number of downstream targets
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     
     
@@ -189,6 +190,7 @@ public class TestDataTransferProtocol extends TestCase {
     sendOut.writeInt(0);           // targets in pipeline 
     sendOut.writeInt(0);           // targets in pipeline 
     sendOut.writeBoolean(false);   // recoveryFlag
     sendOut.writeBoolean(false);   // recoveryFlag
     Text.writeString(sendOut, "cl");// clientID
     Text.writeString(sendOut, "cl");// clientID
+    sendOut.writeBoolean(false); // no src node info
 
 
     // bad number of targets
     // bad number of targets
     sendOut.writeInt(-1-random.nextInt(oneMil));
     sendOut.writeInt(-1-random.nextInt(oneMil));
@@ -204,6 +206,7 @@ public class TestDataTransferProtocol extends TestCase {
     sendOut.writeInt(0);           // targets in pipeline 
     sendOut.writeInt(0);           // targets in pipeline 
     sendOut.writeBoolean(false);   // recoveryFlag
     sendOut.writeBoolean(false);   // recoveryFlag
     Text.writeString(sendOut, "cl");// clientID
     Text.writeString(sendOut, "cl");// clientID
+    sendOut.writeBoolean(false); // no src node info
     sendOut.writeInt(0);
     sendOut.writeInt(0);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt((int)512);
     sendOut.writeInt((int)512);
@@ -230,6 +233,7 @@ public class TestDataTransferProtocol extends TestCase {
     sendOut.writeInt(0);           // targets in pipeline 
     sendOut.writeInt(0);           // targets in pipeline 
     sendOut.writeBoolean(false);   // recoveryFlag
     sendOut.writeBoolean(false);   // recoveryFlag
     Text.writeString(sendOut, "cl");// clientID
     Text.writeString(sendOut, "cl");// clientID
+    sendOut.writeBoolean(false); // no src node info
     sendOut.writeInt(0);
     sendOut.writeInt(0);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt((int)512);    // checksum size
     sendOut.writeInt((int)512);    // checksum size

+ 46 - 0
src/test/org/apache/hadoop/dfs/TestReplication.java

@@ -126,6 +126,51 @@ public class TestReplication extends TestCase {
     fileSys.delete(name, true);
     fileSys.delete(name, true);
     assertTrue(!fileSys.exists(name));
     assertTrue(!fileSys.exists(name));
   }
   }
+
+  /* 
+   * Test if Datanode reports bad blocks during replication request
+   */
+  public void testBadBlockReportOnTransfer() throws Exception {
+    Configuration conf = new Configuration();
+    FileSystem fs = null;
+    DFSClient dfsClient = null;
+    LocatedBlocks blocks = null;
+    int replicaCount = 0;
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    dfsClient = new DFSClient(new InetSocketAddress("localhost",
+                              cluster.getNameNodePort()), conf);
+  
+    // Create file with replication factor of 1
+    Path file1 = new Path("/tmp/testBadBlockReportOnTransfer/file1");
+    DFSTestUtil.createFile(fs, file1, 1024, (short)1, 0);
+    DFSTestUtil.waitReplication(fs, file1, (short)1);
+  
+    // Corrupt the block belonging to the created file
+    String block = DFSTestUtil.getFirstBlock(fs, file1).getBlockName();
+    cluster.corruptBlockOnDataNodes(block);
+  
+    // Increase replication factor, this should invoke transfer request
+    // Receiving datanode fails on checksum and reports it to namenode
+    fs.setReplication(file1, (short)2);
+  
+    // Now get block details and check if the block is corrupt
+    blocks = dfsClient.namenode.
+              getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+    while (blocks.get(0).isCorrupt() != true) {
+      try {
+        LOG.info("Waiting until block is marked as corrupt...");
+        Thread.sleep(1000);
+      } catch (InterruptedException ie) {
+      }
+      blocks = dfsClient.namenode.
+                getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+    }
+    replicaCount = blocks.get(0).getLocations().length;
+    assertTrue(replicaCount == 1);
+    cluster.shutdown();
+  }
   
   
   /**
   /**
    * Tests replication in DFS.
    * Tests replication in DFS.
@@ -330,4 +375,5 @@ public class TestReplication extends TestCase {
       }
       }
     }
     }
   }  
   }  
+  
 }
 }