浏览代码

Merge all changes from trunk to branch HDFS-2832

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1517030 13f79535-47bb-0310-9956-ffa450edef68
Arpit Agarwal 12 年之前
父节点
当前提交
c41df254fe
共有 43 个文件被更改,包括 1572 次插入179 次删除
  1. 48 0
      hadoop-assemblies/src/main/resources/assemblies/hadoop-hdfs-nfs-dist.xml
  2. 48 0
      hadoop-assemblies/src/main/resources/assemblies/hadoop-nfs-dist.xml
  3. 3 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  4. 16 6
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Globber.java
  5. 15 11
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java
  6. 26 15
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/cli/util/CommandExecutor.java
  7. 46 0
      hadoop-common-project/hadoop-nfs/pom.xml
  8. 2 0
      hadoop-dist/pom.xml
  9. 44 0
      hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
  10. 14 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  11. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  12. 2 13
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java
  13. 21 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  14. 651 39
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
  15. 167 20
      hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml
  16. 14 0
      hadoop-mapreduce-project/CHANGES.txt
  17. 29 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
  18. 2 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
  19. 29 16
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
  20. 13 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
  21. 114 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
  22. 5 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
  23. 110 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
  24. 5 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
  25. 42 10
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
  26. 10 7
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
  27. 1 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr
  28. 2 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java
  29. 1 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
  30. 2 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java
  31. 6 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
  32. 5 0
      hadoop-yarn-project/CHANGES.txt
  33. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Priority.java
  34. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
  35. 11 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  36. 9 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
  37. 5 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
  38. 7 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
  39. 23 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
  40. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  41. 5 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
  42. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
  43. 6 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

+ 48 - 0
hadoop-assemblies/src/main/resources/assemblies/hadoop-hdfs-nfs-dist.xml

@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<assembly>
+  <id>hadoop-hdfs-nfs-dist</id>
+  <formats>
+    <format>dir</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>target</directory>
+      <outputDirectory>/share/hadoop/hdfs</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+  </fileSets>
+
+  <dependencySets>
+    <dependencySet>
+      <useProjectArtifact>false</useProjectArtifact>
+      <outputDirectory>/share/hadoop/hdfs/lib</outputDirectory>
+      <!-- Exclude hadoop artifacts. They will be found via HADOOP* env -->
+      <excludes>
+        <exclude>org.apache.hadoop:hadoop-common</exclude>
+        <exclude>org.apache.hadoop:hadoop-hdfs</exclude>
+        <!-- use slf4j from common to avoid multiple binding warnings -->
+        <exclude>org.slf4j:slf4j-api</exclude>
+        <exclude>org.slf4j:slf4j-log4j12</exclude>
+        <exclude>org.hsqldb:hsqldb</exclude>
+      </excludes>
+    </dependencySet>
+  </dependencySets>
+
+</assembly>
+

+ 48 - 0
hadoop-assemblies/src/main/resources/assemblies/hadoop-nfs-dist.xml

@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<assembly>
+  <id>hadoop-nfs-dist</id>
+  <formats>
+    <format>dir</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>target</directory>
+      <outputDirectory>/share/hadoop/common</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+  </fileSets>
+
+  <dependencySets>
+    <dependencySet>
+      <useProjectArtifact>false</useProjectArtifact>
+      <outputDirectory>/share/hadoop/common/lib</outputDirectory>
+      <!-- Exclude hadoop artifacts. They will be found via HADOOP* env -->
+      <excludes>
+        <exclude>org.apache.hadoop:hadoop-common</exclude>
+        <exclude>org.apache.hadoop:hadoop-hdfs</exclude>
+        <!-- use slf4j from common to avoid multiple binding warnings -->
+        <exclude>org.slf4j:slf4j-api</exclude>
+        <exclude>org.slf4j:slf4j-log4j12</exclude>
+        <exclude>org.hsqldb:hsqldb</exclude>
+      </excludes>
+    </dependencySet>
+  </dependencySets>
+
+</assembly>
+

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -420,6 +420,9 @@ Release 2.1.1-beta - UNRELEASED
     HADOOP-9880. SASL changes from HADOOP-9421 breaks Secure HA NN. (daryn via
     jing9)
 
+    HADOOP-9887. globStatus does not correctly handle paths starting with a drive
+    spec on Windows. (Chuan Liu via cnauroth)
+
 Release 2.1.0-beta - 2013-08-22
 
   INCOMPATIBLE CHANGES

+ 16 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Globber.java

@@ -97,7 +97,7 @@ class Globber {
   /**
    * Translate an absolute path into a list of path components.
    * We merge double slashes into a single slash here.
-   * The first path component (i.e. root) does not get an entry in the list.
+   * POSIX root path, i.e. '/', does not get an entry in the list.
    */
   private static List<String> getPathComponents(String path)
       throws IOException {
@@ -167,8 +167,8 @@ class Globber {
       // Get the absolute path for this flattened pattern.  We couldn't do 
       // this prior to flattening because of patterns like {/,a}, where which
       // path you go down influences how the path must be made absolute.
-      Path absPattern =
-          fixRelativePart(new Path(flatPattern .isEmpty() ? "." : flatPattern ));
+      Path absPattern = fixRelativePart(new Path(
+          flatPattern.isEmpty() ? Path.CUR_DIR : flatPattern));
       // Now we break the flattened, absolute pattern into path components.
       // For example, /a/*/c would be broken into the list [a, *, c]
       List<String> components =
@@ -176,9 +176,19 @@ class Globber {
       // Starting out at the root of the filesystem, we try to match
       // filesystem entries against pattern components.
       ArrayList<FileStatus> candidates = new ArrayList<FileStatus>(1);
-      candidates.add(new FileStatus(0, true, 0, 0, 0,
-          new Path(scheme, authority, "/")));
-
+      if (Path.WINDOWS && !components.isEmpty()
+          && Path.isWindowsAbsolutePath(absPattern.toUri().getPath(), true)) {
+        // On Windows the path could begin with a drive letter, e.g. /E:/foo.
+        // We will skip matching the drive letter and start from listing the
+        // root of the filesystem on that drive.
+        String driveLetter = components.remove(0);
+        candidates.add(new FileStatus(0, true, 0, 0, 0, new Path(scheme,
+            authority, Path.SEPARATOR + driveLetter + Path.SEPARATOR)));
+      } else {
+        candidates.add(new FileStatus(0, true, 0, 0, 0,
+            new Path(scheme, authority, Path.SEPARATOR)));
+      }
+      
       for (String component : components) {
         ArrayList<FileStatus> newCandidates =
             new ArrayList<FileStatus>(candidates.size());

+ 15 - 11
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java

@@ -106,10 +106,12 @@ public class PathData implements Comparable<PathData> {
 
   /**
    * Validates the given Windows path.
-   * Throws IOException on failure.
    * @param pathString a String of the path suppliued by the user.
+   * @return true if the URI scheme was not present in the pathString but
+   * inferred; false, otherwise.
+   * @throws IOException if anything goes wrong
    */
-  private void ValidateWindowsPath(String pathString)
+  private static boolean checkIfSchemeInferredFromPath(String pathString)
   throws IOException
   {
     if (windowsNonUriAbsolutePath1.matcher(pathString).find()) {
@@ -118,23 +120,21 @@ public class PathData implements Comparable<PathData> {
         throw new IOException("Invalid path string " + pathString);
       }
 
-      inferredSchemeFromPath = true;
-      return;
+      return true;
     }
 
     // Is it a forward slash-separated absolute path?
     if (windowsNonUriAbsolutePath2.matcher(pathString).find()) {
-      inferredSchemeFromPath = true;
-      return;
+      return true;
     }
 
     // Does it look like a URI? If so then just leave it alone.
     if (potentialUri.matcher(pathString).find()) {
-      return;
+      return false;
     }
 
     // Looks like a relative path on Windows.
-    return;
+    return false;
   }
 
   /**
@@ -153,7 +153,7 @@ public class PathData implements Comparable<PathData> {
     setStat(stat);
 
     if (Path.WINDOWS) {
-      ValidateWindowsPath(pathString);
+      inferredSchemeFromPath = checkIfSchemeInferredFromPath(pathString);
     }
   }
 
@@ -302,7 +302,7 @@ public class PathData implements Comparable<PathData> {
     // check getPath() so scheme slashes aren't considered part of the path
     String separator = uri.getPath().endsWith(Path.SEPARATOR)
         ? "" : Path.SEPARATOR;
-    return uri + separator + basename;
+    return uriToString(uri, inferredSchemeFromPath) + separator + basename;
   }
   
   protected enum PathType { HAS_SCHEME, SCHEMELESS_ABSOLUTE, RELATIVE };
@@ -356,7 +356,7 @@ public class PathData implements Comparable<PathData> {
             if (globUri.getAuthority() == null) {
               matchUri = removeAuthority(matchUri);
             }
-            globMatch = matchUri.toString();
+            globMatch = uriToString(matchUri, false);
             break;
           case SCHEMELESS_ABSOLUTE: // take just the uri's path
             globMatch = matchUri.getPath();
@@ -438,6 +438,10 @@ public class PathData implements Comparable<PathData> {
    */
   @Override
   public String toString() {
+    return uriToString(uri, inferredSchemeFromPath);
+  }
+ 
+  private static String uriToString(URI uri, boolean inferredSchemeFromPath) {
     String scheme = uri.getScheme();
     // No interpretation of symbols. Just decode % escaped chars.
     String decodedRemainder = uri.getSchemeSpecificPart();

+ 26 - 15
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/cli/util/CommandExecutor.java

@@ -24,6 +24,9 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.PrintStream;
 import java.util.StringTokenizer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.ArrayList;
 
 /**
  *
@@ -32,23 +35,31 @@ import java.util.StringTokenizer;
 public abstract class CommandExecutor {  
   protected String[] getCommandAsArgs(final String cmd, final String masterKey,
 		                                       final String master) {
-    StringTokenizer tokenizer = new StringTokenizer(cmd, " ");
-    String[] args = new String[tokenizer.countTokens()];
-    
-    int i = 0;
-    while (tokenizer.hasMoreTokens()) {
-      args[i] = tokenizer.nextToken();
+    String regex = "\'([^\']*)\'|\"([^\"]*)\"|(\\S+)";
+    Matcher matcher = Pattern.compile(regex).matcher(cmd);
 
-      args[i] = args[i].replaceAll(masterKey, master);
-      args[i] = args[i].replaceAll("CLITEST_DATA", 
-        new File(CLITestHelper.TEST_CACHE_DATA_DIR).
-        toURI().toString().replace(' ', '+'));
-      args[i] = args[i].replaceAll("USERNAME", System.getProperty("user.name"));
+    ArrayList<String> args = new ArrayList<String>();
+    String arg = null;
 
-      i++;
-    }
-    
-    return args;
+    while (matcher.find()) {
+      if (matcher.group(1) != null) {
+        arg = matcher.group(1);
+      } else if (matcher.group(2) != null) {
+        arg = matcher.group(2);
+      } else {
+        arg = matcher.group(3);
+      }
+
+      arg = arg.replaceAll(masterKey, master);
+      arg = arg.replaceAll("CLITEST_DATA",
+         new File(CLITestHelper.TEST_CACHE_DATA_DIR).
+         toURI().toString().replace(' ', '+'));
+      arg = arg.replaceAll("USERNAME", System.getProperty("user.name"));
+
+      args.add(arg);
+     }
+
+    return args.toArray(new String[0]);
   }
   
   public Result executeCommand(final String cmd) throws Exception {

+ 46 - 0
hadoop-common-project/hadoop-nfs/pom.xml

@@ -95,4 +95,50 @@
       <version>11.0.2</version>
     </dependency>
   </dependencies>
+
+
+  <profiles>
+    <profile>
+      <id>dist</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-assembly-plugin</artifactId>
+            <dependencies>
+              <dependency>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-assemblies</artifactId>
+                <version>${project.version}</version>
+              </dependency>
+            </dependencies>
+            <executions>
+              <execution>
+                <id>dist</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>single</goal>
+                </goals>
+                <configuration>
+                  <finalName>${project.artifactId}-${project.version}</finalName>
+                  <appendAssemblyId>false</appendAssemblyId>
+                  <attach>false</attach>
+                  <!--<descriptorRefs>
+                    <descriptorRef>hadoop-nfs-dist</descriptorRef>
+                  </descriptorRefs>-->
+                  <descriptors>
+                    <descriptor>../../hadoop-assemblies/src/main/resources/assemblies/hadoop-nfs-dist.xml</descriptor>
+                  </descriptors>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
 </project>

+ 2 - 0
hadoop-dist/pom.xml

@@ -115,8 +115,10 @@
                       run mkdir hadoop-${project.version}
                       run cd hadoop-${project.version}
                       run cp -r $ROOT/hadoop-common-project/hadoop-common/target/hadoop-common-${project.version}/* .
+                      run cp -r $ROOT/hadoop-common-project/hadoop-nfs/target/hadoop-nfs-${project.version}/* .
                       run cp -r $ROOT/hadoop-hdfs-project/hadoop-hdfs/target/hadoop-hdfs-${project.version}/* .
                       run cp -r $ROOT/hadoop-hdfs-project/hadoop-hdfs-httpfs/target/hadoop-hdfs-httpfs-${project.version}/* .
+                      run cp -r $ROOT/hadoop-hdfs-project/hadoop-hdfs-nfs/target/hadoop-hdfs-nfs-${project.version}/* .
                       run cp -r $ROOT/hadoop-yarn-project/target/hadoop-yarn-project-${project.version}/* .
                       run cp -r $ROOT/hadoop-mapreduce-project/target/hadoop-mapreduce-${project.version}/* .
                       run cp -r $ROOT/hadoop-tools/hadoop-tools-dist/target/hadoop-tools-dist-${project.version}/* .

+ 44 - 0
hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml

@@ -192,4 +192,48 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
     </dependency>
   </dependencies>
 
+  <profiles>
+    <profile>
+      <id>dist</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-assembly-plugin</artifactId>
+            <dependencies>
+              <dependency>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-assemblies</artifactId>
+                <version>${project.version}</version>
+              </dependency>
+            </dependencies>
+            <executions>
+              <execution>
+                <id>dist</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>single</goal>
+                </goals>
+                <configuration>
+                  <finalName>${project.artifactId}-${project.version}</finalName>
+                  <appendAssemblyId>false</appendAssemblyId>
+                  <attach>false</attach>
+                  <!--<descriptorRefs>
+                    <descriptorRef>hadoop-nfs-dist</descriptorRef>
+                  </descriptorRefs>-->
+                  <descriptors>
+                    <descriptor>../../hadoop-assemblies/src/main/resources/assemblies/hadoop-hdfs-nfs-dist.xml</descriptor>
+                  </descriptors>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
 </project>

+ 14 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -262,6 +262,8 @@ Release 2.3.0 - UNRELEASED
     HDFS-5068. Convert NNThroughputBenchmark to a Tool to allow generic options.
     (shv)
 
+    HDFS-4994. Audit log getContentSummary() calls. (Robert Parker via kihwal)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -299,6 +301,9 @@ Release 2.1.1-beta - UNRELEASED
     HDFS-5110 Change FSDataOutputStream to HdfsDataOutputStream for opened
     streams to fix type cast error. (brandonli)
 
+    HDFS-5069 Include hadoop-nfs and hadoop-hdfs-nfs into hadoop dist for
+    NFS deployment (brandonli)
+
   IMPROVEMENTS
 
     HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may
@@ -322,6 +327,9 @@ Release 2.1.1-beta - UNRELEASED
     HDFS-5111. Remove duplicated error message for snapshot commands when 
     processing invalid arguments. (jing9)
 
+    HDFS-5045. Add more unit tests for retry cache to cover all AtMostOnce 
+    methods. (jing9)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -375,6 +383,9 @@ Release 2.1.1-beta - UNRELEASED
     HDFS-4594. WebHDFS open sets Content-Length header to what is specified by
     length parameter rather than how much data is actually returned. (cnauroth)
 
+    HDFS-5124. DelegationTokenSecretManager#retrievePassword can cause deadlock 
+    in NameNode. (Daryn Sharp via jing9)
+
 Release 2.1.0-beta - 2013-08-22
 
   INCOMPATIBLE CHANGES
@@ -3424,6 +3435,9 @@ Release 0.23.10 - UNRELEASED
 
     HDFS-4998. TestUnderReplicatedBlocks fails intermittently (kihwal)
 
+    HDFS-4329. DFSShell issues with directories with spaces in name (Cristina
+    L. Abad via jeagles)
+
 Release 0.23.9 - 2013-07-08
 
   INCOMPATIBLE CHANGES

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -596,7 +596,8 @@ public class DFSClient implements java.io.Closeable {
     return dfsClientConf.hdfsTimeout;
   }
   
-  String getClientName() {
+  @VisibleForTesting
+  public String getClientName() {
     return clientName;
   }
 

+ 2 - 13
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java

@@ -82,7 +82,7 @@ public class DelegationTokenSecretManager
   }
   
   @Override
-  public synchronized byte[] retrievePassword(
+  public byte[] retrievePassword(
       DelegationTokenIdentifier identifier) throws InvalidToken {
     try {
       // this check introduces inconsistency in the authentication to a
@@ -91,7 +91,7 @@ public class DelegationTokenSecretManager
       // different in that a standby may be behind and thus not yet know
       // of all tokens issued by the active NN.  the following check does
       // not allow ANY token auth, however it should allow known tokens in
-      checkAvailableForRead();
+      namesystem.checkOperation(OperationCategory.READ);
     } catch (StandbyException se) {
       // FIXME: this is a hack to get around changing method signatures by
       // tunneling a non-InvalidToken exception as the cause which the
@@ -103,17 +103,6 @@ public class DelegationTokenSecretManager
     return super.retrievePassword(identifier);
   }
   
-  @Override //SecretManager
-  public void checkAvailableForRead() throws StandbyException {
-    namesystem.checkOperation(OperationCategory.READ);
-    namesystem.readLock();
-    try {
-      namesystem.checkOperation(OperationCategory.READ);
-    } finally {
-      namesystem.readUnlock();
-    }
-  }
-
   /**
    * Returns expiry time of a token given its identifier.
    * 

+ 21 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -3415,12 +3415,26 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     return true;
   }
 
-  ContentSummary getContentSummary(String src) throws AccessControlException,
-      FileNotFoundException, UnresolvedLinkException, StandbyException {
+  /**
+   * Get the content summary for a specific file/dir.
+   *
+   * @param src The string representation of the path to the file
+   *
+   * @throws AccessControlException if access is denied
+   * @throws UnresolvedLinkException if a symlink is encountered.
+   * @throws FileNotFoundException if no file exists
+   * @throws StandbyException
+   * @throws IOException for issues with writing to the audit log
+   *
+   * @return object containing information regarding the file
+   *         or null if file not found
+   */
+  ContentSummary getContentSummary(String src) throws IOException {
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.READ);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     readLock();
+    boolean success = true;
     try {
       checkOperation(OperationCategory.READ);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
@@ -3428,8 +3442,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         checkPermission(pc, src, false, null, null, null, FsAction.READ_EXECUTE);
       }
       return dir.getContentSummary(src);
+
+    } catch (AccessControlException ace) {
+      success = false;
+      throw ace;
     } finally {
       readUnlock();
+      logAuditEvent(success, "contentSummary", src);
     }
   }
 

+ 651 - 39
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java

@@ -26,15 +26,22 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.net.URI;
 import java.net.UnknownHostException;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -43,9 +50,20 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.NameNodeProxies;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
+import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.retry.FailoverProxyProvider;
 import org.apache.hadoop.io.retry.RetryInvocationHandler;
@@ -60,14 +78,13 @@ import org.junit.Test;
 public class TestRetryCacheWithHA {
   private static final Log LOG = LogFactory.getLog(TestRetryCacheWithHA.class);
   
-  private static MiniDFSCluster cluster;
-  private static DistributedFileSystem dfs;
-  private static Configuration conf = new HdfsConfiguration();
-  
   private static final int BlockSize = 1024;
   private static final short DataNodes = 3;
-  private final static Map<String, Object> results = 
-      new HashMap<String, Object>();
+  private static final int CHECKTIMES = 10;
+  
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem dfs;
+  private Configuration conf = new HdfsConfiguration();
   
   /** 
    * A dummy invocation handler extending RetryInvocationHandler. We can use
@@ -120,7 +137,7 @@ public class TestRetryCacheWithHA {
    * 2. Trigger the NN failover
    * 3. Check the retry cache on the original standby NN
    */
-  @Test
+  @Test (timeout=60000)
   public void testRetryCacheOnStandbyNN() throws Exception {
     // 1. run operations
     DFSTestUtil.runOperations(cluster, dfs, conf, BlockSize, 0);
@@ -180,26 +197,624 @@ public class TestRetryCacheWithHA {
     return client;
   }
   
+  abstract class AtMostOnceOp {
+    private final String name;
+    final DFSClient client;
+    
+    AtMostOnceOp(String name, DFSClient client) {
+      this.name = name;
+      this.client = client;
+    }
+    
+    abstract void prepare() throws Exception;
+    abstract void invoke() throws Exception;
+    abstract boolean checkNamenodeBeforeReturn() throws Exception;
+    abstract Object getResult();
+  }
+  
+  /** createSnapshot operaiton */
+  class CreateSnapshotOp extends AtMostOnceOp {
+    private String snapshotPath;
+    private String dir;
+    private String snapshotName;
+    
+    CreateSnapshotOp(DFSClient client, String dir, String snapshotName) {
+      super("createSnapshot", client);
+      this.dir = dir;
+      this.snapshotName = snapshotName;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      final Path dirPath = new Path(dir);
+      if (!dfs.exists(dirPath)) {
+        dfs.mkdirs(dirPath);
+        dfs.allowSnapshot(dirPath);
+      }
+    }
+
+    @Override
+    void invoke() throws Exception {
+      this.snapshotPath = client.createSnapshot(dir, snapshotName);
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      final Path sPath = SnapshotTestHelper.getSnapshotRoot(new Path(dir),
+          snapshotName);
+      boolean snapshotCreated = dfs.exists(sPath);
+      for (int i = 0; i < CHECKTIMES && !snapshotCreated; i++) {
+        Thread.sleep(1000);
+        snapshotCreated = dfs.exists(sPath);
+      }
+      return snapshotCreated;
+    }
+
+    @Override
+    Object getResult() {
+      return snapshotPath;
+    }
+  }
+  
+  /** deleteSnapshot */
+  class DeleteSnapshotOp extends AtMostOnceOp {
+    private String dir;
+    private String snapshotName;
+    
+    DeleteSnapshotOp(DFSClient client, String dir, String snapshotName) {
+      super("deleteSnapshot", client);
+      this.dir = dir;
+      this.snapshotName = snapshotName;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      final Path dirPath = new Path(dir);
+      if (!dfs.exists(dirPath)) {
+        dfs.mkdirs(dirPath);
+      }
+      
+      Path sPath = SnapshotTestHelper.getSnapshotRoot(dirPath, snapshotName);
+      if (!dfs.exists(sPath)) {
+        dfs.allowSnapshot(dirPath);
+        dfs.createSnapshot(dirPath, snapshotName);
+      }
+    }
+
+    @Override
+    void invoke() throws Exception {
+      client.deleteSnapshot(dir, snapshotName);
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      final Path sPath = SnapshotTestHelper.getSnapshotRoot(new Path(dir),
+          snapshotName);
+      boolean snapshotNotDeleted = dfs.exists(sPath);
+      for (int i = 0; i < CHECKTIMES && snapshotNotDeleted; i++) {
+        Thread.sleep(1000);
+        snapshotNotDeleted = dfs.exists(sPath);
+      }
+      return !snapshotNotDeleted;
+    }
+
+    @Override
+    Object getResult() {
+      return null;
+    }
+  }
+  
+  /** renameSnapshot */
+  class RenameSnapshotOp extends AtMostOnceOp {
+    private String dir;
+    private String oldName;
+    private String newName;
+    
+    RenameSnapshotOp(DFSClient client, String dir, String oldName,
+        String newName) {
+      super("renameSnapshot", client);
+      this.dir = dir;
+      this.oldName = oldName;
+      this.newName = newName;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      final Path dirPath = new Path(dir);
+      if (!dfs.exists(dirPath)) {
+        dfs.mkdirs(dirPath);
+      }
+      
+      Path sPath = SnapshotTestHelper.getSnapshotRoot(dirPath, oldName);
+      if (!dfs.exists(sPath)) {
+        dfs.allowSnapshot(dirPath);
+        dfs.createSnapshot(dirPath, oldName);
+      }
+    }
+
+    @Override
+    void invoke() throws Exception {
+      client.renameSnapshot(dir, oldName, newName);
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      final Path sPath = SnapshotTestHelper.getSnapshotRoot(new Path(dir),
+          newName);
+      boolean snapshotRenamed = dfs.exists(sPath);
+      for (int i = 0; i < CHECKTIMES && !snapshotRenamed; i++) {
+        Thread.sleep(1000);
+        snapshotRenamed = dfs.exists(sPath);
+      }
+      return snapshotRenamed;
+    }
+
+    @Override
+    Object getResult() {
+      return null;
+    }
+  }
+  
+  /** create file operation (without OverWrite) */
+  class CreateOp extends AtMostOnceOp {
+    private String fileName;
+    private HdfsFileStatus status;
+    
+    CreateOp(DFSClient client, String fileName) {
+      super("create", client);
+      this.fileName = fileName;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      final Path filePath = new Path(fileName);
+      if (dfs.exists(filePath)) {
+        dfs.delete(filePath, true);
+      }
+      final Path fileParent = filePath.getParent();
+      if (!dfs.exists(fileParent)) {
+        dfs.mkdirs(fileParent);
+      }
+    }
+
+    @Override
+    void invoke() throws Exception {
+      EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.CREATE);
+      this.status = client.getNamenode().create(fileName,
+          FsPermission.getFileDefault(), client.getClientName(),
+          new EnumSetWritable<CreateFlag>(createFlag), false, DataNodes,
+          BlockSize);
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      final Path filePath = new Path(fileName);
+      boolean fileCreated = dfs.exists(filePath);
+      for (int i = 0; i < CHECKTIMES && !fileCreated; i++) {
+        Thread.sleep(1000);
+        fileCreated = dfs.exists(filePath);
+      }
+      return fileCreated;
+    }
+
+    @Override
+    Object getResult() {
+      return status;
+    }
+  }
+  
+  /** append operation */
+  class AppendOp extends AtMostOnceOp {
+    private String fileName;
+    private LocatedBlock lbk;
+    
+    AppendOp(DFSClient client, String fileName) {
+      super("append", client);
+      this.fileName = fileName;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      final Path filePath = new Path(fileName);
+      if (!dfs.exists(filePath)) {
+        DFSTestUtil.createFile(dfs, filePath, BlockSize / 2, DataNodes, 0);
+      }
+    }
+
+    @Override
+    void invoke() throws Exception {
+      lbk = client.getNamenode().append(fileName, client.getClientName());
+    }
+    
+    // check if the inode of the file is under construction
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      INodeFile fileNode = cluster.getNameNode(0).getNamesystem()
+          .getFSDirectory().getINode4Write(fileName).asFile();
+      boolean fileIsUC = fileNode.isUnderConstruction();
+      for (int i = 0; i < CHECKTIMES && !fileIsUC; i++) {
+        Thread.sleep(1000);
+        fileNode = cluster.getNameNode(0).getNamesystem().getFSDirectory()
+            .getINode4Write(fileName).asFile();
+        fileIsUC = fileNode.isUnderConstruction();
+      }
+      return fileIsUC;
+    }
+
+    @Override
+    Object getResult() {
+      return lbk;
+    }
+  }
+  
+  /** rename */
+  class RenameOp extends AtMostOnceOp {
+    private String oldName;
+    private String newName;
+    private boolean renamed;
+    
+    RenameOp(DFSClient client, String oldName, String newName) {
+      super("rename", client);
+      this.oldName = oldName;
+      this.newName = newName;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      final Path filePath = new Path(oldName);
+      if (!dfs.exists(filePath)) {
+        DFSTestUtil.createFile(dfs, filePath, BlockSize, DataNodes, 0);
+      }
+    }
+    
+    @SuppressWarnings("deprecation")
+    @Override
+    void invoke() throws Exception {
+      this.renamed = client.rename(oldName, newName);
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      Path targetPath = new Path(newName);
+      boolean renamed = dfs.exists(targetPath);
+      for (int i = 0; i < CHECKTIMES && !renamed; i++) {
+        Thread.sleep(1000);
+        renamed = dfs.exists(targetPath);
+      }
+      return renamed;
+    }
+
+    @Override
+    Object getResult() {
+      return new Boolean(renamed);
+    }
+  }
+  
+  /** rename2 */
+  class Rename2Op extends AtMostOnceOp {
+    private String oldName;
+    private String newName;
+    
+    Rename2Op(DFSClient client, String oldName, String newName) {
+      super("rename2", client);
+      this.oldName = oldName;
+      this.newName = newName;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      final Path filePath = new Path(oldName);
+      if (!dfs.exists(filePath)) {
+        DFSTestUtil.createFile(dfs, filePath, BlockSize, DataNodes, 0);
+      }
+    }
+
+    @Override
+    void invoke() throws Exception {
+      client.rename(oldName, newName, Rename.OVERWRITE);
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      Path targetPath = new Path(newName);
+      boolean renamed = dfs.exists(targetPath);
+      for (int i = 0; i < CHECKTIMES && !renamed; i++) {
+        Thread.sleep(1000);
+        renamed = dfs.exists(targetPath);
+      }
+      return renamed;
+    }
+
+    @Override
+    Object getResult() {
+      return null;
+    }
+  }
+  
+  /** concat */
+  class ConcatOp extends AtMostOnceOp {
+    private String target;
+    private String[] srcs;
+    private Path[] srcPaths;
+    
+    ConcatOp(DFSClient client, Path target, int numSrc) {
+      super("concat", client);
+      this.target = target.toString();
+      this.srcs = new String[numSrc];
+      this.srcPaths = new Path[numSrc];
+      Path parent = target.getParent();
+      for (int i = 0; i < numSrc; i++) {
+        srcPaths[i] = new Path(parent, "srcfile" + i);
+        srcs[i] = srcPaths[i].toString();
+      }
+    }
+
+    @Override
+    void prepare() throws Exception {
+      DFSTestUtil.createFile(dfs, new Path(target), BlockSize, DataNodes, 0);
+      for (int i = 0; i < srcPaths.length; i++) {
+        DFSTestUtil.createFile(dfs, srcPaths[i], BlockSize, DataNodes, 0);
+      }
+    }
+
+    @Override
+    void invoke() throws Exception {
+      client.concat(target, srcs);
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      Path targetPath = new Path(target);
+      boolean done = dfs.exists(targetPath);
+      for (int i = 0; i < CHECKTIMES && !done; i++) {
+        Thread.sleep(1000);
+        done = dfs.exists(targetPath);
+      }
+      return done;
+    }
+
+    @Override
+    Object getResult() {
+      return null;
+    }
+  }
+  
+  /** delete */
+  class DeleteOp extends AtMostOnceOp {
+    private String target;
+    private boolean deleted;
+    
+    DeleteOp(DFSClient client, String target) {
+      super("delete", client);
+      this.target = target;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      Path p = new Path(target);
+      if (!dfs.exists(p)) {
+        DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
+      }
+    }
+
+    @Override
+    void invoke() throws Exception {
+      deleted = client.delete(target, true);
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      Path targetPath = new Path(target);
+      boolean del = !dfs.exists(targetPath);
+      for (int i = 0; i < CHECKTIMES && !del; i++) {
+        Thread.sleep(1000);
+        del = !dfs.exists(targetPath);
+      }
+      return del;
+    }
+
+    @Override
+    Object getResult() {
+      return new Boolean(deleted);
+    }
+  }
+  
+  /** createSymlink */
+  class CreateSymlinkOp extends AtMostOnceOp {
+    private String target;
+    private String link;
+    
+    public CreateSymlinkOp(DFSClient client, String target, String link) {
+      super("createSymlink", client);
+      this.target = target;
+      this.link = link;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      Path p = new Path(target);
+      if (!dfs.exists(p)) {
+        DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
+      }
+    }
+
+    @Override
+    void invoke() throws Exception {
+      client.createSymlink(target, link, false);
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      Path linkPath = new Path(link);
+      FileStatus linkStatus = dfs.getFileLinkStatus(linkPath);
+      for (int i = 0; i < CHECKTIMES && linkStatus == null; i++) {
+        Thread.sleep(1000);
+        linkStatus = dfs.getFileLinkStatus(linkPath);
+      }
+      return linkStatus != null;
+    }
+
+    @Override
+    Object getResult() {
+      return null;
+    }
+  }
+  
+  /** updatePipeline */
+  class UpdatePipelineOp extends AtMostOnceOp {
+    private String file;
+    private ExtendedBlock oldBlock;
+    private ExtendedBlock newBlock;
+    private DatanodeInfo[] nodes;
+    private FSDataOutputStream out;
+    
+    public UpdatePipelineOp(DFSClient client, String file) {
+      super("updatePipeline", client);
+      this.file = file;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      final Path filePath = new Path(file);
+      DFSTestUtil.createFile(dfs, filePath, BlockSize, DataNodes, 0);
+      // append to the file and leave the last block under construction
+      out = this.client.append(file, BlockSize, null, null);
+      byte[] appendContent = new byte[100];
+      new Random().nextBytes(appendContent);
+      out.write(appendContent);
+      ((HdfsDataOutputStream) out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
+      
+      LocatedBlocks blks = dfs.getClient()
+          .getLocatedBlocks(file, BlockSize + 1);
+      assertEquals(1, blks.getLocatedBlocks().size());
+      nodes = blks.get(0).getLocations();
+      oldBlock = blks.get(0).getBlock();
+      
+      LocatedBlock newLbk = client.getNamenode().updateBlockForPipeline(
+          oldBlock, client.getClientName());
+      newBlock = new ExtendedBlock(oldBlock.getBlockPoolId(),
+          oldBlock.getBlockId(), oldBlock.getNumBytes(), 
+          newLbk.getBlock().getGenerationStamp());
+    }
+
+    @Override
+    void invoke() throws Exception {
+      DatanodeInfo[] newNodes = new DatanodeInfo[2];
+      newNodes[0] = nodes[0];
+      newNodes[1] = nodes[1];
+      
+      client.getNamenode().updatePipeline(client.getClientName(), oldBlock,
+          newBlock, newNodes);
+      out.close();
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      INodeFileUnderConstruction fileNode = (INodeFileUnderConstruction) cluster
+          .getNamesystem(0).getFSDirectory().getINode4Write(file).asFile();
+      BlockInfoUnderConstruction blkUC = 
+          (BlockInfoUnderConstruction) (fileNode.getBlocks())[1];
+      int datanodeNum = blkUC.getExpectedLocations().length;
+      for (int i = 0; i < CHECKTIMES && datanodeNum != 2; i++) {
+        Thread.sleep(1000);
+        datanodeNum = blkUC.getExpectedLocations().length;
+      }
+      return datanodeNum == 2;
+    }
+    
+    @Override
+    Object getResult() {
+      return null;
+    }
+  }
+  
+  @Test (timeout=60000)
+  public void testCreateSnapshot() throws Exception {
+    final DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new CreateSnapshotOp(client, "/test", "s1");
+    testClientRetryWithFailover(op);
+  }
+  
+  @Test (timeout=60000)
+  public void testDeleteSnapshot() throws Exception {
+    final DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new DeleteSnapshotOp(client, "/test", "s1");
+    testClientRetryWithFailover(op);
+  }
+  
+  @Test (timeout=60000)
+  public void testRenameSnapshot() throws Exception {
+    final DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new RenameSnapshotOp(client, "/test", "s1", "s2");
+    testClientRetryWithFailover(op);
+  }
+  
+  @Test (timeout=60000)
+  public void testCreate() throws Exception {
+    final DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new CreateOp(client, "/testfile");
+    testClientRetryWithFailover(op);
+  }
+  
+  @Test (timeout=60000)
+  public void testAppend() throws Exception {
+    final DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new AppendOp(client, "/testfile");
+    testClientRetryWithFailover(op);
+  }
+  
+  @Test (timeout=60000)
+  public void testRename() throws Exception {
+    final DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new RenameOp(client, "/file1", "/file2");
+    testClientRetryWithFailover(op);
+  }
+  
+  @Test (timeout=60000)
+  public void testRename2() throws Exception {
+    final DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new Rename2Op(client, "/file1", "/file2");
+    testClientRetryWithFailover(op);
+  }
+  
+  @Test (timeout=60000)
+  public void testConcat() throws Exception {
+    final DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new ConcatOp(client, new Path("/test/file"), 5);
+    testClientRetryWithFailover(op);
+  }
+  
+  @Test (timeout=60000)
+  public void testDelete() throws Exception {
+    final DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new DeleteOp(client, "/testfile");
+    testClientRetryWithFailover(op);
+  }
+  
+  @Test (timeout=60000)
+  public void testCreateSymlink() throws Exception {
+    final DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new CreateSymlinkOp(client, "/testfile", "/testlink");
+    testClientRetryWithFailover(op);
+  }
+  
+  @Test (timeout=60000)
+  public void testUpdatePipeline() throws Exception {
+    final DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new UpdatePipelineOp(client, "/testfile");
+    testClientRetryWithFailover(op);
+  }
+  
   /**
    * When NN failover happens, if the client did not receive the response and
    * send a retry request to the other NN, the same response should be recieved
    * based on the retry cache.
-   * 
-   * TODO: currently we only test the createSnapshot from the client side. We 
-   * may need to cover all the calls with "@AtMostOnce" annotation.
    */
-  @Test
-  public void testClientRetryWithFailover() throws Exception {
-    final String dir = "/test";
-    final Path dirPath = new Path(dir);
-    final String sName = "s1";
-    final String dirSnapshot = dir + HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR
-        + Path.SEPARATOR + sName;
+  public void testClientRetryWithFailover(final AtMostOnceOp op)
+      throws Exception {
+    final Map<String, Object> results = new HashMap<String, Object>();
     
-    dfs.mkdirs(dirPath);
-    dfs.allowSnapshot(dirPath);
-    
-    final DFSClient client = genClientWithDummyHandler();
+    op.prepare();
     // set DummyRetryInvocationHandler#block to true
     DummyRetryInvocationHandler.block.set(true);
     
@@ -207,28 +822,25 @@ public class TestRetryCacheWithHA {
       @Override
       public void run() {
         try {
-          final String snapshotPath = client.createSnapshot(dir, "s1");
-          assertEquals(dirSnapshot, snapshotPath);
-          LOG.info("Created snapshot " + snapshotPath);
+          op.invoke();
+          Object result = op.getResult();
+          LOG.info("Operation " + op.name + " finished");
           synchronized (TestRetryCacheWithHA.this) {
-            results.put("createSnapshot", snapshotPath);
+            results.put(op.name, result == null ? "SUCCESS" : result);
             TestRetryCacheWithHA.this.notifyAll();
           }
-        } catch (IOException e) {
-          LOG.info("Got IOException " + e + " while creating snapshot");
+        } catch (Exception e) {
+          LOG.info("Got Exception while calling " + op.name, e);
         } finally {
-          IOUtils.cleanup(null, client);
+          IOUtils.cleanup(null, op.client);
         }
       }
     }.start();
     
-    // make sure the client's createSnapshot call has actually been handled by
-    // the active NN
-    boolean snapshotCreated = dfs.exists(new Path(dirSnapshot));
-    while (!snapshotCreated) {
-      Thread.sleep(1000);
-      snapshotCreated = dfs.exists(new Path(dirSnapshot));
-    }
+    // make sure the client's call has actually been handled by the active NN
+    assertTrue("After waiting the operation " + op.name
+        + " still has not taken effect on NN yet",
+        op.checkNamenodeBeforeReturn());
     
     // force the failover
     cluster.transitionToStandby(0);
@@ -238,11 +850,11 @@ public class TestRetryCacheWithHA {
     DummyRetryInvocationHandler.block.set(false);
     
     synchronized (this) {
-      while (!results.containsKey("createSnapshot")) {
+      while (!results.containsKey(op.name)) {
         this.wait();
       }
-      LOG.info("Got the result of createSnapshot: "
-          + results.get("createSnapshot"));
+      LOG.info("Got the result of " + op.name + ": "
+          + results.get(op.name));
     }
   }
 }

+ 167 - 20
hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml

@@ -443,6 +443,153 @@
       </comparators>
     </test>
 
+    <test> <!-- TESTED -->
+      <description>ls: whitespaces in an absolute path to a file</description>
+      <test-commands>
+        <command>-fs NAMENODE -mkdir -p "/a path with/whitespaces in directories"</command>
+        <command>-fs NAMENODE -touchz "/a path with/whitespaces in directories/and file names"</command>
+        <command>-fs NAMENODE -ls "/a path with/whitespaces in directories"</command>
+      </test-commands>
+      <cleanup-commands>
+        <command>-fs NAMENODE -rm -r "/a path with"</command>
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>TokenComparator</type>
+          <expected-output>Found 1 items</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^-rw-r--r--( )*1( )*[a-z]*( )*supergroup( )*0( )*[0-9]{4,}-[0-9]{2,}-[0-9]{2,} [0-9]{2,}:[0-9]{2,}( )*/a path with/whitespaces in directories/and file names</expected-output>
+        </comparator>
+      </comparators>
+    </test>
+
+    <test> <!-- TESTED -->
+      <description>ls: whitespaces in a relative path to a file</description>
+      <test-commands>
+        <command>-fs NAMENODE -mkdir -p "a path with/whitespaces in directories"</command>
+        <command>-fs NAMENODE -touchz "a path with/whitespaces in directories/and file names"</command>
+        <command>-fs NAMENODE -ls "a path with/whitespaces in directories"</command>
+      </test-commands>
+      <cleanup-commands>
+        <command>-fs NAMENODE -rm -r "a path with"</command>
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>TokenComparator</type>
+          <expected-output>Found 1 items</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^-rw-r--r--( )*1( )*[a-z]*( )*supergroup( )*0( )*[0-9]{4,}-[0-9]{2,}-[0-9]{2,} [0-9]{2,}:[0-9]{2,}( )*a path with/whitespaces in directories/and file names</expected-output>
+        </comparator>
+      </comparators>
+    </test> 
+
+    <test> <!-- TESTED -->
+      <description>ls: whitespaces in a scheme-qualified path to a file</description>
+      <test-commands>
+        <command>-fs NAMENODE -mkdir -p "NAMENODE/a path with/whitespaces in directories"</command>
+        <command>-fs NAMENODE -touchz "NAMENODE/a path with/whitespaces in directories/and file names"</command>
+        <command>-fs NAMENODE -ls "NAMENODE/a path with/whitespaces in directories"</command>
+      </test-commands>
+      <cleanup-commands>
+        <command>-fs NAMENODE -rm -r "NAMENODE/a path with"</command>
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>TokenComparator</type>
+          <expected-output>Found 1 items</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^-rw-r--r--( )*1( )*[a-z]*( )*supergroup( )*0( )*[0-9]{4,}-[0-9]{2,}-[0-9]{2,} [0-9]{2,}:[0-9]{2,}( )*NAMENODE/a path with/whitespaces in directories/and file names</expected-output>
+        </comparator>
+      </comparators>
+    </test>
+
+    <test> <!-- TESTED -->
+      <description>ls: whitespaces in an absolute path to a file, using globbing</description>
+      <test-commands>
+        <command>-fs NAMENODE -mkdir -p "/a path with/whitespaces in directories"</command>
+        <command>-fs NAMENODE -touchz "/a path with/whitespaces in directories/and file names"</command>
+        <command>-fs NAMENODE -touchz "/a path with/whitespaces in directories/and file names 2"</command>
+        <command>-fs NAMENODE -ls "/a*/w*"</command>
+      </test-commands>
+      <cleanup-commands>
+        <command>-fs NAMENODE -rm -r "/a path with"</command>
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>TokenComparator</type>
+          <expected-output>Found 2 items</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^-rw-r--r--( )*1( )*[a-z]*( )*supergroup( )*0( )*[0-9]{4,}-[0-9]{2,}-[0-9]{2,} [0-9]{2,}:[0-9]{2,}( )*/a path with/whitespaces in directories/and file names</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^-rw-r--r--( )*1( )*[a-z]*( )*supergroup( )*0( )*[0-9]{4,}-[0-9]{2,}-[0-9]{2,} [0-9]{2,}:[0-9]{2,}( )*/a path with/whitespaces in directories/and file names 2</expected-output>
+        </comparator>
+      </comparators>
+    </test>
+
+    <test> <!-- TESTED -->
+      <description>ls: whitespaces in a relative path to a file, using globbing</description>
+      <test-commands>
+        <command>-fs NAMENODE -mkdir -p "a path with/whitespaces in directories"</command>
+        <command>-fs NAMENODE -touchz "a path with/whitespaces in directories/and file names"</command>
+        <command>-fs NAMENODE -touchz "a path with/whitespaces in directories/and file names 2"</command>
+        <command>-fs NAMENODE -ls "a*/w*"</command>
+      </test-commands>
+      <cleanup-commands>
+        <command>-fs NAMENODE -rm -r "a path with"</command>
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>TokenComparator</type>
+          <expected-output>Found 2 items</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^-rw-r--r--( )*1( )*[a-z]*( )*supergroup( )*0( )*[0-9]{4,}-[0-9]{2,}-[0-9]{2,} [0-9]{2,}:[0-9]{2,}( )*a path with/whitespaces in directories/and file names</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^-rw-r--r--( )*1( )*[a-z]*( )*supergroup( )*0( )*[0-9]{4,}-[0-9]{2,}-[0-9]{2,} [0-9]{2,}:[0-9]{2,}( )*a path with/whitespaces in directories/and file names 2</expected-output>
+        </comparator>
+      </comparators>
+    </test>
+
+    <test> <!-- TESTED -->
+      <description>ls: whitespaces in a scheme-qualified path to a file, using globbing</description>
+      <test-commands>
+        <command>-fs NAMENODE -mkdir -p "NAMENODE/a path with/whitespaces in directories"</command>
+        <command>-fs NAMENODE -touchz "NAMENODE/a path with/whitespaces in directories/and file names"</command>
+        <command>-fs NAMENODE -touchz "NAMENODE/a path with/whitespaces in directories/and file names 2"</command>
+        <command>-fs NAMENODE -ls "NAMENODE/a*/w*"</command>
+      </test-commands>
+      <cleanup-commands>
+        <command>-fs NAMENODE -rm -r "NAMENODE/a path with"</command>
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>TokenComparator</type>
+          <expected-output>Found 2 items</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^-rw-r--r--( )*1( )*[a-z]*( )*supergroup( )*0( )*[0-9]{4,}-[0-9]{2,}-[0-9]{2,} [0-9]{2,}:[0-9]{2,}( )*NAMENODE/a path with/whitespaces in directories/and file names</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^-rw-r--r--( )*1( )*[a-z]*( )*supergroup( )*0( )*[0-9]{4,}-[0-9]{2,}-[0-9]{2,} [0-9]{2,}:[0-9]{2,}( )*NAMENODE/a path with/whitespaces in directories/and file names 2</expected-output>
+        </comparator>
+      </comparators>
+    </test>
+
     <!-- Tests for ls -R -->
     <test> <!-- TESTED -->
       <description>ls: files/directories using absolute path</description>
@@ -6503,23 +6650,23 @@
       <comparators>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>"data15bytes-15"</expected-output>
+          <expected-output>data15bytes-15</expected-output>
         </comparator>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>"data30bytes-30"</expected-output>
+          <expected-output>data30bytes-30</expected-output>
         </comparator>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>"data60bytes-60"</expected-output>
+          <expected-output>data60bytes-60</expected-output>
         </comparator>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>"data120bytes-120"</expected-output>
+          <expected-output>data120bytes-120</expected-output>
         </comparator>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>"datadir-0"</expected-output>
+          <expected-output>datadir-0</expected-output>
         </comparator>
       </comparators>
     </test>
@@ -6542,23 +6689,23 @@
       <comparators>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>"data15bytes-15"</expected-output>
+          <expected-output>data15bytes-15</expected-output>
         </comparator>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>"data30bytes-30"</expected-output>
+          <expected-output>data30bytes-30</expected-output>
         </comparator>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>"data60bytes-60"</expected-output>
+          <expected-output>data60bytes-60</expected-output>
         </comparator>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>"data120bytes-120"</expected-output>
+          <expected-output>data120bytes-120</expected-output>
         </comparator>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>"datadir-0"</expected-output>
+          <expected-output>datadir-0</expected-output>
         </comparator>
       </comparators>
     </test>
@@ -6644,23 +6791,23 @@
       <comparators>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>"data15bytes-15"</expected-output>
+          <expected-output>data15bytes-15</expected-output>
         </comparator>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>"data30bytes-30"</expected-output>
+          <expected-output>data30bytes-30</expected-output>
         </comparator>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>"data60bytes-60"</expected-output>
+          <expected-output>data60bytes-60</expected-output>
         </comparator>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>"data120bytes-120"</expected-output>
+          <expected-output>data120bytes-120</expected-output>
         </comparator>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>"datadir-0"</expected-output>
+          <expected-output>datadir-0</expected-output>
         </comparator>
       </comparators>
     </test>
@@ -6731,23 +6878,23 @@
       <comparators>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>"data15bytes-15"</expected-output>
+          <expected-output>data15bytes-15</expected-output>
         </comparator>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>"data30bytes-30"</expected-output>
+          <expected-output>data30bytes-30</expected-output>
         </comparator>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>"data60bytes-60"</expected-output>
+          <expected-output>data60bytes-60</expected-output>
         </comparator>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>"data120bytes-120"</expected-output>
+          <expected-output>data120bytes-120</expected-output>
         </comparator>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>"datadir-0"</expected-output>
+          <expected-output>datadir-0</expected-output>
         </comparator>
       </comparators>
     </test>

+ 14 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -226,6 +226,20 @@ Release 2.1.1-beta - UNRELEASED
     MAPREDUCE-5001. LocalJobRunner has race condition resulting in job
     failures (Sandy Ryza via jlowe)
 
+    MAPREDUCE-5466. Changed MR AM to not promote history files of intermediate
+    AMs in case they are exiting because of errors and thus help history-server
+    pick up the right history file for the last successful AM. (Jian He via
+    vinodkv)
+
+    MAPREDUCE-5468. Fix MR AM recovery for map-only jobs. (vinodkv via
+    acmurthy)
+
+    MAPREDUCE-5470. LocalJobRunner does not work on Windows. (Sandy Ryza via
+    cnauroth)
+
+    MAPREDUCE-5476. Changed MR AM recovery code to cleanup staging-directory
+    only after unregistering from the RM. (Jian He via vinodkv)
+
 Release 2.1.0-beta - 2013-08-22
 
   INCOMPATIBLE CHANGES

+ 29 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java

@@ -520,7 +520,7 @@ public class JobHistoryEventHandler extends AbstractService
         mi.getJobIndexInfo().setSubmitTime(jobSubmittedEvent.getSubmitTime());
         mi.getJobIndexInfo().setQueueName(jobSubmittedEvent.getJobQueueName());
       }
-     
+
       // If this is JobFinishedEvent, close the writer and setup the job-index
       if (event.getHistoryEvent().getEventType() == EventType.JOB_FINISHED) {
         try {
@@ -532,6 +532,24 @@ public class JobHistoryEventHandler extends AbstractService
               jFinishedEvent.getFinishedReduces());
           mi.getJobIndexInfo().setJobStatus(JobState.SUCCEEDED.toString());
           closeEventWriter(event.getJobID());
+          processDoneFiles(event.getJobID());
+        } catch (IOException e) {
+          throw new YarnRuntimeException(e);
+        }
+      }
+      // In case of JOB_ERROR, only process all the Done files(e.g. job
+      // summary, job history file etc.) if it is last AM retry.
+      if (event.getHistoryEvent().getEventType() == EventType.JOB_ERROR) {
+        try {
+          JobUnsuccessfulCompletionEvent jucEvent =
+              (JobUnsuccessfulCompletionEvent) event.getHistoryEvent();
+          mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
+          mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
+          mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
+          mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
+          closeEventWriter(event.getJobID());
+          if(context.isLastAMRetry())
+            processDoneFiles(event.getJobID());
         } catch (IOException e) {
           throw new YarnRuntimeException(e);
         }
@@ -548,6 +566,7 @@ public class JobHistoryEventHandler extends AbstractService
           mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
           mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
           closeEventWriter(event.getJobID());
+          processDoneFiles(event.getJobID());
         } catch (IOException e) {
           throw new YarnRuntimeException(e);
         }
@@ -634,7 +653,6 @@ public class JobHistoryEventHandler extends AbstractService
   }
 
   protected void closeEventWriter(JobId jobId) throws IOException {
-
     final MetaInfo mi = fileMap.get(jobId);
     if (mi == null) {
       throw new IOException("No MetaInfo found for JobId: [" + jobId + "]");
@@ -654,7 +672,15 @@ public class JobHistoryEventHandler extends AbstractService
       LOG.error("Error closing writer for JobID: " + jobId);
       throw e;
     }
-     
+  }
+
+  protected void processDoneFiles(JobId jobId) throws IOException {
+
+    final MetaInfo mi = fileMap.get(jobId);
+    if (mi == null) {
+      throw new IOException("No MetaInfo found for JobId: [" + jobId + "]");
+    }
+
     if (mi.getHistoryFile() == null) {
       LOG.warn("No file for job-history with " + jobId + " found in cache!");
     }

+ 2 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java

@@ -61,4 +61,6 @@ public interface AppContext {
   Set<String> getBlacklistedNodes();
   
   ClientToAMTokenSecretManager getClientToAMTokenSecretManager();
+
+  boolean isLastAMRetry();
 }

+ 29 - 16
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -325,18 +325,23 @@ public class MRAppMaster extends CompositeService {
         dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
             eater);
       }
-      
+
+      if (copyHistory) {
+        // Now that there's a FINISHING state for application on RM to give AMs
+        // plenty of time to clean up after unregister it's safe to clean staging
+        // directory after unregistering with RM. So, we start the staging-dir
+        // cleaner BEFORE the ContainerAllocator so that on shut-down,
+        // ContainerAllocator unregisters first and then the staging-dir cleaner
+        // deletes staging directory.
+        addService(createStagingDirCleaningService());
+      }
+
       // service to allocate containers from RM (if non-uber) or to fake it (uber)
       containerAllocator = createContainerAllocator(null, context);
       addIfService(containerAllocator);
       dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
 
       if (copyHistory) {
-        // Add the staging directory cleaner before the history server but after
-        // the container allocator so the staging directory is cleaned after
-        // the history has been flushed but before unregistering with the RM.
-        addService(createStagingDirCleaningService());
-
         // Add the JobHistoryEventHandler last so that it is properly stopped first.
         // This will guarantee that all history-events are flushed before AM goes
         // ahead with shutdown.
@@ -344,7 +349,6 @@ public class MRAppMaster extends CompositeService {
         // component creates a JobHistoryEvent in the meanwhile, it will be just be
         // queued inside the JobHistoryEventHandler 
         addIfService(historyService);
-        
 
         JobHistoryCopyService cpHist = new JobHistoryCopyService(appAttemptID,
             dispatcher.getEventHandler());
@@ -396,6 +400,14 @@ public class MRAppMaster extends CompositeService {
       dispatcher.register(Speculator.EventType.class,
           speculatorEventDispatcher);
 
+      // Now that there's a FINISHING state for application on RM to give AMs
+      // plenty of time to clean up after unregister it's safe to clean staging
+      // directory after unregistering with RM. So, we start the staging-dir
+      // cleaner BEFORE the ContainerAllocator so that on shut-down,
+      // ContainerAllocator unregisters first and then the staging-dir cleaner
+      // deletes staging directory.
+      addService(createStagingDirCleaningService());
+
       // service to allocate containers from RM (if non-uber) or to fake it (uber)
       addIfService(containerAllocator);
       dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
@@ -405,11 +417,6 @@ public class MRAppMaster extends CompositeService {
       addIfService(containerLauncher);
       dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
 
-      // Add the staging directory cleaner before the history server but after
-      // the container allocator so the staging directory is cleaned after
-      // the history has been flushed but before unregistering with the RM.
-      addService(createStagingDirCleaningService());
-
       // Add the JobHistoryEventHandler last so that it is properly stopped first.
       // This will guarantee that all history-events are flushed before AM goes
       // ahead with shutdown.
@@ -952,6 +959,11 @@ public class MRAppMaster extends CompositeService {
     public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
       return clientToAMTokenSecretManager;
     }
+
+    @Override
+    public boolean isLastAMRetry(){
+      return isLastAMRetry;
+    }
   }
 
   @SuppressWarnings("unchecked")
@@ -1037,11 +1049,11 @@ public class MRAppMaster extends CompositeService {
     // attempt will generate one.  However that disables recovery if there
     // are reducers as the shuffle secret would be app attempt specific.
     int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
-    boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
-        TokenCache.getShuffleSecretKey(jobCredentials) != null);
+    boolean shuffleKeyValidForRecovery =
+        TokenCache.getShuffleSecretKey(jobCredentials) != null;
 
     if (recoveryEnabled && recoverySupportedByCommitter
-          && shuffleKeyValidForRecovery) {
+        && (numReduceTasks <= 0 || shuffleKeyValidForRecovery)) {
       LOG.info("Recovery is enabled. "
           + "Will try to recover from previous life on best effort basis.");
       try {
@@ -1054,7 +1066,8 @@ public class MRAppMaster extends CompositeService {
     } else {
       LOG.info("Will not try to recover. recoveryEnabled: "
             + recoveryEnabled + " recoverySupportedByCommitter: "
-            + recoverySupportedByCommitter + " shuffleKeyValidForRecovery: "
+            + recoverySupportedByCommitter + " numReduceTasks: "
+            + numReduceTasks + " shuffleKeyValidForRecovery: "
             + shuffleKeyValidForRecovery + " ApplicationAttemptID: "
             + appAttemptID.getAttemptId());
       // Get the amInfos anyways whether recovery is enabled or not

+ 13 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java

@@ -36,9 +36,12 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -223,6 +226,7 @@ public abstract class RMCommunicator extends AbstractService
 
   protected void startAllocatorThread() {
     allocatorThread = new Thread(new Runnable() {
+      @SuppressWarnings("unchecked")
       @Override
       public void run() {
         while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
@@ -233,6 +237,15 @@ public abstract class RMCommunicator extends AbstractService
             } catch (YarnRuntimeException e) {
               LOG.error("Error communicating with RM: " + e.getMessage() , e);
               return;
+            } catch (InvalidToken e) {
+              // This can happen if the RM has been restarted, since currently
+              // when RM restarts AMRMToken is not populated back to
+              // AMRMTokenSecretManager yet. Once this is fixed, no need
+              // to send JOB_AM_REBOOT event in this method any more.
+              eventHandler.handle(new JobEvent(job.getID(),
+                JobEventType.JOB_AM_REBOOT));
+              LOG.error("Error in authencating with RM: " ,e);
+              return;
             } catch (Exception e) {
               LOG.error("ERROR IN CONTACTING RM. ", e);
               continue;

+ 114 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java

@@ -25,10 +25,13 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.never;
 
 import java.io.File;
 import java.io.IOException;
 
+import junit.framework.Assert;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -43,6 +46,7 @@ import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -229,6 +233,98 @@ public class TestJobHistoryEventHandler {
     }
   }
 
+  // In case of all types of events, process Done files if it's last AM retry
+  @Test (timeout=50000)
+  public void testProcessDoneFilesOnLastAMRetry() throws Exception {
+    TestParams t = new TestParams(true);
+    Configuration conf = new Configuration();
+
+    JHEvenHandlerForTest realJheh =
+        new JHEvenHandlerForTest(t.mockAppContext, 0);
+    JHEvenHandlerForTest jheh = spy(realJheh);
+    jheh.init(conf);
+
+    EventWriter mockWriter = null;
+    try {
+      jheh.start();
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
+        t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+      verify(jheh, times(0)).processDoneFiles(any(JobId.class));
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+        new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
+          0, 0, JobStateInternal.ERROR.toString())));
+      verify(jheh, times(1)).processDoneFiles(any(JobId.class));
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
+        TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
+        new Counters(), new Counters())));
+      verify(jheh, times(2)).processDoneFiles(any(JobId.class));
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+        new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
+          0, 0, JobStateInternal.FAILED.toString())));
+      verify(jheh, times(3)).processDoneFiles(any(JobId.class));
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+        new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
+          0, 0, JobStateInternal.KILLED.toString())));
+      verify(jheh, times(4)).processDoneFiles(any(JobId.class));
+
+      mockWriter = jheh.getEventWriter();
+      verify(mockWriter, times(5)).write(any(HistoryEvent.class));
+    } finally {
+      jheh.stop();
+      verify(mockWriter).close();
+    }
+  }
+
+  // Skip processing Done files in case of ERROR, if it's not last AM retry
+  @Test (timeout=50000)
+  public void testProcessDoneFilesNotLastAMRetry() throws Exception {
+    TestParams t = new TestParams(false);
+    Configuration conf = new Configuration();
+    JHEvenHandlerForTest realJheh =
+        new JHEvenHandlerForTest(t.mockAppContext, 0);
+    JHEvenHandlerForTest jheh = spy(realJheh);
+    jheh.init(conf);
+
+    EventWriter mockWriter = null;
+    try {
+      jheh.start();
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
+        t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+      verify(jheh, times(0)).processDoneFiles(t.jobId);
+
+      // skip processing done files
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+        new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
+          0, 0, JobStateInternal.ERROR.toString())));
+      verify(jheh, times(0)).processDoneFiles(t.jobId);
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
+          TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
+          new Counters(), new Counters())));
+      verify(jheh, times(1)).processDoneFiles(t.jobId);
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+        new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
+          0, 0, JobStateInternal.FAILED.toString())));
+      verify(jheh, times(2)).processDoneFiles(t.jobId);
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+        new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
+          0, 0, JobStateInternal.KILLED.toString())));
+      verify(jheh, times(3)).processDoneFiles(t.jobId);
+
+      mockWriter = jheh.getEventWriter();
+      verify(mockWriter, times(5)).write(any(HistoryEvent.class));
+    } finally {
+      jheh.stop();
+      verify(mockWriter).close();
+    }
+  }
+
   private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) {
     jheh.handle(event);
   }
@@ -258,20 +354,23 @@ public class TestJobHistoryEventHandler {
     }
   }
 
-  private AppContext mockAppContext(ApplicationId appId) {
+  private AppContext mockAppContext(ApplicationId appId, boolean isLastAMRetry) {
     JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appId));
     AppContext mockContext = mock(AppContext.class);
     Job mockJob = mock(Job.class);
+    when(mockJob.getAllCounters()).thenReturn(new Counters());
     when(mockJob.getTotalMaps()).thenReturn(10);
     when(mockJob.getTotalReduces()).thenReturn(10);
     when(mockJob.getName()).thenReturn("mockjob");
     when(mockContext.getJob(jobId)).thenReturn(mockJob);
     when(mockContext.getApplicationID()).thenReturn(appId);
+    when(mockContext.isLastAMRetry()).thenReturn(isLastAMRetry);
     return mockContext;
   }
 
 
   private class TestParams {
+    boolean isLastAMRetry;
     String workDir = setupTestWorkDir();
     ApplicationId appId = ApplicationId.newInstance(200, 1);
     ApplicationAttemptId appAttemptId =
@@ -279,7 +378,15 @@ public class TestJobHistoryEventHandler {
     ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
     TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005");
     JobId jobId = MRBuilderUtils.newJobId(appId, 1);
-    AppContext mockAppContext = mockAppContext(appId);
+    AppContext mockAppContext;
+
+    public TestParams() {
+      this(false);
+    }
+    public TestParams(boolean isLastAMRetry) {
+      this.isLastAMRetry = isLastAMRetry;
+      mockAppContext = mockAppContext(appId, this.isLastAMRetry);
+    }
   }
 
   private JobHistoryEvent getEventToEnqueue(JobId jobId) {
@@ -344,7 +451,6 @@ public class TestJobHistoryEventHandler {
 class JHEvenHandlerForTest extends JobHistoryEventHandler {
 
   private EventWriter eventWriter;
-
   public JHEvenHandlerForTest(AppContext context, int startCount) {
     super(context, startCount);
   }
@@ -367,6 +473,11 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
   public EventWriter getEventWriter() {
     return this.eventWriter;
   }
+
+  @Override
+  protected void processDoneFiles(JobId jobId){
+    // do nothing
+  }
 }
 
 /**

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java

@@ -130,4 +130,9 @@ public class MockAppContext implements AppContext {
     // Not implemented
     return null;
   }
+
+  @Override
+  public boolean isLastAMRetry() {
+    return false;
+  }
 }

+ 110 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java

@@ -114,7 +114,6 @@ public class TestRecovery {
   private Text val1 = new Text("val1");
   private Text val2 = new Text("val2");
 
-
   /**
    * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
    * completely disappears because of failed launch, one attempt gets killed and
@@ -316,6 +315,116 @@ public class TestRecovery {
     // available in the failed attempt should be available here
   }
 
+  /**
+   * AM with 3 maps and 0 reduce. AM crashes after the first two tasks finishes
+   * and recovers completely and succeeds in the second generation.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testCrashOfMapsOnlyJob() throws Exception {
+    int runCount = 0;
+    MRApp app =
+        new MRAppWithHistory(3, 0, false, this.getClass().getName(), true,
+          ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+
+    // all maps would be running
+    Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+    Task mapTask3 = it.next();
+
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    app.waitForState(mapTask3, TaskState.RUNNING);
+
+    TaskAttempt task1Attempt =
+        mapTask1.getAttempts().values().iterator().next();
+    TaskAttempt task2Attempt =
+        mapTask2.getAttempts().values().iterator().next();
+    TaskAttempt task3Attempt =
+        mapTask3.getAttempts().values().iterator().next();
+
+    // before sending the TA_DONE, event make sure attempt has come to
+    // RUNNING state
+    app.waitForState(task1Attempt, TaskAttemptState.RUNNING);
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+    app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
+
+    // send the done signal to the 1st two maps
+    app
+      .getContext()
+      .getEventHandler()
+      .handle(
+        new TaskAttemptEvent(task1Attempt.getID(), TaskAttemptEventType.TA_DONE));
+    app
+      .getContext()
+      .getEventHandler()
+      .handle(
+        new TaskAttemptEvent(task2Attempt.getID(), TaskAttemptEventType.TA_DONE));
+
+    // wait for first two map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    // stop the app
+    app.stop();
+
+    // rerun
+    // in rerun the 1st two map will be recovered from previous run
+    app =
+        new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+          ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    // Set num-reduces explicitly in conf as recovery logic depends on it.
+    conf.setInt(MRJobConfig.NUM_REDUCES, 0);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+
+    Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    mapTask3 = it.next();
+
+    // first two maps will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    app.waitForState(mapTask3, TaskState.RUNNING);
+
+    task3Attempt = mapTask3.getAttempts().values().iterator().next();
+    // before sending the TA_DONE, event make sure attempt has come to
+    // RUNNING state
+    app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
+
+    // send the done signal to the 3rd map task
+    app
+      .getContext()
+      .getEventHandler()
+      .handle(
+        new TaskAttemptEvent(mapTask3.getAttempts().values().iterator().next()
+          .getID(), TaskAttemptEventType.TA_DONE));
+
+    // wait to get it completed
+    app.waitForState(mapTask3, TaskState.SUCCEEDED);
+
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+  }
+
   @Test
   public void testMultipleCrashes() throws Exception {
 

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java

@@ -862,5 +862,10 @@ public class TestRuntimeEstimators {
     public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
       return null;
     }
+
+    @Override
+    public boolean isLastAMRetry() {
+      return false;
+    }
   }
 }

+ 42 - 10
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java

@@ -36,6 +36,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
@@ -54,6 +56,7 @@ import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -279,14 +282,17 @@ import org.junit.Test;
    }
 
   private final class MRAppTestCleanup extends MRApp {
-    boolean stoppedContainerAllocator;
-    boolean cleanedBeforeContainerAllocatorStopped;
-
+    int stagingDirCleanedup;
+    int ContainerAllocatorStopped;
+    int JobHistoryEventHandlerStopped;
+    int numStops;
     public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
         String testName, boolean cleanOnStart) {
       super(maps, reduces, autoComplete, testName, cleanOnStart);
-      stoppedContainerAllocator = false;
-      cleanedBeforeContainerAllocatorStopped = false;
+      stagingDirCleanedup = 0;
+      ContainerAllocatorStopped = 0;
+      JobHistoryEventHandlerStopped = 0;
+      numStops = 0;
     }
 
     @Override
@@ -312,6 +318,26 @@ import org.junit.Test;
       return newJob;
     }
 
+    @Override
+    protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+        AppContext context) {
+      return new TestJobHistoryEventHandler(context, getStartCount());
+    }
+
+    private class TestJobHistoryEventHandler extends JobHistoryEventHandler {
+
+      public TestJobHistoryEventHandler(AppContext context, int startCount) {
+        super(context, startCount);
+      }
+
+      @Override
+      public void serviceStop() throws Exception {
+        numStops++;
+        JobHistoryEventHandlerStopped = numStops;
+        super.serviceStop();
+      }
+    }
+
     @Override
     protected ContainerAllocator createContainerAllocator(
         ClientService clientService, AppContext context) {
@@ -334,7 +360,8 @@ import org.junit.Test;
 
       @Override
       protected void serviceStop() throws Exception {
-        stoppedContainerAllocator = true;
+        numStops++;
+        ContainerAllocatorStopped = numStops;
         super.serviceStop();
       }
     }
@@ -346,7 +373,8 @@ import org.junit.Test;
 
     @Override
     public void cleanupStagingDir() throws IOException {
-      cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator;
+      numStops++;
+      stagingDirCleanedup = numStops;
     }
 
     @Override
@@ -377,11 +405,15 @@ import org.junit.Test;
     app.verifyCompleted();
 
     int waitTime = 20 * 1000;
-    while (waitTime > 0 && !app.cleanedBeforeContainerAllocatorStopped) {
+    while (waitTime > 0 && app.numStops < 3 ) {
       Thread.sleep(100);
       waitTime -= 100;
     }
-    Assert.assertTrue("Staging directory not cleaned before notifying RM",
-        app.cleanedBeforeContainerAllocatorStopped);
+
+    // assert JobHistoryEventHandlerStopped first, then
+    // ContainerAllocatorStopped, and then stagingDirCleanedup
+    Assert.assertEquals(1, app.JobHistoryEventHandlerStopped);
+    Assert.assertEquals(2, app.ContainerAllocatorStopped);
+    Assert.assertEquals(3, app.stagingDirCleanedup);
   }
  }

+ 10 - 7
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -227,7 +227,7 @@ public class LocalJobRunner implements ClientProtocol {
             info.getSplitIndex(), 1);
           map.setUser(UserGroupInformation.getCurrentUser().
               getShortUserName());
-          setupChildMapredLocalDirs(localJobDir, map, localConf);
+          setupChildMapredLocalDirs(map, localConf);
 
           MapOutputFile mapOutput = new MROutputFiles();
           mapOutput.setConf(localConf);
@@ -305,7 +305,7 @@ public class LocalJobRunner implements ClientProtocol {
               reduceId, taskId, mapIds.size(), 1);
           reduce.setUser(UserGroupInformation.getCurrentUser().
               getShortUserName());
-          setupChildMapredLocalDirs(localJobDir, reduce, localConf);
+          setupChildMapredLocalDirs(reduce, localConf);
           reduce.setLocalMapFiles(mapOutputFiles);
 
           if (!Job.this.isInterrupted()) {
@@ -958,16 +958,18 @@ public class LocalJobRunner implements ClientProtocol {
     throw new UnsupportedOperationException("Not supported");
   }
   
-  static void setupChildMapredLocalDirs(Path localJobDir, Task t, JobConf conf) {
+  static void setupChildMapredLocalDirs(Task t, JobConf conf) {
     String[] localDirs = conf.getTrimmedStrings(MRConfig.LOCAL_DIR);
+    String jobId = t.getJobID().toString();
     String taskId = t.getTaskID().toString();
     boolean isCleanup = t.isTaskCleanupTask();
+    String user = t.getUser();
     StringBuffer childMapredLocalDir =
         new StringBuffer(localDirs[0] + Path.SEPARATOR
-            + getLocalTaskDir(localJobDir, taskId, isCleanup));
+            + getLocalTaskDir(user, jobId, taskId, isCleanup));
     for (int i = 1; i < localDirs.length; i++) {
       childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
-          + getLocalTaskDir(localJobDir, taskId, isCleanup));
+          + getLocalTaskDir(user, jobId, taskId, isCleanup));
     }
     LOG.debug(MRConfig.LOCAL_DIR + " for child : " + childMapredLocalDir);
     conf.set(MRConfig.LOCAL_DIR, childMapredLocalDir.toString());
@@ -976,9 +978,10 @@ public class LocalJobRunner implements ClientProtocol {
   static final String TASK_CLEANUP_SUFFIX = ".cleanup";
   static final String JOBCACHE = "jobcache";
   
-  static String getLocalTaskDir(Path localJobDir, String taskid,
+  static String getLocalTaskDir(String user, String jobid, String taskid,
       boolean isCleanupAttempt) {
-    String taskDir = localJobDir.toString() + Path.SEPARATOR + taskid;
+    String taskDir = jobDir + Path.SEPARATOR + user + Path.SEPARATOR + JOBCACHE
+      + Path.SEPARATOR + jobid + Path.SEPARATOR + taskid;
     if (isCleanupAttempt) {
       taskDir = taskDir + TASK_CLEANUP_SUFFIX;
     }

+ 1 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr

@@ -269,6 +269,7 @@
           "JOB_STATUS_CHANGED",
           "JOB_FAILED",
           "JOB_KILLED",
+          "JOB_ERROR",
           "JOB_INFO_CHANGED",
           "TASK_STARTED",
           "TASK_FINISHED",

+ 2 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java

@@ -104,6 +104,8 @@ public class EventReader implements Closeable {
       result = new JobUnsuccessfulCompletionEvent(); break;
     case JOB_KILLED:
       result = new JobUnsuccessfulCompletionEvent(); break;
+    case JOB_ERROR:
+      result = new JobUnsuccessfulCompletionEvent(); break;
     case JOB_INFO_CHANGED:
       result = new JobInfoChangeEvent(); break;
     case TASK_STARTED:

+ 1 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java

@@ -185,6 +185,7 @@ public class JobHistoryParser implements HistoryEventHandler {
       break;
     case JOB_FAILED:
     case JOB_KILLED:
+    case JOB_ERROR:
       handleJobFailedEvent((JobUnsuccessfulCompletionEvent) event);
       break;
     case JOB_FINISHED:

+ 2 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java

@@ -72,6 +72,8 @@ public class JobUnsuccessfulCompletionEvent implements HistoryEvent {
   public EventType getEventType() {
     if ("FAILED".equals(getStatus())) {
       return EventType.JOB_FAILED;
+    } else if ("ERROR".equals(getStatus())) {
+      return EventType.JOB_ERROR;
     } else
       return EventType.JOB_KILLED;
   }

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java

@@ -381,4 +381,10 @@ public class JobHistory extends AbstractService implements HistoryContext {
     // Not implemented.
     return null;
   }
+
+  @Override
+  public boolean isLastAMRetry() {
+    // bogus - Not Required
+    return false;
+  }
 }

+ 5 - 0
hadoop-yarn-project/CHANGES.txt

@@ -80,6 +80,11 @@ Release 2.1.1-beta - UNRELEASED
     YARN-1006. Fixed broken rendering in the Nodes list web page on the RM web
     UI. (Xuan Gong via vinodkv)
 
+    YARN-881. Priority#compareTo method seems to be wrong. (Jian He via bikas)
+
+    YARN-1082. Create base directories on HDFS after RM login to ensure RM
+    recovery doesn't fail in secure mode. (vinodkv via acmurthy)
+
 Release 2.1.0-beta - 2013-08-22
 
   INCOMPATIBLE CHANGES

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Priority.java

@@ -81,7 +81,7 @@ public abstract class Priority implements Comparable<Priority> {
 
   @Override
   public int compareTo(Priority other) {
-    return this.getPriority() - other.getPriority();
+    return other.getPriority() - this.getPriority();
   }
 
   @Override

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java

@@ -100,7 +100,7 @@ public class RMContextImpl implements RMContext {
           containerTokenSecretManager, nmTokenSecretManager,
           clientToAMTokenSecretManager);
     RMStateStore nullStore = new NullRMStateStore();
-    nullStore.setDispatcher(rmDispatcher);
+    nullStore.setRMDispatcher(rmDispatcher);
     try {
       nullStore.init(new YarnConfiguration());
       setStateStore(nullStore);

+ 11 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -67,18 +67,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
@@ -186,16 +186,17 @@ public class ResourceManager extends CompositeService implements Recoverable {
       recoveryEnabled = false;
       rmStore = new NullRMStateStore();
     }
+
     try {
       rmStore.init(conf);
-      rmStore.setDispatcher(rmDispatcher);
+      rmStore.setRMDispatcher(rmDispatcher);
     } catch (Exception e) {
       // the Exception from stateStore.init() needs to be handled for 
       // HA and we need to give up master status if we got fenced
       LOG.error("Failed to init state store", e);
       ExitUtil.terminate(1, e);
     }
-    
+
     this.rmContext =
         new RMContextImpl(this.rmDispatcher, rmStore,
           this.containerAllocationExpirer, amLivelinessMonitor,
@@ -275,7 +276,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
   
   @VisibleForTesting
   protected void setRMStateStore(RMStateStore rmStore) {
-    rmStore.setDispatcher(rmDispatcher);
+    rmStore.setRMDispatcher(rmDispatcher);
     ((RMContextImpl) rmContext).setStateStore(rmStore);
   }
 
@@ -601,9 +602,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
     this.containerTokenSecretManager.start();
     this.nmTokenSecretManager.start();
 
+    RMStateStore rmStore = rmContext.getStateStore();
+    // The state store needs to start irrespective of recoveryEnabled as apps
+    // need events to move to further states.
+    rmStore.start();
+
     if(recoveryEnabled) {
       try {
-        RMStateStore rmStore = rmContext.getStateStore();
         RMState state = rmStore.loadState();
         recover(state);
       } catch (Exception e) {

+ 9 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java

@@ -70,7 +70,7 @@ public class FileSystemRMStateStore extends RMStateStore {
   private static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
       "RMDTSequenceNumber_";
 
-  private FileSystem fs;
+  protected FileSystem fs;
 
   private Path rootDirPath;
   private Path rmDTSecretManagerRoot;
@@ -80,6 +80,7 @@ public class FileSystemRMStateStore extends RMStateStore {
   @VisibleForTesting
   Path fsWorkingPath;
 
+  @Override
   public synchronized void initInternal(Configuration conf)
       throws Exception{
 
@@ -87,9 +88,14 @@ public class FileSystemRMStateStore extends RMStateStore {
     rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
     rmDTSecretManagerRoot = new Path(rootDirPath, RM_DT_SECRET_MANAGER_ROOT);
     rmAppRoot = new Path(rootDirPath, RM_APP_ROOT);
+  }
 
-    // create filesystem
-    fs = fsWorkingPath.getFileSystem(conf);
+  @Override
+  protected synchronized void startInternal() throws Exception {
+    // create filesystem only now, as part of service-start. By this time, RM is
+    // authenticated with kerberos so we are good to create a file-system
+    // handle.
+    fs = fsWorkingPath.getFileSystem(getConfig());
     fs.mkdirs(rmDTSecretManagerRoot);
     fs.mkdirs(rmAppRoot);
   }

+ 5 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java

@@ -65,7 +65,11 @@ public class MemoryRMStateStore extends RMStateStore {
   @Override
   public synchronized void initInternal(Configuration conf) {
   }
-  
+
+  @Override
+  protected synchronized void startInternal() throws Exception {
+  }
+
   @Override
   protected synchronized void closeInternal() throws Exception {
   }

+ 7 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java

@@ -21,10 +21,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 
 @Unstable
 public class NullRMStateStore extends RMStateStore {
@@ -34,6 +34,11 @@ public class NullRMStateStore extends RMStateStore {
     // Do nothing
   }
 
+  @Override
+  protected void startInternal() throws Exception {
+    // Do nothing
+  }
+
   @Override
   protected void closeInternal() throws Exception {
     // Do nothing

+ 23 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -60,9 +61,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
  * Real store implementations need to derive from it and implement blocking
  * store and load methods to actually store and load the state.
  */
-public abstract class RMStateStore {
+public abstract class RMStateStore extends AbstractService {
+
   public static final Log LOG = LogFactory.getLog(RMStateStore.class);
 
+  public RMStateStore() {
+    super(RMStateStore.class.getName());
+  }
+
   /**
    * State of an application attempt
    */
@@ -174,31 +180,39 @@ public abstract class RMStateStore {
    * Dispatcher used to send state operation completion events to 
    * ResourceManager services
    */
-  public void setDispatcher(Dispatcher dispatcher) {
+  public void setRMDispatcher(Dispatcher dispatcher) {
     this.rmDispatcher = dispatcher;
   }
   
   AsyncDispatcher dispatcher;
   
-  public synchronized void init(Configuration conf) throws Exception{    
+  public synchronized void serviceInit(Configuration conf) throws Exception{    
     // create async handler
     dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.register(RMStateStoreEventType.class, 
                         new ForwardingEventHandler());
-    dispatcher.start();
-    
     initInternal(conf);
   }
+  
+  protected synchronized void serviceStart() throws Exception {
+    dispatcher.start();
+    startInternal();
+  }
 
   /**
    * Derived classes initialize themselves using this method.
-   * The base class is initialized and the event dispatcher is ready to use at
-   * this point
    */
   protected abstract void initInternal(Configuration conf) throws Exception;
-  
-  public synchronized void close() throws Exception {
+
+  /**
+   * Derived classes start themselves using this method.
+   * The base class is started and the event dispatcher is ready to use at
+   * this point
+   */
+  protected abstract void startInternal() throws Exception;
+
+  public synchronized void serviceStop() throws Exception {
     closeInternal();
     dispatcher.stop();
   }

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -385,8 +385,8 @@ public class FairScheduler implements ResourceScheduler {
     // Sort containers into reverse order of priority
     Collections.sort(runningContainers, new Comparator<RMContainer>() {
       public int compare(RMContainer c1, RMContainer c2) {
-        int ret = c2.getContainer().getPriority().compareTo(
-            c1.getContainer().getPriority());
+        int ret = c1.getContainer().getPriority().compareTo(
+            c2.getContainer().getPriority());
         if (ret == 0) {
           return c2.getContainerId().compareTo(c1.getContainerId());
         }

+ 5 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java

@@ -129,7 +129,10 @@ public class TestRMStateStore {
     class TestFileSystemRMStore extends FileSystemRMStateStore {
       TestFileSystemRMStore(Configuration conf) throws Exception {
         init(conf);
+        Assert.assertNull(fs);
         assertTrue(workingDirPathURI.equals(fsWorkingPath));
+        start();
+        Assert.assertNotNull(fs);
       }
     }
 
@@ -218,7 +221,7 @@ public class TestRMStateStore {
     Configuration conf = new YarnConfiguration();
     RMStateStore store = stateStoreHelper.getRMStateStore();
     TestDispatcher dispatcher = new TestDispatcher();
-    store.setDispatcher(dispatcher);
+    store.setRMDispatcher(dispatcher);
 
     AMRMTokenSecretManager appTokenMgr =
         new AMRMTokenSecretManager(conf);
@@ -327,7 +330,7 @@ public class TestRMStateStore {
       RMStateStoreHelper stateStoreHelper) throws Exception {
     RMStateStore store = stateStoreHelper.getRMStateStore();
     TestDispatcher dispatcher = new TestDispatcher();
-    store.setDispatcher(dispatcher);
+    store.setRMDispatcher(dispatcher);
 
     // store RM delegation token;
     RMDelegationTokenIdentifier dtId1 =

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
@@ -351,4 +352,10 @@ public class TestSchedulerUtils {
         RMAppAttemptState.LAUNCHED);
   }
 
+  @Test
+  public void testComparePriorities(){
+    Priority high = Priority.newInstance(1);
+    Priority low = Priority.newInstance(2);
+    assertTrue(high.compareTo(low) > 0);
+  }
 }

+ 6 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -39,12 +39,10 @@ import javax.xml.parsers.ParserConfigurationException;
 
 import junit.framework.Assert;
 
-import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.MockApps;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -57,11 +55,11 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -81,8 +79,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemoved
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
-import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -132,7 +128,12 @@ public class TestFairScheduler {
     conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
     resourceManager = new ResourceManager();
     resourceManager.init(conf);
+
+    // TODO: This test should really be using MockRM. For now starting stuff
+    // that is needed at a bare minimum.
     ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
+    resourceManager.getRMContext().getStateStore().start();
+
     scheduler.reinitialize(conf, resourceManager.getRMContext());
     // to initialize the master key
     resourceManager.getRMContainerTokenSecretManager().rollMasterKey();