Selaa lähdekoodia

Merging changes r1044166:r1050109 from trunk to federation

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1078983 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 14 vuotta sitten
vanhempi
commit
8d41792f77

+ 12 - 0
CHANGES.txt

@@ -4,6 +4,9 @@ Trunk (unreleased changes)
 
   INCOMPATIBLE CHANGES
 
+    HDFS-1526. Dfs client name for a map/reduce task should be unique
+    among threads. (hairong)
+
   NEW FEATURES
 
     HDFS-1365. Federation: propose ClusterID and BlockPoolID format 
@@ -237,6 +240,9 @@ Trunk (unreleased changes)
     HDFS-1533. A more elegant FileSystem#listCorruptFileBlocks API
     (HDFS portion) (Patrick Kling via hairong)
 
+    HDFS-1476. listCorruptFileBlocks should be functional while the
+    name node is in safe mode. (Patrick Kling via hairong)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
@@ -260,6 +266,8 @@ Trunk (unreleased changes)
     HDFS-1684. Balancer cannot start with with multiple namenodes.  (szetszwo)
 
     HDFS-1516. mvn-install is broken after 0.22 branch creation. (cos)
+    HDFS-1360. TestBlockRecovery should bind ephemeral ports.
+    (Todd Lipcon via hairong)
 
 Release 0.22.0 - Unreleased
 
@@ -676,6 +684,8 @@ Release 0.22.0 - Unreleased
     HDFS-613. TestBalancer and TestBlockTokenWithDFS fail Balancer assert.
     (Todd Lipcon via cos)
 
+    HDFS-1206. TestFiHFlush fails intermittently. (cos)
+
 Release 0.21.1 - Unreleased
 
     HDFS-1411. Correct backup node startup command in hdfs user guide.
@@ -1711,6 +1721,8 @@ Release 0.20.2 - Unreleased
     (Todd Lipcon via szetszwo)
 
     HDFS-464. Fix memory leaks in libhdfs. (Christian Kunz via suresh)
+    
+    HDFS-1377. Quota bug for partial blocks allows quotas to be violated. (eli)
 
     HDFS-1377. Quota bug for partial blocks allows quotas to be violated. (eli)
 

+ 4 - 7
src/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -127,7 +127,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
   volatile boolean clientRunning = true;
   private volatile FsServerDefaults serverDefaults;
   private volatile long serverDefaultsLastUpdate;
-  Random r = new Random();
+  static Random r = new Random();
   final String clientName;
   final LeaseChecker leasechecker = new LeaseChecker();
   Configuration conf;
@@ -252,12 +252,9 @@ public class DFSClient implements FSConstants, java.io.Closeable {
 
     this.ugi = UserGroupInformation.getCurrentUser();
     
-    String taskId = conf.get("mapred.task.id");
-    if (taskId != null) {
-      this.clientName = "DFSClient_" + taskId; 
-    } else {
-      this.clientName = "DFSClient_" + r.nextInt();
-    }
+    String taskId = conf.get("mapred.task.id", "NONMAPREDUCE");
+    this.clientName = "DFSClient_" + taskId + "_" +
+                      r.nextInt() + "_" + Thread.currentThread().getId(); 
     defaultBlockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
     defaultReplication = (short) conf.getInt("dfs.replication", 3);
 

+ 5 - 0
src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -58,6 +58,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_NAMENODE_SAFEMODE_EXTENSION_DEFAULT = 30000;
   public static final String  DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY = "dfs.namenode.safemode.threshold-pct";
   public static final float   DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT = 0.999f;
+  // set this to a slightly smaller value than
+  // DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT to populate
+  // needed replication queues before exiting safe mode
+  public static final String  DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY =
+    "dfs.namenode.replqueue.threshold-pct";
   public static final String  DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY = "dfs.namenode.safemode.min.datanodes";
   public static final int     DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT = 0;
   public static final String  DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY = "dfs.namenode.secondary.http-address";

+ 3 - 3
src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java

@@ -624,7 +624,7 @@ public class BlockManager {
     if (countNodes(storedBlock).liveReplicas() > inode.getReplication()) {
       // the block is over-replicated so invalidate the replicas immediately
       invalidateBlock(storedBlock, node);
-    } else {
+    } else if (namesystem.isPopulatingReplQueues()) {
       // add the block to neededReplication
       updateNeededReplications(storedBlock, -1, 0);
     }
@@ -1154,8 +1154,8 @@ public class BlockManager {
       return storedBlock;
     }
 
-    // do not handle mis-replicated blocks during startup
-    if (namesystem.isInSafeMode())
+    // do not handle mis-replicated blocks during start up
+    if (!namesystem.isPopulatingReplQueues())
       return storedBlock;
 
     // handle underReplication/overReplication

+ 64 - 5
src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -3792,6 +3792,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     private int extension;
     /** Min replication required by safe mode. */
     private int safeReplication;
+    /** threshold for populating needed replication queues */
+    private double replQueueThreshold;
       
     // internal fields
     /** Time when threshold was reached.
@@ -3806,9 +3808,13 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     private int blockSafe;
     /** Number of blocks needed to satisfy safe mode threshold condition */
     private int blockThreshold;
+    /** Number of blocks needed before populating replication queues */
+    private int blockReplQueueThreshold;
     /** time of the last status printout */
     private long lastStatusReport = 0;
-      
+    /** flag indicating whether replication queues have been initialized */
+    private boolean initializedReplQueues = false;
+    
     /**
      * Creates SafeModeInfo when the name node enters
      * automatic safe mode at startup.
@@ -3823,6 +3829,10 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
       this.extension = conf.getInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
       this.safeReplication = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, 
                                          DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
+      // default to safe mode threshold (i.e., don't populate queues before leaving safe mode)
+      this.replQueueThreshold = 
+        conf.getFloat(DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY,
+                      (float) threshold);
       this.blockTotal = 0; 
       this.blockSafe = 0;
     }
@@ -3840,6 +3850,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
       this.datanodeThreshold = Integer.MAX_VALUE;
       this.extension = Integer.MAX_VALUE;
       this.safeReplication = Short.MAX_VALUE + 1; // more than maxReplication
+      this.replQueueThreshold = 1.5f; // can never be reached
       this.blockTotal = -1;
       this.blockSafe = -1;
       this.reached = -1;
@@ -3862,6 +3873,13 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
       return this.reached >= 0;
     }
       
+    /**
+     * Check if we are populating replication queues.
+     */
+    synchronized boolean isPopulatingReplQueues() {
+      return initializedReplQueues;
+    }
+
     /**
      * Enter safe mode.
      */
@@ -3890,8 +3908,10 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
           return;
         }
       }
-      // verify blocks replications
-      blockManager.processMisReplicatedBlocks();
+      // if not done yet, initialize replication queues
+      if (!isPopulatingReplQueues()) {
+        initializeReplQueues();
+      }
       long timeInSafemode = now() - systemStart;
       NameNode.stateChangeLog.info("STATE* Leaving safe mode after " 
                                     + timeInSafemode/1000 + " secs.");
@@ -3908,6 +3928,26 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
       NameNode.stateChangeLog.info("STATE* UnderReplicatedBlocks has "
                                    +blockManager.neededReplications.size()+" blocks");
     }
+
+    /**
+     * Initialize replication queues.
+     */
+    synchronized void initializeReplQueues() {
+      LOG.info("initializing replication queues");
+      if (isPopulatingReplQueues()) {
+        LOG.warn("Replication queues already initialized.");
+      }
+      blockManager.processMisReplicatedBlocks();
+      initializedReplQueues = true;
+    }
+
+    /**
+     * Check whether we have reached the threshold for 
+     * initializing replication queues.
+     */
+    synchronized boolean canInitializeReplQueues() {
+      return blockSafe >= blockReplQueueThreshold;
+    }
       
     /** 
      * Safe mode can be turned off iff 
@@ -3940,6 +3980,10 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     private void checkMode() {
       if (needEnter()) {
         enter();
+        // check if we are ready to initialize replication queues
+        if (canInitializeReplQueues() && !isPopulatingReplQueues()) {
+          initializeReplQueues();
+        }
         reportStatus("STATE* Safe mode ON.", false);
         return;
       }
@@ -3958,6 +4002,11 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
       smmthread = new Daemon(new SafeModeMonitor());
       smmthread.start();
       reportStatus("STATE* Safe mode extension entered.", true);
+
+      // check if we are ready to initialize replication queues
+      if (canInitializeReplQueues() && !isPopulatingReplQueues()) {
+        initializeReplQueues();
+      }
     }
       
     /**
@@ -3966,6 +4015,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     synchronized void setBlockTotal(int total) {
       this.blockTotal = total;
       this.blockThreshold = (int) (blockTotal * threshold);
+      this.blockReplQueueThreshold = 
+        (int) (((double) blockTotal) * replQueueThreshold);
       checkMode();
     }
       
@@ -4152,11 +4203,19 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
    * Check whether the name node is in safe mode.
    * @return true if safe mode is ON, false otherwise
    */
-  boolean isInSafeMode() {
+  synchronized boolean isInSafeMode() {
     if (safeMode == null)
       return false;
     return safeMode.isOn();
   }
+
+  /**
+   * Check whether replication queues are populated.
+   */
+  synchronized boolean isPopulatingReplQueues() {
+    return (!isInSafeMode() ||
+            safeMode.isPopulatingReplQueues());
+  }
     
   /**
    * Increment number of blocks that reached minimal replication.
@@ -4892,7 +4951,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
 
     readLock();
     try {
-    if (isInSafeMode()) {
+    if (!isPopulatingReplQueues()) {
       throw new IOException("Cannot run listCorruptFileBlocks because " +
                             "replication queues have not been initialized.");
     }

+ 2 - 4
src/test/aop/org/apache/hadoop/hdfs/TestFiHFlush.java

@@ -52,10 +52,8 @@ public class TestFiHFlush {
     hft.fiCallHFlush.set(a);
     hft.fiErrorOnCallHFlush.set(new DataTransferTestUtil.VerificationAction(methodName, index));
     TestHFlush.doTheJob(conf, methodName, block_size, (short)3);
-    if (!trueVerification)
-      assertTrue(!hft.isSuccess());
-    else
-      assertTrue(hft.isSuccess());      
+    if (trueVerification)
+      assertTrue("Some of expected conditions weren't detected", hft.isSuccess());
   }
   
   /** The tests calls 

+ 13 - 10
src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -194,14 +194,6 @@ public class MiniDFSCluster {
       this.simulatedCapacities = val;
       return this;
     }
-    
-    /**
-     * Default: null
-     */
-    public Builder clusterId(String cid) {
-      this.clusterId = cid;
-      return this;
-    }
 
     /**
      * Default: true
@@ -210,6 +202,14 @@ public class MiniDFSCluster {
       this.waitSafeMode = val;
       return this;
     }
+    
+    /**
+     * Default: null
+     */
+    public Builder clusterId(String cid) {
+      this.clusterId = cid;
+      return this;
+    }
 
     /**
      * Default: false
@@ -274,6 +274,7 @@ public class MiniDFSCluster {
   private File base_dir;
   private File data_dir;
   private boolean federation = false; 
+  private boolean waitSafeMode = true;
   
   /**
    * Stores the information related to a namenode in the cluster
@@ -469,6 +470,7 @@ public class MiniDFSCluster {
     base_dir = new File(getBaseDirectory());
     data_dir = new File(base_dir, "data");
     this.federation = federation;
+    this.waitSafeMode = waitSafeMode;
     
     // use alternate RPC engine if spec'd
     String rpcEngineName = System.getProperty("hdfs.rpc.engine");
@@ -1238,7 +1240,8 @@ public class MiniDFSCluster {
   }
   
   /**
-   * Returns true if the given namenode is running and is out of Safe Mode.
+   * Returns true if the NameNode is running and is out of Safe Mode
+   * or if waiting for safe mode is disabled.
    */
   public boolean isNameNodeUp(int nnIndex) {
     NameNode nameNode = nameNodes[nnIndex].nameNode;
@@ -1248,7 +1251,7 @@ public class MiniDFSCluster {
     long[] sizes = nameNode.getStats();
     boolean isUp = false;
     synchronized (this) {
-      isUp = !(nameNode.isInSafeMode() || sizes[0] == 0);
+      isUp = ((!nameNode.isInSafeMode() || !waitSafeMode) && sizes[0] != 0);
     }
     return isUp;
   }

+ 134 - 0
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java

@@ -38,7 +38,10 @@ import org.apache.hadoop.hdfs.CorruptFileBlockIterator;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * This class tests the listCorruptFileBlocks API.
@@ -122,6 +125,137 @@ public class TestListCorruptFileBlocks {
       if (cluster != null) { cluster.shutdown(); }
     }
   }
+
+  /**
+   * Check that listCorruptFileBlocks works while the namenode is still in safemode.
+   */
+  @Test
+  public void testListCorruptFileBlocksInSafeMode() throws Exception {
+    MiniDFSCluster cluster = null;
+    Random random = new Random();
+
+    try {
+      Configuration conf = new HdfsConfiguration();
+      // datanode scans directories
+      conf.setInt("dfs.datanode.directoryscan.interval", 1);
+      // datanode sends block reports
+      conf.setInt("dfs.blockreport.intervalMsec", 3 * 1000);
+      // never leave safemode automatically
+      conf.setFloat(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
+                    1.5f);
+      // start populating repl queues immediately 
+      conf.setFloat(DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY,
+                    0f);
+      cluster = new MiniDFSCluster.Builder(conf).waitSafeMode(false).build();
+      cluster.getNameNode().
+        setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
+      FileSystem fs = cluster.getFileSystem();
+
+      // create two files with one block each
+      DFSTestUtil util = new DFSTestUtil("testListCorruptFileBlocksInSafeMode",
+                                         2, 1, 512);
+      util.createFiles(fs, "/srcdat10");
+
+      // fetch bad file list from namenode. There should be none.
+      Collection<FSNamesystem.CorruptFileBlockInfo> badFiles = 
+        cluster.getNameNode().getNamesystem().listCorruptFileBlocks("/", null);
+      assertTrue("Namenode has " + badFiles.size()
+          + " corrupt files. Expecting None.", badFiles.size() == 0);
+
+      // Now deliberately corrupt one block
+      File data_dir = new File(System.getProperty("test.build.data"),
+      "dfs/data/data1/current/finalized");
+      assertTrue("data directory does not exist", data_dir.exists());
+      File[] blocks = data_dir.listFiles();
+      assertTrue("Blocks do not exist in data-dir", (blocks != null) &&
+                 (blocks.length > 0));
+      for (int idx = 0; idx < blocks.length; idx++) {
+        if (blocks[idx].getName().startsWith("blk_") &&
+            blocks[idx].getName().endsWith(".meta")) {
+          //
+          // shorten .meta file
+          //
+          RandomAccessFile file = new RandomAccessFile(blocks[idx], "rw");
+          FileChannel channel = file.getChannel();
+          long position = channel.size() - 2;
+          int length = 2;
+          byte[] buffer = new byte[length];
+          random.nextBytes(buffer);
+          channel.write(ByteBuffer.wrap(buffer), position);
+          file.close();
+          LOG.info("Deliberately corrupting file " + blocks[idx].getName() +
+              " at offset " + position + " length " + length);
+
+          // read all files to trigger detection of corrupted replica
+          try {
+            util.checkFiles(fs, "/srcdat10");
+          } catch (BlockMissingException e) {
+            System.out.println("Received BlockMissingException as expected.");
+          } catch (IOException e) {
+            assertTrue("Corrupted replicas not handled properly. " +
+                       "Expecting BlockMissingException " +
+                       " but received IOException " + e, false);
+          }
+          break;
+        }
+      }
+
+      // fetch bad file list from namenode. There should be one file.
+      badFiles = cluster.getNameNode().getNamesystem().
+        listCorruptFileBlocks("/", null);
+      LOG.info("Namenode has bad files. " + badFiles.size());
+      assertTrue("Namenode has " + badFiles.size() + " bad files. Expecting 1.",
+          badFiles.size() == 1);
+ 
+      // restart namenode
+      cluster.restartNameNode(0);
+      fs = cluster.getFileSystem();
+
+      // wait until replication queues have been initialized
+      while (!cluster.getNameNode().namesystem.isPopulatingReplQueues()) {
+        try {
+          LOG.info("waiting for replication queues");
+          Thread.sleep(1000);
+        } catch (InterruptedException ignore) {
+        }
+      }
+
+      // read all files to trigger detection of corrupted replica
+      try {
+        util.checkFiles(fs, "/srcdat10");
+      } catch (BlockMissingException e) {
+        System.out.println("Received BlockMissingException as expected.");
+      } catch (IOException e) {
+        assertTrue("Corrupted replicas not handled properly. " +
+                   "Expecting BlockMissingException " +
+                   " but received IOException " + e, false);
+      }
+
+      // fetch bad file list from namenode. There should be one file.
+      badFiles = cluster.getNameNode().getNamesystem().
+        listCorruptFileBlocks("/", null);
+      LOG.info("Namenode has bad files. " + badFiles.size());
+      assertTrue("Namenode has " + badFiles.size() + " bad files. Expecting 1.",
+          badFiles.size() == 1);
+
+      // check that we are still in safe mode
+      assertTrue("Namenode is not in safe mode", 
+                 cluster.getNameNode().isInSafeMode());
+
+      // now leave safe mode so that we can clean up
+      cluster.getNameNode().
+        setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
+
+      util.cleanup(fs, "/srcdat10");
+    } catch (Exception e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown(); 
+      }
+    }
+  }
   
   // deliberately remove blocks from a file and validate the list-corrupt-file-blocks API
   @Test

+ 3 - 0
src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java

@@ -96,6 +96,9 @@ public class TestBlockRecovery {
   public void startUp() throws IOException {
     conf = new HdfsConfiguration();
     conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR);
+    conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
+    conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+    conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
     FileSystem.setDefaultUri(conf, "hdfs://localhost:5020");
     ArrayList<File> dirs = new ArrayList<File>();
     File dataDir = new File(DATA_DIR);