Browse Source

Merge -r 724882:724883 from main to move the change log of HADOOP-4702 into release 0.18.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.18@724907 13f79535-47bb-0310-9956-ffa450edef68
Hairong Kuang 16 years ago
parent
commit
5e04159fc3

+ 4 - 1
CHANGES.txt

@@ -40,7 +40,7 @@ Release 0.18.3 - Unreleased
     HADOOP-4061. Throttle Datanode decommission monitoring in Namenode.
     HADOOP-4061. Throttle Datanode decommission monitoring in Namenode.
     (szetszwo)
     (szetszwo)
 
 
-    HADOOP-4659. Root cause of connection failure is being ost to code that
+    HADOOP-4659. Root cause of connection failure is being lost to code that
     uses it for delaying startup. (Steve Loughran and Hairong via hairong)
     uses it for delaying startup. (Steve Loughran and Hairong via hairong)
 
 
     HADOOP-4614. Lazily open segments when merging map spills to avoid using
     HADOOP-4614. Lazily open segments when merging map spills to avoid using
@@ -83,6 +83,9 @@ Release 0.18.3 - Unreleased
 
 
     HADOOP-4742. Replica gets deleted by mistake. (Wang Xu via hairong)
     HADOOP-4742. Replica gets deleted by mistake. (Wang Xu via hairong)
 
 
+    HADOOP-4702. Failed block replication leaves an incomplete block in
+    receiver's tmp data directory. (hairong)
+
 Release 0.18.2 - 2008-11-03
 Release 0.18.2 - 2008-11-03
 
 
   BUG FIXES
   BUG FIXES

+ 14 - 1
src/hdfs/org/apache/hadoop/dfs/DataNode.java

@@ -2364,10 +2364,10 @@ public class DataNode extends Configured
         this.isRecovery = isRecovery;
         this.isRecovery = isRecovery;
         this.clientName = clientName;
         this.clientName = clientName;
         this.offsetInBlock = 0;
         this.offsetInBlock = 0;
+        this.srcDataNode = srcDataNode;
         this.checksum = DataChecksum.newDataChecksum(in);
         this.checksum = DataChecksum.newDataChecksum(in);
         this.bytesPerChecksum = checksum.getBytesPerChecksum();
         this.bytesPerChecksum = checksum.getBytesPerChecksum();
         this.checksumSize = checksum.getChecksumSize();
         this.checksumSize = checksum.getChecksumSize();
-        this.srcDataNode = srcDataNode;
         //
         //
         // Open local disk out
         // Open local disk out
         //
         //
@@ -2381,6 +2381,9 @@ public class DataNode extends Configured
         }
         }
       } catch(IOException ioe) {
       } catch(IOException ioe) {
         IOUtils.closeStream(this);
         IOUtils.closeStream(this);
+        removeBlock();
+
+        // check if there is a disk error
         IOException cause = FSDataset.getCauseIfDiskError(ioe);
         IOException cause = FSDataset.getCauseIfDiskError(ioe);
         if (cause != null) { // possible disk error
         if (cause != null) { // possible disk error
           ioe = cause;
           ioe = cause;
@@ -2792,6 +2795,7 @@ public class DataNode extends Configured
         if (responder != null) {
         if (responder != null) {
           responder.interrupt();
           responder.interrupt();
         }
         }
+        removeBlock();
         throw ioe;
         throw ioe;
       } finally {
       } finally {
         if (responder != null) {
         if (responder != null) {
@@ -2805,6 +2809,15 @@ public class DataNode extends Configured
       }
       }
     }
     }
 
 
+    /** Remove a partial block
+     * if this write is for a replication request (and not from a client)
+     */
+    private void removeBlock() throws IOException {
+      if (clientName.length() == 0) { // not client write
+        data.unfinalizeBlock(block);
+      }
+    }
+
     /**
     /**
      * Sets the file pointer in the local block file to the specified value.
      * Sets the file pointer in the local block file to the specified value.
      */
      */

+ 36 - 2
src/hdfs/org/apache/hadoop/dfs/FSDataset.java

@@ -1089,11 +1089,45 @@ class FSDataset implements FSConstants, FSDatasetInterface {
    * Remove the temporary block file (if any)
    * Remove the temporary block file (if any)
    */
    */
   public synchronized void unfinalizeBlock(Block b) throws IOException {
   public synchronized void unfinalizeBlock(Block b) throws IOException {
-    ongoingCreates.remove(b);
+    // remove the block from in-memory data structure
+    ActiveFile activefile = ongoingCreates.remove(b);
+    if (activefile == null) {
+      return;
+    }
     volumeMap.remove(b);
     volumeMap.remove(b);
-    DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
+    
+    // delete the on-disk temp file
+    if (delBlockFromDisk(activefile.file, getMetaFile(activefile.file, b), b)) {
+      DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
+    }
   }
   }
 
 
+  /**
+   * Remove a block from disk
+   * @param blockFile block file
+   * @param metaFile block meta file
+   * @param b a block
+   * @return true if on-disk files are deleted; false otherwise
+   */
+  private boolean delBlockFromDisk(File blockFile, File metaFile, Block b) {
+    if (blockFile == null) {
+      DataNode.LOG.warn("No file exists for block: " + b);
+      return true;
+    }
+    
+    if (!blockFile.delete()) {
+      DataNode.LOG.warn("Not able to delete the block file: " + blockFile);
+      return false;
+    } else { // remove the meta file
+      if (metaFile != null && !metaFile.delete()) {
+        DataNode.LOG.warn(
+            "Not able to delete the meta block file: " + metaFile);
+        return false;
+      }
+    }
+    return true;
+  }
+  
   /**
   /**
    * Return a table of block data
    * Return a table of block data
    */
    */

+ 81 - 1
src/test/org/apache/hadoop/dfs/TestDiskError.java

@@ -17,17 +17,22 @@
  */
  */
 package org.apache.hadoop.dfs;
 package org.apache.hadoop.dfs;
 
 
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.File;
+import java.net.InetSocketAddress;
+import java.net.Socket;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.dfs.DFSTestUtil;
 import org.apache.hadoop.dfs.DFSTestUtil;
+import org.apache.hadoop.dfs.FSConstants;
 import org.apache.hadoop.dfs.MiniDFSCluster;
 import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.io.Text;
 
 
 import junit.framework.TestCase;
 import junit.framework.TestCase;
 
 
-/** Test if a datanode can handle disk error correctly*/
+/** Test if a datanode can correctly handle errors during block read/write*/
 public class TestDiskError extends TestCase {
 public class TestDiskError extends TestCase {
   public void testShutdown() throws Exception {
   public void testShutdown() throws Exception {
     // bring up a cluster of 3
     // bring up a cluster of 3
@@ -54,6 +59,7 @@ public class TestDiskError extends TestCase {
         Path fileName = new Path("/test.txt"+i);
         Path fileName = new Path("/test.txt"+i);
         DFSTestUtil.createFile(fs, fileName, 1024, (short)2, 1L);
         DFSTestUtil.createFile(fs, fileName, 1024, (short)2, 1L);
         DFSTestUtil.waitReplication(fs, fileName, (short)2);
         DFSTestUtil.waitReplication(fs, fileName, (short)2);
+        fs.delete(fileName, true);
       }
       }
     } finally {
     } finally {
       // restore its old permission
       // restore its old permission
@@ -62,4 +68,78 @@ public class TestDiskError extends TestCase {
       cluster.shutdown();
       cluster.shutdown();
     }
     }
   }
   }
+  
+  public void testReplicationError() throws Exception {
+    // bring up a cluster of 1
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    cluster.waitActive();
+    FileSystem fs = cluster.getFileSystem();
+    
+    try {
+      // create a file of replication factor of 1
+      final Path fileName = new Path("/test.txt");
+      final int fileLen = 1;
+      DFSTestUtil.createFile(fs, fileName, 1, (short)1, 1L);
+      DFSTestUtil.waitReplication(fs, fileName, (short)1);
+
+      // get the block belonged to the created file
+      LocatedBlocks blocks = cluster.getNameNode().namesystem.getBlockLocations(
+          fileName.toString(), 0, (long)fileLen);
+      assertEquals(blocks.locatedBlockCount(), 1);
+      LocatedBlock block = blocks.get(0);
+      
+      // bring up a second datanode
+      cluster.startDataNodes(conf, 1, true, null, null);
+      cluster.waitActive();
+      final int sndNode = 1;
+      DataNode datanode = cluster.getDataNodes().get(sndNode);
+      
+      // replicate the block to the second datanode
+      InetSocketAddress target = datanode.getSelfAddr();
+      Socket s = new Socket(target.getAddress(), target.getPort());
+        //write the header.
+      DataOutputStream out = new DataOutputStream(
+          s.getOutputStream());
+
+      out.writeShort( FSConstants.DATA_TRANSFER_VERSION );
+      out.write( FSConstants.OP_WRITE_BLOCK );
+      out.writeLong( block.getBlock().getBlockId());
+      out.writeLong( block.getBlock().getGenerationStamp() );
+      out.writeInt(1);
+      out.writeBoolean( false );       // recovery flag
+      Text.writeString( out, "" );
+      out.writeBoolean(false); // Not sending src node information
+      out.writeInt(0);
+      
+      // write check header
+      out.writeByte( 1 );
+      out.writeInt( 512 );
+
+      out.flush();
+
+      // close the connection before sending the content of the block
+      out.close();
+      
+      // the temporary block & meta files should be deleted
+      String dataDir = new File(
+         System.getProperty("test.build.data", "build/test/data"), 
+         "dfs").toString() + "/data";
+      File dir1 = new File(new File(dataDir, "data"+(2*sndNode+1)), "tmp");
+      File dir2 = new File(new File(dataDir, "data"+(2*sndNode+2)), "tmp");
+      while (dir1.listFiles().length != 0 || dir2.listFiles().length != 0) {
+        Thread.sleep(100);
+      }
+      
+      // then increase the file's replication factor
+      fs.setReplication(fileName, (short)2);
+      // replication should succeed
+      DFSTestUtil.waitReplication(fs, fileName, (short)1);
+      
+      // clean up the file
+      fs.delete(fileName, false);
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }
 }