瀏覽代碼

HDFS-826. The DFSOutputStream has a API that returns the number of
active datanode(s) in the current pipeline. (dhruba)



git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@923098 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur 15 年之前
父節點
當前提交
8869bec2d3

+ 3 - 0
CHANGES.txt

@@ -95,6 +95,9 @@ Trunk (unreleased changes)
     HDFS-850. The WebUI display more details about namenode memory usage.
     HDFS-850. The WebUI display more details about namenode memory usage.
     (Dmytro Molkov via dhruba)
     (Dmytro Molkov via dhruba)
 
 
+    HDFS-826. The DFSOutputStream has a API that returns the number of
+    active datanode(s) in the current pipeline. (dhruba)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HDFS-946. NameNode should not return full path name when lisitng a
     HDFS-946. NameNode should not return full path name when lisitng a

+ 27 - 4
src/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -122,6 +122,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
   private volatile boolean appendChunk = false;   // appending to existing partial block
   private volatile boolean appendChunk = false;   // appending to existing partial block
   private long initialFileSize = 0; // at time of file open
   private long initialFileSize = 0; // at time of file open
   private Progressable progress;
   private Progressable progress;
+  private short blockReplication; // replication factor of file
   
   
   private class Packet {
   private class Packet {
     ByteBuffer buffer;           // only one of buf and buffer is non-null
     ByteBuffer buffer;           // only one of buf and buffer is non-null
@@ -1025,12 +1026,13 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
   }
   }
 
 
   private DFSOutputStream(DFSClient dfsClient, String src, long blockSize, Progressable progress,
   private DFSOutputStream(DFSClient dfsClient, String src, long blockSize, Progressable progress,
-      int bytesPerChecksum) throws IOException {
+      int bytesPerChecksum, short replication) throws IOException {
     super(new PureJavaCrc32(), bytesPerChecksum, 4);
     super(new PureJavaCrc32(), bytesPerChecksum, 4);
     this.dfsClient = dfsClient;
     this.dfsClient = dfsClient;
     this.conf = dfsClient.conf;
     this.conf = dfsClient.conf;
     this.src = src;
     this.src = src;
     this.blockSize = blockSize;
     this.blockSize = blockSize;
+    this.blockReplication = replication;
     this.progress = progress;
     this.progress = progress;
     if (progress != null) {
     if (progress != null) {
       DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
       DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
@@ -1055,7 +1057,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
       boolean createParent, short replication, long blockSize, Progressable progress,
       boolean createParent, short replication, long blockSize, Progressable progress,
       int buffersize, int bytesPerChecksum) 
       int buffersize, int bytesPerChecksum) 
       throws IOException, UnresolvedLinkException {
       throws IOException, UnresolvedLinkException {
-    this(dfsClient, src, blockSize, progress, bytesPerChecksum);
+    this(dfsClient, src, blockSize, progress, bytesPerChecksum, replication);
 
 
     computePacketChunkSize(dfsClient.writePacketSize, bytesPerChecksum);
     computePacketChunkSize(dfsClient.writePacketSize, bytesPerChecksum);
 
 
@@ -1081,7 +1083,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
   DFSOutputStream(DFSClient dfsClient, String src, int buffersize, Progressable progress,
   DFSOutputStream(DFSClient dfsClient, String src, int buffersize, Progressable progress,
       LocatedBlock lastBlock, HdfsFileStatus stat,
       LocatedBlock lastBlock, HdfsFileStatus stat,
       int bytesPerChecksum) throws IOException {
       int bytesPerChecksum) throws IOException {
-    this(dfsClient, src, stat.getBlockSize(), progress, bytesPerChecksum);
+    this(dfsClient, src, stat.getBlockSize(), progress, bytesPerChecksum, stat.getReplication());
     initialFileSize = stat.getLen(); // length of file when opened
     initialFileSize = stat.getLen(); // length of file when opened
 
 
     //
     //
@@ -1291,6 +1293,27 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
   public synchronized void hsync() throws IOException {
   public synchronized void hsync() throws IOException {
     hflush();
     hflush();
   }
   }
+
+  /**
+   * Returns the number of replicas of current block. This can be different
+   * from the designated replication factor of the file because the NameNode
+   * does not replicate the block to which a client is currently writing to.
+   * The client continues to write to a block even if a few datanodes in the
+   * write pipeline have failed. 
+   * @return the number of valid replicas of the current block
+   */
+  public synchronized int getNumCurrentReplicas() throws IOException {
+    dfsClient.checkOpen();
+    isClosed();
+    if (streamer == null) {
+      return blockReplication; // no pipeline, return repl factor of file
+    }
+    DatanodeInfo[] currentNodes = streamer.getNodes();
+    if (currentNodes == null) {
+      return blockReplication; // no pipeline, return repl factor of file
+    }
+    return currentNodes.length;
+  }
   
   
   /**
   /**
    * Waits till all existing data is flushed and confirmations 
    * Waits till all existing data is flushed and confirmations 
@@ -1446,4 +1469,4 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
     return streamer.getAccessToken();
     return streamer.getAccessToken();
   }
   }
 
 
-}
+}

+ 12 - 0
src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java

@@ -460,10 +460,18 @@ public class TestFileCreation extends junit.framework.TestCase {
       FSDataOutputStream stm = createFile(fs, file1, 1);
       FSDataOutputStream stm = createFile(fs, file1, 1);
       System.out.println("testFileCreationNamenodeRestart: "
       System.out.println("testFileCreationNamenodeRestart: "
                          + "Created file " + file1);
                          + "Created file " + file1);
+      int actualRepl = ((DFSOutputStream)(stm.getWrappedStream())).
+                        getNumCurrentReplicas();
+      assertTrue(file1 + " should be replicated to 1 datanodes.",
+                 actualRepl == 1);
 
 
       // write two full blocks.
       // write two full blocks.
       writeFile(stm, numBlocks * blockSize);
       writeFile(stm, numBlocks * blockSize);
       stm.hflush();
       stm.hflush();
+      actualRepl = ((DFSOutputStream)(stm.getWrappedStream())).
+                        getNumCurrentReplicas();
+      assertTrue(file1 + " should still be replicated to 1 datanodes.",
+                 actualRepl == 1);
 
 
       // rename file wile keeping it open.
       // rename file wile keeping it open.
       Path fileRenamed = new Path("/filestatusRenamed.dat");
       Path fileRenamed = new Path("/filestatusRenamed.dat");
@@ -857,6 +865,10 @@ public class TestFileCreation extends junit.framework.TestCase {
       FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM);
       FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM);
       out.write("something".getBytes());
       out.write("something".getBytes());
       out.hflush();
       out.hflush();
+      int actualRepl = ((DFSOutputStream)(out.getWrappedStream())).
+                        getNumCurrentReplicas();
+      assertTrue(f + " should be replicated to " + DATANODE_NUM + " datanodes.",
+                 actualRepl == DATANODE_NUM);
 
 
       // set the soft and hard limit to be 1 second so that the
       // set the soft and hard limit to be 1 second so that the
       // namenode triggers lease recovery
       // namenode triggers lease recovery