瀏覽代碼

Merge r1595744 from trunk. HDFS-6325. Append should fail if the last block has insufficient number of replicas (Keith Pak via cos)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.4@1595746 13f79535-47bb-0310-9956-ffa450edef68
Konstantin Boudnik 11 年之前
父節點
當前提交
8bb12ed22c

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -80,6 +80,9 @@ Release 2.4.1 - UNRELEASED
     HDFS-6402. Suppress findbugs warning for failure to override equals and
     hashCode in FsAclPermission. (cnauroth)
 
+    HDFS-6325. Append should fail if the last block has insufficient number of
+    replicas (Keith Pak via cos)
+
 Release 2.4.0 - 2014-04-07 
 
   INCOMPATIBLE CHANGES

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -943,6 +943,16 @@ public class BlockManager {
                             minReplication);
   }
 
+  /**
+   * Check if a block is replicated to at least the minimum replication.
+   */
+  public boolean isSufficientlyReplicated(BlockInfo b) {
+    // Compare against the lesser of the minReplication and number of live DNs.
+    final int replication =
+        Math.min(minReplication, getDatanodeManager().getNumLiveDataNodes());
+    return countNodes(b).liveReplicas() >= replication;
+  }
+
   /**
    * return a list of blocks & their locations on <code>datanode</code> whose
    * total size is <code>size</code>

+ 7 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -2381,7 +2381,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       // finalizeINodeFileUnderConstruction so we need to refresh 
       // the referenced file.  
       myFile = INodeFile.valueOf(dir.getINode(src), src, true);
-      
+      final BlockInfo lastBlock = myFile.getLastBlock();
+      // Check that the block has at least minimum replication.
+      if(lastBlock != null && lastBlock.isComplete() &&
+          !getBlockManager().isSufficientlyReplicated(lastBlock)) {
+        throw new IOException("append: lastBlock=" + lastBlock +
+            " of src=" + src + " is not sufficiently replicated yet.");
+      }
       final DatanodeDescriptor clientNode = 
           blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
       return prepareFileForWrite(src, myFile, holder, clientMachine, clientNode,

+ 71 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend4.java

@@ -28,6 +28,7 @@ import static org.mockito.Mockito.spy;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
@@ -37,11 +38,15 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@@ -327,4 +332,70 @@ public class TestFileAppend4 {
       cluster.shutdown();
     }
   }
+
+  /**
+   * Test that an append with no locations fails with an exception
+   * showing insufficient locations.
+   */
+  @Test(timeout = 60000)
+  public void testAppendInsufficientLocations() throws Exception {
+    Configuration conf = new Configuration();
+
+    // lower heartbeat interval for fast recognition of DN
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+        1000);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 3000);
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4)
+        .build();
+    DistributedFileSystem fileSystem = null;
+    try {
+      // create a file with replication 3
+      fileSystem = cluster.getFileSystem();
+      Path f = new Path("/testAppend");
+      FSDataOutputStream create = fileSystem.create(f, (short) 2);
+      create.write("/testAppend".getBytes());
+      create.close();
+
+      // Check for replications
+      DFSTestUtil.waitReplication(fileSystem, f, (short) 2);
+
+      // Shut down all DNs that have the last block location for the file
+      LocatedBlocks lbs = fileSystem.dfs.getNamenode().
+          getBlockLocations("/testAppend", 0, Long.MAX_VALUE);
+      List<DataNode> dnsOfCluster = cluster.getDataNodes();
+      DatanodeInfo[] dnsWithLocations = lbs.getLastLocatedBlock().
+          getLocations();
+      for( DataNode dn : dnsOfCluster) {
+        for(DatanodeInfo loc: dnsWithLocations) {
+          if(dn.getDatanodeId().equals(loc)){
+            dn.shutdown();
+            DFSTestUtil.waitForDatanodeDeath(dn);
+          }
+        }
+      }
+
+      // Wait till 0 replication is recognized
+      DFSTestUtil.waitReplication(fileSystem, f, (short) 0);
+
+      // Append to the file, at this state there are 3 live DNs but none of them
+      // have the block.
+      try{
+        fileSystem.append(f);
+        fail("Append should fail because insufficient locations");
+      } catch (IOException e){
+        LOG.info("Expected exception: ", e);
+      }
+      FSDirectory dir = cluster.getNamesystem().getFSDirectory();
+      final INodeFile inode = INodeFile.
+          valueOf(dir.getINode("/testAppend"), "/testAppend");
+      assertTrue("File should remain closed", !inode.isUnderConstruction());
+    } finally {
+      if (null != fileSystem) {
+        fileSystem.close();
+      }
+      cluster.shutdown();
+    }
+  }
 }