浏览代码

Merging changes -r1032469:1033639 from trunk to federation.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1074768 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 14 年之前
父节点
当前提交
a42bb3e502
共有 26 个文件被更改,包括 490 次插入156 次删除
  1. 0 53
      .eclipse.templates/.classpath
  2. 0 17
      .eclipse.templates/.project
  3. 10 0
      CHANGES.txt
  4. 197 33
      build.xml
  5. 1 0
      ivy/libraries.properties
  6. 10 0
      src/java/hdfs-default.xml
  7. 4 0
      src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  8. 2 2
      src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
  9. 3 2
      src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
  10. 3 2
      src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
  11. 3 2
      src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
  12. 2 1
      src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
  13. 2 1
      src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java
  14. 27 7
      src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
  15. 2 1
      src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
  16. 63 10
      src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
  17. 8 2
      src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  18. 22 2
      src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
  19. 3 3
      src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
  20. 3 2
      src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
  21. 18 1
      src/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
  22. 7 4
      src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
  23. 8 8
      src/java/org/apache/hadoop/hdfs/util/DataTransferThrottler.java
  24. 2 1
      src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
  25. 10 2
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java
  26. 80 0
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartup.java

+ 0 - 53
.eclipse.templates/.classpath

@@ -1,53 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<classpath>
-	<classpathentry kind="src" path="src/java"/>
-	<classpathentry kind="src" path="src/test/unit"/>
-	<classpathentry kind="src" path="src/test/hdfs"/>
-	<classpathentry kind="src" path="src/ant"/>
-	<classpathentry kind="src" path="src/contrib/hdfsproxy/src/java"/>
-	<classpathentry kind="src" path="src/contrib/hdfsproxy/src/test"/>
-	<classpathentry kind="src" path="src/contrib/thriftfs/src/java"/>
-	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
-	<classpathentry kind="var" path="ANT_HOME/lib/ant.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/hadoop-common-0.22.0-SNAPSHOT.jar"/>
-  <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/test/hadoop-common-test-0.22.0-SNAPSHOT.jar"/>
-  <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/avro-1.3.2.jar" />
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-cli-1.2.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-codec-1.4.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-el-1.0.jar"/>
-  <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-daemon-1.0.1.jar" />
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-httpclient-3.1.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-logging-1.1.1.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-net-1.4.1.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/core-3.1.1.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/hsqldb-1.8.0.10.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jasper-compiler-5.5.12.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jasper-runtime-5.5.12.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jets3t-0.7.1.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jetty-6.1.14.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jetty-util-6.1.14.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jsp-2.1-6.1.14.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jsp-api-2.1-6.1.14.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/junit-4.8.1.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/kfs-0.3.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/log4j-1.2.15.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/oro-2.0.8.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/servlet-api-2.5-6.1.14.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/slf4j-api-1.5.11.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/test/slf4j-log4j12-1.5.11.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/xmlenc-0.52.jar"/>
-  <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/mockito-all-1.8.2.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/aspectjrt-1.6.5.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cactus.core.framework.uberjar.javaEE.14-1.8.0.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cactus.integration.ant-1.8.0.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cactus.integration.shared.api-1.8.0.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cargo-ant-0.9.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cargo-core-uberjar-0.9.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/standard-1.1.2.jar"/>	
-	<classpathentry kind="lib" path="src/contrib/thriftfs/lib/hadoopthriftapi.jar"/>
-	<classpathentry kind="lib" path="src/contrib/thriftfs/lib/libthrift.jar"/>
-	<classpathentry kind="lib" path="build/test/classes"/>
-	<classpathentry kind="lib" path="build/classes"/>
-	<classpathentry kind="lib" path="conf"/>
-	<classpathentry kind="output" path="build/eclipse-classes"/>
-</classpath>

+ 0 - 17
.eclipse.templates/.project

@@ -1,17 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<projectDescription>
-	<name>@PROJECT@</name>
-	<comment></comment>
-	<projects>
-	</projects>
-	<buildSpec>
-		<buildCommand>
-			<name>org.eclipse.jdt.core.javabuilder</name>
-			<arguments>
-			</arguments>
-		</buildCommand>
-	</buildSpec>
-	<natures>
-		<nature>org.eclipse.jdt.core.javanature</nature>
-	</natures>
-</projectDescription>

+ 10 - 0
CHANGES.txt

@@ -104,6 +104,11 @@ Release 0.22.0 - Unreleased
 
     HDFS-1435. Provide an option to store fsimage compressed. (hairong)
 
+    HDFS-903.  Support fsimage validation through MD5 checksum. (hairong)
+
+    HDFS-1457. Provide an option to throttle image transmission between
+    pimary and secondary NameNodes. (Yifei Lu and hairong via hairong)
+
   IMPROVEMENTS
 
     HDFS-1304. Add a new unit test for HftpFileSystem.open(..).  (szetszwo)
@@ -226,6 +231,8 @@ Release 0.22.0 - Unreleased
 
     HDFS-1485. Fix typo in BlockPlacementPolicy. (Jingguo Yao via shv)
 
+    HDFS-1035. Generate Eclipse's .classpath file from Ivy config. (nigel)
+
   OPTIMIZATIONS
 
     HDFS-1140. Speedup INode.getPathComponents. (Dmytro Molkov via shv)
@@ -400,6 +407,9 @@ Release 0.21.0 - Unreleased
 
     HDFS-1474. ant binary-system is broken (cos)
 
+    HDFS-1292. Allow artifacts to be published to the staging Apache Nexus
+    Maven Repository.  (Giridharan Kesavan via tomwhite)
+
 Release 0.21.0 - 2010-08-13
 
   INCOMPATIBLE CHANGES

+ 197 - 33
build.xml

@@ -150,6 +150,10 @@
   <loadproperties srcfile="${ivy.dir}/libraries.properties"/>
   <property name="ivy.jar" location="${ivy.dir}/ivy-${ivy.version}.jar"/>
   <property name="mvn.repo" value="http://repo2.maven.org/maven2"/>
+  <property name="asfrepo" value="https://repository.apache.org"/>
+  <property name="asfsnapshotrepo" value="${asfrepo}/content/repositories/snapshots"/>
+  <property name="asfstagingrepo"
+  value="${asfrepo}/service/local/staging/deploy/maven2"/>
   <property name="ivy_repo_url" value="${mvn.repo}/org/apache/ivy/ivy/${ivy.version}/ivy-${ivy.version}.jar"/>
   <property name="ant_task.jar" location="${ivy.dir}/maven-ant-tasks-${ant-task.version}.jar"/>
   <property name="ant_task_repo_url" value="${mvn.repo}/org/apache/maven/maven-ant-tasks/${ant-task.version}/maven-ant-tasks-${ant-task.version}.jar"/>
@@ -184,6 +188,13 @@
   <property name="jdiff.jar" value="${jdiff.home}/jdiff-${jdiff.version}.jar"/>
   <property name="xerces.jar" value="${jdiff.home}/xerces-${xerces.version}.jar"/>
 
+  <!-- Eclipse properties -->
+  <property name="build.dir.eclipse" value="${build.dir}/eclipse"/>
+  <property name="build.dir.eclipse-main-classes" value="${build.dir.eclipse}/classes-main"/>
+  <property name="build.dir.eclipse-main-generated-classes" value="${build.dir.eclipse}/classes-main-generated"/>
+  <property name="build.dir.eclipse-test-classes" value="${build.dir.eclipse}/classes-test"/>
+  <property name="build.dir.eclipse-contrib-classes" value="${build.dir.eclipse}/classes-contrib"/>
+
   <property name="clover.jar" location="${clover.home}/lib/clover.jar"/>
   <available property="clover.present" file="${clover.jar}" />
 
@@ -195,6 +206,10 @@
     </and>
   </condition>
 
+  <condition property="staging">
+     <equals arg1="${repo}" arg2="staging"/>
+  </condition>
+
   <!-- the normal classpath -->
   <path id="classpath">
     <pathelement location="${build.classes}"/>
@@ -1173,13 +1188,20 @@
   <!-- ================================================================== -->
   <!-- Clean.  Delete the build files, and their directories              -->
   <!-- ================================================================== -->
-  <target name="clean" depends="clean-contrib, clean-fi" description="Clean.  Delete the build files, and their directories">
+  <target name="clean" depends="clean-contrib, clean-fi, clean-sign" description="Clean.  Delete the build files, and their directories">
     <delete dir="${build.dir}"/>
     <delete dir="${build-fi.dir}"/>
     <delete dir="${docs.src}/build"/>
     <delete file="${hadoop-hdfs.pom}"/>
     <delete file="${hadoop-hdfs-test.pom}"/>
     <delete file="${hadoop-hdfs-instrumented.pom}"/>
+    <delete file="${hadoop-hdfs-instrumented-test.pom}"/>
+  </target>
+
+  <target name="clean-sign" description="Clean.  Delete .asc files">
+    <delete>
+      <fileset dir="." includes="**/**/*.asc"/>
+    </delete>
   </target>
 
   <target name="veryclean" depends="clean-cache,clean" 
@@ -1389,13 +1411,52 @@
   </exec>
 </target>
 	
-  <target name="eclipse-files" depends="init"
-          description="Generate files for Eclipse">
-    <pathconvert property="eclipse.project">
-      <path path="${basedir}"/>
-      <regexpmapper from="^.*/([^/]+)$$" to="\1" handledirsep="yes"/>
-    </pathconvert>
-    <copy todir="." overwrite="true">
+  <condition property="ant-eclipse.jar.exists">
+    <available file="${build.dir}/lib/ant-eclipse-1.0-jvm1.2.jar"/>
+  </condition>
+
+  <target name="ant-eclipse-download" unless="ant-eclipse.jar.exists"
+          description="Downloads the ant-eclipse binary.">
+    <get src="http://downloads.sourceforge.net/project/ant-eclipse/ant-eclipse/1.0/ant-eclipse-1.0.bin.tar.bz2"
+         dest="${build.dir}/ant-eclipse-1.0.bin.tar.bz2" usetimestamp="false" />
+
+    <untar src="${build.dir}/ant-eclipse-1.0.bin.tar.bz2"
+           dest="${build.dir}" compression="bzip2">
+      <patternset>
+        <include name="lib/ant-eclipse-1.0-jvm1.2.jar"/>
+      </patternset>
+    </untar>
+    <delete file="${build.dir}/java/ant-eclipse-1.0.bin.tar.bz2" />
+  </target>
+  
+  <target name="eclipse" 
+          depends="init,ant-eclipse-download,ivy-retrieve-common,ivy-retrieve-test"
+          description="Create eclipse project files">
+       <pathconvert property="eclipse.project">
+         <path path="${basedir}"/>
+         <regexpmapper from="^.*/([^/]+)$$" to="\1" handledirsep="yes"/>
+       </pathconvert>
+    <taskdef name="eclipse"
+             classname="prantl.ant.eclipse.EclipseTask"
+             classpath="${build.dir}/lib/ant-eclipse-1.0-jvm1.2.jar" />
+    <eclipse updatealways="true">
+      <project name="${eclipse.project}" />
+      <classpath>
+        <source path="${java.src.dir}"
+                output="${build.dir.eclipse-main-classes}" />
+        <source path="${build.src}"
+                output="${build.dir.eclipse-main-generated-classes}" />
+        <source path="${test.src.dir}/hdfs"
+                output="${build.dir.eclipse-test-classes}" />
+        <source path="${test.src.dir}/unit"
+                output="${build.dir.eclipse-test-classes}" />
+        <output path="${build.dir.eclipse-main-classes}" />
+        <library pathref="ivy-common.classpath" exported="true" />
+        <library pathref="ivy-test.classpath" exported="false" />
+        <library path="${conf.dir}" exported="false" />
+      </classpath>
+    </eclipse>
+	<copy todir="." overwrite="true">
       <fileset dir=".eclipse.templates">
       	<exclude name="**/README.txt"/>
       </fileset>
@@ -1432,16 +1493,8 @@
          uri="urn:maven-artifact-ant" classpathref="mvn-ant-task.classpath"/>
   </target>   
 
-  <target name="mvn-install-hdfs" depends="mvn-taskdef,jar,set-version">
-     <artifact:pom file="${hadoop-hdfs.pom}" id="hadoop.hdfs"/>
-     <artifact:install file="${hadoop-hdfs.jar}">
-        <pom refid="hadoop.hdfs"/>
-        <attach file="${hadoop-hdfs-sources.jar}" classifier="sources" />
-     </artifact:install>
-  </target>
-
-  <target name="mvn-install" depends="mvn-taskdef,jar,jar-hdfs-test,set-version,
-    -mvn-system-install">
+  <target name="mvn-install" depends="mvn-taskdef,jar,jar-test,set-version,-mvn-system-install"
+     description="To install hadoop hdfs and test jars to local filesystem's m2 cache">
      <artifact:pom file="${hadoop-hdfs.pom}" id="hadoop.hdfs"/>
      <artifact:pom file="${hadoop-hdfs-test.pom}" id="hadoop.hdfs.test"/>
      <artifact:install file="${hadoop-hdfs.jar}">
@@ -1452,38 +1505,150 @@
         <pom refid="hadoop.hdfs.test"/>
         <attach file="${hadoop-hdfs-test-sources.jar}" classifier="sources" />
      </artifact:install>
-  </target>
+   </target>
+  
+   <target name="mvn-deploy" depends="mvn-taskdef, jar, jar-test,
+     jar-system, jar-test-system, set-version, signanddeploy, simpledeploy"
+     description="To deploy hadoop hdfs and test jar's to apache
+     snapshot's repository"/>
 
-  <target name="mvn-deploy" depends="mvn-taskdef, jar, jar-hdfs-test, set-version,
-    -mvn-system-deploy">
-     <property name="repourl" value="https://repository.apache.org/content/repositories/snapshots" />
+   <target name="signanddeploy" if="staging" depends="sign">
      <artifact:pom file="${hadoop-hdfs.pom}" id="hadoop.hdfs"/>
      <artifact:pom file="${hadoop-hdfs-test.pom}" id="hadoop.hdfs.test"/>
+     <artifact:pom file="${hadoop-hdfs-instrumented.pom}" 
+       id="hadoop.hdfs.${herriot.suffix}"/>
+     <artifact:pom file="${hadoop-hdfs-instrumented-test.pom}" 
+       id="hadoop.hdfs.${herriot.suffix}.test"/>
+     <artifact:install-provider artifactId="wagon-http"
+     version="${wagon-http.version}"/>
 
-     <artifact:install-provider artifactId="wagon-http" version="1.0-beta-2"/>
      <artifact:deploy file="${hadoop-hdfs.jar}">
-         <remoteRepository id="apache.snapshots.https" url="${repourl}"/>
+       <remoteRepository id="apache.staging.https" url="${asfstagingrepo}"/>
+       <pom refid="hadoop.hdfs"/>
+       <attach file="${hadoop-hdfs.jar}.asc" type="jar.asc"/>
+       <attach file="${hadoop-hdfs.pom}.asc" type="pom.asc"/>
+       <attach file="${hadoop-hdfs-sources.jar}.asc" type="jar.asc"
+         classifier="sources" />
+       <attach file="${hadoop-hdfs-sources.jar}" classifier="sources"/>
+     </artifact:deploy>
+
+     <artifact:deploy file="${hadoop-hdfs-test.jar}">
+       <remoteRepository id="apache.staging.https" url="${asfstagingrepo}"/>
+       <pom refid="hadoop.hdfs.test"/>
+       <attach file="${hadoop-hdfs-test.jar}.asc" type="jar.asc"/>
+       <attach file="${hadoop-hdfs-test.pom}.asc" type="pom.asc"/>
+       <attach file="${hadoop-hdfs-test-sources.jar}.asc" type="jar.asc"
+         classifier="sources"/>
+       <attach file="${hadoop-hdfs-test-sources.jar}" classifier="sources"/>
+     </artifact:deploy>
+
+     <artifact:deploy file="${hadoop-hdfs-instrumented.jar}">
+       <remoteRepository id="apache.staging.https" url="${asfstagingrepo}"/>
+       <pom refid="hadoop.hdfs.${herriot.suffix}"/>
+       <attach file="${hadoop-hdfs-instrumented.jar}.asc" type="jar.asc"/>
+       <attach file="${hadoop-hdfs-instrumented.pom}.asc" type="pom.asc"/>
+       <attach file="${hadoop-hdfs-instrumented-sources.jar}.asc" 
+         type="jar.asc" classifier="sources"/>
+       <attach file="${hadoop-hdfs-instrumented-sources.jar}"
+         classifier="sources"/>
+     </artifact:deploy>
+
+      <artifact:deploy file="${hadoop-hdfs-instrumented-test.jar}">
+       <remoteRepository id="apache.staging.https" url="${asfstagingrepo}"/>
+       <pom refid="hadoop.hdfs.${herriot.suffix}.test"/>
+       <attach file="${hadoop-hdfs-instrumented-test.jar}.asc" type="jar.asc"/>
+       <attach file="${hadoop-hdfs-instrumented-test.pom}.asc" type="pom.asc"/>
+       <attach file="${hadoop-hdfs-instrumented-test-sources.jar}.asc" 
+         type="jar.asc" classifier="sources"/>
+       <attach file="${hadoop-hdfs-instrumented-test-sources.jar}"
+         classifier="sources"/>
+     </artifact:deploy>
+   </target>
+
+   <target name="sign" depends="clean-sign" if="staging">
+    <input message="password:>" addproperty="gpg.passphrase">
+     <handler classname="org.apache.tools.ant.input.SecureInputHandler" />
+    </input>
+    <macrodef name="sign-artifact" description="Signs the artifact">
+      <attribute name="input.file"/>
+      <attribute name="output.file" default="@{input.file}.asc"/>
+      <attribute name="gpg.passphrase"/>
+      <sequential>
+        <echo>Signing @{input.file} Sig File: @{output.file}</echo>
+        <exec executable="gpg" >
+          <arg value="--armor"/>
+          <arg value="--output"/>
+          <arg value="@{output.file}"/>
+          <arg value="--passphrase"/>
+          <arg value="@{gpg.passphrase}"/>
+          <arg value="--detach-sig"/>
+          <arg value="@{input.file}"/>
+        </exec>
+      </sequential>
+    </macrodef>
+    <sign-artifact input.file="${hadoop-hdfs.jar}" 
+     output.file="${hadoop-hdfs.jar}.asc" gpg.passphrase="${gpg.passphrase}"/>
+    <sign-artifact input.file="${hadoop-hdfs-test.jar}" 
+     output.file="${hadoop-hdfs-test.jar}.asc" gpg.passphrase="${gpg.passphrase}"/>
+    <sign-artifact input.file="${hadoop-hdfs-sources.jar}" 
+     output.file="${hadoop-hdfs-sources.jar}.asc" gpg.passphrase="${gpg.passphrase}"/>
+    <sign-artifact input.file="${hadoop-hdfs-test-sources.jar}" 
+     output.file="${hadoop-hdfs-test-sources.jar}.asc" gpg.passphrase="${gpg.passphrase}"/>
+    <sign-artifact input.file="${hadoop-hdfs.pom}" 
+     output.file="${hadoop-hdfs.pom}.asc" gpg.passphrase="${gpg.passphrase}"/>
+    <sign-artifact input.file="${hadoop-hdfs-test.pom}" 
+     output.file="${hadoop-hdfs-test.pom}.asc" gpg.passphrase="${gpg.passphrase}"/>
+    <sign-artifact input.file="${hadoop-hdfs-instrumented.jar}" 
+     output.file="${hadoop-hdfs-instrumented.jar}.asc" gpg.passphrase="${gpg.passphrase}"/>
+    <sign-artifact input.file="${hadoop-hdfs-instrumented.pom}" 
+     output.file="${hadoop-hdfs-instrumented.pom}.asc" gpg.passphrase="${gpg.passphrase}"/>
+    <sign-artifact input.file="${hadoop-hdfs-instrumented-sources.jar}" 
+     output.file="${hadoop-hdfs-instrumented-sources.jar}.asc" gpg.passphrase="${gpg.passphrase}"/>
+    <sign-artifact input.file="${hadoop-hdfs-instrumented-test.jar}" 
+     output.file="${hadoop-hdfs-instrumented-test.jar}.asc" gpg.passphrase="${gpg.passphrase}"/>
+    <sign-artifact input.file="${hadoop-hdfs-instrumented-test.pom}" 
+     output.file="${hadoop-hdfs-instrumented-test.pom}.asc" gpg.passphrase="${gpg.passphrase}"/>
+    <sign-artifact input.file="${hadoop-hdfs-instrumented-test-sources.jar}" 
+     output.file="${hadoop-hdfs-instrumented-test-sources.jar}.asc" gpg.passphrase="${gpg.passphrase}"/>
+  </target>
+
+  <target name="simpledeploy" unless="staging">
+     <artifact:pom file="${hadoop-hdfs.pom}" id="hadoop.hdfs"/>
+     <artifact:pom file="${hadoop-hdfs-test.pom}" id="hadoop.hdfs.test"/>
+     <artifact:pom file="${hadoop-hdfs-instrumented.pom}" 
+       id="hadoop.hdfs.${herriot.suffix}"/>
+
+     <artifact:install-provider artifactId="wagon-http" version="${wagon-http.version}"/>
+     <artifact:deploy file="${hadoop-hdfs.jar}">
+         <remoteRepository id="apache.snapshots.https" url="${asfsnapshotrepo}"/>
          <pom refid="hadoop.hdfs"/>
-        <attach file="${hadoop-hdfs-sources.jar}" classifier="sources" />
+         <attach file="${hadoop-hdfs-sources.jar}" classifier="sources" />
      </artifact:deploy>
+
      <artifact:deploy file="${hadoop-hdfs-test.jar}">
-         <remoteRepository id="apache.snapshots.https" url="${repourl}"/>
+         <remoteRepository id="apache.snapshots.https" url="${asfsnapshotrepo}"/>
          <pom refid="hadoop.hdfs.test"/>
-        <attach file="${hadoop-hdfs-test-sources.jar}" classifier="sources" />
+         <attach file="${hadoop-hdfs-test-sources.jar}" classifier="sources" />
+     </artifact:deploy> 
+
+     <artifact:deploy file="${hadoop-hdfs-instrumented.jar}">
+         <remoteRepository id="apache.snapshots.https" url="${asfsnapshotrepo}"/>
+         <pom refid="hadoop.hdfs.${herriot.suffix}"/>
+         <attach file="${hadoop-hdfs-instrumented-sources.jar}" classifier="sources" />
      </artifact:deploy>
   </target>
-  
+
   <target name="set-version">
     <delete file="${basedir}/ivy/hadoop-hdfs.xml"/>
     <delete file="${basedir}/ivy/hadoop-hdfs-test.xml"/>
-    <delete file="${hadoop-hdfs-instrumented.pom}"/>
-    <delete file="${hadoop-hdfs-instrumented-test.pom}"/>
+    <delete file="${basedir}/ivy/hadoop-hdfs-${herriot.suffix}.xml"/>
+    <delete file="${basedir}/ivy/hadoop-hdfs-${herriot.suffix}-test.xml"/>
     <copy file="${basedir}/ivy/hadoop-hdfs-template.xml" tofile="${basedir}/ivy/hadoop-hdfs.xml"/>
     <copy file="${basedir}/ivy/hadoop-hdfs-test-template.xml" tofile="${basedir}/ivy/hadoop-hdfs-test.xml"/>
     <copy file="${basedir}/ivy/hadoop-hdfs-${herriot.suffix}-template.xml"
-      tofile="${hadoop-hdfs-instrumented.pom}"/>
+      tofile="${basedir}/ivy/hadoop-hdfs-${herriot.suffix}.xml"/>
     <copy file="${basedir}/ivy/hadoop-hdfs-${herriot.suffix}-test-template.xml"
-      tofile="${hadoop-hdfs-instrumented-test.pom}"/>
+      tofile="${basedir}/ivy/hadoop-hdfs-${herriot.suffix}-test.xml"/>
     <replaceregexp byline="true">
       <regexp pattern="@version"/>
       <substitution expression="${version}"/>
@@ -1495,7 +1660,6 @@
       </fileset>
     </replaceregexp>
   </target>
- 
 
   <!--
   To avoid Ivy leaking things across big projects, always load Ivy in the same classloader.

+ 1 - 0
ivy/libraries.properties

@@ -65,6 +65,7 @@ servlet-api.version=2.5
 slf4j-api.version=1.5.11
 slf4j-log4j12.version=1.5.11
 
+wagon-http.version=1.0-beta-2
 xmlenc.version=0.52
 xerces.version=1.4.4
 

+ 10 - 0
src/java/hdfs-default.xml

@@ -544,4 +544,14 @@ creations/deletions), or "all".</description>
   </description>
 </property>
 
+<property>
+  <name>dfs.image.transfer.bandwidthPerSec</name>
+  <value>0</value>
+  <description>
+        Specifies the maximum amount of bandwidth that can be utilized for image
+        transfer in term of the number of bytes per second.
+        A default value of 0 indicates that throttling is disabled. 
+  </description>
+</property>
+
 </configuration>

+ 4 - 0
src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -206,6 +206,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String DFS_IMAGE_COMPRESSION_CODEC_DEFAULT =
                                    "org.apache.hadoop.io.compress.DefaultCodec";
 
+  public static final String DFS_IMAGE_TRANSFER_RATE_KEY =
+                                           "dfs.image.transfer.bandwidthPerSec";
+  public static final long DFS_IMAGE_TRANSFER_RATE_DEFAULT = 0;  //no throttling
+
   //Keys with no defaults
   public static final String  DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
   public static final String  DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = "dfs.datanode.socket.write.timeout";

+ 2 - 2
src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java

@@ -91,7 +91,7 @@ public interface FSConstants {
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -25;
+  public static final int LAYOUT_VERSION = -26;
   // Current version: 
-  // -25: support iamge compression.
+  // -26: support image checksum.
 }

+ 3 - 2
src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
@@ -74,7 +75,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
   private String mirrorAddr;
   private DataOutputStream mirrorOut;
   private Daemon responder = null;
-  private BlockTransferThrottler throttler;
+  private DataTransferThrottler throttler;
   private FSDataset.BlockWriteStreams streams;
   private String clientName;
   DatanodeInfo srcDataNode = null;
@@ -608,7 +609,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
       DataOutputStream mirrOut, // output to next datanode
       DataInputStream mirrIn,   // input from next datanode
       DataOutputStream replyOut,  // output to previous datanode
-      String mirrAddr, BlockTransferThrottler throttlerArg,
+      String mirrAddr, DataTransferThrottler throttlerArg,
       int numTargets) throws IOException {
 
       boolean responderClosed = false;

+ 3 - 2
src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.SocketOutputStream;
 import org.apache.hadoop.util.DataChecksum;
@@ -68,7 +69,7 @@ class BlockSender implements java.io.Closeable, FSConstants {
   private boolean transferToAllowed = true;
   private boolean blockReadFully; //set when the whole block is read
   private boolean verifyChecksum; //if true, check is verified while reading
-  private BlockTransferThrottler throttler;
+  private DataTransferThrottler throttler;
   private final String clientTraceFmt; // format of client trace log message
 
   /**
@@ -420,7 +421,7 @@ class BlockSender implements java.io.Closeable, FSConstants {
    * @return total bytes reads, including crc.
    */
   long sendBlock(DataOutputStream out, OutputStream baseStream, 
-                 BlockTransferThrottler throttler) throws IOException {
+                 DataTransferThrottler throttler) throws IOException {
     if( out == null ) {
       throw new IOException( "out stream is null" );
     }

+ 3 - 2
src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java

@@ -50,6 +50,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.StringUtils;
 
@@ -101,7 +102,7 @@ class DataBlockScanner implements Runnable {
   
   Random random = new Random();
   
-  BlockTransferThrottler throttler = null;
+  DataTransferThrottler throttler = null;
   
   // Reconciles blocks on disk to blocks in memory
   DirectoryScanner dirScanner;
@@ -253,7 +254,7 @@ class DataBlockScanner implements Runnable {
     }
     
     synchronized (this) {
-      throttler = new BlockTransferThrottler(200, MAX_SCAN_RATE);
+      throttler = new DataTransferThrottler(200, MAX_SCAN_RATE);
     }
   }
 

+ 2 - 1
src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java

@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.balancer.Balancer;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -64,7 +65,7 @@ class DataXceiverServer implements Runnable, FSConstants {
    * It limits the number of block moves for balancing and
    * the total amount of bandwidth they can use.
    */
-  static class BlockBalanceThrottler extends BlockTransferThrottler {
+  static class BlockBalanceThrottler extends DataTransferThrottler {
    private int numThreads;
    
    /**Constructor

+ 2 - 1
src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java

@@ -165,6 +165,7 @@ public class BackupStorage extends FSImage {
 
     // set storage fields
     setStorageInfo(sig);
+    imageDigest = sig.imageDigest;
     checkpointTime = sig.checkpointTime;
   }
 
@@ -355,7 +356,7 @@ public class BackupStorage extends FSImage {
     editLog.revertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE);
 
     // write version file
-    resetVersion(false);
+    resetVersion(false, imageDigest);
 
     // wake up journal writer
     synchronized(this) {

+ 27 - 7
src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java

@@ -24,6 +24,7 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
+import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.WritableComparable;
 
 /**
@@ -35,6 +36,7 @@ public class CheckpointSignature extends StorageInfo
   private static final String FIELD_SEPARATOR = ":";
   long editsTime = -1L;
   long checkpointTime = -1L;
+  MD5Hash imageDigest = null;
 
   public CheckpointSignature() {}
 
@@ -42,16 +44,26 @@ public class CheckpointSignature extends StorageInfo
     super(fsImage);
     editsTime = fsImage.getEditLog().getFsEditTime();
     checkpointTime = fsImage.getCheckpointTime();
+    imageDigest = fsImage.imageDigest;
   }
 
   CheckpointSignature(String str) {
     String[] fields = str.split(FIELD_SEPARATOR);
-    assert fields.length == 5 : "Must be 5 fields in CheckpointSignature";
+    assert fields.length == 6 : "Must be 6 fields in CheckpointSignature";
     layoutVersion = Integer.valueOf(fields[0]);
     namespaceID = Integer.valueOf(fields[1]);
     cTime = Long.valueOf(fields[2]);
     editsTime = Long.valueOf(fields[3]);
     checkpointTime = Long.valueOf(fields[4]);
+    imageDigest = new MD5Hash(fields[5]);
+  }
+
+  /**
+   * Get the MD5 image digest
+   * @return the MD5 image digest
+   */
+  MD5Hash getImageDigest() {
+    return imageDigest;
   }
 
   public String toString() {
@@ -59,20 +71,23 @@ public class CheckpointSignature extends StorageInfo
          + String.valueOf(namespaceID) + FIELD_SEPARATOR
          + String.valueOf(cTime) + FIELD_SEPARATOR
          + String.valueOf(editsTime) + FIELD_SEPARATOR
-         + String.valueOf(checkpointTime);
+         + String.valueOf(checkpointTime) + FIELD_SEPARATOR
+         +  imageDigest.toString();
   }
 
   void validateStorageInfo(FSImage si) throws IOException {
     if(layoutVersion != si.layoutVersion
         || namespaceID != si.namespaceID || cTime != si.cTime
-        || checkpointTime != si.checkpointTime) {
+        || checkpointTime != si.checkpointTime ||
+        !imageDigest.equals(si.imageDigest)) {
       // checkpointTime can change when the image is saved - do not compare
       throw new IOException("Inconsistent checkpoint fields.\n"
           + "LV = " + layoutVersion + " namespaceID = " + namespaceID
-          + " cTime = " + cTime + "; checkpointTime = " + checkpointTime 
+          + " cTime = " + cTime + "; checkpointTime = " + checkpointTime
+          + " ; imageDigest = " + imageDigest
           + ".\nExpecting respectively: "
           + si.layoutVersion + "; " + si.namespaceID + "; " + si.cTime
-          + "; " + si.checkpointTime);
+          + "; " + si.checkpointTime + "; " + si.imageDigest);
     }
   }
 
@@ -87,7 +102,8 @@ public class CheckpointSignature extends StorageInfo
       (cTime < o.cTime) ? -1 : (cTime > o.cTime) ? 1 :
       (editsTime < o.editsTime) ? -1 : (editsTime > o.editsTime) ? 1 :
       (checkpointTime < o.checkpointTime) ? -1 : 
-                  (checkpointTime > o.checkpointTime) ? 1 : 0;
+                  (checkpointTime > o.checkpointTime) ? 1 :
+                    imageDigest.compareTo(o.imageDigest);
   }
 
   public boolean equals(Object o) {
@@ -99,7 +115,8 @@ public class CheckpointSignature extends StorageInfo
 
   public int hashCode() {
     return layoutVersion ^ namespaceID ^
-            (int)(cTime ^ editsTime ^ checkpointTime);
+            (int)(cTime ^ editsTime ^ checkpointTime) ^
+            imageDigest.hashCode();
   }
 
   /////////////////////////////////////////////////
@@ -109,11 +126,14 @@ public class CheckpointSignature extends StorageInfo
     super.write(out);
     out.writeLong(editsTime);
     out.writeLong(checkpointTime);
+    imageDigest.write(out);
   }
 
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
     editsTime = in.readLong();
     checkpointTime = in.readLong();
+    imageDigest = new MD5Hash();
+    imageDigest.readFields(in);
   }
 }

+ 2 - 1
src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java

@@ -211,7 +211,8 @@ class Checkpointer extends Daemon {
     int httpPort = httpSocAddr.getPort();
     String fileid = "putimage=1&port=" + httpPort +
       "&machine=" + infoBindAddress +
-      "&token=" + sig.toString();
+      "&token=" + sig.toString() +
+      "&newChecksum=" + getFSImage().imageDigest.toString();
     LOG.info("Posted URL " + backupNode.nnHttpAddress + fileid);
     TransferFsImage.getFileClient(backupNode.nnHttpAddress, fileid, (File[])null);
   }

+ 63 - 10
src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -33,6 +33,9 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
+import java.security.DigestInputStream;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -75,6 +78,7 @@ import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.net.DNS;
@@ -95,6 +99,7 @@ public class FSImage extends Storage {
 
   private String blockpoolID = "";   // id of the block pool
   
+  static final String MESSAGE_DIGEST_PROPERTY = "imageMD5Digest";
   //
   // The filenames used for storing the images
   //
@@ -139,6 +144,8 @@ public class FSImage extends Storage {
   protected long checkpointTime = -1L;  // The age of the image
   protected FSEditLog editLog = null;
   private boolean isUpgradeFinalized = false;
+  protected MD5Hash imageDigest = null;
+  protected MD5Hash newImageDigest = null;
 
   /**
    * flag that controls if we try to restore failed storages
@@ -741,6 +748,20 @@ public class FSImage extends Storage {
     setDistributedUpgradeState(
         sDUS == null? false : Boolean.parseBoolean(sDUS),
         sDUV == null? getLayoutVersion() : Integer.parseInt(sDUV));
+    
+    String sMd5 = props.getProperty(MESSAGE_DIGEST_PROPERTY);
+    if (layoutVersion <= -26) {
+      if (sMd5 == null) {
+        throw new InconsistentFSStateException(sd.getRoot(),
+            "file " + STORAGE_FILE_VERSION + " does not have MD5 image digest.");
+      }
+      this.imageDigest = new MD5Hash(sMd5);
+    } else if (sMd5 != null) {
+      throw new InconsistentFSStateException(sd.getRoot(),
+          "file " + STORAGE_FILE_VERSION +
+          " has image MD5 digest when version is " + layoutVersion);
+    }
+
     this.checkpointTime = readCheckpointTime(sd);
   }
 
@@ -787,6 +808,12 @@ public class FSImage extends Storage {
       props.setProperty("distributedUpgradeState", Boolean.toString(uState));
       props.setProperty("distributedUpgradeVersion", Integer.toString(uVersion)); 
     }
+    if (imageDigest == null) {
+      imageDigest = MD5Hash.digest(
+          new FileInputStream(getImageFile(sd, NameNodeFile.IMAGE)));
+    }
+    props.setProperty(MESSAGE_DIGEST_PROPERTY, imageDigest.toString());
+
     writeCheckpointTime(sd);
   }
 
@@ -1108,7 +1135,10 @@ public class FSImage extends Storage {
     // Load in bits
     //
     boolean needToSave = true;
-    FileInputStream fin = new FileInputStream(curFile);
+    MessageDigest digester = MD5Hash.getDigester();
+    DigestInputStream fin = new DigestInputStream(
+         new FileInputStream(curFile), digester);
+
     DataInputStream in = new DataInputStream(fin);
     try {
       /*
@@ -1270,7 +1300,17 @@ public class FSImage extends Storage {
     } finally {
       in.close();
     }
-    
+
+    // verify checksum
+    MD5Hash readImageMd5 = new MD5Hash(digester.digest());
+    if (imageDigest == null) {
+      imageDigest = readImageMd5; // set this fsimage's checksum
+    } else if (!imageDigest.equals(readImageMd5)) {
+      throw new IOException("Image file " + curFile + 
+          "is corrupt with MD5 checksum of " + readImageMd5 +
+          " but expecting " + imageDigest);
+    }
+
     LOG.info("Image file of size " + curFile.length() + " loaded in " 
         + (now() - startTime)/1000 + " seconds.");
 
@@ -1351,7 +1391,10 @@ public class FSImage extends Storage {
     //
     // Write out data
     //
-    FileOutputStream fos = new FileOutputStream(newFile);
+    FileOutputStream fout = new FileOutputStream(newFile);
+    MessageDigest digester = MD5Hash.getDigester();
+    DigestOutputStream fos = new DigestOutputStream(fout, digester);
+
     DataOutputStream out = new DataOutputStream(fos);
     try {
       out.writeInt(FSConstants.LAYOUT_VERSION);
@@ -1383,15 +1426,21 @@ public class FSImage extends Storage {
       strbuf = null;
 
       out.flush();
-      fos.getChannel().force(true);
+      fout.getChannel().force(true);
     } finally {
       out.close();
     }
 
+    // set md5 of the saved image
+    setImageDigest( new MD5Hash(digester.digest()));
+
     LOG.info("Image file of size " + newFile.length() + " saved in " 
         + (now() - startTime)/1000 + " seconds.");
   }
 
+  public void setImageDigest(MD5Hash digest) {
+    this.imageDigest = digest;
+  }
   /**
    * Save the contents of the FS image and create empty edits.
    * 
@@ -1828,11 +1877,14 @@ public class FSImage extends Storage {
    * Moves fsimage.ckpt to fsImage and edits.new to edits
    * Reopens the new edits file.
    */
-  void rollFSImage() throws IOException {
+  void rollFSImage(CheckpointSignature sig, 
+      boolean renewCheckpointTime) throws IOException {
+    sig.validateStorageInfo(this);
     rollFSImage(true);
   }
 
-  void rollFSImage(boolean renewCheckpointTime) throws IOException {
+  private void rollFSImage(boolean renewCheckpointTime)
+  throws IOException {
     if (ckptState != CheckpointStates.UPLOAD_DONE
       && !(ckptState == CheckpointStates.ROLLED_EDITS
       && getNumStorageDirs(NameNodeDirType.IMAGE) == 0)) {
@@ -1856,7 +1908,7 @@ public class FSImage extends Storage {
     // Renames new image
     //
     renameCheckpoint();
-    resetVersion(renewCheckpointTime);
+    resetVersion(renewCheckpointTime, newImageDigest);
   }
 
   /**
@@ -1890,10 +1942,11 @@ public class FSImage extends Storage {
   /**
    * Updates version and fstime files in all directories (fsimage and edits).
    */
-  void resetVersion(boolean renewCheckpointTime) throws IOException {
+  void resetVersion(boolean renewCheckpointTime, MD5Hash newImageDigest) throws IOException {
     this.layoutVersion = FSConstants.LAYOUT_VERSION;
     if(renewCheckpointTime)
       this.checkpointTime = now();
+    this.imageDigest = newImageDigest;
     
     ArrayList<StorageDirectory> al = null;
     for (Iterator<StorageDirectory> it = 
@@ -2030,7 +2083,7 @@ public class FSImage extends Storage {
    * @param remoteNNRole
    * @throws IOException
    */
-  void endCheckpoint(CheckpointSignature sig, 
+  void endCheckpoint(CheckpointSignature sig,
                      NamenodeRole remoteNNRole) throws IOException {
     sig.validateStorageInfo(this);
     // Renew checkpoint time for the active if the other is a checkpoint-node.
@@ -2039,7 +2092,7 @@ public class FSImage extends Storage {
     // The backup-node always has up-to-date image and will have the same
     // checkpoint time as the active node.
     boolean renewCheckpointTime = remoteNNRole.equals(NamenodeRole.CHECKPOINT);
-    rollFSImage(renewCheckpointTime);
+    rollFSImage(sig, renewCheckpointTime);
   }
 
   CheckpointStates getCheckpointState() {

+ 8 - 2
src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -4241,7 +4241,13 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     }
   }
 
-  void rollFSImage() throws IOException {
+  /**
+   * Moves fsimage.ckpt to fsImage and edits.new to edits
+   * Reopens the new edits file.
+   *
+   * @param sig the signature of this checkpoint (old image)
+   */
+  void rollFSImage(CheckpointSignature sig) throws IOException {
     writeLock();
     try {
     if (isInSafeMode()) {
@@ -4249,7 +4255,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
                                   safeMode);
     }
     LOG.info("Roll FSImage from " + Server.getRemoteAddress());
-    getFSImage().rollFSImage();
+    getFSImage().rollFSImage(sig, true);
     } finally {
       writeUnlock();
     }

+ 22 - 2
src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 
@@ -77,16 +78,19 @@ public class GetImageServlet extends HttpServlet {
                 String.valueOf(nnImage.getFsImageName().length()));
             // send fsImage
             TransferFsImage.getFileServer(response.getOutputStream(),
-                nnImage.getFsImageName()); 
+                nnImage.getFsImageName(),
+                getThrottler(conf)); 
           } else if (ff.getEdit()) {
             response.setHeader(TransferFsImage.CONTENT_LENGTH,
                 String.valueOf(nnImage.getFsEditName().length()));
             // send edits
             TransferFsImage.getFileServer(response.getOutputStream(),
-                nnImage.getFsEditName());
+                nnImage.getFsEditName(),
+                getThrottler(conf));
           } else if (ff.putImage()) {
             // issue a HTTP get request to download the new fsimage 
             nnImage.validateCheckpointUpload(ff.getToken());
+            nnImage.newImageDigest = ff.getNewChecksum();
             reloginIfNecessary().doAs(new PrivilegedExceptionAction<Void>() {
                 @Override
                 public Void run() throws Exception {
@@ -122,6 +126,22 @@ public class GetImageServlet extends HttpServlet {
     }
   }
   
+  /**
+   * Construct a throttler from conf
+   * @param conf configuration
+   * @return a data transfer throttler
+   */
+  private final DataTransferThrottler getThrottler(Configuration conf) {
+    long transferBandwidth = 
+      conf.getLong(DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY,
+                   DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_DEFAULT);
+    DataTransferThrottler throttler = null;
+    if (transferBandwidth > 0) {
+      throttler = new DataTransferThrottler(transferBandwidth);
+    }
+    return throttler;
+  }
+  
   @SuppressWarnings("deprecation")
   protected boolean isValidRequestor(String remoteUser, Configuration conf)
       throws IOException {

+ 3 - 3
src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -1111,9 +1111,9 @@ public class NameNode implements NamenodeProtocols, FSConstants {
   /**
    * Roll the image 
    */
-  @Deprecated
-  public void rollFsImage() throws IOException {
-    namesystem.rollFSImage();
+  @Deprecated @Override
+  public void rollFsImage(CheckpointSignature sig) throws IOException {
+    namesystem.rollFSImage(sig);
   }
     
   public void finalizeUpgrade() throws IOException {

+ 3 - 2
src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java

@@ -370,7 +370,8 @@ public class SecondaryNameNode implements Runnable {
   private void putFSImage(CheckpointSignature sig) throws IOException {
     String fileid = "putimage=1&port=" + imagePort +
       "&machine=" + infoBindAddress + 
-      "&token=" + sig.toString();
+      "&token=" + sig.toString() +
+      "&newChecksum=" + checkpointImage.imageDigest;
     LOG.info("Posted URL " + fsName + fileid);
     TransferFsImage.getFileClient(fsName, fileid, (File[])null);
   }
@@ -433,7 +434,7 @@ public class SecondaryNameNode implements Runnable {
                             "after uploading new image to NameNode");
     }
 
-    namenode.rollFsImage();
+    namenode.rollFsImage(sig);
     checkpointImage.endCheckpoint();
 
     LOG.warn("Checkpoint done. New Image Size: " 

+ 18 - 1
src/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java

@@ -27,7 +27,9 @@ import javax.servlet.http.HttpServletRequest;
 
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.hdfs.DFSUtil.ErrorSimulator;
+import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.security.UserGroupInformation;
 
 
@@ -44,6 +46,7 @@ class TransferFsImage implements FSConstants {
   private int remoteport;
   private String machineName;
   private CheckpointSignature token;
+  private MD5Hash newChecksum = null;
   
   /**
    * File downloader.
@@ -76,6 +79,8 @@ class TransferFsImage implements FSConstants {
         machineName = pmap.get("machine")[0];
       } else if (key.equals("token")) { 
         token = new CheckpointSignature(pmap.get("token")[0]);
+      } else if (key.equals("newChecksum")) {
+        newChecksum = new MD5Hash(pmap.get("newChecksum")[0]);
       }
     }
 
@@ -101,6 +106,14 @@ class TransferFsImage implements FSConstants {
     return token;
   }
 
+  /**
+   * Get the MD5 digest of the new image
+   * @return the MD5 digest of the new image
+   */
+  MD5Hash getNewChecksum() {
+    return newChecksum;
+  }
+  
   String getInfoServer() throws IOException{
     if (machineName == null || remoteport == 0) {
       throw new IOException ("MachineName and port undefined");
@@ -112,7 +125,8 @@ class TransferFsImage implements FSConstants {
    * A server-side method to respond to a getfile http request
    * Copies the contents of the local file into the output stream.
    */
-  static void getFileServer(OutputStream outstream, File localfile) 
+  static void getFileServer(OutputStream outstream, File localfile,
+      DataTransferThrottler throttler) 
     throws IOException {
     byte buf[] = new byte[BUFFER_SIZE];
     FileInputStream infile = null;
@@ -141,6 +155,9 @@ class TransferFsImage implements FSConstants {
           break;
         }
         outstream.write(buf, 0, num);
+        if (throttler != null) {
+          throttler.throttle(num);
+        }
       }
     } finally {
       if (infile != null) {

+ 7 - 4
src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java

@@ -42,10 +42,10 @@ public interface NamenodeProtocol extends VersionedProtocol {
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
    * 
-   * 4: new method added: getAccessKeys()
-   *      
+   * 5: Added one parameter to rollFSImage() and
+   *    changed the definition of CheckpointSignature
    */
-  public static final long versionID = 4L;
+  public static final long versionID = 5L;
 
   // Error codes passed by errorReport().
   final static int NOTIFY = 0;
@@ -108,12 +108,15 @@ public interface NamenodeProtocol extends VersionedProtocol {
    * Rolls the fsImage log. It removes the old fsImage, copies the
    * new image to fsImage, removes the old edits and renames edits.new 
    * to edits. The call fails if any of the four files are missing.
+   * 
+   * @param sig the signature of this checkpoint (old fsimage)
    * @throws IOException
    * @deprecated 
    *    See {@link org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode}
    */
   @Deprecated
-  public void rollFsImage() throws IOException;
+  public void rollFsImage(CheckpointSignature sig)
+  throws IOException;
 
   /**
    * Request name-node version and storage information.

+ 8 - 8
src/java/org/apache/hadoop/hdfs/server/datanode/BlockTransferThrottler.java → src/java/org/apache/hadoop/hdfs/util/DataTransferThrottler.java

@@ -15,15 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.datanode;
+package org.apache.hadoop.hdfs.util;
 
 /** 
- * a class to throttle the block transfers.
+ * a class to throttle the data transfers.
  * This class is thread safe. It can be shared by multiple threads.
  * The parameter bandwidthPerSec specifies the total bandwidth shared by
  * threads.
  */
-class BlockTransferThrottler {
+public class DataTransferThrottler {
   private long period;          // period over which bw is imposed
   private long periodExtension; // Max period over which bw accumulates.
   private long bytesPerPeriod; // total number of bytes can be sent in each period
@@ -34,7 +34,7 @@ class BlockTransferThrottler {
   /** Constructor 
    * @param bandwidthPerSec bandwidth allowed in bytes per second. 
    */
-  BlockTransferThrottler(long bandwidthPerSec) {
+  public DataTransferThrottler(long bandwidthPerSec) {
     this(500, bandwidthPerSec);  // by default throttling period is 500ms 
   }
 
@@ -44,7 +44,7 @@ class BlockTransferThrottler {
    *        period.
    * @param bandwidthPerSec bandwidth allowed in bytes per second. 
    */
-  BlockTransferThrottler(long period, long bandwidthPerSec) {
+  public DataTransferThrottler(long period, long bandwidthPerSec) {
     this.curPeriodStart = System.currentTimeMillis();
     this.period = period;
     this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000;
@@ -54,7 +54,7 @@ class BlockTransferThrottler {
   /**
    * @return current throttle bandwidth in bytes per second.
    */
-  synchronized long getBandwidth() {
+  public synchronized long getBandwidth() {
     return bytesPerPeriod*1000/period;
   }
   
@@ -64,7 +64,7 @@ class BlockTransferThrottler {
    * 
    * @param bytesPerSecond 
    */
-  synchronized void setBandwidth(long bytesPerSecond) {
+  public synchronized void setBandwidth(long bytesPerSecond) {
     if ( bytesPerSecond <= 0 ) {
       throw new IllegalArgumentException("" + bytesPerSecond);
     }
@@ -78,7 +78,7 @@ class BlockTransferThrottler {
    * @param numOfBytes
    *     number of bytes sent/received since last time throttle was called
    */
-  synchronized void throttle(long numOfBytes) {
+  public synchronized void throttle(long numOfBytes) {
     if ( numOfBytes <= 0 ) {
       return;
     }

+ 2 - 1
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.net.NetUtils;
 /**
@@ -65,7 +66,7 @@ public class TestBlockReplacement extends TestCase {
     final long TOTAL_BYTES =6*bandwidthPerSec; 
     long bytesToSend = TOTAL_BYTES; 
     long start = Util.now();
-    BlockTransferThrottler throttler = new BlockTransferThrottler(bandwidthPerSec);
+    DataTransferThrottler throttler = new DataTransferThrottler(bandwidthPerSec);
     long totalBytes = 0L;
     long bytesSent = 1024*512L; // 0.5MB
     throttler.throttle(bytesSent);

+ 10 - 2
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java

@@ -95,13 +95,16 @@ public class TestSaveNamespace {
     // Replace the FSImage with a spy
     FSImage originalImage = fsn.dir.fsImage;
     FSImage spyImage = spy(originalImage);
+    spyImage.setStorageDirectories(
+        FSNamesystem.getNamespaceDirs(conf), 
+        FSNamesystem.getNamespaceEditsDirs(conf));
     fsn.dir.fsImage = spyImage;
 
     // inject fault
     switch(fault) {
     case SAVE_FSIMAGE:
       // The spy throws a RuntimeException when writing to the second directory
-      doAnswer(new FaultySaveImage(originalImage)).
+      doAnswer(new FaultySaveImage(spyImage)).
         when(spyImage).saveFSImage((File)anyObject());
       break;
     case MOVE_CURRENT:
@@ -128,7 +131,8 @@ public class TestSaveNamespace {
       }
 
       // Now shut down and restart the namesystem
-      fsn.close();
+      originalImage.close();
+      fsn.close();      
       fsn = null;
 
       // Start a new namesystem, which should be able to recover
@@ -169,6 +173,9 @@ public class TestSaveNamespace {
     // Replace the FSImage with a spy
     final FSImage originalImage = fsn.dir.fsImage;
     FSImage spyImage = spy(originalImage);
+    spyImage.setStorageDirectories(
+        FSNamesystem.getNamespaceDirs(conf), 
+        FSNamesystem.getNamespaceEditsDirs(conf));
     fsn.dir.fsImage = spyImage;
 
     try {
@@ -183,6 +190,7 @@ public class TestSaveNamespace {
       fsn.saveNamespace();
 
       // Now shut down and restart the NN
+      originalImage.close();
       fsn.close();
       fsn = null;
 

+ 80 - 0
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartup.java

@@ -21,10 +21,14 @@ import static org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption.I
 import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
 
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.net.URI;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Properties;
 import java.util.Random;
 
 import junit.framework.TestCase;
@@ -46,6 +50,8 @@ import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -405,4 +411,78 @@ public class TestStartup extends TestCase {
     namenode.stop();
     namenode.join();
   }
+  
+  public void testImageChecksum() throws Exception {
+    LOG.info("Test uncompressed image checksum");
+    testImageChecksum(false);
+    LOG.info("Test compressed image checksum");
+    testImageChecksum(true);
+  }
+
+  private void testImageChecksum(boolean compress) throws Exception {
+    Configuration conf = new Configuration();
+    FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
+    conf.set("dfs.http.address", "127.0.0.1:0");
+    File base_dir = new File(
+        System.getProperty("test.build.data", "build/test/data"), "dfs/");
+    conf.set("dfs.name.dir", new File(base_dir, "name").getPath());
+    conf.setBoolean("dfs.permissions", false);
+    if (compress) {
+      conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY, true);
+    }
+
+    NameNode.format(conf);
+
+    // create an image
+    LOG.info("Create an fsimage");
+    NameNode namenode = new NameNode(conf);
+    namenode.getNamesystem().mkdirs("/test",
+        new PermissionStatus("hairong", null, FsPermission.getDefault()), true);
+    assertTrue(namenode.getFileInfo("/test").isDir());
+    namenode.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+    namenode.saveNamespace();
+
+    FSImage image = namenode.getFSImage();
+    image.loadFSImage();
+
+    File versionFile = image.getStorageDir(0).getVersionFile();
+
+    RandomAccessFile file = new RandomAccessFile(versionFile, "rws");
+    FileInputStream in = null;
+    FileOutputStream out = null;
+    try {
+      // read the property from version file
+      in = new FileInputStream(file.getFD());
+      file.seek(0);
+      Properties props = new Properties();
+      props.load(in);
+
+      // get the MD5 property and change it
+      String sMd5 = props.getProperty(FSImage.MESSAGE_DIGEST_PROPERTY);
+      MD5Hash md5 = new MD5Hash(sMd5);
+      byte[] bytes = md5.getDigest();
+      bytes[0] += 1;
+      md5 = new MD5Hash(bytes);
+      props.setProperty(FSImage.MESSAGE_DIGEST_PROPERTY, md5.toString());
+
+      // write the properties back to version file
+      file.seek(0);
+      out = new FileOutputStream(file.getFD());
+      props.store(out, null);
+      out.flush();
+      file.setLength(out.getChannel().position());
+
+      // now load the image again
+      image.loadFSImage();
+
+      fail("Expect to get a checksumerror");
+    } catch(IOException e) {
+        assertTrue(e.getMessage().contains("is corrupt"));
+    } finally {
+      IOUtils.closeStream(in);
+      IOUtils.closeStream(out);
+      namenode.stop();
+      namenode.join();
+    }
+  }
 }