Ver código fonte

Merging revisions r1052169:r1062010 from trunk to federation

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1079020 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 14 anos atrás
pai
commit
fb34a5e4c2

+ 29 - 1
CHANGES.txt

@@ -7,6 +7,8 @@ Trunk (unreleased changes)
     HDFS-1526. Dfs client name for a map/reduce task should be unique
     HDFS-1526. Dfs client name for a map/reduce task should be unique
     among threads. (hairong)
     among threads. (hairong)
 
 
+    HDFS-1536. Improve HDFS WebUI. (hairong)
+
   NEW FEATURES
   NEW FEATURES
 
 
     HDFS-1365. Federation: propose ClusterID and BlockPoolID format 
     HDFS-1365. Federation: propose ClusterID and BlockPoolID format 
@@ -244,6 +246,9 @@ Trunk (unreleased changes)
     name node is in safe mode. (Patrick Kling via hairong)
     name node is in safe mode. (Patrick Kling via hairong)
 
 
     HDFS-1534. Fix some incorrect logs in FSDirectory. (eli)
     HDFS-1534. Fix some incorrect logs in FSDirectory. (eli)
+    
+    HDFS-1539. A config option for the datanode to fsycn a block file
+    when block is completely written. (dhruba)
 
 
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
@@ -274,6 +279,14 @@ Trunk (unreleased changes)
 
 
     HDFS-1551. Fix pom templates dependency list (gkesavan)
     HDFS-1551. Fix pom templates dependency list (gkesavan)
 
 
+    HDFS-1509. A savenamespace command writes the fsimage and edits into
+    all configured directories. (dhruba)
+
+    HDFS-1540. Make Datanode handle errors from RPC calls to namenode
+    more elegantly. (dhruba)
+
+    HDFS-1463. Accesstime of a file is not updated in safeMode. (dhruba)
+
 Release 0.22.0 - Unreleased
 Release 0.22.0 - Unreleased
 
 
   NEW FEATURES
   NEW FEATURES
@@ -692,6 +705,19 @@ Release 0.22.0 - Unreleased
     HDFS-1511. 98 Release Audit warnings on trunk and branch-0.22.
     HDFS-1511. 98 Release Audit warnings on trunk and branch-0.22.
     (jghoman)
     (jghoman)
 
 
+    HDFS-1560. dfs.data.dir permissions should default to 700. 
+    (Todd Lipcon via eli)
+
+    HDFS-1550. NPE when listing a file with no location. (hairong)
+
+    HDFS-1542. Add test for HADOOP-7082, a deadlock writing Configuration to
+    HDFS. (todd)
+
+    HDFS-1504. FSImageSaver should catch all exceptions, not just IOE. (todd)
+
+    HDFS-884. DataNode throws IOException if all data directories are 
+    unavailable. (Steve Loughran and shv)
+
 Release 0.21.1 - Unreleased
 Release 0.21.1 - Unreleased
 
 
     HDFS-1411. Correct backup node startup command in hdfs user guide.
     HDFS-1411. Correct backup node startup command in hdfs user guide.
@@ -733,7 +759,9 @@ Release 0.21.1 - Unreleased
     HDFS-1548. Fault-injection tests are executed multiple times if invoked
     HDFS-1548. Fault-injection tests are executed multiple times if invoked
     with run-test-hdfs-fault-inject target (cos)
     with run-test-hdfs-fault-inject target (cos)
 
 
-Release 0.21.0 - 2010-08-13
+    HDFS-1552. Remove java5 dependencies from build. (cos) 
+
+    HDFS-996. JUnit tests should never depend on anything in conf (cos)
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES
 
 

+ 10 - 1
build.xml

@@ -94,6 +94,7 @@
   <property name="test.junit.printsummary" value="yes" />
   <property name="test.junit.printsummary" value="yes" />
   <property name="test.junit.haltonfailure" value="no" />
   <property name="test.junit.haltonfailure" value="no" />
   <property name="test.junit.maxmemory" value="512m" />
   <property name="test.junit.maxmemory" value="512m" />
+  <property name="test.conf.dir" value="${build.dir}/test/conf" />
 
 
   <property name="test.hdfs.build.classes" value="${test.build.dir}/classes"/>
   <property name="test.hdfs.build.classes" value="${test.build.dir}/classes"/>
 
 
@@ -229,7 +230,9 @@
       <include name="hadoop-common-test-${hadoop-common.version}.jar" />
       <include name="hadoop-common-test-${hadoop-common.version}.jar" />
       <exclude name="**/excluded/" />
       <exclude name="**/excluded/" />
     </fileset>
     </fileset>
-    <path refid="classpath"/>
+    <pathelement location="${build.classes}"/>
+    <pathelement location="${test.conf.dir}"/>
+    <path refid="ivy-common.classpath"/>
   </path>
   </path>
 
 
   <!-- the cluster test classpath: uses conf.dir for configuration -->
   <!-- the cluster test classpath: uses conf.dir for configuration -->
@@ -289,6 +292,12 @@
       <mapper type="glob" from="*.template" to="*"/>
       <mapper type="glob" from="*.template" to="*"/>
     </copy>
     </copy>
 
 
+    <mkdir dir="${test.conf.dir}"/>
+    <copy todir="${test.conf.dir}" verbose="true">
+      <fileset dir="${conf.dir}" includes="**/*.template"/>
+      <mapper type="glob" from="*.template" to="*"/>
+    </copy>
+
     <copy todir="${contrib.dir}" verbose="true">
     <copy todir="${contrib.dir}" verbose="true">
       <fileset dir="${contrib.dir}" includes="**/*.template"/>
       <fileset dir="${contrib.dir}" includes="**/*.template"/>
       <mapper type="glob" from="*.template" to="*"/>
       <mapper type="glob" from="*.template" to="*"/>

+ 1 - 1
src/java/hdfs-default.xml

@@ -274,7 +274,7 @@ creations/deletions), or "all".</description>
 
 
 <property>
 <property>
   <name>dfs.datanode.data.dir.perm</name>
   <name>dfs.datanode.data.dir.perm</name>
-  <value>755</value>
+  <value>700</value>
   <description>Permissions for the directories on on the local filesystem where
   <description>Permissions for the directories on on the local filesystem where
   the DFS data node store its blocks. The permissions can either be octal or
   the DFS data node store its blocks. The permissions can either be octal or
   symbolic.</description>
   symbolic.</description>

+ 3 - 1
src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -106,6 +106,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_LIST_LIMIT_DEFAULT = 1000;
   public static final int     DFS_LIST_LIMIT_DEFAULT = 1000;
   public static final String  DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY = "dfs.datanode.failed.volumes.tolerated";
   public static final String  DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY = "dfs.datanode.failed.volumes.tolerated";
   public static final int     DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT = 0;
   public static final int     DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT = 0;
+  public static final String  DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose";
+  public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false;
 
 
   //Delegation token related keys
   //Delegation token related keys
   public static final String  DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY = "dfs.namenode.delegation.key.update-interval";
   public static final String  DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY = "dfs.namenode.delegation.key.update-interval";
@@ -146,7 +148,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";
   public static final String  DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";
   public static final String  DFS_DATANODE_ADDRESS_DEFAULT = "0.0.0.0:50010";
   public static final String  DFS_DATANODE_ADDRESS_DEFAULT = "0.0.0.0:50010";
   public static final String  DFS_DATANODE_DATA_DIR_PERMISSION_KEY = "dfs.datanode.data.dir.perm";
   public static final String  DFS_DATANODE_DATA_DIR_PERMISSION_KEY = "dfs.datanode.data.dir.perm";
-  public static final String  DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT = "755";
+  public static final String  DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT = "700";
   public static final String  DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY = "dfs.datanode.directoryscan.interval";
   public static final String  DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY = "dfs.datanode.directoryscan.interval";
   public static final int     DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT = 21600;
   public static final int     DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT = 21600;
   public static final String  DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = "dfs.datanode.directoryscan.threads";
   public static final String  DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = "dfs.datanode.directoryscan.threads";

+ 3 - 0
src/java/org/apache/hadoop/hdfs/DFSUtil.java

@@ -229,6 +229,9 @@ public class DFSUtil {
     }
     }
     int nrBlocks = blocks.locatedBlockCount();
     int nrBlocks = blocks.locatedBlockCount();
     BlockLocation[] blkLocations = new BlockLocation[nrBlocks];
     BlockLocation[] blkLocations = new BlockLocation[nrBlocks];
+    if (nrBlocks == 0) {
+      return blkLocations;
+    }
     int idx = 0;
     int idx = 0;
     for (LocatedBlock blk : blocks.getLocatedBlocks()) {
     for (LocatedBlock blk : blocks.getLocatedBlocks()) {
       assert idx < nrBlocks : "Incorrect index";
       assert idx < nrBlocks : "Incorrect index";

+ 10 - 2
src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -25,6 +25,7 @@ import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.EOFException;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
@@ -63,6 +64,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
   private DataInputStream in = null; // from where data are read
   private DataInputStream in = null; // from where data are read
   private DataChecksum checksum; // from where chunks of a block can be read
   private DataChecksum checksum; // from where chunks of a block can be read
   private OutputStream out = null; // to block file at local disk
   private OutputStream out = null; // to block file at local disk
+  private OutputStream cout = null; // output stream for cehcksum file
   private DataOutputStream checksumOut = null; // to crc file at local disk
   private DataOutputStream checksumOut = null; // to crc file at local disk
   private int bytesPerChecksum;
   private int bytesPerChecksum;
   private int checksumSize;
   private int checksumSize;
@@ -142,10 +144,10 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
           this.bytesPerChecksum, this.checksumSize);
           this.bytesPerChecksum, this.checksumSize);
       if (streams != null) {
       if (streams != null) {
         this.out = streams.dataOut;
         this.out = streams.dataOut;
+        this.cout = streams.checksumOut;
         this.checksumOut = new DataOutputStream(new BufferedOutputStream(
         this.checksumOut = new DataOutputStream(new BufferedOutputStream(
-                                                  streams.checksumOut, 
+                                                  streams.checksumOut,
                                                   SMALL_BUFFER_SIZE));
                                                   SMALL_BUFFER_SIZE));
-        
         // write data chunk header if creating a new replica
         // write data chunk header if creating a new replica
         if (isCreate) {
         if (isCreate) {
           BlockMetadataHeader.writeHeader(checksumOut, checksum);
           BlockMetadataHeader.writeHeader(checksumOut, checksum);
@@ -186,6 +188,9 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
     try {
     try {
       if (checksumOut != null) {
       if (checksumOut != null) {
         checksumOut.flush();
         checksumOut.flush();
+        if (datanode.syncOnClose && (cout instanceof FileOutputStream)) {
+          ((FileOutputStream)cout).getChannel().force(true);
+        }
         checksumOut.close();
         checksumOut.close();
         checksumOut = null;
         checksumOut = null;
       }
       }
@@ -196,6 +201,9 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
     try {
     try {
       if (out != null) {
       if (out != null) {
         out.flush();
         out.flush();
+        if (datanode.syncOnClose && (out instanceof FileOutputStream)) {
+          ((FileOutputStream)out).getChannel().force(true);
+        }
         out.close();
         out.close();
         out = null;
         out = null;
       }
       }

+ 16 - 9
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -350,6 +350,7 @@ public class DataNode extends Configured
   int writePacketSize = 0;
   int writePacketSize = 0;
   boolean isBlockTokenEnabled;
   boolean isBlockTokenEnabled;
   BlockPoolTokenSecretManager blockPoolTokenSecretManager;
   BlockPoolTokenSecretManager blockPoolTokenSecretManager;
+  boolean syncOnClose;
   
   
   public DataBlockScanner blockScanner = null;
   public DataBlockScanner blockScanner = null;
   private DirectoryScanner directoryScanner = null;
   private DirectoryScanner directoryScanner = null;
@@ -440,6 +441,10 @@ public class DataNode extends Configured
         "dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
         "dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
     }
     }
     this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
     this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
+
+    // do we need to sync block file contents to disk when blockfile is closed?
+    this.syncOnClose = conf.getBoolean(DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY, 
+                                       DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT);
   }
   }
   
   
   private void startInfoServer(Configuration conf) throws IOException {
   private void startInfoServer(Configuration conf) throws IOException {
@@ -2086,21 +2091,19 @@ public class DataNode extends Configured
                  DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
                  DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
     ArrayList<File> dirs = getDataDirsFromURIs(dataDirs, localFS, permission);
     ArrayList<File> dirs = getDataDirsFromURIs(dataDirs, localFS, permission);
 
 
-    if (dirs.size() > 0) {
-      return new DataNode(conf, dirs, resources);
-    }
-    LOG.error("All directories in "
-        + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY + " are invalid.");
-    return null;
+    assert dirs.size() > 0 : "number of data directories should be > 0";
+    return new DataNode(conf, dirs, resources);
   }
   }
 
 
   // DataNode ctor expects AbstractList instead of List or Collection...
   // DataNode ctor expects AbstractList instead of List or Collection...
   static ArrayList<File> getDataDirsFromURIs(Collection<URI> dataDirs,
   static ArrayList<File> getDataDirsFromURIs(Collection<URI> dataDirs,
-      LocalFileSystem localFS, FsPermission permission) {
+      LocalFileSystem localFS, FsPermission permission) throws IOException {
     ArrayList<File> dirs = new ArrayList<File>();
     ArrayList<File> dirs = new ArrayList<File>();
+    StringBuilder invalidDirs = new StringBuilder();
     for (URI dirURI : dataDirs) {
     for (URI dirURI : dataDirs) {
       if (!"file".equalsIgnoreCase(dirURI.getScheme())) {
       if (!"file".equalsIgnoreCase(dirURI.getScheme())) {
         LOG.warn("Unsupported URI schema in " + dirURI + ". Ignoring ...");
         LOG.warn("Unsupported URI schema in " + dirURI + ". Ignoring ...");
+        invalidDirs.append("\"").append(dirURI).append("\" ");
         continue;
         continue;
       }
       }
       // drop any (illegal) authority in the URI for backwards compatibility
       // drop any (illegal) authority in the URI for backwards compatibility
@@ -2110,10 +2113,14 @@ public class DataNode extends Configured
         dirs.add(data);
         dirs.add(data);
       } catch (IOException e) {
       } catch (IOException e) {
         LOG.warn("Invalid directory in: "
         LOG.warn("Invalid directory in: "
-                 + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY + ": "
-                 + e.getMessage());
+                 + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY + ": ", e);
+        invalidDirs.append("\"").append(data.getCanonicalPath()).append("\" ");
       }
       }
     }
     }
+    if (dirs.size() == 0)
+      throw new IOException("All directories in "
+          + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY + " are invalid: "
+          + invalidDirs);
     return dirs;
     return dirs;
   }
   }
 
 

+ 6 - 11
src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java

@@ -118,8 +118,6 @@ public class BlockManager {
    * Last block index used for replication work.
    * Last block index used for replication work.
    */
    */
   private int replIndex = 0;
   private int replIndex = 0;
-  private long missingBlocksInCurIter = 0;
-  private long missingBlocksInPrevIter = 0;
   Random r = new Random();
   Random r = new Random();
 
 
   // for block replicas placement
   // for block replicas placement
@@ -668,6 +666,11 @@ public class BlockManager {
     corruptReplicaBlocksCount = corruptReplicas.size();
     corruptReplicaBlocksCount = corruptReplicas.size();
   }
   }
 
 
+  /** Return number of under-replicated but not missing blocks */
+  int getUnderReplicatedNotMissingBlocks() {
+    return neededReplications.getUnderReplicatedBlockCount();
+  }
+  
   /**
   /**
    * Schedule blocks for deletion at datanodes
    * Schedule blocks for deletion at datanodes
    * @param nodesToProcess number of datanodes to schedule deletion work
    * @param nodesToProcess number of datanodes to schedule deletion work
@@ -749,8 +752,6 @@ public class BlockManager {
     try {
     try {
       synchronized (neededReplications) {
       synchronized (neededReplications) {
         if (neededReplications.size() == 0) {
         if (neededReplications.size() == 0) {
-          missingBlocksInCurIter = 0;
-          missingBlocksInPrevIter = 0;
           return blocksToReplicate;
           return blocksToReplicate;
         }
         }
 
 
@@ -769,8 +770,6 @@ public class BlockManager {
           if (!neededReplicationsIterator.hasNext()) {
           if (!neededReplicationsIterator.hasNext()) {
             // start from the beginning
             // start from the beginning
             replIndex = 0;
             replIndex = 0;
-            missingBlocksInPrevIter = missingBlocksInCurIter;
-            missingBlocksInCurIter = 0;
             blocksToProcess = Math.min(blocksToProcess, neededReplications
             blocksToProcess = Math.min(blocksToProcess, neededReplications
                 .size());
                 .size());
             if (blkCnt >= blocksToProcess)
             if (blkCnt >= blocksToProcess)
@@ -827,10 +826,6 @@ public class BlockManager {
         containingNodes = new ArrayList<DatanodeDescriptor>();
         containingNodes = new ArrayList<DatanodeDescriptor>();
         NumberReplicas numReplicas = new NumberReplicas();
         NumberReplicas numReplicas = new NumberReplicas();
         srcNode = chooseSourceDatanode(block, containingNodes, numReplicas);
         srcNode = chooseSourceDatanode(block, containingNodes, numReplicas);
-        if ((numReplicas.liveReplicas() + numReplicas.decommissionedReplicas())
-            <= 0) {
-          missingBlocksInCurIter++;
-        }
         if(srcNode == null) // block can not be replicated from any node
         if(srcNode == null) // block can not be replicated from any node
           return false;
           return false;
 
 
@@ -1724,7 +1719,7 @@ public class BlockManager {
   
   
   long getMissingBlocksCount() {
   long getMissingBlocksCount() {
     // not locking
     // not locking
-    return Math.max(missingBlocksInPrevIter, missingBlocksInCurIter);
+    return this.neededReplications.getCorruptBlockSize();
   }
   }
 
 
   BlockInfo addINode(BlockInfo block, INodeFile iNode) {
   BlockInfo addINode(BlockInfo block, INodeFile iNode) {

+ 1 - 1
src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

@@ -874,7 +874,7 @@ public class FSEditLog {
       return; // nothing to do, edits.new exists!
       return; // nothing to do, edits.new exists!
 
 
     // check if any of failed storage is now available and put it back
     // check if any of failed storage is now available and put it back
-    fsimage.attemptRestoreRemovedStorage();
+    fsimage.attemptRestoreRemovedStorage(false);
 
 
     divertFileStreams(
     divertFileStreams(
         Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.EDITS_NEW.getName());
         Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.EDITS_NEW.getName());

+ 26 - 8
src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -886,7 +886,7 @@ public class FSImage extends Storage {
               sd1.unlock(); // unlock before removing (in case it will be
               sd1.unlock(); // unlock before removing (in case it will be
                             // restored)
                             // restored)
             } catch (Exception e) {
             } catch (Exception e) {
-              // nothing
+              LOG.info("Unable to unlock bad storage directory : " +  sd.getRoot().getPath());
             }
             }
             removedStorageDirs.add(sd1);
             removedStorageDirs.add(sd1);
             it.remove();
             it.remove();
@@ -1197,9 +1197,9 @@ public class FSImage extends Storage {
     public void run() {
     public void run() {
       try {
       try {
         saveCurrent(sd);
         saveCurrent(sd);
-      } catch (IOException ie) {
-        LOG.error("Unable to save image for " + sd.getRoot(), ie);
-        errorSDs.add(sd);              
+      } catch (Throwable t) {
+        LOG.error("Unable to save image for " + sd.getRoot(), t);
+        errorSDs.add(sd);
       }
       }
     }
     }
     
     
@@ -1237,7 +1237,11 @@ public class FSImage extends Storage {
    * in which case the journal will be lost.
    * in which case the journal will be lost.
    */
    */
   void saveNamespace(boolean renewCheckpointTime) throws IOException {
   void saveNamespace(boolean renewCheckpointTime) throws IOException {
+ 
+    // try to restore all failed edit logs here
     assert editLog != null : "editLog must be initialized";
     assert editLog != null : "editLog must be initialized";
+    attemptRestoreRemovedStorage(true);
+
     editLog.close();
     editLog.close();
     if(renewCheckpointTime)
     if(renewCheckpointTime)
       this.checkpointTime = now();
       this.checkpointTime = now();
@@ -1281,6 +1285,11 @@ public class FSImage extends Storage {
     for (Iterator<StorageDirectory> it = dirIterator(NameNodeDirType.EDITS);
     for (Iterator<StorageDirectory> it = dirIterator(NameNodeDirType.EDITS);
                                                               it.hasNext();) {
                                                               it.hasNext();) {
       final StorageDirectory sd = it.next();
       final StorageDirectory sd = it.next();
+      // if this directory already stores the image and edits, then it was
+      // already processed in the earlier loop.
+      if (sd.getStorageDirType() == NameNodeDirType.IMAGE_AND_EDITS) {
+        continue;
+      }
       FSImageSaver saver = new FSImageSaver(sd, errorSDs);
       FSImageSaver saver = new FSImageSaver(sd, errorSDs);
       Thread saveThread = new Thread(saver, saver.toString());
       Thread saveThread = new Thread(saver, saver.toString());
       saveThreads.add(saveThread);
       saveThreads.add(saveThread);
@@ -1715,10 +1724,11 @@ public class FSImage extends Storage {
   }
   }
 
 
   /**
   /**
-   * See if any of removed storages iw "writable" again, and can be returned 
-   * into service
+   * See if any of removed storages is "writable" again, and can be returned 
+   * into service. If saveNamespace is set, then this methdod is being 
+   * called form saveNamespace.
    */
    */
-  synchronized void attemptRestoreRemovedStorage() {   
+  synchronized void attemptRestoreRemovedStorage(boolean saveNamespace) {   
     // if directory is "alive" - copy the images there...
     // if directory is "alive" - copy the images there...
     if(!restoreFailedStorage || removedStorageDirs.size() == 0) 
     if(!restoreFailedStorage || removedStorageDirs.size() == 0) 
       return; //nothing to restore
       return; //nothing to restore
@@ -1733,7 +1743,15 @@ public class FSImage extends Storage {
       try {
       try {
         
         
         if(root.exists() && root.canWrite()) { 
         if(root.exists() && root.canWrite()) { 
-          format(sd);
+          /** If this call is being made from savenamespace command, then no
+           * need to format, the savenamespace command will format and write
+           * the new image to this directory anyways.
+           */
+          if (saveNamespace) {
+            sd.clearDirectory();
+          } else {
+            format(sd);
+          }
           LOG.info("restoring dir " + sd.getRoot().getAbsolutePath());
           LOG.info("restoring dir " + sd.getRoot().getAbsolutePath());
           if(sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
           if(sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
             File eFile = getEditFile(sd);
             File eFile = getEditFile(sd);

+ 11 - 0
src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -849,6 +849,12 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
       }  else { // second attempt is with  write lock
       }  else { // second attempt is with  write lock
         writeLock(); // writelock is needed to set accesstime
         writeLock(); // writelock is needed to set accesstime
       }
       }
+
+      // if the namenode is in safemode, then do not update access time
+      if (isInSafeMode()) {
+        doAccessTime = false;
+      }
+
       try {
       try {
         long now = now();
         long now = now();
         INodeFile inode = dir.getFileINode(src);
         INodeFile inode = dir.getFileINode(src);
@@ -4534,6 +4540,11 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     return blockManager.underReplicatedBlocksCount;
     return blockManager.underReplicatedBlocksCount;
   }
   }
 
 
+  /** Return number of under-replicated but not missing blocks */
+  public long getUnderReplicatedNotMissingBlocks() {
+    return blockManager.getUnderReplicatedNotMissingBlocks();
+  }
+
   /** Returns number of blocks with corrupt replicas */
   /** Returns number of blocks with corrupt replicas */
   public long getCorruptReplicaBlocks() {
   public long getCorruptReplicaBlocks() {
     return blockManager.corruptReplicaBlocksCount;
     return blockManager.corruptReplicaBlocksCount;

+ 7 - 3
src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java

@@ -145,7 +145,7 @@ class NamenodeJspHelper {
     // Ideally this should be displayed in RED
     // Ideally this should be displayed in RED
     long missingBlocks = fsn.getMissingBlocksCount();
     long missingBlocks = fsn.getMissingBlocksCount();
     if (missingBlocks > 0) {
     if (missingBlocks > 0) {
-      return "<br> WARNING :" + " There are about " + missingBlocks
+      return "<br> WARNING :" + " There are " + missingBlocks
           + " missing blocks. Please check the log or run fsck. <br><br>";
           + " missing blocks. Please check the log or run fsck. <br><br>";
     }
     }
     return "";
     return "";
@@ -167,6 +167,10 @@ class NamenodeJspHelper {
       return "<td id=\"col" + ++colNum + "\"> ";
       return "<td id=\"col" + ++colNum + "\"> ";
     }
     }
 
 
+    private String colTxt(String title) {
+      return "<td id=\"col" + ++colNum + "\" title=\"" + title + "\"> ";
+    }
+
     private void counterReset() {
     private void counterReset() {
       colNum = 0;
       colNum = 0;
       rowNum = 0;
       rowNum = 0;
@@ -321,9 +325,9 @@ class NamenodeJspHelper {
           + "<a href=\"dfsnodelist.jsp?whatNodes=DECOMMISSIONING\">"
           + "<a href=\"dfsnodelist.jsp?whatNodes=DECOMMISSIONING\">"
           + "Decommissioning Nodes</a> "
           + "Decommissioning Nodes</a> "
           + colTxt() + ":" + colTxt() + decommissioning.size() 
           + colTxt() + ":" + colTxt() + decommissioning.size() 
-          + rowTxt() + colTxt()
+          + rowTxt() + colTxt("Excludes missing blocks.")
           + "Number of Under-Replicated Blocks" + colTxt() + ":" + colTxt()
           + "Number of Under-Replicated Blocks" + colTxt() + ":" + colTxt()
-          + fsn.getUnderReplicatedBlocks()
+          + fsn.getUnderReplicatedNotMissingBlocks()
           + "</table></div><br>\n");
           + "</table></div><br>\n");
 
 
       if (live.isEmpty() && dead.isEmpty()) {
       if (live.isEmpty() && dead.isEmpty()) {

+ 16 - 2
src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java

@@ -26,8 +26,8 @@ import org.apache.hadoop.hdfs.protocol.Block;
  * Blocks have only one replicas has the highest
  * Blocks have only one replicas has the highest
  */
  */
 class UnderReplicatedBlocks implements Iterable<Block> {
 class UnderReplicatedBlocks implements Iterable<Block> {
-  static final int LEVEL = 4;
-  static public final int QUEUE_WITH_CORRUPT_BLOCKS = 2;
+  static final int LEVEL = 5;
+  static public final int QUEUE_WITH_CORRUPT_BLOCKS = 4;
   private List<TreeSet<Block>> priorityQueues = new ArrayList<TreeSet<Block>>();
   private List<TreeSet<Block>> priorityQueues = new ArrayList<TreeSet<Block>>();
       
       
   /* constructor */
   /* constructor */
@@ -55,6 +55,20 @@ class UnderReplicatedBlocks implements Iterable<Block> {
     return size;
     return size;
   }
   }
 
 
+  /* Return the number of under replication blocks excluding corrupt blocks */
+  synchronized int getUnderReplicatedBlockCount() {
+    int size = 0;
+    for (int i=0; i<QUEUE_WITH_CORRUPT_BLOCKS; i++) {
+      size += priorityQueues.get(i).size();
+    }
+    return size;
+  }
+  
+  /** Return the number of corrupt blocks */
+  synchronized int getCorruptBlockSize() {
+    return priorityQueues.get(QUEUE_WITH_CORRUPT_BLOCKS).size();
+  }
+  
   /* Check if a block is in the neededReplication queue */
   /* Check if a block is in the neededReplication queue */
   synchronized boolean contains(Block block) {
   synchronized boolean contains(Block block) {
     for(TreeSet<Block> set:priorityQueues) {
     for(TreeSet<Block> set:priorityQueues) {

+ 8 - 1
src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeConfig.java

@@ -76,7 +76,14 @@ public class TestDatanodeConfig {
     // 1. Test unsupported schema. Only "file:" is supported.
     // 1. Test unsupported schema. Only "file:" is supported.
     String dnDir = makeURI("shv", null, fileAsURI(dataDir).getPath());
     String dnDir = makeURI("shv", null, fileAsURI(dataDir).getPath());
     conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dnDir);
     conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dnDir);
-    DataNode dn = DataNode.createDataNode(new String[]{}, conf);
+    DataNode dn = null;
+    try {
+      dn = DataNode.createDataNode(new String[]{}, conf);
+    } catch(IOException e) {
+      // expecting exception here
+    }
+    if(dn != null)
+      dn.shutdown();
     assertNull("Data-node startup should have failed.", dn);
     assertNull("Data-node startup should have failed.", dn);
 
 
     // 2. Test "file:" schema and no schema (path-only). Both should work.
     // 2. Test "file:" schema and no schema (path-only). Both should work.

+ 33 - 0
src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java

@@ -717,6 +717,39 @@ public class TestFileCreation extends junit.framework.TestCase {
     }
     }
   }
   }
 
 
+  /**
+   * Test creating a file whose data gets sync when closed
+   */
+  public void testFileCreationSyncOnClose() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY, true);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+
+    try {
+      FileSystem fs = cluster.getFileSystem();
+      
+      Path[] p = {new Path("/foo"), new Path("/bar")};
+      
+      //write 2 files at the same time
+      FSDataOutputStream[] out = {fs.create(p[0]), fs.create(p[1])};
+      int i = 0;
+      for(; i < 100; i++) {
+        out[0].write(i);
+        out[1].write(i);
+      }
+      out[0].close();
+      for(; i < 200; i++) {out[1].write(i);}
+      out[1].close();
+
+      //verify
+      FSDataInputStream[] in = {fs.open(p[0]), fs.open(p[1])};  
+      for(i = 0; i < 100; i++) {assertEquals(i, in[0].read());}
+      for(i = 0; i < 200; i++) {assertEquals(i, in[1].read());}
+    } finally {
+      if (cluster != null) {cluster.shutdown();}
+    }
+  }
+
   /**
   /**
    * Create a file, write something, hflush but not close.
    * Create a file, write something, hflush but not close.
    * Then change lease period and wait for lease recovery.
    * Then change lease period and wait for lease recovery.

+ 9 - 1
src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java

@@ -50,6 +50,7 @@ public class TestMissingBlocksAlert extends TestCase {
       //minimize test delay
       //minimize test delay
       conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 0);
       conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 0);
       int fileLen = 10*1024;
       int fileLen = 10*1024;
+      conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, fileLen/2);
 
 
       //start a cluster with single datanode
       //start a cluster with single datanode
       cluster = new MiniDFSCluster.Builder(conf).build();
       cluster = new MiniDFSCluster.Builder(conf).build();
@@ -84,13 +85,16 @@ public class TestMissingBlocksAlert extends TestCase {
         Thread.sleep(100);
         Thread.sleep(100);
       }
       }
       assertTrue(dfs.getMissingBlocksCount() == 1);
       assertTrue(dfs.getMissingBlocksCount() == 1);
+      assertEquals(4, dfs.getUnderReplicatedBlocksCount());
+      assertEquals(3, 
+          cluster.getNamesystem().getUnderReplicatedNotMissingBlocks());
 
 
 
 
       // Now verify that it shows up on webui
       // Now verify that it shows up on webui
       URL url = new URL("http://" + conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY) + 
       URL url = new URL("http://" + conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY) + 
                         "/dfshealth.jsp");
                         "/dfshealth.jsp");
       String dfsFrontPage = DFSTestUtil.urlGet(url);
       String dfsFrontPage = DFSTestUtil.urlGet(url);
-      String warnStr = "WARNING : There are about ";
+      String warnStr = "WARNING : There are ";
       assertTrue("HDFS Front page does not contain expected warning", 
       assertTrue("HDFS Front page does not contain expected warning", 
                  dfsFrontPage.contains(warnStr + "1 missing blocks"));
                  dfsFrontPage.contains(warnStr + "1 missing blocks"));
 
 
@@ -104,6 +108,10 @@ public class TestMissingBlocksAlert extends TestCase {
         Thread.sleep(100);
         Thread.sleep(100);
       }
       }
 
 
+      assertEquals(2, dfs.getUnderReplicatedBlocksCount());
+      assertEquals(2, 
+          cluster.getNamesystem().getUnderReplicatedNotMissingBlocks());
+
       // and make sure WARNING disappears
       // and make sure WARNING disappears
       // Now verify that it shows up on webui
       // Now verify that it shows up on webui
       dfsFrontPage = DFSTestUtil.urlGet(url);
       dfsFrontPage = DFSTestUtil.urlGet(url);

+ 49 - 0
src/test/hdfs/org/apache/hadoop/hdfs/TestWriteConfigurationToDFS.java

@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import java.io.OutputStream;
+import org.junit.Test;
+
+/**
+ * Regression test for HDFS-1542, a deadlock between the main thread
+ * and the DFSOutputStream.DataStreamer thread caused because
+ * Configuration.writeXML holds a lock on itself while writing to DFS.
+ */
+public class TestWriteConfigurationToDFS {
+  @Test(timeout=60000)
+  public void testWriteConf() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
+    System.out.println("Setting conf in: " + System.identityHashCode(conf));
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    FileSystem fs = cluster.getFileSystem();
+    Path filePath = new Path("/testWriteConf.xml");
+    OutputStream os = fs.create(filePath);
+    StringBuilder longString = new StringBuilder();
+    for (int i = 0; i < 100000; i++) {
+      longString.append("hello");
+    } // 500KB
+    conf.set("foobar", longString.toString());
+    conf.writeXml(os);
+    os.close();
+  }
+}

+ 2 - 2
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java

@@ -179,9 +179,9 @@ public class TestDiskError {
   @Test
   @Test
   public void testLocalDirs() throws Exception {
   public void testLocalDirs() throws Exception {
     Configuration conf = new Configuration();
     Configuration conf = new Configuration();
-    final String permStr = "755";
+    final String permStr = conf.get(
+      DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY);
     FsPermission expected = new FsPermission(permStr);
     FsPermission expected = new FsPermission(permStr);
-    conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY, permStr);
 
 
     // Check permissions on directories in 'dfs.data.dir'
     // Check permissions on directories in 'dfs.data.dir'
     FileSystem localFS = FileSystem.getLocal(conf);
     FileSystem localFS = FileSystem.getLocal(conf);

+ 93 - 1
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 
 import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
 import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.doThrow;
@@ -60,6 +61,17 @@ public class TestSaveNamespace {
 
 
   private static class FaultySaveImage implements Answer<Void> {
   private static class FaultySaveImage implements Answer<Void> {
     int count = 0;
     int count = 0;
+    boolean exceptionType = true;
+
+    // generate a RuntimeException
+    public FaultySaveImage() {
+      this.exceptionType = true;
+    }
+
+    // generate either a RuntimeException or IOException
+    public FaultySaveImage(boolean etype) {
+      this.exceptionType = etype;
+    }
 
 
     public Void answer(InvocationOnMock invocation) throws Throwable {
     public Void answer(InvocationOnMock invocation) throws Throwable {
       Object[] args = invocation.getArguments();
       Object[] args = invocation.getArguments();
@@ -67,7 +79,11 @@ public class TestSaveNamespace {
 
 
       if (count++ == 1) {
       if (count++ == 1) {
         LOG.info("Injecting fault for file: " + f);
         LOG.info("Injecting fault for file: " + f);
-        throw new RuntimeException("Injected fault: saveFSImage second time");
+        if (exceptionType) {
+          throw new RuntimeException("Injected fault: saveFSImage second time");
+        } else {
+          throw new IOException("Injected fault: saveFSImage second time");
+        }
       }
       }
       LOG.info("Not injecting fault for file: " + f);
       LOG.info("Not injecting fault for file: " + f);
       return (Void)invocation.callRealMethod();
       return (Void)invocation.callRealMethod();
@@ -142,6 +158,82 @@ public class TestSaveNamespace {
     }
     }
   }
   }
 
 
+  /**
+   * Verify that a saveNamespace command brings faulty directories
+   * in fs.name.dir and fs.edit.dir back online.
+   */
+  @Test
+  public void testReinsertnamedirsInSavenamespace() throws Exception {
+    // create a configuration with the key to restore error
+    // directories in fs.name.dir
+    Configuration conf = getConf();
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY, true);
+
+    NameNode.initMetrics(conf, NamenodeRole.ACTIVE);
+    NameNode.format(conf);
+    FSNamesystem fsn = new FSNamesystem(conf);
+
+    // Replace the FSImage with a spy
+    FSImage originalImage = fsn.dir.fsImage;
+    FSImage spyImage = spy(originalImage);
+    spyImage.setStorageDirectories(
+        FSNamesystem.getNamespaceDirs(conf), 
+        FSNamesystem.getNamespaceEditsDirs(conf));
+    fsn.dir.fsImage = spyImage;
+
+    // inject fault
+    // The spy throws a IOException when writing to the second directory
+    doAnswer(new FaultySaveImage(false)).
+      when(spyImage).saveFSImage((File)anyObject());
+
+    try {
+      doAnEdit(fsn, 1);
+      fsn.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+
+      // Save namespace - this  injects a fault and marks one
+      // directory as faulty.
+      LOG.info("Doing the first savenamespace.");
+      fsn.saveNamespace();
+      LOG.warn("First savenamespace sucessful.");
+      assertTrue("Savenamespace should have marked one directory as bad." +
+                 " But found " + spyImage.getRemovedStorageDirs().size() +
+                 " bad directories.", 
+                   spyImage.getRemovedStorageDirs().size() == 1);
+
+      // The next call to savenamespace should try inserting the
+      // erroneous directory back to fs.name.dir. This command should
+      // be successful.
+      LOG.info("Doing the second savenamespace.");
+      fsn.saveNamespace();
+      LOG.warn("Second savenamespace sucessful.");
+      assertTrue("Savenamespace should have been successful in removing " +
+                 " bad directories from Image."  +
+                 " But found " + originalImage.getRemovedStorageDirs().size() +
+                 " bad directories.", 
+                 originalImage.getRemovedStorageDirs().size() == 0);
+
+      // Now shut down and restart the namesystem
+      LOG.info("Shutting down fsimage.");
+      originalImage.close();
+      fsn.close();      
+      fsn = null;
+
+      // Start a new namesystem, which should be able to recover
+      // the namespace from the previous incarnation.
+      LOG.info("Loading new FSmage from disk.");
+      fsn = new FSNamesystem(conf);
+
+      // Make sure the image loaded including our edit.
+      LOG.info("Checking reloaded image.");
+      checkEditExists(fsn, 1);
+      LOG.info("Reloaded image is good.");
+    } finally {
+      if (fsn != null) {
+        fsn.close();
+      }
+    }
+  }
+
   @Test
   @Test
   public void testCrashWhileSavingSecondImage() throws Exception {
   public void testCrashWhileSavingSecondImage() throws Exception {
     saveNamespaceWithInjectedFault(Fault.SAVE_FSIMAGE);
     saveNamespaceWithInjectedFault(Fault.SAVE_FSIMAGE);

+ 1 - 1
src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestDataDirs.java

@@ -39,7 +39,7 @@ public class TestDataDirs {
   @Test public void testGetDataDirsFromURIs() throws Throwable {
   @Test public void testGetDataDirsFromURIs() throws Throwable {
     File localDir = make(stub(File.class).returning(true).from.exists());
     File localDir = make(stub(File.class).returning(true).from.exists());
     when(localDir.mkdir()).thenReturn(true);
     when(localDir.mkdir()).thenReturn(true);
-    FsPermission normalPerm = new FsPermission("755");
+    FsPermission normalPerm = new FsPermission("700");
     FsPermission badPerm = new FsPermission("000");
     FsPermission badPerm = new FsPermission("000");
     FileStatus stat = make(stub(FileStatus.class)
     FileStatus stat = make(stub(FileStatus.class)
         .returning(normalPerm, normalPerm, badPerm).from.getPermission());
         .returning(normalPerm, normalPerm, badPerm).from.getPermission());