瀏覽代碼

Merge -r 724882:724883 from main to move the change log of HADOOP-4702 into release 0.19.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.19@724887 13f79535-47bb-0310-9956-ffa450edef68
Hairong Kuang 16 年之前
父節點
當前提交
57304eee05

+ 4 - 1
CHANGES.txt

@@ -1037,7 +1037,7 @@ Release 0.18.3 - Unreleased
     HADOOP-4061. Throttle Datanode decommission monitoring in Namenode.
     HADOOP-4061. Throttle Datanode decommission monitoring in Namenode.
     (szetszwo)
     (szetszwo)
 
 
-    HADOOP-4659. Root cause of connection failure is being ost to code that
+    HADOOP-4659. Root cause of connection failure is being lost to code that
     uses it for delaying startup. (Steve Loughran and Hairong via hairong)
     uses it for delaying startup. (Steve Loughran and Hairong via hairong)
 
 
     HADOOP-4614. Lazily open segments when merging map spills to avoid using
     HADOOP-4614. Lazily open segments when merging map spills to avoid using
@@ -1070,6 +1070,9 @@ Release 0.18.3 - Unreleased
 
 
     HADOOP-4742. Replica gets deleted by mistake. (Wang Xu via hairong)
     HADOOP-4742. Replica gets deleted by mistake. (Wang Xu via hairong)
 
 
+    HADOOP-4702. Failed block replication leaves an incomplete block in
+    receiver's tmp data directory. (hairong)
+
 Release 0.18.2 - 2008-11-03
 Release 0.18.2 - 2008-11-03
 
 
   BUG FIXES
   BUG FIXES

+ 17 - 3
src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -86,11 +86,11 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
       this.isRecovery = isRecovery;
       this.isRecovery = isRecovery;
       this.clientName = clientName;
       this.clientName = clientName;
       this.offsetInBlock = 0;
       this.offsetInBlock = 0;
+      this.srcDataNode = srcDataNode;
+      this.datanode = datanode;
       this.checksum = DataChecksum.newDataChecksum(in);
       this.checksum = DataChecksum.newDataChecksum(in);
       this.bytesPerChecksum = checksum.getBytesPerChecksum();
       this.bytesPerChecksum = checksum.getBytesPerChecksum();
       this.checksumSize = checksum.getChecksumSize();
       this.checksumSize = checksum.getChecksumSize();
-      this.srcDataNode = srcDataNode;
-      this.datanode = datanode;
       //
       //
       // Open local disk out
       // Open local disk out
       //
       //
@@ -109,11 +109,15 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
       }
       }
     } catch(IOException ioe) {
     } catch(IOException ioe) {
       IOUtils.closeStream(this);
       IOUtils.closeStream(this);
+      removeBlock();
+      
+      // check if there is a disk error
       IOException cause = FSDataset.getCauseIfDiskError(ioe);
       IOException cause = FSDataset.getCauseIfDiskError(ioe);
       if (cause != null) { // possible disk error
       if (cause != null) { // possible disk error
         ioe = cause;
         ioe = cause;
-        datanode.checkDiskError(ioe);
+        datanode.checkDiskError(ioe); // may throw an exception here
       }
       }
+      
       throw ioe;
       throw ioe;
     }
     }
   }
   }
@@ -553,6 +557,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
       if (responder != null) {
       if (responder != null) {
         responder.interrupt();
         responder.interrupt();
       }
       }
+      removeBlock();
       throw ioe;
       throw ioe;
     } finally {
     } finally {
       if (responder != null) {
       if (responder != null) {
@@ -566,6 +571,15 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
     }
     }
   }
   }
 
 
+  /** Remove a partial block 
+   * if this write is for a replication request (and not from a client)
+   */
+  private void removeBlock() throws IOException {
+    if (clientName.length() == 0) { // not client write
+      datanode.data.unfinalizeBlock(block);
+    }
+  }
+
   /**
   /**
    * Sets the file pointer in the local block file to the specified value.
    * Sets the file pointer in the local block file to the specified value.
    */
    */

+ 36 - 2
src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java

@@ -1164,11 +1164,45 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
    * Remove the temporary block file (if any)
    * Remove the temporary block file (if any)
    */
    */
   public synchronized void unfinalizeBlock(Block b) throws IOException {
   public synchronized void unfinalizeBlock(Block b) throws IOException {
-    ongoingCreates.remove(b);
+    // remove the block from in-memory data structure
+    ActiveFile activefile = ongoingCreates.remove(b);
+    if (activefile == null) {
+      return;
+    }
     volumeMap.remove(b);
     volumeMap.remove(b);
-    DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
+    
+    // delete the on-disk temp file
+    if (delBlockFromDisk(activefile.file, getMetaFile(activefile.file, b), b)) {
+      DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
+    }
   }
   }
 
 
+  /**
+   * Remove a block from disk
+   * @param blockFile block file
+   * @param metaFile block meta file
+   * @param b a block
+   * @return true if on-disk files are deleted; false otherwise
+   */
+  private boolean delBlockFromDisk(File blockFile, File metaFile, Block b) {
+    if (blockFile == null) {
+      DataNode.LOG.warn("No file exists for block: " + b);
+      return true;
+    }
+    
+    if (!blockFile.delete()) {
+      DataNode.LOG.warn("Not able to delete the block file: " + blockFile);
+      return false;
+    } else { // remove the meta file
+      if (metaFile != null && !metaFile.delete()) {
+        DataNode.LOG.warn(
+            "Not able to delete the meta block file: " + metaFile);
+        return false;
+      }
+    }
+    return true;
+  }
+  
   /**
   /**
    * Return a table of block data
    * Return a table of block data
    */
    */

+ 81 - 1
src/test/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java

@@ -17,17 +17,24 @@
  */
  */
 package org.apache.hadoop.hdfs.server.datanode;
 package org.apache.hadoop.hdfs.server.datanode;
 
 
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.File;
+import java.net.InetSocketAddress;
+import java.net.Socket;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.io.Text;
 
 
 import junit.framework.TestCase;
 import junit.framework.TestCase;
 
 
-/** Test if a datanode can handle disk error correctly*/
+/** Test if a datanode can correctly handle errors during block read/write*/
 public class TestDiskError extends TestCase {
 public class TestDiskError extends TestCase {
   public void testShutdown() throws Exception {
   public void testShutdown() throws Exception {
     // bring up a cluster of 3
     // bring up a cluster of 3
@@ -52,6 +59,7 @@ public class TestDiskError extends TestCase {
         Path fileName = new Path("/test.txt"+i);
         Path fileName = new Path("/test.txt"+i);
         DFSTestUtil.createFile(fs, fileName, 1024, (short)2, 1L);
         DFSTestUtil.createFile(fs, fileName, 1024, (short)2, 1L);
         DFSTestUtil.waitReplication(fs, fileName, (short)2);
         DFSTestUtil.waitReplication(fs, fileName, (short)2);
+        fs.delete(fileName, true);
       }
       }
     } finally {
     } finally {
       // restore its old permission
       // restore its old permission
@@ -60,4 +68,76 @@ public class TestDiskError extends TestCase {
       cluster.shutdown();
       cluster.shutdown();
     }
     }
   }
   }
+  
+  public void testReplicationError() throws Exception {
+    // bring up a cluster of 1
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    cluster.waitActive();
+    FileSystem fs = cluster.getFileSystem();
+    
+    try {
+      // create a file of replication factor of 1
+      final Path fileName = new Path("/test.txt");
+      final int fileLen = 1;
+      DFSTestUtil.createFile(fs, fileName, 1, (short)1, 1L);
+      DFSTestUtil.waitReplication(fs, fileName, (short)1);
+
+      // get the block belonged to the created file
+      LocatedBlocks blocks = cluster.getNameNode().namesystem.getBlockLocations(
+          fileName.toString(), 0, (long)fileLen);
+      assertEquals(blocks.locatedBlockCount(), 1);
+      LocatedBlock block = blocks.get(0);
+      
+      // bring up a second datanode
+      cluster.startDataNodes(conf, 1, true, null, null);
+      cluster.waitActive();
+      final int sndNode = 1;
+      DataNode datanode = cluster.getDataNodes().get(sndNode);
+      
+      // replicate the block to the second datanode
+      InetSocketAddress target = datanode.getSelfAddr();
+      Socket s = new Socket(target.getAddress(), target.getPort());
+        //write the header.
+      DataOutputStream out = new DataOutputStream(
+          s.getOutputStream());
+
+      out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
+      out.write( DataTransferProtocol.OP_WRITE_BLOCK );
+      out.writeLong( block.getBlock().getBlockId());
+      out.writeLong( block.getBlock().getGenerationStamp() );
+      out.writeInt(1);
+      out.writeBoolean( false );       // recovery flag
+      Text.writeString( out, "" );
+      out.writeBoolean(false); // Not sending src node information
+      out.writeInt(0);
+      
+      // write check header
+      out.writeByte( 1 );
+      out.writeInt( 512 );
+
+      out.flush();
+
+      // close the connection before sending the content of the block
+      out.close();
+      
+      // the temporary block & meta files should be deleted
+      String dataDir = cluster.getDataDirectory();
+      File dir1 = new File(new File(dataDir, "data"+(2*sndNode+1)), "tmp");
+      File dir2 = new File(new File(dataDir, "data"+(2*sndNode+2)), "tmp");
+      while (dir1.listFiles().length != 0 || dir2.listFiles().length != 0) {
+        Thread.sleep(100);
+      }
+      
+      // then increase the file's replication factor
+      fs.setReplication(fileName, (short)2);
+      // replication should succeed
+      DFSTestUtil.waitReplication(fs, fileName, (short)1);
+      
+      // clean up the file
+      fs.delete(fileName, false);
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }
 }