Przeglądaj źródła

HDFS-826. Allow a mechanism for an application to detect that
datanode(s) have died in the write pipeline. (dhruba)



git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-append@953417 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur 15 lat temu
rodzic
commit
f094bdd577

+ 3 - 0
CHANGES.txt

@@ -12,6 +12,9 @@ Release 0.20-append - Unreleased
     HDFS-988. Fix bug where savenameSpace can corrupt edits log.
     (Nicolas Spiegelberg via dhruba)
 
+    HDFS-826. Allow a mechanism for an application to detect that 
+    datanode(s) have died in the write pipeline. (dhruba)
+
   IMPROVEMENTS
 
   BUG FIXES

+ 24 - 5
src/hdfs/org/apache/hadoop/hdfs/DFSClient.java

@@ -2169,6 +2169,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     private int maxRecoveryErrorCount = 5; // try block recovery 5 times
     private volatile boolean appendChunk = false;   // appending to existing partial block
     private long initialFileSize = 0; // at time of file open
+    private Progressable progress;
+    private short blockReplication; // replication factor of file
 
     private void setLastException(IOException e) {
       if (lastException == null) {
@@ -2707,13 +2709,12 @@ public class DFSClient implements FSConstants, java.io.Closeable {
       }
     }
 
-    private Progressable progress;
-
     private DFSOutputStream(String src, long blockSize, Progressable progress,
-        int bytesPerChecksum) throws IOException {
+        int bytesPerChecksum, short replication) throws IOException {
       super(new CRC32(), bytesPerChecksum, 4);
       this.src = src;
       this.blockSize = blockSize;
+      this.blockReplication = replication;
       this.progress = progress;
       if (progress != null) {
         LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
@@ -2737,7 +2738,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     DFSOutputStream(String src, FsPermission masked, boolean overwrite,
         short replication, long blockSize, Progressable progress,
         int buffersize, int bytesPerChecksum) throws IOException {
-      this(src, blockSize, progress, bytesPerChecksum);
+      this(src, blockSize, progress, bytesPerChecksum, replication);
 
       computePacketChunkSize(writePacketSize, bytesPerChecksum);
 
@@ -2759,7 +2760,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     DFSOutputStream(String src, int buffersize, Progressable progress,
         LocatedBlock lastBlock, FileStatus stat,
         int bytesPerChecksum) throws IOException {
-      this(src, stat.getBlockSize(), progress, bytesPerChecksum);
+      this(src, stat.getBlockSize(), progress, bytesPerChecksum, stat.getReplication());
       initialFileSize = stat.getLen(); // length of file when opened
 
       //
@@ -3155,6 +3156,24 @@ public class DFSClient implements FSConstants, java.io.Closeable {
       }
     }
 
+    /**
+     * Returns the number of replicas of current block. This can be different
+     * from the designated replication factor of the file because the NameNode
+     * does not replicate the block to which a client is currently writing to.
+     * The client continues to write to a block even if a few datanodes in the
+     * write pipeline have failed. If the current block is full and the next
+     * block is not yet allocated, then this API will return 0 because there are
+     * no replicas in the pipeline.
+     */
+    public int getNumCurrentReplicas() throws IOException {
+      synchronized(dataQueue) {
+        if (nodes == null) {
+          return blockReplication;
+        }
+        return nodes.length;
+      }
+    }
+
     /**
      * Waits till all existing data is flushed and confirmations 
      * received from datanodes. 

+ 15 - 2
src/test/org/apache/hadoop/hdfs/TestFileCreation.java

@@ -93,7 +93,7 @@ public class TestFileCreation extends junit.framework.TestCase {
     byte[] buffer = AppendTestUtil.randomBytes(seed, size);
     stm.write(buffer, 0, size);
   }
-
+  
   //
   // verify that the data written to the full blocks are sane
   // 
@@ -478,7 +478,16 @@ public class TestFileCreation extends junit.framework.TestCase {
                          + "Created file " + file1);
 
       // write two full blocks.
-      writeFile(stm, numBlocks * blockSize);
+      int remainingPiece = blockSize/2;
+      int blocksMinusPiece = numBlocks * blockSize - remainingPiece;
+      writeFile(stm, blocksMinusPiece);
+      stm.sync();
+      int actualRepl = ((DFSClient.DFSOutputStream)(stm.getWrappedStream())).
+                        getNumCurrentReplicas();
+      // if we sync on a block boundary, actualRepl will be 0
+      assertTrue(file1 + " should be replicated to 1 datanodes, not " + actualRepl,
+                 actualRepl == 1);
+      writeFile(stm, remainingPiece);
       stm.sync();
 
       // rename file wile keeping it open.
@@ -686,6 +695,10 @@ public class TestFileCreation extends junit.framework.TestCase {
       FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM);
       out.write("something".getBytes());
       out.sync();
+      int actualRepl = ((DFSClient.DFSOutputStream)(out.getWrappedStream())).
+                        getNumCurrentReplicas();
+      assertTrue(f + " should be replicated to " + DATANODE_NUM + " datanodes.",
+                 actualRepl == DATANODE_NUM);
 
       // set the soft and hard limit to be 1 second so that the
       // namenode triggers lease recovery