Browse Source

Merging changes -r1026177:1028906 from trunk to federation branch

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1073510 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 14 years ago
parent
commit
59a76fd43d
23 changed files with 984 additions and 717 deletions
  1. 49 14
      CHANGES.txt
  2. 1 2
      build.xml
  3. 15 0
      src/java/hdfs-default.xml
  4. 8 0
      src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  5. 2 3
      src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
  6. 7 4
      src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java
  7. 1 1
      src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
  8. 2 2
      src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
  9. 2 2
      src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
  10. 2 10
      src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  11. 79 649
      src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
  12. 612 0
      src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
  13. 87 14
      src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
  14. 16 0
      src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  15. 3 3
      src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
  16. 11 4
      src/java/org/apache/hadoop/hdfs/tools/DFSck.java
  17. 21 1
      src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
  18. 2 0
      src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageVisitor.java
  19. 2 1
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
  20. 1 1
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
  21. 2 5
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
  22. 2 1
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java
  23. 57 0
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartup.java

+ 49 - 14
CHANGES.txt

@@ -4,6 +4,18 @@ Trunk (unreleased changes)
 
   NEW FEATURES
 
+    HDFS-1365. Federation: propose ClusterID and BlockPoolID format 
+    (tanping via boryas)
+
+    HDFS-1394. Federation: modify -format option for namenode to generated 
+    new blockpool id and accept newcluster (boryas)
+
+    HDFS-1400. Federation: DataTransferProtocol uses ExtendedBlockPool to 
+    include BlockPoolID in the protocol. (suresh)
+
+    HDFS-1428. Federation : add cluster ID and block pool ID into 
+    Name node web UI(tanping via boryas)
+
     HDFS-1450. Federation: Introduce block pool ID into FSDatasetInterface.
     (suresh)
 
@@ -15,6 +27,9 @@ Trunk (unreleased changes)
 
   BUG FIXES
 
+    HDFS-1449. Fix test failures - ExtendedBlock must return 
+    block file name in #getBlockName(). (suresh)
+
 Release 0.22.0 - Unreleased
 
   NEW FEATURES
@@ -51,17 +66,7 @@ Release 0.22.0 - Unreleased
 
     HDFS-1361. Add -fileStatus operation to NNThroughputBenchmark. (shv)
 
-    HDFS-1365. HDFS federation: propose ClusterID and BlockPoolID format 
-    (tanping via boryas)
-
-    HDFS-1394. modify -format option for namenode to generated new blockpool id 
-    and accept newcluster (boryas)
-
-    HDFS-1400. HDFS federation: DataTransferProtocol uses ExtendedBlockPool to 
-    include BlockPoolID in the protocol. (suresh)
-
-    HDFS-1428. HDFS federation : add cluster ID and block pool ID into 
-    Name node web UI(tanping via boryas)
+    HDFS-1435. Provide an option to store fsimage compressed. (hairong)
 
   IMPROVEMENTS
 
@@ -174,6 +179,17 @@ Release 0.22.0 - Unreleased
     HDFS-1456. Provide builder for constructing instances of MiniDFSCluster.
     (jghoman)
 
+    HDFS-1472. Allow programmatic access to fsck output.
+    (Ramkumar Vadali via dhruba)
+
+    HADOOP-7007. Update the hudson-test-patch ant target to work with the
+    latest test-patch.sh script (gkesavan)
+
+    HDFS-1462. Refactor edit log loading to a separate class from edit log writing.
+    (Todd Lipcon via eli)
+
+    HDFS-1485. Fix typo in BlockPlacementPolicy. (Jingguo Yao via shv)
+
   OPTIMIZATIONS
 
     HDFS-1140. Speedup INode.getPathComponents. (Dmytro Molkov via shv)
@@ -326,11 +342,30 @@ Release 0.22.0 - Unreleased
 
     HDFS-1440. Fix TestComputeInvalidateWork failure. (suresh)
 
-    HDFS-1449. Fix test failures - ExtendedBlock must return 
-    block file name in #getBlockName(). (suresh)
-
 Release 0.21.0 - Unreleased
 
+    HDFS-1411. Correct backup node startup command in hdfs user guide.
+    (Ching-Shen Chen via shv)
+
+  BUG FIXES
+
+    HDFS-1363. Eliminate second synchronized sections in appendFile(). (shv)
+
+    HDFS-1413. Fix broken links to HDFS Wiki. (shv)
+
+    HDFS-1420. Clover build doesn't generate per-test coverage (cos)
+
+    HDFS-1444. Test related code of build.xml is error-prone and needs to be
+    re-aligned. (cos)
+
+    HDFS-1343. Instrumented build should be concentrated in one build area (cos)
+
+    HDFS-1452. ant compile-contrib is broken (cos)
+
+    HDFS-1474. ant binary-system is broken (cos)
+
+Release 0.21.0 - 2010-08-13
+
   INCOMPATIBLE CHANGES
 
     HDFS-538. Per the contract elucidated in HADOOP-6201, throw

+ 1 - 2
build.xml

@@ -1110,7 +1110,7 @@
       <fileset dir="${test.src.dir}/system/conf/"/>
     </copy>
     <copy tofile="${system-test-build-dir}/${final.name}/lib/hadoop-common-${version}.jar"
-      file="${build-fi.dir}/ivy/lib/${ant.project.name}/system/hadoop-common-${herriot.suffix}-${version}.jar"
+      file="${system-test-build-dir}/ivy/lib/${ant.project.name}/system/hadoop-common-${herriot.suffix}-${version}.jar"
       overwrite="true"/>
     <copy tofile="${system-test-build-dir}/${final.name}/${final.name}.jar"
       file="${system-test-build-dir}/${instrumented.final.name}.jar" overwrite="true"/>
@@ -1382,7 +1382,6 @@
     <arg value="${eclipse.home}"/>
     <arg value="${python.home}"/>
     <arg value="${basedir}"/>
-    <arg value="${trigger.url}"/>
     <arg value="${jira.passwd}"/>
     <arg value="${java5.home}"/>
     <arg value="${curl.cmd}"/>

+ 15 - 0
src/java/hdfs-default.xml

@@ -529,4 +529,19 @@ creations/deletions), or "all".</description>
   </description>
 </property>
 
+<property>
+  <name>dfs.image.compress</name>
+  <value>false</value>
+  <description>Should the dfs image be compressed?
+  </description>
+</property>
+
+<property>
+  <name>dfs.image.compression.codec</name>
+  <value>org.apache.hadoop.io.compress.DefaultCodec</value>
+  <description>If the dfs image is compressed, how should they be compressed?
+               This has to be a codec defined in io.compression.codecs.
+  </description>
+</property>
+
 </configuration>

+ 8 - 0
src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -198,6 +198,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_BLOCKREPORT_INITIAL_DELAY_KEY = "dfs.blockreport.initialDelay";
   public static final int     DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT = 0;
 
+  // property for fsimage compression
+  public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
+  public static final boolean DFS_IMAGE_COMPRESS_DEFAULT = false;
+  public static final String DFS_IMAGE_COMPRESSION_CODEC_KEY =
+                                   "dfs.image.compression.codec";
+  public static final String DFS_IMAGE_COMPRESSION_CODEC_DEFAULT =
+                                   "org.apache.hadoop.io.compress.DefaultCodec";
+
   //Keys with no defaults
   public static final String  DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
   public static final String  DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = "dfs.datanode.socket.write.timeout";

+ 2 - 3
src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java

@@ -91,8 +91,7 @@ public interface FSConstants {
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -24;
+  public static final int LAYOUT_VERSION = -25;
   // Current version: 
-  // -24: added new OP_[GET|RENEW|CANCEL]_DELEGATION_TOKEN and
-  // OP_UPDATE_MASTER_KEY.
+  // -25: support iamge compression.
 }

+ 7 - 4
src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java

@@ -43,6 +43,7 @@ public class BackupStorage extends FSImage {
 
   /** Backup input stream for loading edits into memory */
   private EditLogBackupInputStream backupInputStream;
+
   /** Is journal spooling in progress */
   volatile JSpoolState jsState;
 
@@ -214,7 +215,8 @@ public class BackupStorage extends FSImage {
           waitSpoolEnd();
           // update NameSpace in memory
           backupInputStream.setBytes(data);
-          editLog.loadEditRecords(getLayoutVersion(),
+          FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
+          logLoader.loadEditRecords(getLayoutVersion(),
                     backupInputStream.getDataInputStream(), true);
           getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
           break;
@@ -334,11 +336,12 @@ public class BackupStorage extends FSImage {
       // load edits.new
       EditLogFileInputStream edits = new EditLogFileInputStream(jSpoolFile);
       DataInputStream in = edits.getDataInputStream();
-      numEdits += editLog.loadFSEdits(in, false);
-  
+      FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
+      numEdits += logLoader.loadFSEdits(in, false);
+
       // first time reached the end of spool
       jsState = JSpoolState.WAIT;
-      numEdits += editLog.loadEditRecords(getLayoutVersion(), in, true);
+      numEdits += logLoader.loadEditRecords(getLayoutVersion(), in, true);
       getFSNamesystem().dir.updateCountForINodeWithQuota();
       edits.close();
     }

+ 1 - 1
src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java

@@ -154,7 +154,7 @@ public abstract class BlockPlacementPolicy {
    * value of the configuration paramater dfs.block.replicator.classname.
    * 
    * @param conf the configuration to be used
-   * @param stats an object thatis used to retrieve the load on the cluster
+   * @param stats an object that is used to retrieve the load on the cluster
    * @param clusterMap the network topology of the cluster
    * @return an instance of BlockPlacementPolicy
    */

+ 2 - 2
src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java

@@ -147,7 +147,7 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
       JournalRecord jRec = null;
       for(; idx < bufReadySize; idx++) {
         jRec = bufReady.get(idx);
-        if(jRec.op >= FSEditLog.OP_JSPOOL_START)
+        if(jRec.op >= FSEditLog.Ops.OP_JSPOOL_START)
           break;  // special operation should be sent in a separate call to BN
         jRec.write(out);
       }
@@ -177,7 +177,7 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
   private void send(int ja) throws IOException {
     try {
       int length = out.getLength();
-      out.write(FSEditLog.OP_INVALID);
+      out.write(FSEditLog.Ops.OP_INVALID);
       backupNode.journal(nnRegistration, ja, length, out.getData());
     } finally {
       out.reset();

+ 2 - 2
src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java

@@ -128,7 +128,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
   @Override
   void setReadyToFlush() throws IOException {
     assert bufReady.size() == 0 : "previous data is not flushed yet";
-    write(FSEditLog.OP_INVALID); // insert end-of-file marker
+    write(FSEditLog.Ops.OP_INVALID); // insert end-of-file marker
     DataOutputBuffer tmp = bufReady;
     bufReady = bufCurrent;
     bufCurrent = tmp;
@@ -189,7 +189,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
    */
   @Override
   boolean isOperationSupported(byte op) {
-    return op < FSEditLog.OP_JSPOOL_START - 1;
+    return op < FSEditLog.Ops.OP_JSPOOL_START - 1;
   }
 
   /**

+ 2 - 10
src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -102,16 +102,8 @@ class FSDirectory implements Closeable {
   private final NameCache<ByteArray> nameCache;
 
   /** Access an existing dfs name directory. */
-  FSDirectory(FSNamesystem ns, Configuration conf) {
-    this(new FSImage(), ns, conf);
-    if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY, 
-                       DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT)) {
-      NameNode.LOG.info("set FSImage.restoreFailedStorage");
-      fsImage.setRestoreFailedStorage(true);
-    }
-    
-    fsImage.setCheckpointDirectories(FSImage.getCheckpointDirs(conf, null),
-                                FSImage.getCheckpointEditsDirs(conf, null));
+  FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
+    this(new FSImage(conf), ns, conf);
   }
 
   FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) {

File diff suppressed because it is too large
+ 79 - 649
src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java


+ 612 - 0
src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java

@@ -0,0 +1,612 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog.Ops;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+
+public class FSEditLogLoader {
+  private final FSNamesystem fsNamesys;
+
+  public FSEditLogLoader(FSNamesystem fsNamesys) {
+    this.fsNamesys = fsNamesys;
+  }
+  
+  /**
+   * Load an edit log, and apply the changes to the in-memory structure
+   * This is where we apply edits that we've been writing to disk all
+   * along.
+   */
+  int loadFSEdits(EditLogInputStream edits) throws IOException {
+    DataInputStream in = edits.getDataInputStream();
+    long startTime = now();
+    int numEdits = loadFSEdits(in, true);
+    FSImage.LOG.info("Edits file " + edits.getName() 
+        + " of size " + edits.length() + " edits # " + numEdits 
+        + " loaded in " + (now()-startTime)/1000 + " seconds.");
+    return numEdits;
+  }
+
+  int loadFSEdits(DataInputStream in, boolean closeOnExit) throws IOException {
+    int numEdits = 0;
+    int logVersion = 0;
+
+    try {
+      // Read log file version. Could be missing. 
+      in.mark(4);
+      // If edits log is greater than 2G, available method will return negative
+      // numbers, so we avoid having to call available
+      boolean available = true;
+      try {
+        logVersion = in.readByte();
+      } catch (EOFException e) {
+        available = false;
+      }
+      if (available) {
+        in.reset();
+        logVersion = in.readInt();
+        if (logVersion < FSConstants.LAYOUT_VERSION) // future version
+          throw new IOException(
+                          "Unexpected version of the file system log file: "
+                          + logVersion + ". Current version = " 
+                          + FSConstants.LAYOUT_VERSION + ".");
+      }
+      assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
+                            "Unsupported version " + logVersion;
+      numEdits = loadEditRecords(logVersion, in, false);
+    } finally {
+      if(closeOnExit)
+        in.close();
+    }
+    if (logVersion != FSConstants.LAYOUT_VERSION) // other version
+      numEdits++; // save this image asap
+    return numEdits;
+  }
+
+  @SuppressWarnings("deprecation")
+  int loadEditRecords(int logVersion, DataInputStream in,
+      boolean closeOnExit) throws IOException {
+    FSDirectory fsDir = fsNamesys.dir;
+    int numEdits = 0;
+    String clientName = null;
+    String clientMachine = null;
+    String path = null;
+    int numOpAdd = 0, numOpClose = 0, numOpDelete = 0,
+        numOpRenameOld = 0, numOpSetRepl = 0, numOpMkDir = 0,
+        numOpSetPerm = 0, numOpSetOwner = 0, numOpSetGenStamp = 0,
+        numOpTimes = 0, numOpRename = 0, numOpConcatDelete = 0, 
+        numOpSymlink = 0, numOpGetDelegationToken = 0,
+        numOpRenewDelegationToken = 0, numOpCancelDelegationToken = 0, 
+        numOpUpdateMasterKey = 0, numOpOther = 0;
+
+    try {
+      while (true) {
+        long timestamp = 0;
+        long mtime = 0;
+        long atime = 0;
+        long blockSize = 0;
+        byte opcode = -1;
+        try {
+          in.mark(1);
+          opcode = in.readByte();
+          if (opcode == Ops.OP_INVALID) {
+            in.reset(); // reset back to end of file if somebody reads it again
+            break; // no more transactions
+          }
+        } catch (EOFException e) {
+          break; // no more transactions
+        }
+        numEdits++;
+        switch (opcode) {
+        case Ops.OP_ADD:
+        case Ops.OP_CLOSE: {
+          // versions > 0 support per file replication
+          // get name and replication
+          int length = in.readInt();
+          if (-7 == logVersion && length != 3||
+              -17 < logVersion && logVersion < -7 && length != 4 ||
+              logVersion <= -17 && length != 5) {
+              throw new IOException("Incorrect data format."  +
+                                    " logVersion is " + logVersion +
+                                    " but writables.length is " +
+                                    length + ". ");
+          }
+          path = FSImage.readString(in);
+          short replication = fsNamesys.adjustReplication(readShort(in));
+          mtime = readLong(in);
+          if (logVersion <= -17) {
+            atime = readLong(in);
+          }
+          if (logVersion < -7) {
+            blockSize = readLong(in);
+          }
+          // get blocks
+          boolean isFileUnderConstruction = (opcode == Ops.OP_ADD);
+          BlockInfo blocks[] = 
+            readBlocks(in, logVersion, isFileUnderConstruction, replication);
+
+          // Older versions of HDFS does not store the block size in inode.
+          // If the file has more than one block, use the size of the
+          // first block as the blocksize. Otherwise use the default
+          // block size.
+          if (-8 <= logVersion && blockSize == 0) {
+            if (blocks.length > 1) {
+              blockSize = blocks[0].getNumBytes();
+            } else {
+              long first = ((blocks.length == 1)? blocks[0].getNumBytes(): 0);
+              blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
+            }
+          }
+           
+          PermissionStatus permissions = fsNamesys.getUpgradePermission();
+          if (logVersion <= -11) {
+            permissions = PermissionStatus.read(in);
+          }
+
+          // clientname, clientMachine and block locations of last block.
+          if (opcode == Ops.OP_ADD && logVersion <= -12) {
+            clientName = FSImage.readString(in);
+            clientMachine = FSImage.readString(in);
+            if (-13 <= logVersion) {
+              readDatanodeDescriptorArray(in);
+            }
+          } else {
+            clientName = "";
+            clientMachine = "";
+          }
+
+          // The open lease transaction re-creates a file if necessary.
+          // Delete the file if it already exists.
+          if (FSNamesystem.LOG.isDebugEnabled()) {
+            FSNamesystem.LOG.debug(opcode + ": " + path + 
+                                   " numblocks : " + blocks.length +
+                                   " clientHolder " +  clientName +
+                                   " clientMachine " + clientMachine);
+          }
+
+          fsDir.unprotectedDelete(path, mtime);
+
+          // add to the file tree
+          INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
+                                                    path, permissions,
+                                                    blocks, replication, 
+                                                    mtime, atime, blockSize);
+          if (isFileUnderConstruction) {
+            numOpAdd++;
+            //
+            // Replace current node with a INodeUnderConstruction.
+            // Recreate in-memory lease record.
+            //
+            INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
+                                      node.getLocalNameBytes(),
+                                      node.getReplication(), 
+                                      node.getModificationTime(),
+                                      node.getPreferredBlockSize(),
+                                      node.getBlocks(),
+                                      node.getPermissionStatus(),
+                                      clientName, 
+                                      clientMachine, 
+                                      null);
+            fsDir.replaceNode(path, node, cons);
+            fsNamesys.leaseManager.addLease(cons.getClientName(), path);
+          }
+          break;
+        } 
+        case Ops.OP_SET_REPLICATION: {
+          numOpSetRepl++;
+          path = FSImage.readString(in);
+          short replication = fsNamesys.adjustReplication(readShort(in));
+          fsDir.unprotectedSetReplication(path, replication, null);
+          break;
+        } 
+        case Ops.OP_CONCAT_DELETE: {
+          if (logVersion > -22) {
+            throw new IOException("Unexpected opcode " + opcode
+                + " for version " + logVersion);
+          }
+          numOpConcatDelete++;
+          int length = in.readInt();
+          if (length < 3) { // trg, srcs.., timestam
+            throw new IOException("Incorrect data format. " 
+                                  + "Mkdir operation.");
+          }
+          String trg = FSImage.readString(in);
+          int srcSize = length - 1 - 1; //trg and timestamp
+          String [] srcs = new String [srcSize];
+          for(int i=0; i<srcSize;i++) {
+            srcs[i]= FSImage.readString(in);
+          }
+          timestamp = readLong(in);
+          fsDir.unprotectedConcat(trg, srcs);
+          break;
+        }
+        case Ops.OP_RENAME_OLD: {
+          numOpRenameOld++;
+          int length = in.readInt();
+          if (length != 3) {
+            throw new IOException("Incorrect data format. " 
+                                  + "Mkdir operation.");
+          }
+          String s = FSImage.readString(in);
+          String d = FSImage.readString(in);
+          timestamp = readLong(in);
+          HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
+          fsDir.unprotectedRenameTo(s, d, timestamp);
+          fsNamesys.changeLease(s, d, dinfo);
+          break;
+        }
+        case Ops.OP_DELETE: {
+          numOpDelete++;
+          int length = in.readInt();
+          if (length != 2) {
+            throw new IOException("Incorrect data format. " 
+                                  + "delete operation.");
+          }
+          path = FSImage.readString(in);
+          timestamp = readLong(in);
+          fsDir.unprotectedDelete(path, timestamp);
+          break;
+        }
+        case Ops.OP_MKDIR: {
+          numOpMkDir++;
+          PermissionStatus permissions = fsNamesys.getUpgradePermission();
+          int length = in.readInt();
+          if (-17 < logVersion && length != 2 ||
+              logVersion <= -17 && length != 3) {
+            throw new IOException("Incorrect data format. " 
+                                  + "Mkdir operation.");
+          }
+          path = FSImage.readString(in);
+          timestamp = readLong(in);
+
+          // The disk format stores atimes for directories as well.
+          // However, currently this is not being updated/used because of
+          // performance reasons.
+          if (logVersion <= -17) {
+            atime = readLong(in);
+          }
+
+          if (logVersion <= -11) {
+            permissions = PermissionStatus.read(in);
+          }
+          fsDir.unprotectedMkdir(path, permissions, timestamp);
+          break;
+        }
+        case Ops.OP_SET_GENSTAMP: {
+          numOpSetGenStamp++;
+          long lw = in.readLong();
+          fsNamesys.setGenerationStamp(lw);
+          break;
+        } 
+        case Ops.OP_DATANODE_ADD: {
+          numOpOther++;
+          FSImage.DatanodeImage nodeimage = new FSImage.DatanodeImage();
+          nodeimage.readFields(in);
+          //Datanodes are not persistent any more.
+          break;
+        }
+        case Ops.OP_DATANODE_REMOVE: {
+          numOpOther++;
+          DatanodeID nodeID = new DatanodeID();
+          nodeID.readFields(in);
+          //Datanodes are not persistent any more.
+          break;
+        }
+        case Ops.OP_SET_PERMISSIONS: {
+          numOpSetPerm++;
+          if (logVersion > -11)
+            throw new IOException("Unexpected opcode " + opcode
+                                  + " for version " + logVersion);
+          fsDir.unprotectedSetPermission(
+              FSImage.readString(in), FsPermission.read(in));
+          break;
+        }
+        case Ops.OP_SET_OWNER: {
+          numOpSetOwner++;
+          if (logVersion > -11)
+            throw new IOException("Unexpected opcode " + opcode
+                                  + " for version " + logVersion);
+          fsDir.unprotectedSetOwner(FSImage.readString(in),
+              FSImage.readString_EmptyAsNull(in),
+              FSImage.readString_EmptyAsNull(in));
+          break;
+        }
+        case Ops.OP_SET_NS_QUOTA: {
+          if (logVersion > -16) {
+            throw new IOException("Unexpected opcode " + opcode
+                + " for version " + logVersion);
+          }
+          fsDir.unprotectedSetQuota(FSImage.readString(in), 
+                                    readLongWritable(in), 
+                                    FSConstants.QUOTA_DONT_SET);
+          break;
+        }
+        case Ops.OP_CLEAR_NS_QUOTA: {
+          if (logVersion > -16) {
+            throw new IOException("Unexpected opcode " + opcode
+                + " for version " + logVersion);
+          }
+          fsDir.unprotectedSetQuota(FSImage.readString(in),
+                                    FSConstants.QUOTA_RESET,
+                                    FSConstants.QUOTA_DONT_SET);
+          break;
+        }
+
+        case Ops.OP_SET_QUOTA:
+          fsDir.unprotectedSetQuota(FSImage.readString(in),
+                                    readLongWritable(in),
+                                    readLongWritable(in));
+                                      
+          break;
+
+        case Ops.OP_TIMES: {
+          numOpTimes++;
+          int length = in.readInt();
+          if (length != 3) {
+            throw new IOException("Incorrect data format. " 
+                                  + "times operation.");
+          }
+          path = FSImage.readString(in);
+          mtime = readLong(in);
+          atime = readLong(in);
+          fsDir.unprotectedSetTimes(path, mtime, atime, true);
+          break;
+        }
+        case Ops.OP_SYMLINK: {
+          numOpSymlink++;
+          int length = in.readInt();
+          if (length != 4) {
+            throw new IOException("Incorrect data format. " 
+                                  + "symlink operation.");
+          }
+          path = FSImage.readString(in);
+          String value = FSImage.readString(in);
+          mtime = readLong(in);
+          atime = readLong(in);
+          PermissionStatus perm = PermissionStatus.read(in);
+          fsDir.unprotectedSymlink(path, value, mtime, atime, perm);
+          break;
+        }
+        case Ops.OP_RENAME: {
+          if (logVersion > -21) {
+            throw new IOException("Unexpected opcode " + opcode
+                + " for version " + logVersion);
+          }
+          numOpRename++;
+          int length = in.readInt();
+          if (length != 3) {
+            throw new IOException("Incorrect data format. " 
+                                  + "Mkdir operation.");
+          }
+          String s = FSImage.readString(in);
+          String d = FSImage.readString(in);
+          timestamp = readLong(in);
+          Rename[] options = readRenameOptions(in);
+          HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
+          fsDir.unprotectedRenameTo(s, d, timestamp, options);
+          fsNamesys.changeLease(s, d, dinfo);
+          break;
+        }
+        case Ops.OP_GET_DELEGATION_TOKEN: {
+          if (logVersion > -24) {
+            throw new IOException("Unexpected opcode " + opcode
+                + " for version " + logVersion);
+          }
+          numOpGetDelegationToken++;
+          DelegationTokenIdentifier delegationTokenId = 
+              new DelegationTokenIdentifier();
+          delegationTokenId.readFields(in);
+          long expiryTime = readLong(in);
+          fsNamesys.getDelegationTokenSecretManager()
+              .addPersistedDelegationToken(delegationTokenId, expiryTime);
+          break;
+        }
+        case Ops.OP_RENEW_DELEGATION_TOKEN: {
+          if (logVersion > -24) {
+            throw new IOException("Unexpected opcode " + opcode
+                + " for version " + logVersion);
+          }
+          numOpRenewDelegationToken++;
+          DelegationTokenIdentifier delegationTokenId = 
+              new DelegationTokenIdentifier();
+          delegationTokenId.readFields(in);
+          long expiryTime = readLong(in);
+          fsNamesys.getDelegationTokenSecretManager()
+              .updatePersistedTokenRenewal(delegationTokenId, expiryTime);
+          break;
+        }
+        case Ops.OP_CANCEL_DELEGATION_TOKEN: {
+          if (logVersion > -24) {
+            throw new IOException("Unexpected opcode " + opcode
+                + " for version " + logVersion);
+          }
+          numOpCancelDelegationToken++;
+          DelegationTokenIdentifier delegationTokenId = 
+              new DelegationTokenIdentifier();
+          delegationTokenId.readFields(in);
+          fsNamesys.getDelegationTokenSecretManager()
+              .updatePersistedTokenCancellation(delegationTokenId);
+          break;
+        }
+        case Ops.OP_UPDATE_MASTER_KEY: {
+          if (logVersion > -24) {
+            throw new IOException("Unexpected opcode " + opcode
+                + " for version " + logVersion);
+          }
+          numOpUpdateMasterKey++;
+          DelegationKey delegationKey = new DelegationKey();
+          delegationKey.readFields(in);
+          fsNamesys.getDelegationTokenSecretManager().updatePersistedMasterKey(
+              delegationKey);
+          break;
+        }
+        default: {
+          throw new IOException("Never seen opcode " + opcode);
+        }
+        }
+      }
+    } finally {
+      if(closeOnExit)
+        in.close();
+    }
+    if (FSImage.LOG.isDebugEnabled()) {
+      FSImage.LOG.debug("numOpAdd = " + numOpAdd + " numOpClose = " + numOpClose 
+          + " numOpDelete = " + numOpDelete 
+          + " numOpRenameOld = " + numOpRenameOld 
+          + " numOpSetRepl = " + numOpSetRepl + " numOpMkDir = " + numOpMkDir
+          + " numOpSetPerm = " + numOpSetPerm 
+          + " numOpSetOwner = " + numOpSetOwner
+          + " numOpSetGenStamp = " + numOpSetGenStamp 
+          + " numOpTimes = " + numOpTimes
+          + " numOpConcatDelete  = " + numOpConcatDelete
+          + " numOpRename = " + numOpRename
+          + " numOpGetDelegationToken = " + numOpGetDelegationToken
+          + " numOpRenewDelegationToken = " + numOpRenewDelegationToken
+          + " numOpCancelDelegationToken = " + numOpCancelDelegationToken
+          + " numOpUpdateMasterKey = " + numOpUpdateMasterKey
+          + " numOpOther = " + numOpOther);
+    }
+    return numEdits;
+  }
+
+
+  /**
+   * A class to read in blocks stored in the old format. The only two
+   * fields in the block were blockid and length.
+   */
+  static class BlockTwo implements Writable {
+    long blkid;
+    long len;
+
+    static {                                      // register a ctor
+      WritableFactories.setFactory
+        (BlockTwo.class,
+         new WritableFactory() {
+           public Writable newInstance() { return new BlockTwo(); }
+         });
+    }
+
+
+    BlockTwo() {
+      blkid = 0;
+      len = 0;
+    }
+    /////////////////////////////////////
+    // Writable
+    /////////////////////////////////////
+    public void write(DataOutput out) throws IOException {
+      out.writeLong(blkid);
+      out.writeLong(len);
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      this.blkid = in.readLong();
+      this.len = in.readLong();
+    }
+  }
+
+  /** This method is defined for compatibility reason. */
+  static private DatanodeDescriptor[] readDatanodeDescriptorArray(DataInput in
+      ) throws IOException {
+    DatanodeDescriptor[] locations = new DatanodeDescriptor[in.readInt()];
+    for (int i = 0; i < locations.length; i++) {
+      locations[i] = new DatanodeDescriptor();
+      locations[i].readFieldsFromFSEditLog(in);
+    }
+    return locations;
+  }
+
+  static private short readShort(DataInputStream in) throws IOException {
+    return Short.parseShort(FSImage.readString(in));
+  }
+
+  static private long readLong(DataInputStream in) throws IOException {
+    return Long.parseLong(FSImage.readString(in));
+  }
+  
+  // a place holder for reading a long
+  private static final LongWritable longWritable = new LongWritable();
+
+  /** Read an integer from an input stream */
+  private static long readLongWritable(DataInputStream in) throws IOException {
+    synchronized (longWritable) {
+      longWritable.readFields(in);
+      return longWritable.get();
+    }
+  }
+  
+  static Rename[] readRenameOptions(DataInputStream in) throws IOException {
+    BytesWritable writable = new BytesWritable();
+    writable.readFields(in);
+    
+    byte[] bytes = writable.getBytes();
+    Rename[] options = new Rename[bytes.length];
+    
+    for (int i = 0; i < bytes.length; i++) {
+      options[i] = Rename.valueOf(bytes[i]);
+    }
+    return options;
+  }
+
+  static private BlockInfo[] readBlocks(
+      DataInputStream in,
+      int logVersion,
+      boolean isFileUnderConstruction,
+      short replication) throws IOException {
+    int numBlocks = in.readInt();
+    BlockInfo[] blocks = new BlockInfo[numBlocks];
+    Block blk = new Block();
+    BlockTwo oldblk = new BlockTwo();
+    for (int i = 0; i < numBlocks; i++) {
+      if (logVersion <= -14) {
+        blk.readFields(in);
+      } else {
+        oldblk.readFields(in);
+        blk.set(oldblk.blkid, oldblk.len,
+                GenerationStamp.GRANDFATHER_GENERATION_STAMP);
+      }
+      if(isFileUnderConstruction && i == numBlocks-1)
+        blocks[i] = new BlockInfoUnderConstruction(blk, replication);
+      else
+        blocks[i] = new BlockInfo(blk, replication);
+    }
+    return blocks;
+  }
+}

+ 87 - 14
src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -78,6 +78,8 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 
 /**
@@ -153,6 +155,13 @@ public class FSImage extends Storage {
   private Collection<URI> checkpointDirs;
   private Collection<URI> checkpointEditsDirs;
 
+  /**
+   * Image compression related fields
+   */
+  private boolean compressImage = false;  // if image should be compressed
+  private CompressionCodec saveCodec;     // the compression codec
+  private CompressionCodecFactory codecFac;  // all the supported codecs
+
   /**
    * Can fs-image be rolled?
    */
@@ -172,6 +181,34 @@ public class FSImage extends Storage {
     this((FSNamesystem)null);
   }
 
+  /**
+   * Constructor
+   * @param conf Configuration
+   */
+  FSImage(Configuration conf) throws IOException {
+    this();
+    if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY, 
+        DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT)) {
+      NameNode.LOG.info("set FSImage.restoreFailedStorage");
+      setRestoreFailedStorage(true);
+    }
+    setCheckpointDirectories(FSImage.getCheckpointDirs(conf, null),
+        FSImage.getCheckpointEditsDirs(conf, null));
+    this.compressImage = conf.getBoolean(
+        DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY,
+        DFSConfigKeys.DFS_IMAGE_COMPRESS_DEFAULT);
+     this.codecFac = new CompressionCodecFactory(conf);
+     if (this.compressImage) {
+       String codecClassName = conf.get(
+           DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
+           DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_DEFAULT);
+       this.saveCodec = codecFac.getCodecByClassName(codecClassName);
+       if (this.saveCodec == null) {
+         throw new IOException("Not supported codec: " + codecClassName);
+       }
+     }
+   }
+ 
   FSImage(FSNamesystem ns) {
     super(NodeType.NAME_NODE);
     this.editLog = new FSEditLog(this);
@@ -640,6 +677,7 @@ public class FSImage extends Storage {
     // replace real image with the checkpoint image
     FSImage realImage = fsNamesys.getFSImage();
     assert realImage == this;
+    ckptImage.codecFac = realImage.codecFac;
     fsNamesys.dir.fsImage = ckptImage;
     // load from the checkpoint dirs
     try {
@@ -1011,16 +1049,11 @@ public class FSImage extends Storage {
     // Recover from previous interrupted checkpoint, if any
     needToSave |= recoverInterruptedCheckpoint(latestNameSD, latestEditsSD);
 
-    long startTime = now();
-    long imageSize = getImageFile(latestNameSD, NameNodeFile.IMAGE).length();
-
     //
     // Load in bits
     //
     latestNameSD.read();
     needToSave |= loadFSImage(getImageFile(latestNameSD, NameNodeFile.IMAGE));
-    LOG.info("Image file of size " + imageSize + " loaded in " 
-        + (now() - startTime)/1000 + " seconds.");
     
     // Load latest edits
     if (latestNameCheckpointTime > latestEditsCheckpointTime)
@@ -1041,6 +1074,7 @@ public class FSImage extends Storage {
     assert this.getLayoutVersion() < 0 : "Negative layout version is expected.";
     assert curFile != null : "curFile is null";
 
+    long startTime = now();   
     FSNamesystem fsNamesys = getFSNamesystem();
     FSDirectory fsDir = fsNamesys.dir;
     fsNamesys.setBlockPoolId(this.getBlockPoolID());
@@ -1049,8 +1083,8 @@ public class FSImage extends Storage {
     // Load in bits
     //
     boolean needToSave = true;
-    DataInputStream in = new DataInputStream(new BufferedInputStream(
-                              new FileInputStream(curFile)));
+    FileInputStream fin = new FileInputStream(curFile);
+    DataInputStream in = new DataInputStream(fin);
     try {
       /*
        * Note: Remove any checks for version earlier than 
@@ -1064,6 +1098,8 @@ public class FSImage extends Storage {
        */
       // read image version: first appeared in version -1
       int imgVersion = in.readInt();
+      needToSave = (imgVersion != FSConstants.LAYOUT_VERSION);
+
       // read namespaceID: first appeared in version -2
       this.namespaceID = in.readInt();
 
@@ -1082,8 +1118,27 @@ public class FSImage extends Storage {
         fsNamesys.setGenerationStamp(genstamp); 
       }
 
-      needToSave = (imgVersion != FSConstants.LAYOUT_VERSION);
-
+      // read compression related info
+      boolean isCompressed = false;
+      if (imgVersion <= -25) {  // -25: 1st version providing compression option
+        isCompressed = in.readBoolean();
+        if (isCompressed) {
+          String codecClassName = Text.readString(in);
+          CompressionCodec loadCodec = codecFac.getCodecByClassName(codecClassName);
+          if (loadCodec == null) {
+            throw new IOException("Image compression codec not supported: "
+                                 + codecClassName);
+          }
+          in = new DataInputStream(loadCodec.createInputStream(fin));
+          LOG.info("Loading image file " + curFile +
+              " compressed using codec " + codecClassName);
+        }
+      }
+      if (!isCompressed) {
+        // use buffered input stream
+        in = new DataInputStream(new BufferedInputStream(fin));
+      }
+      
       // read file info
       short replication = fsNamesys.getDefaultReplication();
 
@@ -1098,7 +1153,7 @@ public class FSImage extends Storage {
         long blockSize = 0;
         pathComponents = readPathComponents(in);
         replication = in.readShort();
-        replication = editLog.adjustReplication(replication);
+        replication = fsNamesys.adjustReplication(replication);
         modificationTime = in.readLong();
         if (imgVersion <= -17) {
           atime = in.readLong();
@@ -1191,6 +1246,9 @@ public class FSImage extends Storage {
       in.close();
     }
     
+    LOG.info("Image file of size " + curFile.length() + " loaded in " 
+        + (now() - startTime)/1000 + " seconds.");
+
     return needToSave;
   }
 
@@ -1236,17 +1294,19 @@ public class FSImage extends Storage {
    * @throws IOException
    */
   int loadFSEdits(StorageDirectory sd) throws IOException {
+    FSEditLogLoader loader = new FSEditLogLoader(namesystem);
+    
     int numEdits = 0;
     EditLogFileInputStream edits = 
       new EditLogFileInputStream(getImageFile(sd, NameNodeFile.EDITS));
     
-    numEdits = editLog.loadFSEdits(edits);
+    numEdits = loader.loadFSEdits(edits);
     edits.close();
     File editsNew = getImageFile(sd, NameNodeFile.EDITS_NEW);
     
     if (editsNew.exists() && editsNew.length() > 0) {
       edits = new EditLogFileInputStream(editsNew);
-      numEdits += editLog.loadFSEdits(edits);
+      numEdits += loader.loadFSEdits(edits);
       edits.close();
     }
     
@@ -1267,13 +1327,26 @@ public class FSImage extends Storage {
     // Write out data
     //
     FileOutputStream fos = new FileOutputStream(newFile);
-    DataOutputStream out = new DataOutputStream(
-      new BufferedOutputStream(fos));
+    DataOutputStream out = new DataOutputStream(fos);
     try {
       out.writeInt(FSConstants.LAYOUT_VERSION);
       out.writeInt(namespaceID);
       out.writeLong(fsDir.rootDir.numItemsInTree());
       out.writeLong(fsNamesys.getGenerationStamp());
+      
+      // write compression info
+      out.writeBoolean(compressImage);
+      if (compressImage) {
+        String codecClassName = saveCodec.getClass().getCanonicalName();
+        Text.writeString(out, codecClassName);
+        out = new DataOutputStream(saveCodec.createOutputStream(fos));
+        LOG.info("Saving image file " + newFile +
+            " compressed using codec " + codecClassName);
+      } else {
+        // use a buffered output stream
+        out = new DataOutputStream(new BufferedOutputStream(fos));
+      }
+
       byte[] byteStore = new byte[4*FSConstants.MAX_PATH_LENGTH];
       ByteBuffer strbuf = ByteBuffer.wrap(byteStore);
       // save the root

+ 16 - 0
src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -3505,6 +3505,22 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
   short getMaxReplication()     { return (short)blockManager.maxReplication; }
   short getMinReplication()     { return (short)blockManager.minReplication; }
   short getDefaultReplication() { return (short)blockManager.defaultReplication; }
+
+  /**
+   * Clamp the specified replication between the minimum and maximum
+   * replication levels for this namesystem.
+   */
+  short adjustReplication(short replication) {
+    short minReplication = getMinReplication();
+    if (replication < minReplication) {
+      replication = minReplication;
+    }
+    short maxReplication = getMaxReplication();
+    if (replication > maxReplication) {
+      replication = maxReplication;
+    }
+    return replication;
+  }
     
   /**
    * A immutable object that stores the number of live replicas and

+ 3 - 3
src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java

@@ -166,7 +166,7 @@ public class SecondaryNameNode implements Runnable {
                                   "/tmp/hadoop/dfs/namesecondary");
     checkpointEditsDirs = FSImage.getCheckpointEditsDirs(conf, 
                                   "/tmp/hadoop/dfs/namesecondary");    
-    checkpointImage = new CheckpointStorage();
+    checkpointImage = new CheckpointStorage(conf);
     checkpointImage.recoverCreate(checkpointDirs, checkpointEditsDirs);
 
     // Initialize other scheduling parameters from the configuration
@@ -581,8 +581,8 @@ public class SecondaryNameNode implements Runnable {
   static class CheckpointStorage extends FSImage {
     /**
      */
-    CheckpointStorage() throws IOException {
-      super();
+    CheckpointStorage(Configuration conf) throws IOException {
+      super(conf);
     }
 
     @Override

+ 11 - 4
src/java/org/apache/hadoop/hdfs/tools/DFSck.java

@@ -21,6 +21,7 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.io.PrintStream;
 import java.net.URL;
 import java.net.URLConnection;
 import java.net.URLEncoder;
@@ -70,14 +71,20 @@ public class DFSck extends Configured implements Tool {
   }
 
   private final UserGroupInformation ugi;
+  private final PrintStream out;
 
   /**
    * Filesystem checker.
    * @param conf current Configuration
    */
   public DFSck(Configuration conf) throws IOException {
+    this(conf, System.out);
+  }
+
+  public DFSck(Configuration conf, PrintStream out) throws IOException {
     super(conf);
     this.ugi = UserGroupInformation.getCurrentUser();
+    this.out = out;
   }
 
   /**
@@ -163,10 +170,10 @@ public class DFSck extends Configured implements Tool {
             continue;
           numCorrupt++;
           if (numCorrupt == 1) {
-            System.out.println("The list of corrupt files under path '" 
+            out.println("The list of corrupt files under path '" 
                 + dir + "' are:");
           }
-          System.out.println(line);
+          out.println(line);
           try {
             // Get the block # that we need to send in next call
             lastBlock = line.split("\t")[0];
@@ -179,7 +186,7 @@ public class DFSck extends Configured implements Tool {
         input.close();
       }
     }
-    System.out.println("The filesystem under path '" + dir + "' has " 
+    out.println("The filesystem under path '" + dir + "' has " 
         + numCorrupt + " CORRUPT files");
     if (numCorrupt == 0)
       errCode = 0;
@@ -231,7 +238,7 @@ public class DFSck extends Configured implements Tool {
     int errCode = -1;
     try {
       while ((line = input.readLine()) != null) {
-        System.out.println(line);
+        out.println(line);
         lastLine = line;
       }
     } finally {

+ 21 - 1
src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java

@@ -23,6 +23,7 @@ import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -30,6 +31,8 @@ import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.tools.offlineImageViewer.ImageVisitor.ImageElement;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 
 /**
@@ -116,7 +119,8 @@ import org.apache.hadoop.security.token.delegation.DelegationKey;
 class ImageLoaderCurrent implements ImageLoader {
   protected final DateFormat dateFormat = 
                                       new SimpleDateFormat("yyyy-MM-dd HH:mm");
-  private static int [] versions = {-16, -17, -18, -19, -20, -21, -22, -23, -24};
+  private static int [] versions = 
+           {-16, -17, -18, -19, -20, -21, -22, -23, -24, -25};
   private int imageVersion = 0;
 
   /* (non-Javadoc)
@@ -151,6 +155,22 @@ class ImageLoaderCurrent implements ImageLoader {
 
       v.visit(ImageElement.GENERATION_STAMP, in.readLong());
 
+      if (imageVersion <= -25) {
+        boolean isCompressed = in.readBoolean();
+        v.visit(ImageElement.IS_COMPRESSED, imageVersion);
+        if (isCompressed) {
+          String codecClassName = Text.readString(in);
+          v.visit(ImageElement.COMPRESS_CODEC, codecClassName);
+          CompressionCodecFactory codecFac = new CompressionCodecFactory(
+              new Configuration());
+          CompressionCodec codec = codecFac.getCodecByClassName(codecClassName);
+          if (codec == null) {
+            throw new IOException("Image compression codec not supported: "
+                + codecClassName);
+          }
+          in = new DataInputStream(codec.createInputStream(in));
+        }
+      }
       processINodes(in, v, numInodes, skipBlocks);
 
       processINodesUC(in, v, skipBlocks);

+ 2 - 0
src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageVisitor.java

@@ -33,6 +33,8 @@ abstract class ImageVisitor {
     FS_IMAGE,
     IMAGE_VERSION,
     NAMESPACE_ID,
+    IS_COMPRESSED,
+    COMPRESS_CODEC,
     LAYOUT_VERSION,
     NUM_INODES,
     GENERATION_STAMP,

+ 2 - 1
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java

@@ -138,11 +138,12 @@ public class TestEditLog extends TestCase {
       // If there were any corruptions, it is likely that the reading in
       // of these transactions will throw an exception.
       //
+      FSEditLogLoader loader = new FSEditLogLoader(namesystem);
       for (Iterator<StorageDirectory> it = 
               fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
         File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS);
         System.out.println("Verifying file: " + editFile);
-        int numEdits = namesystem.getEditLog().loadFSEdits(
+        int numEdits = loader.loadFSEdits(
                                   new EditLogFileInputStream(editFile));
         int numLeases = namesystem.leaseManager.countLease();
         System.out.println("Number of outstanding leases " + numLeases);

+ 1 - 1
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java

@@ -222,7 +222,7 @@ public class TestEditLogRace {
            fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
       File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS);
       System.out.println("Verifying file: " + editFile);
-      int numEdits = namesystem.getEditLog().loadFSEdits(
+      int numEdits = new FSEditLogLoader(namesystem).loadFSEdits(
         new EditLogFileInputStream(editFile));
       System.out.println("Number of edits: " + numEdits);
     }

+ 2 - 5
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java

@@ -73,16 +73,13 @@ public class TestFsck extends TestCase {
   static String runFsck(Configuration conf, int expectedErrCode, 
                         boolean checkErrorCode,String... path) 
                         throws Exception {
-    PrintStream oldOut = System.out;
     ByteArrayOutputStream bStream = new ByteArrayOutputStream();
-    PrintStream newOut = new PrintStream(bStream, true);
-    System.setOut(newOut);
+    PrintStream out = new PrintStream(bStream, true);
     ((Log4JLogger)FSPermissionChecker.LOG).getLogger().setLevel(Level.ALL);
-    int errCode = ToolRunner.run(new DFSck(conf), path);
+    int errCode = ToolRunner.run(new DFSck(conf, out), path);
     if (checkErrorCode)
       assertEquals(expectedErrCode, errCode);
     ((Log4JLogger)FSPermissionChecker.LOG).getLogger().setLevel(Level.INFO);
-    System.setOut(oldOut);
     return bStream.toString();
   }
 

+ 2 - 1
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java

@@ -134,13 +134,14 @@ public class TestSecurityTokenEditLog extends TestCase {
       // If there were any corruptions, it is likely that the reading in
       // of these transactions will throw an exception.
       //
+      FSEditLogLoader loader = new FSEditLogLoader(namesystem);
       namesystem.getDelegationTokenSecretManager().stopThreads();
       int numKeys = namesystem.getDelegationTokenSecretManager().getNumberOfKeys();
       for (Iterator<StorageDirectory> it = 
               fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
         File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS);
         System.out.println("Verifying file: " + editFile);
-        int numEdits = namesystem.getEditLog().loadFSEdits(
+        int numEdits = loader.loadFSEdits(
                                   new EditLogFileInputStream(editFile));
         assertTrue("Verification for " + editFile + " failed. " +
                    "Expected " + (NUM_THREADS * opsPerTrans * NUM_TRANSACTIONS + numKeys) + " transactions. "+

+ 57 - 0
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartup.java

@@ -36,9 +36,13 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
@@ -348,4 +352,57 @@ public class TestStartup extends TestCase {
         cluster.shutdown();
     }
   }
+  
+  public void testCompression() throws IOException {
+    LOG.info("Test compressing image.");
+    Configuration conf = new Configuration();
+    FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
+    conf.set("dfs.http.address", "127.0.0.1:0");
+    File base_dir = new File(System.getProperty(
+        "test.build.data", "build/test/data"), "dfs/");
+    conf.set("dfs.name.dir", new File(base_dir, "name").getPath());
+    conf.setBoolean("dfs.permissions", false);
+
+    NameNode.format(conf);
+
+    // create an uncompressed image
+    LOG.info("Create an uncompressed fsimage");
+    NameNode namenode = new NameNode(conf);
+    namenode.getNamesystem().mkdirs("/test",
+        new PermissionStatus("hairong", null, FsPermission.getDefault()), true);
+    assertTrue(namenode.getFileInfo("/test").isDir());
+    namenode.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+    namenode.saveNamespace();
+    namenode.stop();
+    namenode.join();
+
+    // compress image using default codec
+    LOG.info("Read an uncomressed image and store it compressed using default codec.");
+    conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
+    checkNameSpace(conf);
+
+    // read image compressed using the default and compress it using Gzip codec
+    LOG.info("Read a compressed image and store it using a different codec.");
+    conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
+        "org.apache.hadoop.io.compress.GzipCodec");
+    checkNameSpace(conf);
+
+    // read an image compressed in Gzip and store it uncompressed
+    LOG.info("Read an compressed iamge and store it as uncompressed.");
+    conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, false);
+    checkNameSpace(conf);
+
+    // read an uncomrpessed image and store it uncompressed
+    LOG.info("Read an uncompressed image and store it as uncompressed.");
+    checkNameSpace(conf);
+  }
+
+  private void checkNameSpace(Configuration conf) throws IOException {
+    NameNode namenode = new NameNode(conf);
+    assertTrue(namenode.getFileInfo("/test").isDir());
+    namenode.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+    namenode.saveNamespace();
+    namenode.stop();
+    namenode.join();
+  }
 }

Some files were not shown because too many files changed in this diff