Browse Source

Merge r1569890 through r1572250 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1572251 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 11 years ago
parent
commit
9cc0d5d497
27 changed files with 795 additions and 195 deletions
  1. 44 38
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 111 16
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java
  3. 7 3
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/VersionInfo.java
  4. 25 0
      hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
  5. 126 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/s3native/TestJets3tNativeFileSystemStore.java
  6. 16 0
      hadoop-common-project/hadoop-common/src/test/resources/jets3t.properties
  7. 96 84
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  8. 0 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
  9. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
  10. 7 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclFeature.java
  11. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java
  12. 9 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
  13. 16 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java
  14. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
  15. 6 0
      hadoop-yarn-project/CHANGES.txt
  16. 37 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
  17. 33 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java
  18. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
  19. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
  20. 15 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
  21. 101 20
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java
  22. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java
  23. 26 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
  24. 2 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
  25. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  26. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
  27. 100 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java

+ 44 - 38
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -8,9 +8,6 @@ Trunk (Unreleased)
     FSDataOutputStream.sync() and Syncable.sync().  (szetszwo)
 
   NEW FEATURES
-
-    HADOOP-10184. Hadoop Common changes required to support HDFS ACLs. (See
-    breakdown of tasks below for features and contributors)
     
   IMPROVEMENTS
 
@@ -300,41 +297,6 @@ Trunk (Unreleased)
 
     HADOOP-10044 Improve the javadoc of rpc code (sanjay Radia)
 
-  BREAKDOWN OF HADOOP-10184 SUBTASKS AND RELATED JIRAS
-
-    HADOOP-10185. FileSystem API for ACLs. (cnauroth)
-
-    HADOOP-10186. Remove AclReadFlag and AclWriteFlag in FileSystem API.
-    (Haohui Mai via cnauroth)
-
-    HADOOP-10187. FsShell CLI: add getfacl and setfacl with minimal support for
-    getting and setting ACLs. (Vinay via cnauroth)
-
-    HADOOP-10192. FileSystem#getAclStatus has incorrect JavaDocs. (cnauroth)
-
-    HADOOP-10220. Add ACL indicator bit to FsPermission. (cnauroth)
-
-    HADOOP-10241. Clean up output of FsShell getfacl. (Chris Nauroth via wheat9)
-
-    HADOOP-10213. Fix bugs parsing ACL spec in FsShell setfacl.
-    (Vinay via cnauroth)
-
-    HADOOP-10277. setfacl -x fails to parse ACL spec if trying to remove the
-    mask entry. (Vinay via cnauroth)
-
-    HADOOP-10270. getfacl does not display effective permissions of masked
-    entries. (cnauroth)
-
-    HADOOP-10344. Fix TestAclCommands after merging HADOOP-10338 patch.
-    (cnauroth)
-
-    HADOOP-10352. Recursive setfacl erroneously attempts to apply default ACL to
-    files. (cnauroth)
-
-    HADOOP-10354. TestWebHDFS fails after merge of HDFS-4685 to trunk. (cnauroth)
-
-    HADOOP-10361. Correct alignment in CLI output for ACLs. (cnauroth)
-
   OPTIMIZATIONS
 
     HADOOP-7761. Improve the performance of raw comparisons. (todd)
@@ -362,6 +324,9 @@ Release 2.4.0 - UNRELEASED
 
   NEW FEATURES
 
+    HADOOP-10184. Hadoop Common changes required to support HDFS ACLs. (See
+    breakdown of tasks below for features and contributors)
+
   IMPROVEMENTS
 
     HADOOP-10139. Update and improve the Single Cluster Setup document.
@@ -378,6 +343,9 @@ Release 2.4.0 - UNRELEASED
     HADOOP-10348. Deprecate hadoop.ssl.configuration in branch-2, and remove
     it in trunk. (Haohui Mai via jing9)
 
+    HADOOP-9454. Support multipart uploads for s3native. (Jordan Mendelson and
+    Akira AJISAKA via atm)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -413,6 +381,44 @@ Release 2.4.0 - UNRELEASED
     HADOOP-10070. RPC client doesn't use per-connection conf to determine
     server's expected Kerberos principal name. (atm)
 
+    HADOOP-10368. InputStream is not closed in VersionInfo ctor.
+    (Tsuyoshi OZAWA via szetszwo)
+
+  BREAKDOWN OF HADOOP-10184 SUBTASKS AND RELATED JIRAS
+
+    HADOOP-10185. FileSystem API for ACLs. (cnauroth)
+
+    HADOOP-10186. Remove AclReadFlag and AclWriteFlag in FileSystem API.
+    (Haohui Mai via cnauroth)
+
+    HADOOP-10187. FsShell CLI: add getfacl and setfacl with minimal support for
+    getting and setting ACLs. (Vinay via cnauroth)
+
+    HADOOP-10192. FileSystem#getAclStatus has incorrect JavaDocs. (cnauroth)
+
+    HADOOP-10220. Add ACL indicator bit to FsPermission. (cnauroth)
+
+    HADOOP-10241. Clean up output of FsShell getfacl. (Chris Nauroth via wheat9)
+
+    HADOOP-10213. Fix bugs parsing ACL spec in FsShell setfacl.
+    (Vinay via cnauroth)
+
+    HADOOP-10277. setfacl -x fails to parse ACL spec if trying to remove the
+    mask entry. (Vinay via cnauroth)
+
+    HADOOP-10270. getfacl does not display effective permissions of masked
+    entries. (cnauroth)
+
+    HADOOP-10344. Fix TestAclCommands after merging HADOOP-10338 patch.
+    (cnauroth)
+
+    HADOOP-10352. Recursive setfacl erroneously attempts to apply default ACL to
+    files. (cnauroth)
+
+    HADOOP-10354. TestWebHDFS fails after merge of HDFS-4685 to trunk. (cnauroth)
+
+    HADOOP-10361. Correct alignment in CLI output for ACLs. (cnauroth)
+
 Release 2.3.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 111 - 16
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java

@@ -28,6 +28,9 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -41,10 +44,13 @@ import org.jets3t.service.S3ServiceException;
 import org.jets3t.service.ServiceException;
 import org.jets3t.service.StorageObjectsChunk;
 import org.jets3t.service.impl.rest.httpclient.RestS3Service;
+import org.jets3t.service.model.MultipartPart;
+import org.jets3t.service.model.MultipartUpload;
 import org.jets3t.service.model.S3Bucket;
 import org.jets3t.service.model.S3Object;
 import org.jets3t.service.model.StorageObject;
 import org.jets3t.service.security.AWSCredentials;
+import org.jets3t.service.utils.MultipartUtils;
 
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -52,6 +58,12 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
   
   private S3Service s3Service;
   private S3Bucket bucket;
+
+  private long multipartBlockSize;
+  private boolean multipartEnabled;
+  private long multipartCopyBlockSize;
+  static final long MAX_PART_SIZE = (long)5 * 1024 * 1024 * 1024;
+  
   public static final Log LOG =
       LogFactory.getLog(Jets3tNativeFileSystemStore.class);
 
@@ -67,13 +79,27 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
     } catch (S3ServiceException e) {
       handleS3ServiceException(e);
     }
+    multipartEnabled =
+        conf.getBoolean("fs.s3n.multipart.uploads.enabled", false);
+    multipartBlockSize = Math.min(
+        conf.getLong("fs.s3n.multipart.uploads.block.size", 64 * 1024 * 1024),
+        MAX_PART_SIZE);
+    multipartCopyBlockSize = Math.min(
+        conf.getLong("fs.s3n.multipart.copy.block.size", MAX_PART_SIZE),
+        MAX_PART_SIZE);
+
     bucket = new S3Bucket(uri.getHost());
   }
   
   @Override
   public void storeFile(String key, File file, byte[] md5Hash)
     throws IOException {
-    
+
+    if (multipartEnabled && file.length() >= multipartBlockSize) {
+      storeLargeFile(key, file, md5Hash);
+      return;
+    }
+
     BufferedInputStream in = null;
     try {
       in = new BufferedInputStream(new FileInputStream(file));
@@ -98,6 +124,31 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
     }
   }
 
+  public void storeLargeFile(String key, File file, byte[] md5Hash)
+      throws IOException {
+    S3Object object = new S3Object(key);
+    object.setDataInputFile(file);
+    object.setContentType("binary/octet-stream");
+    object.setContentLength(file.length());
+    if (md5Hash != null) {
+      object.setMd5Hash(md5Hash);
+    }
+
+    List<StorageObject> objectsToUploadAsMultipart =
+        new ArrayList<StorageObject>();
+    objectsToUploadAsMultipart.add(object);
+    MultipartUtils mpUtils = new MultipartUtils(multipartBlockSize);
+
+    try {
+      mpUtils.uploadObjects(bucket.getName(), s3Service,
+                            objectsToUploadAsMultipart, null);
+    } catch (ServiceException e) {
+      handleServiceException(e);
+    } catch (Exception e) {
+      throw new S3Exception(e);
+    }
+  }
+  
   @Override
   public void storeEmptyFile(String key) throws IOException {
     try {
@@ -152,11 +203,8 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
       }
       S3Object object = s3Service.getObject(bucket.getName(), key);
       return object.getDataInputStream();
-    } catch (S3ServiceException e) {
-      handleS3ServiceException(key, e);
-      return null; //never returned - keep compiler happy
     } catch (ServiceException e) {
-      handleServiceException(e);
+      handleServiceException(key, e);
       return null; //return null if key not found
     }
   }
@@ -180,11 +228,8 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
       S3Object object = s3Service.getObject(bucket, key, null, null, null,
                                             null, byteRangeStart, null);
       return object.getDataInputStream();
-    } catch (S3ServiceException e) {
-      handleS3ServiceException(key, e);
-      return null; //never returned - keep compiler happy
     } catch (ServiceException e) {
-      handleServiceException(e);
+      handleServiceException(key, e);
       return null; //return null if key not found
     }
   }
@@ -244,8 +289,16 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
         LOG.debug("Deleting key:" + key + "from bucket" + bucket.getName());
       }
       s3Service.deleteObject(bucket, key);
-    } catch (S3ServiceException e) {
-      handleS3ServiceException(key, e);
+    } catch (ServiceException e) {
+      handleServiceException(key, e);
+    }
+  }
+
+  public void rename(String srcKey, String dstKey) throws IOException {
+    try {
+      s3Service.renameObject(bucket.getName(), srcKey, new S3Object(dstKey));
+    } catch (ServiceException e) {
+      handleServiceException(e);
     }
   }
   
@@ -255,10 +308,52 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
       if(LOG.isDebugEnabled()) {
         LOG.debug("Copying srcKey: " + srcKey + "to dstKey: " + dstKey + "in bucket: " + bucket.getName());
       }
+      if (multipartEnabled) {
+        S3Object object = s3Service.getObjectDetails(bucket, srcKey, null,
+                                                     null, null, null);
+        if (multipartCopyBlockSize > 0 &&
+            object.getContentLength() > multipartCopyBlockSize) {
+          copyLargeFile(object, dstKey);
+          return;
+        }
+      }
       s3Service.copyObject(bucket.getName(), srcKey, bucket.getName(),
           new S3Object(dstKey), false);
-    } catch (S3ServiceException e) {
-      handleS3ServiceException(srcKey, e);
+    } catch (ServiceException e) {
+      handleServiceException(srcKey, e);
+    }
+  }
+
+  public void copyLargeFile(S3Object srcObject, String dstKey) throws IOException {
+    try {
+      long partCount = srcObject.getContentLength() / multipartCopyBlockSize +
+          (srcObject.getContentLength() % multipartCopyBlockSize > 0 ? 1 : 0);
+
+      MultipartUpload multipartUpload = s3Service.multipartStartUpload
+          (bucket.getName(), dstKey, srcObject.getMetadataMap());
+
+      List<MultipartPart> listedParts = new ArrayList<MultipartPart>();
+      for (int i = 0; i < partCount; i++) {
+        long byteRangeStart = i * multipartCopyBlockSize;
+        long byteLength;
+        if (i < partCount - 1) {
+          byteLength = multipartCopyBlockSize;
+        } else {
+          byteLength = srcObject.getContentLength() % multipartCopyBlockSize;
+          if (byteLength == 0) {
+            byteLength = multipartCopyBlockSize;
+          }
+        }
+
+        MultipartPart copiedPart = s3Service.multipartUploadPartCopy
+            (multipartUpload, i + 1, bucket.getName(), srcObject.getKey(),
+             null, null, null, null, byteRangeStart,
+             byteRangeStart + byteLength - 1, null);
+        listedParts.add(copiedPart);
+      }
+      
+      Collections.reverse(listedParts);
+      s3Service.multipartCompleteUpload(multipartUpload, listedParts);
     } catch (ServiceException e) {
       handleServiceException(e);
     }
@@ -291,11 +386,11 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
     System.out.println(sb);
   }
 
-  private void handleS3ServiceException(String key, S3ServiceException e) throws IOException {
-    if ("NoSuchKey".equals(e.getS3ErrorCode())) {
+  private void handleServiceException(String key, ServiceException e) throws IOException {
+    if ("NoSuchKey".equals(e.getErrorCode())) {
       throw new FileNotFoundException("Key '" + key + "' does not exist in S3");
     } else {
-      handleS3ServiceException(e);
+      handleServiceException(e);
     }
   }
 

+ 7 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/VersionInfo.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Properties;
+import org.apache.hadoop.io.IOUtils;
 
 /**
  * This class returns build information about Hadoop components.
@@ -45,16 +46,19 @@ public class VersionInfo {
   protected VersionInfo(String component) {
     info = new Properties();
     String versionInfoFile = component + "-version-info.properties";
+    InputStream is = null;
     try {
-      InputStream is = Thread.currentThread().getContextClassLoader()
+      is = Thread.currentThread().getContextClassLoader()
         .getResourceAsStream(versionInfoFile);
       if (is == null) {
         throw new IOException("Resource not found");
       }
       info.load(is);
     } catch (IOException ex) {
-      LogFactory.getLog(getClass()).warn("Could not read '" + 
-        versionInfoFile + "', " + ex.toString(), ex);
+      LogFactory.getLog(getClass()).warn("Could not read '" +
+          versionInfoFile + "', " + ex.toString(), ex);
+    } finally {
+      IOUtils.closeStream(is);
     }
   }
 

+ 25 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -532,6 +532,31 @@
   filesystem (s3n: URIs).</description>
 </property>
 
+<property>
+  <name>fs.s3n.multipart.uploads.enabled</name>
+  <value>false</value>
+  <description>Setting this property to true enables multiple uploads to
+  native S3 filesystem. When uploading a file, it is split into blocks
+  if the size is larger than fs.s3n.multipart.uploads.block.size.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3n.multipart.uploads.block.size</name>
+  <value>67108864</value>
+  <description>The block size for multipart uploads to native S3 filesystem.
+  Default size is 64MB.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3n.multipart.copy.block.size</name>
+  <value>5368709120</value>
+  <description>The block size for multipart copy in native S3 filesystem.
+  Default size is 5GB.
+  </description>
+</property>
+
 <property>
   <name>io.seqfile.compress.blocksize</name>
   <value>1000000</value>

+ 126 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/s3native/TestJets3tNativeFileSystemStore.java

@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3native;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import static org.junit.Assert.*;
+import static org.junit.Assume.*;
+
+import org.junit.Before;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.security.DigestInputStream;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+
+public class TestJets3tNativeFileSystemStore {
+  private Configuration conf;
+  private Jets3tNativeFileSystemStore store;
+  private NativeS3FileSystem fs;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new Configuration();
+    store = new Jets3tNativeFileSystemStore();
+    fs = new NativeS3FileSystem(store);
+    conf.setBoolean("fs.s3n.multipart.uploads.enabled", true);
+    conf.setLong("fs.s3n.multipart.uploads.block.size", 64 * 1024 * 1024);
+    fs.initialize(URI.create(conf.get("test.fs.s3n.name")), conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      store.purge("test");
+    } catch (Exception e) {}
+  }
+
+  @BeforeClass
+  public static void checkSettings() throws Exception {
+    Configuration conf = new Configuration();
+    assumeNotNull(conf.get("fs.s3n.awsAccessKeyId"));
+    assumeNotNull(conf.get("fs.s3n.awsSecretAccessKey"));
+    assumeNotNull(conf.get("test.fs.s3n.name"));
+  }
+
+  protected void writeRenameReadCompare(Path path, long len)
+      throws IOException, NoSuchAlgorithmException {
+    // If len > fs.s3n.multipart.uploads.block.size,
+    // we'll use a multipart upload copy
+    MessageDigest digest = MessageDigest.getInstance("MD5");
+    OutputStream out = new BufferedOutputStream(
+        new DigestOutputStream(fs.create(path, false), digest));
+    for (long i = 0; i < len; i++) {
+      out.write('Q');
+    }
+    out.flush();
+    out.close();
+
+    assertTrue("Exists", fs.exists(path));
+
+    // Depending on if this file is over 5 GB or not,
+    // rename will cause a multipart upload copy
+    Path copyPath = path.suffix(".copy");
+    fs.rename(path, copyPath);
+
+    assertTrue("Copy exists", fs.exists(copyPath));
+
+    // Download file from S3 and compare the digest against the original
+    MessageDigest digest2 = MessageDigest.getInstance("MD5");
+    InputStream in = new BufferedInputStream(
+        new DigestInputStream(fs.open(copyPath), digest2));
+    long copyLen = 0;
+    while (in.read() != -1) {copyLen++;}
+    in.close();
+
+    assertEquals("Copy length matches original", len, copyLen);
+    assertArrayEquals("Digests match", digest.digest(), digest2.digest());
+  }
+
+  @Test
+  public void testSmallUpload() throws IOException, NoSuchAlgorithmException {
+    // Regular upload, regular copy
+    writeRenameReadCompare(new Path("/test/small"), 16384);
+  }
+
+  @Test
+  public void testMediumUpload() throws IOException, NoSuchAlgorithmException {
+    // Multipart upload, regular copy
+    writeRenameReadCompare(new Path("/test/medium"), 33554432);    // 100 MB
+  }
+
+  @Test
+  public void testExtraLargeUpload()
+      throws IOException, NoSuchAlgorithmException {
+    // Multipart upload, multipart copy
+    writeRenameReadCompare(new Path("/test/xlarge"), 5368709121L); // 5GB+1byte
+  }
+}

+ 16 - 0
hadoop-common-project/hadoop-common/src/test/resources/jets3t.properties

@@ -0,0 +1,16 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+# Speed up the s3native jets3t test
+
+s3service.max-thread-count=10
+threaded-service.max-thread-count=10

+ 96 - 84
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -13,9 +13,6 @@ Trunk (Unreleased)
 
     HDFS-3125. Add JournalService to enable Journal Daemon. (suresh)
 
-    HDFS-4685. Implementation of ACLs in HDFS. (See breakdown of tasks below for
-    features and contributors)
-
   IMPROVEMENTS
 
     HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common.
@@ -256,86 +253,6 @@ Trunk (Unreleased)
     HDFS-5794. Fix the inconsistency of layout version number of 
     ADD_DATANODE_AND_STORAGE_UUIDS between trunk and branch-2. (jing9)
 
-  BREAKDOWN OF HDFS-4685 SUBTASKS AND RELATED JIRAS
-
-    HDFS-5596. Implement RPC stubs. (Haohui Mai via cnauroth)
-
-    HDFS-5685. Implement ACL as a INode feature. (Haohui Mai via cnauroth)
-
-    HDFS-5618. NameNode: persist ACLs in fsimage. (Haohui Mai via cnauroth)
-
-    HDFS-5619. NameNode: record ACL modifications to edit log.
-    (Haohui Mai via cnauroth)
-
-    HDFS-5673. Implement logic for modification of ACLs. (cnauroth)
-
-    HDFS-5758. NameNode: complete implementation of inode modifications for
-    ACLs. (Chris Nauroth via wheat9)
-
-    HDFS-5612. NameNode: change all permission checks to enforce ACLs in
-    addition to permissions. (Chris Nauroth via wheat9)
-
-    HDFS-5613. NameNode: implement handling of ACLs in combination with
-    symlinks. (Chris Nauroth via wheat9)
-
-    HDFS-5615. NameNode: implement handling of ACLs in combination with sticky
-    bit. (Chris Nauroth via wheat9)
-
-    HDFS-5702. FsShell Cli: Add XML based End-to-End test for getfacl and
-    setfacl commands. (Vinay via cnauroth)
-
-    HDFS-5608. WebHDFS: implement ACL APIs.
-    (Sachin Jose and Renil Joseph via cnauroth)
-
-    HDFS-5614. NameNode: implement handling of ACLs in combination with
-    snapshots. (cnauroth)
-
-    HDFS-5858. Refactor common ACL test cases to be run through multiple
-    FileSystem implementations. (cnauroth)
-
-    HDFS-5860. Refactor INodeDirectory getDirectoryXFeature methods to use
-    common getFeature helper method. (Jing Zhao via cnauroth)
-
-    HDFS-5861. Add CLI test for Ls output for extended ACL marker.
-    (Vinay via cnauroth)
-
-    HDFS-5616. NameNode: implement default ACL handling. (cnauroth)
-
-    HDFS-5899. Add configuration flag to disable/enable support for ACLs.
-    (cnauroth)
-
-    HDFS-5914. Incorporate ACLs with the changes from HDFS-5698.
-    (Haohui Mai via cnauroth)
-
-    HDFS-5625. Write end user documentation for HDFS ACLs. (cnauroth)
-
-    HDFS-5925. ACL configuration flag must only reject ACL API calls, not ACLs
-    present in fsimage or edits. (cnauroth)
-
-    HDFS-5923. Do not persist the ACL bit in the FsPermission.
-    (Haohui Mai via cnauroth)
-
-    HDFS-5933. Optimize the FSImage layout for ACLs (Haohui Mai via cnauroth)
-
-    HDFS-5932. Ls should display the ACL bit (Chris Nauroth via wheat9)
-
-    HDFS-5937. Fix TestOfflineEditsViewer on HDFS-4685 branch. (cnauroth)
-
-    HDFS-5737. Replacing only the default ACL can fail to copy unspecified base
-    entries from the access ACL. (cnauroth)
-
-    HDFS-5739. ACL RPC must allow null name or unspecified permissions in ACL
-    entries. (cnauroth)
-
-    HDFS-5799. Make audit logging consistent across ACL APIs. (cnauroth)
-
-    HDFS-5849. Removing ACL from an inode fails if it has only a default ACL.
-    (cnauroth)
-
-    HDFS-5623. NameNode: add tests for skipping ACL enforcement when permission
-    checks are disabled, user is superuser or user is member of supergroup.
-    (cnauroth)
-
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -359,6 +276,9 @@ Release 2.4.0 - UNRELEASED
 
     HDFS-5776 Support 'hedged' reads in DFSClient (Liang Xie via stack)
 
+    HDFS-4685. Implementation of ACLs in HDFS. (See breakdown of tasks below for
+    features and contributors)
+
   IMPROVEMENTS
 
     HDFS-5781. Use an array to record the mapping between FSEditLogOpCode and 
@@ -439,6 +359,9 @@ Release 2.4.0 - UNRELEASED
     HDFS-6006. Remove duplicate code in FSNameSystem#getFileInfo.
     (Akira Ajisaka via cnauroth)
 
+    HDFS-6018. Exception recorded in LOG when IPCLoggerChannel#close is called.
+    (jing9)
+
   OPTIMIZATIONS
 
     HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery
@@ -563,6 +486,14 @@ Release 2.4.0 - UNRELEASED
 
     HDFS-5988. Bad fsimage always generated after upgrade. (wang)
 
+    HDFS-5922. DN heartbeat thread can get stuck in tight loop. (Arpit Agarwal)
+
+    HDFS-6008. Namenode dead node link is giving HTTP error 500.
+    (Benoy Antony via cnauroth)
+
+    HDFS-5936. MiniDFSCluster does not clean data left behind by
+    SecondaryNameNode. (Binglin Chang via cnauroth)
+
   BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
 
     HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)
@@ -623,7 +554,88 @@ Release 2.4.0 - UNRELEASED
     HDFS-5981. PBImageXmlWriter generates malformed XML.
     (Haohui Mai via cnauroth)
 
-    HDFS-5922. DN heartbeat thread can get stuck in tight loop. (Arpit Agarwal)
+  BREAKDOWN OF HDFS-4685 SUBTASKS AND RELATED JIRAS
+
+    HDFS-5596. Implement RPC stubs. (Haohui Mai via cnauroth)
+
+    HDFS-5685. Implement ACL as a INode feature. (Haohui Mai via cnauroth)
+
+    HDFS-5618. NameNode: persist ACLs in fsimage. (Haohui Mai via cnauroth)
+
+    HDFS-5619. NameNode: record ACL modifications to edit log.
+    (Haohui Mai via cnauroth)
+
+    HDFS-5673. Implement logic for modification of ACLs. (cnauroth)
+
+    HDFS-5758. NameNode: complete implementation of inode modifications for
+    ACLs. (Chris Nauroth via wheat9)
+
+    HDFS-5612. NameNode: change all permission checks to enforce ACLs in
+    addition to permissions. (Chris Nauroth via wheat9)
+
+    HDFS-5613. NameNode: implement handling of ACLs in combination with
+    symlinks. (Chris Nauroth via wheat9)
+
+    HDFS-5615. NameNode: implement handling of ACLs in combination with sticky
+    bit. (Chris Nauroth via wheat9)
+
+    HDFS-5702. FsShell Cli: Add XML based End-to-End test for getfacl and
+    setfacl commands. (Vinay via cnauroth)
+
+    HDFS-5608. WebHDFS: implement ACL APIs.
+    (Sachin Jose and Renil Joseph via cnauroth)
+
+    HDFS-5614. NameNode: implement handling of ACLs in combination with
+    snapshots. (cnauroth)
+
+    HDFS-5858. Refactor common ACL test cases to be run through multiple
+    FileSystem implementations. (cnauroth)
+
+    HDFS-5860. Refactor INodeDirectory getDirectoryXFeature methods to use
+    common getFeature helper method. (Jing Zhao via cnauroth)
+
+    HDFS-5861. Add CLI test for Ls output for extended ACL marker.
+    (Vinay via cnauroth)
+
+    HDFS-5616. NameNode: implement default ACL handling. (cnauroth)
+
+    HDFS-5899. Add configuration flag to disable/enable support for ACLs.
+    (cnauroth)
+
+    HDFS-5914. Incorporate ACLs with the changes from HDFS-5698.
+    (Haohui Mai via cnauroth)
+
+    HDFS-5625. Write end user documentation for HDFS ACLs. (cnauroth)
+
+    HDFS-5925. ACL configuration flag must only reject ACL API calls, not ACLs
+    present in fsimage or edits. (cnauroth)
+
+    HDFS-5923. Do not persist the ACL bit in the FsPermission.
+    (Haohui Mai via cnauroth)
+
+    HDFS-5933. Optimize the FSImage layout for ACLs (Haohui Mai via cnauroth)
+
+    HDFS-5932. Ls should display the ACL bit (Chris Nauroth via wheat9)
+
+    HDFS-5937. Fix TestOfflineEditsViewer on HDFS-4685 branch. (cnauroth)
+
+    HDFS-5737. Replacing only the default ACL can fail to copy unspecified base
+    entries from the access ACL. (cnauroth)
+
+    HDFS-5739. ACL RPC must allow null name or unspecified permissions in ACL
+    entries. (cnauroth)
+
+    HDFS-5799. Make audit logging consistent across ACL APIs. (cnauroth)
+
+    HDFS-5849. Removing ACL from an inode fails if it has only a default ACL.
+    (cnauroth)
+
+    HDFS-5623. NameNode: add tests for skipping ACL enforcement when permission
+    checks are disabled, user is superuser or user is member of supergroup.
+    (cnauroth)
+
+    HDFS-5908. Change AclFeature to capture list of ACL entries in an
+    ImmutableList. (cnauroth)
 
 Release 2.3.1 - UNRELEASED
 

+ 0 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java

@@ -182,7 +182,6 @@ public class IPCLoggerChannel implements AsyncLogger {
   
   @Override
   public void close() {
-    QuorumJournalManager.LOG.info("Closing", new Exception());
     // No more tasks may be submitted after this point.
     executor.shutdown();
     if (proxy != null) {

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java

@@ -147,7 +147,9 @@ public class JspHelper {
    */
   public static final class Url {
     public static String authority(String scheme, DatanodeID d) {
-      String fqdn = canonicalize(d.getIpAddr());
+      String fqdn = (d.getIpAddr() != null && !d.getIpAddr().isEmpty())?
+          canonicalize(d.getIpAddr()): 
+          d.getHostName();
       if (scheme.equals("http")) {
         return fqdn + ":" + d.getInfoPort();
       } else if (scheme.equals("https")) {

+ 7 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclFeature.java

@@ -18,26 +18,26 @@
 
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.AclEntry;
 
+import com.google.common.collect.ImmutableList;
+
 /**
  * Feature that represents the ACLs of the inode.
  */
 @InterfaceAudience.Private
 public class AclFeature implements INode.Feature {
-  public static final List<AclEntry> EMPTY_ENTRY_LIST = Collections.emptyList();
+  public static final ImmutableList<AclEntry> EMPTY_ENTRY_LIST =
+    ImmutableList.of();
 
-  private final List<AclEntry> entries;
+  private final ImmutableList<AclEntry> entries;
 
-  public AclFeature(List<AclEntry> entries) {
+  public AclFeature(ImmutableList<AclEntry> entries) {
     this.entries = entries;
   }
 
-  public List<AclEntry> getEntries() {
+  public ImmutableList<AclEntry> getEntries() {
     return entries;
   }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java

@@ -328,7 +328,7 @@ final class AclStorage {
 
     // Add all default entries to the feature.
     featureEntries.addAll(defaultEntries);
-    return new AclFeature(Collections.unmodifiableList(featureEntries));
+    return new AclFeature(ImmutableList.copyOf(featureEntries));
   }
 
   /**

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -79,6 +79,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
@@ -817,6 +818,14 @@ public class MiniDFSCluster {
               throw new IOException("Could not fully delete " + nameDir);
             }
           }
+          Collection<URI> checkpointDirs = Util.stringCollectionAsURIs(conf
+              .getTrimmedStringCollection(DFS_NAMENODE_CHECKPOINT_DIR_KEY));
+          for (URI checkpointDirUri : checkpointDirs) {
+            File checkpointDir = new File(checkpointDirUri);
+            if (checkpointDir.exists() && !FileUtil.fullyDelete(checkpointDir)) {
+              throw new IOException("Could not fully delete " + checkpointDir);
+            }
+          }
         }
         
         boolean formatThisOne = format;

+ 16 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.common;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -641,5 +642,20 @@ public class TestJspHelper {
     assertTrue(upgradeStatusReport.getStatusText(true).equals(
         MessageFormat.format(EXPECTED__NOTF_PATTERN, version)));
   }  
+  
+  @Test 
+  public void testAuthority(){
+    DatanodeID dnWithIp = new DatanodeID("127.0.0.1", "hostName", null,
+        50020, 50075, 50076, 50010);
+    assertNotNull(JspHelper.Url.authority("http", dnWithIp));
+
+    DatanodeID dnWithNullIp = new DatanodeID(null, "hostName", null,
+        50020, 50075, 50076, 50010);
+    assertNotNull(JspHelper.Url.authority("http", dnWithNullIp));
+
+    DatanodeID dnWithEmptyIp = new DatanodeID("", "hostName", null,
+        50020, 50075, 50076, 50010);
+    assertNotNull(JspHelper.Url.authority("http", dnWithEmptyIp));
+  }
 }
 

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java

@@ -48,6 +48,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 /**
@@ -1272,6 +1273,12 @@ public abstract class FSAclBaseTest {
     AclFeature aclFeature = inode.getAclFeature();
     if (expectAclFeature) {
       assertNotNull(aclFeature);
+      // Intentionally capturing a reference to the entries, not using nested
+      // calls.  This way, we get compile-time enforcement that the entries are
+      // stored in an ImmutableList.
+      ImmutableList<AclEntry> entries = aclFeature.getEntries();
+      assertNotNull(entries);
+      assertFalse(entries.isEmpty());
     } else {
       assertNull(aclFeature);
     }

+ 6 - 0
hadoop-yarn-project/CHANGES.txt

@@ -30,6 +30,8 @@ Release 2.5.0 - UNRELEASED
 
     YARN-1678. Fair scheduler gabs incessantly about reservations (Sandy Ryza)
 
+    YARN-1561. Fix a generic type warning in FairScheduler. (Chen He via junping_du)
+
   OPTIMIZATIONS
 
   BUG FIXES 
@@ -144,6 +146,10 @@ Release 2.4.0 - UNRELEASED
     YARN-1497. Command line additions for moving apps between queues (Sandy
     Ryza)
 
+    YARN-1588. Enhanced RM and the scheduling protocol to also send NMTokens of
+    transferred containers from previous app-attempts to new AMs after YARN-1490.
+    (Jian He via vinodkv)
+
   IMPROVEMENTS
 
     YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via

+ 37 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.util.Records;
 
@@ -55,13 +56,15 @@ public abstract class RegisterApplicationMasterResponse {
   public static RegisterApplicationMasterResponse newInstance(
       Resource minCapability, Resource maxCapability,
       Map<ApplicationAccessType, String> acls, ByteBuffer key,
-      List<Container> containersFromPreviousAttempt, String queue) {
+      List<Container> containersFromPreviousAttempt, String queue,
+      List<NMToken> nmTokensFromPreviousAttempts) {
     RegisterApplicationMasterResponse response =
         Records.newRecord(RegisterApplicationMasterResponse.class);
     response.setMaximumResourceCapability(maxCapability);
     response.setApplicationACLs(acls);
     response.setClientToAMTokenMasterKey(key);
-    response.setContainersFromPreviousAttempt(containersFromPreviousAttempt);
+    response.setContainersFromPreviousAttempts(containersFromPreviousAttempt);
+    response.setNMTokensFromPreviousAttempts(nmTokensFromPreviousAttempts);
     response.setQueue(queue);
     return response;
   }
@@ -129,26 +132,52 @@ public abstract class RegisterApplicationMasterResponse {
   /**
    * <p>
    * Get the list of running containers as viewed by
-   * <code>ResourceManager</code> from previous application attempt.
+   * <code>ResourceManager</code> from previous application attempts.
    * </p>
    * 
    * @return the list of running containers as viewed by
-   *         <code>ResourceManager</code> from previous application attempt
+   *         <code>ResourceManager</code> from previous application attempts
+   * @see RegisterApplicationMasterResponse#getNMTokensFromPreviousAttempts()
    */
   @Public
   @Unstable
-  public abstract List<Container> getContainersFromPreviousAttempt();
+  public abstract List<Container> getContainersFromPreviousAttempts();
 
   /**
    * Set the list of running containers as viewed by
-   * <code>ResourceManager</code> from previous application attempt.
+   * <code>ResourceManager</code> from previous application attempts.
    * 
    * @param containersFromPreviousAttempt
    *          the list of running containers as viewed by
-   *          <code>ResourceManager</code> from previous application attempt.
+   *          <code>ResourceManager</code> from previous application attempts.
    */
   @Private
   @Unstable
-  public abstract void setContainersFromPreviousAttempt(
+  public abstract void setContainersFromPreviousAttempts(
       List<Container> containersFromPreviousAttempt);
+
+  /**
+   * Get the list of NMTokens for communicating with the NMs where the
+   * containers of previous application attempts are running.
+   * 
+   * @return the list of NMTokens for communicating with the NMs where the
+   *         containers of previous application attempts are running.
+   * 
+   * @see RegisterApplicationMasterResponse#getContainersFromPreviousAttempts()
+   */
+  @Public
+  @Stable
+  public abstract List<NMToken> getNMTokensFromPreviousAttempts();
+
+  /**
+   * Set the list of NMTokens for communicating with the NMs where the the
+   * containers of previous application attempts are running.
+   * 
+   * @param nmTokens
+   *          the list of NMTokens for communicating with the NMs where the
+   *          containers of previous application attempts are running.
+   */
+  @Private
+  @Unstable
+  public abstract void setNMTokensFromPreviousAttempts(List<NMToken> nmTokens);
 }

+ 33 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java

@@ -72,4 +72,37 @@ public abstract class NMToken {
   @Stable
   public abstract void setToken(Token token);
 
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result =
+        prime * result + ((getNodeId() == null) ? 0 : getNodeId().hashCode());
+    result =
+        prime * result + ((getToken() == null) ? 0 : getToken().hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    NMToken other = (NMToken) obj;
+    if (getNodeId() == null) {
+      if (other.getNodeId() != null)
+        return false;
+    } else if (!getNodeId().equals(other.getNodeId()))
+      return false;
+    if (getToken() == null) {
+      if (other.getToken() != null)
+        return false;
+    } else if (!getToken().equals(other.getToken()))
+      return false;
+    return true;
+  }
 }

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto

@@ -44,8 +44,9 @@ message RegisterApplicationMasterResponseProto {
   optional ResourceProto maximumCapability = 1;
   optional bytes client_to_am_token_master_key = 2;
   repeated ApplicationACLMapProto application_ACLs = 3;
-  repeated ContainerProto containers_from_previous_attempt = 4;
+  repeated ContainerProto containers_from_previous_attempts = 4;
   optional string queue = 5;
+  repeated NMTokenProto nm_tokens_from_previous_attempts = 6;
 }
 
 message FinishApplicationMasterRequestProto {

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java

@@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -542,7 +543,7 @@ public class ApplicationMaster {
     }
 
     List<Container> previousAMRunningContainers =
-        response.getContainersFromPreviousAttempt();
+        response.getContainersFromPreviousAttempts();
     LOG.info("Received " + previousAMRunningContainers.size()
         + " previous AM's running containers on AM registration.");
     numAllocatedContainers.addAndGet(previousAMRunningContainers.size());

+ 15 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java

@@ -195,6 +195,12 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
           appTrackingUrl);
     RegisterApplicationMasterResponse response =
         rmClient.registerApplicationMaster(request);
+
+    synchronized (this) {
+      if(!response.getNMTokensFromPreviousAttempts().isEmpty()) {
+        populateNMTokens(response.getNMTokensFromPreviousAttempts());
+      }
+    }
     return response;
   }
 
@@ -250,7 +256,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
         lastResponseId = allocateResponse.getResponseId();
         clusterAvailableResources = allocateResponse.getAvailableResources();
         if (!allocateResponse.getNMTokens().isEmpty()) {
-          populateNMTokens(allocateResponse);
+          populateNMTokens(allocateResponse.getNMTokens());
         }
       }
     } finally {
@@ -284,13 +290,17 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
 
   @Private
   @VisibleForTesting
-  protected void populateNMTokens(AllocateResponse allocateResponse) {
-    for (NMToken token : allocateResponse.getNMTokens()) {
+  protected void populateNMTokens(List<NMToken> nmTokens) {
+    for (NMToken token : nmTokens) {
       String nodeId = token.getNodeId().toString();
       if (getNMTokenCache().containsToken(nodeId)) {
-        LOG.debug("Replacing token for : " + nodeId);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Replacing token for : " + nodeId);
+        }
       } else {
-        LOG.debug("Received new token for : " + nodeId);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Received new token for : " + nodeId);
+        }
       }
       getNMTokenCache().setToken(nodeId, token.getToken());
     }

+ 101 - 20
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java

@@ -31,13 +31,16 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder;
 
@@ -56,7 +59,8 @@ public class RegisterApplicationMasterResponsePBImpl extends
 
   private Resource maximumResourceCapability;
   private Map<ApplicationAccessType, String> applicationACLS = null;
-  private List<Container> containersFromPreviousAttempt = null;
+  private List<Container> containersFromPreviousAttempts = null;
+  private List<NMToken> nmTokens = null;
 
   public RegisterApplicationMasterResponsePBImpl() {
     builder = RegisterApplicationMasterResponseProto.newBuilder();
@@ -110,8 +114,13 @@ public class RegisterApplicationMasterResponsePBImpl extends
     if (this.applicationACLS != null) {
       addApplicationACLs();
     }
-    if (this.containersFromPreviousAttempt != null) {
-      addRunningContainersToProto();
+    if (this.containersFromPreviousAttempts != null) {
+      addContainersFromPreviousAttemptToProto();
+    }
+    if (nmTokens != null) {
+      builder.clearNmTokensFromPreviousAttempts();
+      Iterable<NMTokenProto> iterable = getTokenProtoIterable(nmTokens);
+      builder.addAllNmTokensFromPreviousAttempts(iterable);
     }
   }
 
@@ -236,21 +245,22 @@ public class RegisterApplicationMasterResponsePBImpl extends
   }
 
   @Override
-  public List<Container> getContainersFromPreviousAttempt() {
-    if (this.containersFromPreviousAttempt != null) {
-      return this.containersFromPreviousAttempt;
+  public List<Container> getContainersFromPreviousAttempts() {
+    if (this.containersFromPreviousAttempts != null) {
+      return this.containersFromPreviousAttempts;
     }
-    initRunningContainersList();
-    return this.containersFromPreviousAttempt;
+    initContainersPreviousAttemptList();
+    return this.containersFromPreviousAttempts;
   }
 
   @Override
-  public void setContainersFromPreviousAttempt(final List<Container> containers) {
+  public void
+      setContainersFromPreviousAttempts(final List<Container> containers) {
     if (containers == null) {
       return;
     }
-    this.containersFromPreviousAttempt = new ArrayList<Container>();
-    this.containersFromPreviousAttempt.addAll(containers);
+    this.containersFromPreviousAttempts = new ArrayList<Container>();
+    this.containersFromPreviousAttempts.addAll(containers);
   }
   
   @Override
@@ -272,25 +282,88 @@ public class RegisterApplicationMasterResponsePBImpl extends
     }
   }
 
-  private void initRunningContainersList() {
-    RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder;
-    List<ContainerProto> list = p.getContainersFromPreviousAttemptList();
-    containersFromPreviousAttempt = new ArrayList<Container>();
+
+  private void initContainersPreviousAttemptList() {
+    RegisterApplicationMasterResponseProtoOrBuilder p =
+        viaProto ? proto : builder;
+    List<ContainerProto> list = p.getContainersFromPreviousAttemptsList();
+    containersFromPreviousAttempts = new ArrayList<Container>();
     for (ContainerProto c : list) {
-      containersFromPreviousAttempt.add(convertFromProtoFormat(c));
+      containersFromPreviousAttempts.add(convertFromProtoFormat(c));
     }
   }
 
-  private void addRunningContainersToProto() {
+  private void addContainersFromPreviousAttemptToProto() {
     maybeInitBuilder();
-    builder.clearContainersFromPreviousAttempt();
+    builder.clearContainersFromPreviousAttempts();
     List<ContainerProto> list = new ArrayList<ContainerProto>();
-    for (Container c : containersFromPreviousAttempt) {
+    for (Container c : containersFromPreviousAttempts) {
       list.add(convertToProtoFormat(c));
     }
-    builder.addAllContainersFromPreviousAttempt(list);
+    builder.addAllContainersFromPreviousAttempts(list);
+  }
+
+
+  @Override
+  public List<NMToken> getNMTokensFromPreviousAttempts() {
+    if (nmTokens != null) {
+      return nmTokens;
+    }
+    initLocalNewNMTokenList();
+    return nmTokens;
   }
   
+  @Override
+  public void setNMTokensFromPreviousAttempts(final List<NMToken> nmTokens) {
+    if (nmTokens == null || nmTokens.isEmpty()) {
+      if (this.nmTokens != null) {
+        this.nmTokens.clear();
+      }
+      builder.clearNmTokensFromPreviousAttempts();
+      return;
+    }
+    this.nmTokens = new ArrayList<NMToken>();
+    this.nmTokens.addAll(nmTokens);
+  }
+
+  private synchronized void initLocalNewNMTokenList() {
+    RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder;
+    List<NMTokenProto> list = p.getNmTokensFromPreviousAttemptsList();
+    nmTokens = new ArrayList<NMToken>();
+    for (NMTokenProto t : list) {
+      nmTokens.add(convertFromProtoFormat(t));
+    }
+  }
+
+  private synchronized Iterable<NMTokenProto> getTokenProtoIterable(
+      final List<NMToken> nmTokenList) {
+    maybeInitBuilder();
+    return new Iterable<NMTokenProto>() {
+      @Override
+      public synchronized Iterator<NMTokenProto> iterator() {
+        return new Iterator<NMTokenProto>() {
+
+          Iterator<NMToken> iter = nmTokenList.iterator();
+
+          @Override
+          public boolean hasNext() {
+            return iter.hasNext();
+          }
+
+          @Override
+          public NMTokenProto next() {
+            return convertToProtoFormat(iter.next());
+          }
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+          }
+        };
+      }
+    };
+  }
+
   private Resource convertFromProtoFormat(ResourceProto resource) {
     return new ResourcePBImpl(resource);
   }
@@ -306,4 +379,12 @@ public class RegisterApplicationMasterResponsePBImpl extends
   private ContainerProto convertToProtoFormat(Container t) {
     return ((ContainerPBImpl) t).getProto();
   }
+
+  private NMTokenProto convertToProtoFormat(NMToken token) {
+    return ((NMTokenPBImpl) token).getProto();
+  }
+
+  private NMToken convertFromProtoFormat(NMTokenProto proto) {
+    return new NMTokenPBImpl(proto);
+  }
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java

@@ -47,7 +47,7 @@ public class NMTokenPBImpl extends NMToken{
     this.proto = proto;
     viaProto = true;
   }
-  
+
   @Override
   public synchronized NodeId getNodeId() {
     NMTokenProtoOrBuilder p = viaProto ? proto : builder;

+ 26 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.PreemptionContainer;
 import org.apache.hadoop.yarn.api.records.PreemptionContract;
@@ -280,10 +282,32 @@ public class ApplicationMasterService extends AbstractService implements
             .getMasterKey(applicationAttemptId).getEncoded()));        
       }
 
-      List<Container> containerList =
+      // For work-preserving AM restart, retrieve previous attempts' containers
+      // and corresponding NM tokens.
+      List<Container> transferredContainers =
           ((AbstractYarnScheduler) rScheduler)
             .getTransferredContainers(applicationAttemptId);
-      response.setContainersFromPreviousAttempt(containerList);
+      if (!transferredContainers.isEmpty()) {
+        response.setContainersFromPreviousAttempts(transferredContainers);
+        List<NMToken> nmTokens = new ArrayList<NMToken>();
+        for (Container container : transferredContainers) {
+          try {
+            nmTokens.add(rmContext.getNMTokenSecretManager()
+              .createAndGetNMToken(app.getUser(), applicationAttemptId,
+                container));
+          } catch (IllegalArgumentException e) {
+            // if it's a DNS issue, throw UnknowHostException directly and that
+            // will be automatically retried by RMProxy in RPC layer.
+            if (e.getCause() instanceof UnknownHostException) {
+              throw (UnknownHostException) e.getCause();
+            }
+          }
+        }
+        response.setNMTokensFromPreviousAttempts(nmTokens);
+        LOG.info("Application " + appID + " retrieved "
+            + transferredContainers.size() + " containers from previous"
+            + " attempts and " + nmTokens.size() + " NM tokens.");
+      }
       return response;
     }
   }

+ 2 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java

@@ -385,9 +385,8 @@ public class SchedulerApplicationAttempt {
         }
       } catch (IllegalArgumentException e) {
         // DNS might be down, skip returning this container.
-        LOG.error(
-          "Error trying to assign container token to allocated container "
-              + container.getId(), e);
+        LOG.error("Error trying to assign container token and NM token to" +
+            " an allocated container " + container.getId(), e);
         continue;
       }
       returnContainerList.add(container);

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -175,7 +175,7 @@ public class FairScheduler extends AbstractYarnScheduler {
   protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
   protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
   protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
-  private Comparator nodeAvailableResourceComparator =
+  private Comparator<NodeId> nodeAvailableResourceComparator =
           new NodeAvailableResourceComparator(); // Node available resource comparator
   protected double nodeLocalityThreshold; // Cluster threshold for node locality
   protected double rackLocalityThreshold; // Cluster threshold for rack locality

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java

@@ -486,6 +486,7 @@ public class MockRM extends ResourceManager {
 
   public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
       throws Exception {
+    rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
     RMAppAttempt attempt = app.getCurrentAppAttempt();
     nm.nodeHeartbeat(true);
     MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());

+ 100 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java

@@ -24,6 +24,7 @@ import java.util.List;
 
 import junit.framework.Assert;
 
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
@@ -160,11 +162,11 @@ public class TestAMRestart {
         am2.registerAppAttempt();
 
     // Assert two containers are running: container2 and container3;
-    Assert.assertEquals(2, registerResponse.getContainersFromPreviousAttempt()
+    Assert.assertEquals(2, registerResponse.getContainersFromPreviousAttempts()
       .size());
     boolean containerId2Exists = false, containerId3Exists = false;
     for (Container container : registerResponse
-      .getContainersFromPreviousAttempt()) {
+      .getContainersFromPreviousAttempts()) {
       if (container.getId().equals(containerId2)) {
         containerId2Exists = true;
       }
@@ -232,4 +234,100 @@ public class TestAMRestart {
 
     rm1.stop();
   }
+
+  @Test
+  public void testNMTokensRebindOnAMRestart() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
+
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    RMApp app1 =
+        rm1.submitApp(200, "myname", "myuser",
+          new HashMap<ApplicationAccessType, String>(), false, "default", -1,
+          null, "MAPREDUCE", false, true);
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    MockNM nm2 =
+        new MockNM("127.1.1.1:4321", 8000, rm1.getResourceTrackerService());
+    nm2.registerNode();
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    int NUM_CONTAINERS = 1;
+    List<Container> containers = new ArrayList<Container>();
+    // nmTokens keeps track of all the nmTokens issued in the allocate call.
+    List<NMToken> expectedNMTokens = new ArrayList<NMToken>();
+
+    // am1 allocate 1 container on nm1.
+    while (true) {
+      AllocateResponse response =
+          am1.allocate("127.0.0.1", 2000, NUM_CONTAINERS,
+            new ArrayList<ContainerId>());
+      nm1.nodeHeartbeat(true);
+      containers.addAll(response.getAllocatedContainers());
+      expectedNMTokens.addAll(response.getNMTokens());
+      if (containers.size() == NUM_CONTAINERS) {
+        break;
+      }
+      Thread.sleep(200);
+      System.out.println("Waiting for container to be allocated.");
+    }
+    // launch the container
+    nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
+    ContainerId containerId2 =
+        ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
+    rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
+
+    // fail am1
+    nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    am1.waitForState(RMAppAttemptState.FAILED);
+    rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+
+    // restart the am
+    MockAM am2 = MockRM.launchAM(app1, rm1, nm1);
+    RegisterApplicationMasterResponse registerResponse =
+        am2.registerAppAttempt();
+    rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+
+    // check am2 get the nm token from am1.
+    Assert.assertEquals(expectedNMTokens,
+      registerResponse.getNMTokensFromPreviousAttempts());
+
+    // am2 allocate 1 container on nm2
+    containers = new ArrayList<Container>();
+    while (true) {
+      AllocateResponse allocateResponse =
+          am2.allocate("127.1.1.1", 4000, NUM_CONTAINERS,
+            new ArrayList<ContainerId>());
+      nm2.nodeHeartbeat(true);
+      containers.addAll(allocateResponse.getAllocatedContainers());
+      expectedNMTokens.addAll(allocateResponse.getNMTokens());
+      if (containers.size() == NUM_CONTAINERS) {
+        break;
+      }
+      Thread.sleep(200);
+      System.out.println("Waiting for container to be allocated.");
+    }
+    nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 2, ContainerState.RUNNING);
+    ContainerId am2ContainerId2 =
+        ContainerId.newInstance(am2.getApplicationAttemptId(), 2);
+    rm1.waitForState(nm1, am2ContainerId2, RMContainerState.RUNNING);
+
+    // fail am2.
+    nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    am2.waitForState(RMAppAttemptState.FAILED);
+    rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+
+    // restart am
+    MockAM am3 = MockRM.launchAM(app1, rm1, nm1);
+    registerResponse = am3.registerAppAttempt();
+    rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+
+    // check am3 get the NM token from both am1 and am2;
+    List<NMToken> transferredTokens = registerResponse.getNMTokensFromPreviousAttempts();
+    Assert.assertEquals(2, transferredTokens.size());
+    Assert.assertTrue(transferredTokens.containsAll(expectedNMTokens));
+    rm1.stop();
+  }
 }