Browse Source

Merge branch 'trunk' into HDFS-6581

arp 10 năm trước cách đây
mục cha
commit
22295b4783
29 tập tin đã thay đổi với 730 bổ sung158 xóa
  1. 4 4
      dev-support/test-patch.sh
  2. 5 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  3. 28 7
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
  4. 1 1
      hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java
  5. 1 1
      hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
  6. 3 2
      hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
  7. 121 0
      hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java
  8. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  9. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  10. 9 0
      hadoop-mapreduce-project/CHANGES.txt
  11. 3 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
  12. 8 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
  13. 3 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java
  14. 176 55
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
  15. 9 8
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
  16. 21 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
  17. 5 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/HadoopArchives.md.vm
  18. 16 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java
  19. 95 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
  20. 4 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/client/HSAdmin.java
  21. 3 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/server/TestHSAdminServer.java
  22. 61 25
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java
  23. 8 0
      hadoop-yarn-project/CHANGES.txt
  24. 15 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
  25. 48 28
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
  26. 19 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
  27. 41 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
  28. 11 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
  29. 4 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java

+ 4 - 4
dev-support/test-patch.sh

@@ -454,7 +454,7 @@ checkJavadocWarnings () {
       JIRA_COMMENT="$JIRA_COMMENT
 
     {color:red}-1 javadoc{color}.  The javadoc tool appears to have generated `expr $(($numPatchJavadocWarnings-$numTrunkJavadocWarnings))` warning messages.
-        See $BUILD_URL/artifact/trunk/patchprocess/diffJavadocWarnings.txt for details."
+        See $BUILD_URL/artifact/PreCommit-HADOOP-Build-patchprocess/diffJavadocWarnings.txt for details."
         return 1
     fi
   fi
@@ -498,7 +498,7 @@ checkJavacWarnings () {
       {color:red}-1 javac{color}.  The applied patch generated $patchJavacWarnings javac compiler warnings (more than the trunk's current $trunkJavacWarnings warnings)."
 
     $DIFF $PATCH_DIR/filteredTrunkJavacWarnings.txt $PATCH_DIR/filteredPatchJavacWarnings.txt > $PATCH_DIR/diffJavacWarnings.txt 
-        JIRA_COMMENT_FOOTER="Javac warnings: $BUILD_URL/artifact/trunk/patchprocess/diffJavacWarnings.txt
+        JIRA_COMMENT_FOOTER="Javac warnings: $BUILD_URL/artifact/PreCommit-HADOOP-Build-patchprocess/diffJavacWarnings.txt
 $JIRA_COMMENT_FOOTER"
 
         return 1
@@ -540,7 +540,7 @@ checkReleaseAuditWarnings () {
         {color:red}-1 release audit{color}.  The applied patch generated $patchReleaseAuditWarnings release audit warnings."
         $GREP '\!?????' $PATCH_DIR/patchReleaseAuditWarnings.txt > $PATCH_DIR/patchReleaseAuditProblems.txt
         echo "Lines that start with ????? in the release audit report indicate files that do not have an Apache license header." >> $PATCH_DIR/patchReleaseAuditProblems.txt
-        JIRA_COMMENT_FOOTER="Release audit warnings: $BUILD_URL/artifact/trunk/patchprocess/patchReleaseAuditProblems.txt
+        JIRA_COMMENT_FOOTER="Release audit warnings: $BUILD_URL/artifact/PreCommit-HADOOP-Build-patchprocess/patchReleaseAuditProblems.txt
 $JIRA_COMMENT_FOOTER"
         return 1
       fi
@@ -659,7 +659,7 @@ checkFindbugsWarnings () {
       $PATCH_DIR/newPatchFindbugsWarnings${module_suffix}.xml \
       $PATCH_DIR/newPatchFindbugsWarnings${module_suffix}.html
     if [[ $newFindbugsWarnings > 0 ]] ; then
-      JIRA_COMMENT_FOOTER="Findbugs warnings: $BUILD_URL/artifact/trunk/patchprocess/newPatchFindbugsWarnings${module_suffix}.html
+      JIRA_COMMENT_FOOTER="Findbugs warnings: $BUILD_URL/artifact/PreCommit-HADOOP-Build-patchprocess/newPatchFindbugsWarnings${module_suffix}.html
 $JIRA_COMMENT_FOOTER"
     fi
   done

+ 5 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -533,6 +533,8 @@ Release 2.6.0 - UNRELEASED
     HADOOP-11016. KMS should support signing cookies with zookeeper secret
     manager. (tucu)
 
+    HADOOP-11106. Document considerations of HAR and Encryption. (clamb via wang)
+
   OPTIMIZATIONS
 
     HADOOP-10838. Byte array native checksumming. (James Thomas via todd)
@@ -829,6 +831,9 @@ Release 2.6.0 - UNRELEASED
 
     HADOOP-11099. KMS return HTTP UNAUTHORIZED 401 on ACL failure. (tucu)
 
+    HADOOP-11105. MetricsSystemImpl could leak memory in registered callbacks.
+    (Chuan Liu via cnauroth)
+
 Release 2.5.1 - 2014-09-05
 
   INCOMPATIBLE CHANGES

+ 28 - 7
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java

@@ -83,7 +83,12 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
   private final Map<String, MetricsSource> allSources;
   private final Map<String, MetricsSinkAdapter> sinks;
   private final Map<String, MetricsSink> allSinks;
+
+  // The callback list is used by register(Callback callback), while
+  // the callback map is used by register(String name, String desc, T sink)
   private final List<Callback> callbacks;
+  private final Map<String, Callback> namedCallbacks;
+
   private final MetricsCollectorImpl collector;
   private final MetricsRegistry registry = new MetricsRegistry(MS_NAME);
   @Metric({"Snapshot", "Snapshot stats"}) MutableStat snapshotStat;
@@ -119,6 +124,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
     sourceConfigs = Maps.newHashMap();
     sinkConfigs = Maps.newHashMap();
     callbacks = Lists.newArrayList();
+    namedCallbacks = Maps.newHashMap();
     injectedTags = Lists.newArrayList();
     collector = new MetricsCollectorImpl();
     if (prefix != null) {
@@ -178,11 +184,13 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
       return;
     }
     for (Callback cb : callbacks) cb.preStart();
+    for (Callback cb : namedCallbacks.values()) cb.preStart();
     configure(prefix);
     startTimer();
     monitoring = true;
     LOG.info(prefix +" metrics system started");
     for (Callback cb : callbacks) cb.postStart();
+    for (Callback cb : namedCallbacks.values()) cb.postStart();
   }
 
   @Override
@@ -198,6 +206,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
       return;
     }
     for (Callback cb : callbacks) cb.preStop();
+    for (Callback cb : namedCallbacks.values()) cb.preStop();
     LOG.info("Stopping "+ prefix +" metrics system...");
     stopTimer();
     stopSources();
@@ -206,6 +215,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
     monitoring = false;
     LOG.info(prefix +" metrics system stopped.");
     for (Callback cb : callbacks) cb.postStop();
+    for (Callback cb : namedCallbacks.values()) cb.postStop();
   }
 
   @Override public synchronized <T>
@@ -224,7 +234,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
     }
     // We want to re-register the source to pick up new config when the
     // metrics system restarts.
-    register(new AbstractCallback() {
+    register(name, new AbstractCallback() {
       @Override public void postStart() {
         registerSource(finalName, finalDesc, s);
       }
@@ -241,6 +251,9 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
     if (allSources.containsKey(name)) {
       allSources.remove(name);
     }
+    if (namedCallbacks.containsKey(name)) {
+      namedCallbacks.remove(name);
+    }
   }
 
   synchronized
@@ -268,7 +281,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
     }
     // We want to re-register the sink to pick up new config
     // when the metrics system restarts.
-    register(new AbstractCallback() {
+    register(name, new AbstractCallback() {
       @Override public void postStart() {
         register(name, description, sink);
       }
@@ -289,9 +302,16 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
 
   @Override
   public synchronized void register(final Callback callback) {
-    callbacks.add((Callback) Proxy.newProxyInstance(
-        callback.getClass().getClassLoader(), new Class<?>[] { Callback.class },
-        new InvocationHandler() {
+    callbacks.add((Callback) getProxyForCallback(callback));
+  }
+
+  private synchronized void register(String name, final Callback callback) {
+    namedCallbacks.put(name, (Callback) getProxyForCallback(callback));
+  }
+
+  private Object getProxyForCallback(final Callback callback) {
+    return Proxy.newProxyInstance(callback.getClass().getClassLoader(),
+        new Class<?>[] { Callback.class }, new InvocationHandler() {
           @Override
           public Object invoke(Object proxy, Method method, Object[] args)
               throws Throwable {
@@ -299,11 +319,11 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
               return method.invoke(callback, args);
             } catch (Exception e) {
               // These are not considered fatal.
-              LOG.warn("Caught exception in callback "+ method.getName(), e);
+              LOG.warn("Caught exception in callback " + method.getName(), e);
             }
             return null;
           }
-        }));
+        });
   }
 
   @Override
@@ -572,6 +592,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
     allSources.clear();
     allSinks.clear();
     callbacks.clear();
+    namedCallbacks.clear();
     if (mbeanName != null) {
       MBeans.unregister(mbeanName);
       mbeanName = null;

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java

@@ -241,7 +241,7 @@ class DFSClientCache {
       public FSDataInputStream load(DFSInputStreamCaheKey key) throws Exception {
         DFSClient client = getDfsClient(key.userId);
         DFSInputStream dis = client.open(key.inodePath);
-        return new FSDataInputStream(dis);
+        return client.createWrappedInputStream(dis);
       }
     };
   }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java

@@ -678,7 +678,7 @@ class OpenFileCtx {
     }
     
     try {
-      fis = new FSDataInputStream(dfsClient.open(path));
+      fis = dfsClient.createWrappedInputStream(dfsClient.open(path));
       readCount = fis.read(offset, readbuffer, 0, count);
       if (readCount < count) {
         LOG.error("Can't read back " + count + " bytes, partial read size:"

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java

@@ -922,8 +922,9 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
           EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) : 
           EnumSet.of(CreateFlag.CREATE);
 
-      fos = new HdfsDataOutputStream(dfsClient.create(fileIdPath, permission,
-          flag, false, replication, blockSize, null, bufferSize, null),
+      fos = dfsClient.createWrappedOutputStream(
+          dfsClient.create(fileIdPath, permission, flag, false, replication,
+              blockSize, null, bufferSize, null),
           statistics);
 
       if ((createMode == Nfs3Constant.CREATE_UNCHECKED)

+ 121 - 0
hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java

@@ -17,19 +17,27 @@
  */
 package org.apache.hadoop.hdfs.nfs.nfs3;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.io.EOFException;
+import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.EnumSet;
 
+import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
+import org.apache.hadoop.crypto.key.KeyProviderFactory;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
 import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -98,12 +106,16 @@ public class TestRpcProgramNfs3 {
   static DistributedFileSystem hdfs;
   static MiniDFSCluster cluster = null;
   static NfsConfiguration config = new NfsConfiguration();
+  static HdfsAdmin dfsAdmin;
   static NameNode nn;
   static Nfs3 nfs;
   static RpcProgramNfs3 nfsd;
   static SecurityHandler securityHandler;
   static SecurityHandler securityHandlerUnpriviledged;
   static String testdir = "/tmp";
+  private static final String TEST_KEY = "testKey";
+  private static FileSystemTestHelper fsHelper;
+  private static File testRootDir;
 
   @BeforeClass
   public static void setup() throws Exception {
@@ -114,12 +126,20 @@ public class TestRpcProgramNfs3 {
         .getProxySuperuserGroupConfKey(currentUser), "*");
     config.set(DefaultImpersonationProvider.getTestProvider()
         .getProxySuperuserIpConfKey(currentUser), "*");
+    fsHelper = new FileSystemTestHelper();
+    // Set up java key store
+    String testRoot = fsHelper.getTestRootDir();
+    testRootDir = new File(testRoot).getAbsoluteFile();
+    final Path jksPath = new Path(testRootDir.toString(), "test.jks");
+    config.set(KeyProviderFactory.KEY_PROVIDER_PATH,
+        JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri());
     ProxyUsers.refreshSuperUserGroupsConfiguration(config);
 
     cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
     cluster.waitActive();
     hdfs = cluster.getFileSystem();
     nn = cluster.getNameNode();
+    dfsAdmin = new HdfsAdmin(cluster.getURI(), config);
 
     // Use ephemeral ports in case tests are running in parallel
     config.setInt("nfs3.mountd.port", 0);
@@ -131,6 +151,8 @@ public class TestRpcProgramNfs3 {
     nfs.startServiceInternal(false);
     nfsd = (RpcProgramNfs3) nfs.getRpcProgram();
 
+    hdfs.getClient().setKeyProvider(nn.getNamesystem().getProvider());
+    DFSTestUtil.createKey(TEST_KEY, cluster, config);
 
     // Mock SecurityHandler which returns system user.name
     securityHandler = Mockito.mock(SecurityHandler.class);
@@ -310,6 +332,105 @@ public class TestRpcProgramNfs3 {
         response2.getStatus());
   }
 
+  @Test(timeout = 120000)
+  public void testEncryptedReadWrite() throws Exception {
+    final int len = 8192;
+
+    final Path zone = new Path("/zone");
+    hdfs.mkdirs(zone);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY);
+
+    final byte[] buffer = new byte[len];
+    for (int i = 0; i < len; i++) {
+      buffer[i] = (byte) i;
+    }
+
+    final String encFile1 = "/zone/myfile";
+    createFileUsingNfs(encFile1, buffer);
+    commit(encFile1, len);
+    assertArrayEquals("encFile1 not equal",
+        getFileContentsUsingNfs(encFile1, len),
+        getFileContentsUsingDfs(encFile1, len));
+
+    /*
+     * Same thing except this time create the encrypted file using DFS.
+     */
+    final String encFile2 = "/zone/myfile2";
+    final Path encFile2Path = new Path(encFile2);
+    DFSTestUtil.createFile(hdfs, encFile2Path, len, (short) 1, 0xFEED);
+    assertArrayEquals("encFile2 not equal",
+        getFileContentsUsingNfs(encFile2, len),
+        getFileContentsUsingDfs(encFile2, len));
+  }
+
+  private void createFileUsingNfs(String fileName, byte[] buffer)
+      throws Exception {
+    DFSTestUtil.createFile(hdfs, new Path(fileName), 0, (short) 1, 0);
+
+    final HdfsFileStatus status = nn.getRpcServer().getFileInfo(fileName);
+    final long dirId = status.getFileId();
+    final FileHandle handle = new FileHandle(dirId);
+
+    final WRITE3Request writeReq = new WRITE3Request(handle, 0,
+        buffer.length, WriteStableHow.DATA_SYNC, ByteBuffer.wrap(buffer));
+    final XDR xdr_req = new XDR();
+    writeReq.serialize(xdr_req);
+
+    final WRITE3Response response = nfsd.write(xdr_req.asReadOnlyWrap(),
+        null, 1, securityHandler,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect response: ", null, response);
+  }
+
+  private byte[] getFileContentsUsingNfs(String fileName, int len)
+      throws Exception {
+    final HdfsFileStatus status = nn.getRpcServer().getFileInfo(fileName);
+    final long dirId = status.getFileId();
+    final FileHandle handle = new FileHandle(dirId);
+
+    final READ3Request readReq = new READ3Request(handle, 0, len);
+    final XDR xdr_req = new XDR();
+    readReq.serialize(xdr_req);
+
+    final READ3Response response = nfsd.read(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code: ", Nfs3Status.NFS3_OK,
+        response.getStatus());
+    assertTrue("expected full read", response.isEof());
+    return response.getData().array();
+  }
+
+  private byte[] getFileContentsUsingDfs(String fileName, int len)
+      throws Exception {
+    final FSDataInputStream in = hdfs.open(new Path(fileName));
+    final byte[] ret = new byte[len];
+    in.readFully(ret);
+    try {
+      in.readByte();
+      Assert.fail("expected end of file");
+    } catch (EOFException e) {
+      // expected. Unfortunately there is no associated message to check
+    }
+    in.close();
+    return ret;
+  }
+
+  private void commit(String fileName, int len) throws Exception {
+    final HdfsFileStatus status = nn.getRpcServer().getFileInfo(fileName);
+    final long dirId = status.getFileId();
+    final FileHandle handle = new FileHandle(dirId);
+    final XDR xdr_req = new XDR();
+    final COMMIT3Request req = new COMMIT3Request(handle, 0, len);
+    req.serialize(xdr_req);
+
+    Channel ch = Mockito.mock(Channel.class);
+
+    COMMIT3Response response2 = nfsd.commit(xdr_req.asReadOnlyWrap(),
+        ch, 1, securityHandler,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect COMMIT3Response:", null, response2);
+  }
+
   @Test(timeout = 60000)
   public void testWrite() throws Exception {
     HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -477,6 +477,9 @@ Release 2.6.0 - UNRELEASED
 
     HDFS-7047. Expose FileStatus#isEncrypted in libhdfs (cmccabe)
 
+    HDFS-7003. Add NFS Gateway support for reading and writing to
+    encryption zones. (clamb via wang)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -3089,4 +3089,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   public KeyProviderCryptoExtension getKeyProvider() {
     return provider;
   }
+
+  @VisibleForTesting
+  public void setKeyProvider(KeyProviderCryptoExtension provider) {
+    this.provider = provider;
+  }
 }

+ 9 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -251,6 +251,9 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-5130. Add missing job config options to mapred-default.xml
     (Ray Chiang via Sandy Ryza)
 
+    MAPREDUCE-5891. Improved shuffle error handling across NM restarts
+    (Junping Du via jlowe)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -347,6 +350,12 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-6070. yarn.app.am.resource.mb/cpu-vcores affects uber mode but
     is not documented (Tsuyoshi OZAWA via jlowe)
 
+    MAPREDUCE-6090. mapred hsadmin getGroups fails to connect in some cases
+    (Robert Kanter via jlowe)
+
+    MAPREDUCE-6086. mapreduce.job.credentials.binary should allow all URIs. 
+    (Zhihai Xu via kasha)
+
 Release 2.5.1 - 2014-09-05
 
   INCOMPATIBLE CHANGES

+ 3 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java

@@ -578,7 +578,9 @@ class JobSubmitter {
       conf.get("mapreduce.job.credentials.binary");
     if (binaryTokenFilename != null) {
       Credentials binary = Credentials.readTokenStorageFile(
-          new Path("file:///" + binaryTokenFilename), conf);
+          FileSystem.getLocal(conf).makeQualified(
+              new Path(binaryTokenFilename)),
+          conf);
       credentials.addAll(binary);
     }
     // add secret keys coming from a json file

+ 8 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -298,6 +298,14 @@ public interface MRJobConfig {
   
   public static final String MAX_FETCH_FAILURES_NOTIFICATIONS = "mapreduce.reduce.shuffle.max-fetch-failures-notifications";
   public static final int DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
+  
+  public static final String SHUFFLE_FETCH_RETRY_INTERVAL_MS = "mapreduce.reduce.shuffle.fetch.retry.interval-ms";
+  /** Default interval that fetcher retry to fetch during NM restart.*/
+  public final static int DEFAULT_SHUFFLE_FETCH_RETRY_INTERVAL_MS = 1000;
+  
+  public static final String SHUFFLE_FETCH_RETRY_TIMEOUT_MS = "mapreduce.reduce.shuffle.fetch.retry.timeout-ms";
+  
+  public static final String SHUFFLE_FETCH_RETRY_ENABLED = "mapreduce.reduce.shuffle.fetch.retry.enabled";
 
   public static final String SHUFFLE_NOTIFY_READERROR = "mapreduce.reduce.shuffle.notify.readerror";
   

+ 3 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java

@@ -134,7 +134,9 @@ public class TokenCache {
       Credentials binary;
       try {
         binary = Credentials.readTokenStorageFile(
-            new Path("file:///" +  binaryTokenFilename), conf);
+            FileSystem.getLocal(conf).makeQualified(
+                new Path(binaryTokenFilename)),
+            conf);
       } catch (IOException e) {
         throw new RuntimeException(e);
       }

+ 176 - 55
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java

@@ -27,6 +27,7 @@ import java.net.URL;
 import java.net.URLConnection;
 import java.security.GeneralSecurityException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -46,6 +47,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.CryptoUtils;
 import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -85,10 +88,18 @@ class Fetcher<K,V> extends Thread {
   private final int connectionTimeout;
   private final int readTimeout;
   
+  private final int fetchRetryTimeout;
+  private final int fetchRetryInterval;
+  
+  private final boolean fetchRetryEnabled;
+  
   private final SecretKey shuffleSecretKey;
 
   protected HttpURLConnection connection;
   private volatile boolean stopped = false;
+  
+  // Initiative value is 0, which means it hasn't retried yet.
+  private long retryStartTime = 0;
 
   private static boolean sslShuffle;
   private static SSLFactory sslFactory;
@@ -135,6 +146,19 @@ class Fetcher<K,V> extends Thread {
     this.readTimeout = 
       job.getInt(MRJobConfig.SHUFFLE_READ_TIMEOUT, DEFAULT_READ_TIMEOUT);
     
+    this.fetchRetryInterval = job.getInt(MRJobConfig.SHUFFLE_FETCH_RETRY_INTERVAL_MS,
+        MRJobConfig.DEFAULT_SHUFFLE_FETCH_RETRY_INTERVAL_MS);
+    
+    this.fetchRetryTimeout = job.getInt(MRJobConfig.SHUFFLE_FETCH_RETRY_TIMEOUT_MS, 
+        DEFAULT_STALLED_COPY_TIMEOUT);
+    
+    boolean shuffleFetchEnabledDefault = job.getBoolean(
+        YarnConfiguration.NM_RECOVERY_ENABLED, 
+        YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED);
+    this.fetchRetryEnabled = job.getBoolean(
+        MRJobConfig.SHUFFLE_FETCH_RETRY_ENABLED, 
+        shuffleFetchEnabledDefault);
+    
     setName("fetcher#" + id);
     setDaemon(true);
 
@@ -242,6 +266,8 @@ class Fetcher<K,V> extends Thread {
    */
   @VisibleForTesting
   protected void copyFromHost(MapHost host) throws IOException {
+    // reset retryStartTime for a new host
+    retryStartTime = 0;
     // Get completed maps on 'host'
     List<TaskAttemptID> maps = scheduler.getMapsForHost(host);
     
@@ -261,60 +287,14 @@ class Fetcher<K,V> extends Thread {
     
     // Construct the url and connect
     DataInputStream input = null;
+    URL url = getMapOutputURL(host, maps);
     try {
-      URL url = getMapOutputURL(host, maps);
-      openConnection(url);
-      if (stopped) {
-        abortConnect(host, remaining);
-        return;
-      }
+      setupConnectionsWithRetry(host, remaining, url);
       
-      // generate hash of the url
-      String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
-      String encHash = SecureShuffleUtils.hashFromString(msgToEncode,
-          shuffleSecretKey);
-      
-      // put url hash into http header
-      connection.addRequestProperty(
-          SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
-      // set the read timeout
-      connection.setReadTimeout(readTimeout);
-      // put shuffle version into http header
-      connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
-      connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
-      connect(connection, connectionTimeout);
-      // verify that the thread wasn't stopped during calls to connect
       if (stopped) {
         abortConnect(host, remaining);
         return;
       }
-      input = new DataInputStream(connection.getInputStream());
-
-      // Validate response code
-      int rc = connection.getResponseCode();
-      if (rc != HttpURLConnection.HTTP_OK) {
-        throw new IOException(
-            "Got invalid response code " + rc + " from " + url +
-            ": " + connection.getResponseMessage());
-      }
-      // get the shuffle version
-      if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
-          connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
-          || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
-              connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) {
-        throw new IOException("Incompatible shuffle response version");
-      }
-      // get the replyHash which is HMac of the encHash we sent to the server
-      String replyHash = connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
-      if(replyHash==null) {
-        throw new IOException("security validation of TT Map output failed");
-      }
-      LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash);
-      // verify that replyHash is HMac of encHash
-      SecureShuffleUtils.verifyReply(replyHash, encHash, shuffleSecretKey);
-      LOG.info("for url="+msgToEncode+" sent hash and received reply");
     } catch (IOException ie) {
       boolean connectExcpt = ie instanceof ConnectException;
       ioErrs.increment(1);
@@ -336,6 +316,8 @@ class Fetcher<K,V> extends Thread {
       return;
     }
     
+    input = new DataInputStream(connection.getInputStream());
+    
     try {
       // Loop through available map-outputs and fetch them
       // On any error, faildTasks is not null and we exit
@@ -343,7 +325,23 @@ class Fetcher<K,V> extends Thread {
       // yet_to_be_fetched list and marking the failed tasks.
       TaskAttemptID[] failedTasks = null;
       while (!remaining.isEmpty() && failedTasks == null) {
-        failedTasks = copyMapOutput(host, input, remaining);
+        try {
+          failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled);
+        } catch (IOException e) {
+          //
+          // Setup connection again if disconnected by NM
+          connection.disconnect();
+          // Get map output from remaining tasks only.
+          url = getMapOutputURL(host, remaining);
+          
+          // Connect with retry as expecting host's recovery take sometime.
+          setupConnectionsWithRetry(host, remaining, url);
+          if (stopped) {
+            abortConnect(host, remaining);
+            return;
+          }
+          input = new DataInputStream(connection.getInputStream());
+        }
       }
       
       if(failedTasks != null && failedTasks.length > 0) {
@@ -371,19 +369,111 @@ class Fetcher<K,V> extends Thread {
       }
     }
   }
+
+  private void setupConnectionsWithRetry(MapHost host,
+      Set<TaskAttemptID> remaining, URL url) throws IOException {
+    openConnectionWithRetry(host, remaining, url);
+    if (stopped) {
+      return;
+    }
+      
+    // generate hash of the url
+    String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
+    String encHash = SecureShuffleUtils.hashFromString(msgToEncode,
+        shuffleSecretKey);
+    
+    setupShuffleConnection(encHash);
+    connect(connection, connectionTimeout);
+    // verify that the thread wasn't stopped during calls to connect
+    if (stopped) {
+      return;
+    }
+    
+    verifyConnection(url, msgToEncode, encHash);
+  }
+
+  private void openConnectionWithRetry(MapHost host,
+      Set<TaskAttemptID> remaining, URL url) throws IOException {
+    long startTime = Time.monotonicNow();
+    boolean shouldWait = true;
+    while (shouldWait) {
+      try {
+        openConnection(url);
+        shouldWait = false;
+      } catch (IOException e) {
+        if (!fetchRetryEnabled) {
+           // throw exception directly if fetch's retry is not enabled
+           throw e;
+        }
+        if ((Time.monotonicNow() - startTime) >= this.fetchRetryTimeout) {
+          LOG.warn("Failed to connect to host: " + url + "after " 
+              + fetchRetryTimeout + "milliseconds.");
+          throw e;
+        }
+        try {
+          Thread.sleep(this.fetchRetryInterval);
+        } catch (InterruptedException e1) {
+          if (stopped) {
+            return;
+          }
+        }
+      }
+    }
+  }
+
+  private void verifyConnection(URL url, String msgToEncode, String encHash)
+      throws IOException {
+    // Validate response code
+    int rc = connection.getResponseCode();
+    if (rc != HttpURLConnection.HTTP_OK) {
+      throw new IOException(
+          "Got invalid response code " + rc + " from " + url +
+          ": " + connection.getResponseMessage());
+    }
+    // get the shuffle version
+    if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
+        connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+        || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
+            connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) {
+      throw new IOException("Incompatible shuffle response version");
+    }
+    // get the replyHash which is HMac of the encHash we sent to the server
+    String replyHash = connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
+    if(replyHash==null) {
+      throw new IOException("security validation of TT Map output failed");
+    }
+    LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash);
+    // verify that replyHash is HMac of encHash
+    SecureShuffleUtils.verifyReply(replyHash, encHash, shuffleSecretKey);
+    LOG.info("for url="+msgToEncode+" sent hash and received reply");
+  }
+
+  private void setupShuffleConnection(String encHash) {
+    // put url hash into http header
+    connection.addRequestProperty(
+        SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
+    // set the read timeout
+    connection.setReadTimeout(readTimeout);
+    // put shuffle version into http header
+    connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+  }
   
   private static TaskAttemptID[] EMPTY_ATTEMPT_ID_ARRAY = new TaskAttemptID[0];
   
   private TaskAttemptID[] copyMapOutput(MapHost host,
                                 DataInputStream input,
-                                Set<TaskAttemptID> remaining) {
+                                Set<TaskAttemptID> remaining,
+                                boolean canRetry) throws IOException {
     MapOutput<K,V> mapOutput = null;
     TaskAttemptID mapId = null;
     long decompressedLength = -1;
     long compressedLength = -1;
     
     try {
-      long startTime = System.currentTimeMillis();
+      long startTime = Time.monotonicNow();
       int forReduce = -1;
       //Read the shuffle header
       try {
@@ -449,7 +539,10 @@ class Fetcher<K,V> extends Thread {
       }
       
       // Inform the shuffle scheduler
-      long endTime = System.currentTimeMillis();
+      long endTime = Time.monotonicNow();
+      // Reset retryStartTime as map task make progress if retried before.
+      retryStartTime = 0;
+      
       scheduler.copySucceeded(mapId, host, compressedLength, 
                               endTime - startTime, mapOutput);
       // Note successful shuffle
@@ -457,9 +550,14 @@ class Fetcher<K,V> extends Thread {
       metrics.successFetch();
       return null;
     } catch (IOException ioe) {
+      
+      if (canRetry) {
+        checkTimeoutOrRetry(host, ioe);
+      } 
+      
       ioErrs.increment(1);
       if (mapId == null || mapOutput == null) {
-        LOG.info("fetcher#" + id + " failed to read map header" + 
+        LOG.warn("fetcher#" + id + " failed to read map header" + 
                  mapId + " decomp: " + 
                  decompressedLength + ", " + compressedLength, ioe);
         if(mapId == null) {
@@ -468,7 +566,7 @@ class Fetcher<K,V> extends Thread {
           return new TaskAttemptID[] {mapId};
         }
       }
-      
+        
       LOG.warn("Failed to shuffle output of " + mapId + 
                " from " + host.getHostName(), ioe); 
 
@@ -479,6 +577,29 @@ class Fetcher<K,V> extends Thread {
     }
 
   }
+
+  /** check if hit timeout of retry, if not, throw an exception and start a 
+   *  new round of retry.*/
+  private void checkTimeoutOrRetry(MapHost host, IOException ioe)
+      throws IOException {
+    // First time to retry.
+    long currentTime = Time.monotonicNow();
+    if (retryStartTime == 0) {
+       retryStartTime = currentTime;
+    }
+  
+    // Retry is not timeout, let's do retry with throwing an exception.
+    if (currentTime - retryStartTime < this.fetchRetryTimeout) {
+      LOG.warn("Shuffle output from " + host.getHostName() +
+          " failed, retry it.");
+      throw ioe;
+    } else {
+      // timeout, prepare to be failed.
+      LOG.warn("Timeout for copying MapOutput with retry on host " + host 
+          + "after " + fetchRetryTimeout + "milliseconds.");
+      
+    }
+  }
   
   /**
    * Do some basic verification on the input received -- Being defensive
@@ -525,7 +646,7 @@ class Fetcher<K,V> extends Thread {
    * @return
    * @throws MalformedURLException
    */
-  private URL getMapOutputURL(MapHost host, List<TaskAttemptID> maps
+  private URL getMapOutputURL(MapHost host, Collection<TaskAttemptID> maps
                               )  throws MalformedURLException {
     // Get the base url
     StringBuffer url = new StringBuffer(host.getBaseUrl());

+ 9 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.task.reduce.MapHost.State;
 import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Time;
 
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -121,7 +122,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
     this.shuffledMapsCounter = shuffledMapsCounter;
     this.reduceShuffleBytes = reduceShuffleBytes;
     this.failedShuffleCounter = failedShuffleCounter;
-    this.startTime = System.currentTimeMillis();
+    this.startTime = Time.monotonicNow();
     lastProgressTime = startTime;
     referee.start();
     this.maxFailedUniqueFetches = Math.min(totalMaps, 5);
@@ -198,7 +199,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
       totalBytesShuffledTillNow += bytes;
       updateStatus();
       reduceShuffleBytes.increment(bytes);
-      lastProgressTime = System.currentTimeMillis();
+      lastProgressTime = Time.monotonicNow();
       LOG.debug("map " + mapId + " done " + status.getStateString());
     }
   }
@@ -206,7 +207,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
   private void updateStatus() {
     float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
     int mapsDone = totalMaps - remainingMaps;
-    long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
+    long secsSinceStart = (Time.monotonicNow() - startTime) / 1000 + 1;
 
     float transferRate = mbs / secsSinceStart;
     progress.set((float) mapsDone / totalMaps);
@@ -307,7 +308,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
     // check if the reducer is stalled for a long time
     // duration for which the reducer is stalled
     int stallDuration =
-      (int)(System.currentTimeMillis() - lastProgressTime);
+      (int)(Time.monotonicNow() - lastProgressTime);
 
     // duration for which the reducer ran with progress
     int shuffleProgressDuration =
@@ -389,7 +390,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
 
       LOG.info("Assigning " + host + " with " + host.getNumKnownMapOutputs() +
                " to " + Thread.currentThread().getName());
-      shuffleStart.set(System.currentTimeMillis());
+      shuffleStart.set(Time.monotonicNow());
 
       return host;
   }
@@ -430,7 +431,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
       }
     }
     LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " +
-             (System.currentTimeMillis()-shuffleStart.get()) + "ms");
+             (Time.monotonicNow()-shuffleStart.get()) + "ms");
   }
 
   public synchronized void resetKnownMaps() {
@@ -464,12 +465,12 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
 
     Penalty(MapHost host, long delay) {
       this.host = host;
-      this.endTime = System.currentTimeMillis() + delay;
+      this.endTime = Time.monotonicNow() + delay;
     }
 
     @Override
     public long getDelay(TimeUnit unit) {
-      long remainingTime = endTime - System.currentTimeMillis();
+      long remainingTime = endTime - Time.monotonicNow();
       return unit.convert(remainingTime, TimeUnit.MILLISECONDS);
     }
 

+ 21 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -128,6 +128,27 @@
   </description>
 </property>
 
+<property>
+  <name>mapreduce.reduce.shuffle.fetch.retry.enabled</name>
+  <value>${yarn.nodemanager.recovery.enabled}</value>
+  <description>Set to enable fetch retry during host restart.</description>
+</property>
+
+<property>
+  <name>mapreduce.reduce.shuffle.fetch.retry.interval-ms</name>
+  <value>1000</value>
+  <description>Time of interval that fetcher retry to fetch again when some 
+  non-fatal failure happens because of some events like NM restart.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.reduce.shuffle.fetch.retry.timeout-ms</name>
+  <value>30000</value>
+  <description>Timeout value for fetcher to retry to fetch again when some 
+  non-fatal failure happens because of some events like NM restart.</description>
+</property>
+
 <property>
   <name>mapreduce.reduce.shuffle.retry-delay.max.ms</name>
   <value>60000</value>

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/HadoopArchives.md.vm

@@ -59,6 +59,11 @@ How to Create an Archive
 
   `hadoop archive -archiveName zoo.har -p /foo/bar -r 3 /outputdir`
 
+  If you specify source files that are in an encryption zone, they will be
+  decrypted and written into the archive. If the har file is not located in an
+  encryption zone, then they will be stored in clear (decrypted) form. If the
+  har file is located in an encryption zone they will stored in encrypted form.
+
 How to Look Up Files in Archives
 --------------------------------
 

+ 16 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java

@@ -63,12 +63,25 @@ public class TestTokenCache {
 
   @Test
   @SuppressWarnings("deprecation")
-  public void testBinaryCredentials() throws Exception {
+  public void testBinaryCredentialsWithoutScheme() throws Exception {
+    testBinaryCredentials(false);
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testBinaryCredentialsWithScheme() throws Exception {
+    testBinaryCredentials(true);
+  }
+
+  private void testBinaryCredentials(boolean hasScheme) throws Exception {
     Path TEST_ROOT_DIR =
         new Path(System.getProperty("test.build.data","test/build/data"));
     // ick, but need fq path minus file:/
-    String binaryTokenFile = FileSystem.getLocal(conf).makeQualified(
-        new Path(TEST_ROOT_DIR, "tokenFile")).toUri().getPath();
+    String binaryTokenFile = hasScheme
+        ? FileSystem.getLocal(conf).makeQualified(
+            new Path(TEST_ROOT_DIR, "tokenFile")).toString()
+        : FileSystem.getLocal(conf).makeQualified(
+            new Path(TEST_ROOT_DIR, "tokenFile")).toUri().getPath();
 
     MockFileSystem fs1 = createFileSystemForServiceName("service1");
     MockFileSystem fs2 = createFileSystemForServiceName("service2");

+ 95 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java

@@ -27,6 +27,7 @@ import java.net.HttpURLConnection;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.MapOutputFile;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskID;
 
 import org.junit.After;
@@ -60,6 +61,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.Time;
 import org.junit.Test;
 
 import org.mockito.invocation.InvocationOnMock;
@@ -71,6 +73,7 @@ import org.mockito.stubbing.Answer;
 public class TestFetcher {
   private static final Log LOG = LogFactory.getLog(TestFetcher.class);
   JobConf job = null;
+  JobConf jobWithRetry = null;
   TaskAttemptID id = null;
   ShuffleSchedulerImpl<Text, Text> ss = null;
   MergeManagerImpl<Text, Text> mm = null;
@@ -93,6 +96,9 @@ public class TestFetcher {
   public void setup() {
     LOG.info(">>>> " + name.getMethodName());
     job = new JobConf();
+    job.setBoolean(MRJobConfig.SHUFFLE_FETCH_RETRY_ENABLED, false);
+    jobWithRetry = new JobConf();
+    jobWithRetry.setBoolean(MRJobConfig.SHUFFLE_FETCH_RETRY_ENABLED, true);
     id = TaskAttemptID.forName("attempt_0_1_r_1_1");
     ss = mock(ShuffleSchedulerImpl.class);
     mm = mock(MergeManagerImpl.class);
@@ -228,6 +234,38 @@ public class TestFetcher {
     verify(ss, times(3)).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
     verify(ss, times(3)).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
   }
+  
+  @Test
+  public void testCopyFromHostIncompatibleShuffleVersionWithRetry()
+      throws Exception {
+    String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
+    
+    when(connection.getResponseCode()).thenReturn(200);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+        .thenReturn("mapreduce").thenReturn("other").thenReturn("other");
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
+        .thenReturn("1.0.1").thenReturn("1.0.0").thenReturn("1.0.1");
+    when(connection.getHeaderField(
+        SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)).thenReturn(replyHash);
+    ByteArrayInputStream in = new ByteArrayInputStream(new byte[0]);
+    when(connection.getInputStream()).thenReturn(in);
+
+    for (int i = 0; i < 3; ++i) {
+      Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(jobWithRetry, 
+          id, ss, mm, r, metrics, except, key, connection);
+      underTest.copyFromHost(host);
+    }
+    
+    verify(connection, times(3)).addRequestProperty(
+        SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
+    
+    verify(allErrs, times(3)).increment(1);
+    verify(ss, times(3)).copyFailed(map1ID, host, false, false);
+    verify(ss, times(3)).copyFailed(map2ID, host, false, false);
+    
+    verify(ss, times(3)).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
+    verify(ss, times(3)).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
+  }
 
   @Test
   public void testCopyFromHostWait() throws Exception {
@@ -301,6 +339,48 @@ public class TestFetcher {
           encHash);
     verify(ss, times(1)).copyFailed(map1ID, host, true, false);
   }
+  
+  @SuppressWarnings("unchecked")
+  @Test(timeout=10000) 
+  public void testCopyFromHostWithRetry() throws Exception {
+    InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class);
+    ss = mock(ShuffleSchedulerImpl.class);
+    Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(jobWithRetry, 
+        id, ss, mm, r, metrics, except, key, connection, true);
+
+    String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
+    
+    when(connection.getResponseCode()).thenReturn(200);
+    when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
+        .thenReturn(replyHash);
+    ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
+    ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    header.write(new DataOutputStream(bout));
+    ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
+    when(connection.getInputStream()).thenReturn(in);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+        .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
+        .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+    when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
+        .thenReturn(immo);
+    
+    final long retryTime = Time.monotonicNow();
+    doAnswer(new Answer<Void>() {
+      public Void answer(InvocationOnMock ignore) throws IOException {
+        // Emulate host down for 3 seconds.
+        if ((Time.monotonicNow() - retryTime) <= 3000) {
+          throw new java.lang.InternalError();
+        }
+        return null;
+      }
+    }).when(immo).shuffle(any(MapHost.class), any(InputStream.class), anyLong(), 
+        anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));
+
+    underTest.copyFromHost(host);
+    verify(ss, never()).copyFailed(any(TaskAttemptID.class),any(MapHost.class),
+                                   anyBoolean(), anyBoolean());
+  }
 
   @Test
   public void testCopyFromHostExtraBytes() throws Exception {
@@ -447,6 +527,9 @@ public class TestFetcher {
 
   public static class FakeFetcher<K,V> extends Fetcher<K,V> {
 
+    // If connection need to be reopen.
+    private boolean renewConnection = false;
+    
     public FakeFetcher(JobConf job, TaskAttemptID reduceId,
         ShuffleSchedulerImpl<K,V> scheduler, MergeManagerImpl<K,V> merger,
         Reporter reporter, ShuffleClientMetrics metrics,
@@ -456,6 +539,17 @@ public class TestFetcher {
           exceptionReporter, jobTokenSecret);
       this.connection = connection;
     }
+    
+    public FakeFetcher(JobConf job, TaskAttemptID reduceId,
+        ShuffleSchedulerImpl<K,V> scheduler, MergeManagerImpl<K,V> merger,
+        Reporter reporter, ShuffleClientMetrics metrics,
+        ExceptionReporter exceptionReporter, SecretKey jobTokenSecret,
+        HttpURLConnection connection, boolean renewConnection) {
+      super(job, reduceId, scheduler, merger, reporter, metrics,
+          exceptionReporter, jobTokenSecret);
+      this.connection = connection;
+      this.renewConnection = renewConnection;
+    }
 
     public FakeFetcher(JobConf job, TaskAttemptID reduceId,
         ShuffleSchedulerImpl<K,V> scheduler, MergeManagerImpl<K,V> merger,
@@ -469,7 +563,7 @@ public class TestFetcher {
 
     @Override
     protected void openConnection(URL url) throws IOException {
-      if (null == connection) {
+      if (null == connection || renewConnection) {
         super.openConnection(url);
       }
       // already 'opened' the mocked connection

+ 4 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/client/HSAdmin.java

@@ -25,6 +25,7 @@ import java.util.Arrays;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.v2.hs.HSProxies;
 import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
@@ -41,7 +42,7 @@ public class HSAdmin extends Configured implements Tool {
     super();
   }
 
-  public HSAdmin(Configuration conf) {
+  public HSAdmin(JobConf conf) {
     super(conf);
   }
 
@@ -331,7 +332,8 @@ public class HSAdmin extends Configured implements Tool {
   }
 
   public static void main(String[] args) throws Exception {
-    int result = ToolRunner.run(new HSAdmin(), args);
+    JobConf conf = new JobConf();
+    int result = ToolRunner.run(new HSAdmin(conf), args);
     System.exit(result);
   }
 }

+ 3 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/server/TestHSAdminServer.java

@@ -28,6 +28,7 @@ import java.util.List;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.v2.hs.JobHistory;
 import org.apache.hadoop.mapreduce.v2.hs.client.HSAdmin;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
@@ -48,7 +49,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService;
 public class TestHSAdminServer {
   private HSAdminServer hsAdminServer = null;
   private HSAdmin hsAdminClient = null;
-  Configuration conf = null;
+  JobConf conf = null;
   private static long groupRefreshTimeoutSec = 1;
   JobHistory jobHistoryService = null;
   AggregatedLogDeletionService alds = null;
@@ -81,7 +82,7 @@ public class TestHSAdminServer {
 
   @Before
   public void init() throws HadoopIllegalArgumentException, IOException {
-    conf = new Configuration();
+    conf = new JobConf();
     conf.set(JHAdminConfig.JHS_ADMIN_ADDRESS, "0.0.0.0:0");
     conf.setClass("hadoop.security.group.mapping", MockUnixGroupsMapping.class,
         GroupMappingServiceProvider.class);

+ 61 - 25
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java

@@ -150,30 +150,15 @@ public class TestBinaryTokenFile {
     // Credentials in the job will not have delegation tokens
     // because security is disabled. Fetch delegation tokens
     // and store in binary token file.
-      try {
-        Credentials cred1 = new Credentials();
-        Credentials cred2 = new Credentials();
-        TokenCache.obtainTokensForNamenodesInternal(cred1, new Path[] { p1 },
-            job.getConfiguration());
-        for (Token<? extends TokenIdentifier> t : cred1.getAllTokens()) {
-          cred2.addToken(new Text(DELEGATION_TOKEN_KEY), t);
-        }
-        DataOutputStream os = new DataOutputStream(new FileOutputStream(
-            binaryTokenFileName.toString()));
-        try {
-          cred2.writeTokenStorageToStream(os);
-        } finally {
-          os.close();
-        }
-        job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY,
-            binaryTokenFileName.toString());
-        // NB: the MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY key now gets deleted from config, 
-        // so it's not accessible in the job's config. So, we use another key to pass the file name into the job configuration:  
-        job.getConfiguration().set(KEY_SECURITY_TOKEN_FILE_NAME, 
-            binaryTokenFileName.toString());
-      } catch (IOException e) {
-        Assert.fail("Exception " + e);
-      }
+      createBinaryTokenFile(job.getConfiguration());
+      job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY,
+          binaryTokenFileName.toString());
+      // NB: the MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY
+      // key now gets deleted from config,
+      // so it's not accessible in the job's config. So,
+      // we use another key to pass the file name into the job configuration:
+      job.getConfiguration().set(KEY_SECURITY_TOKEN_FILE_NAME,
+          binaryTokenFileName.toString());
     }
   }
   
@@ -225,7 +210,29 @@ public class TestBinaryTokenFile {
       dfsCluster = null;
     }
   }
-  
+
+  private static void createBinaryTokenFile(Configuration conf) {
+    // Fetch delegation tokens and store in binary token file.
+    try {
+      Credentials cred1 = new Credentials();
+      Credentials cred2 = new Credentials();
+      TokenCache.obtainTokensForNamenodesInternal(cred1, new Path[] { p1 },
+          conf);
+      for (Token<? extends TokenIdentifier> t : cred1.getAllTokens()) {
+        cred2.addToken(new Text(DELEGATION_TOKEN_KEY), t);
+      }
+      DataOutputStream os = new DataOutputStream(new FileOutputStream(
+          binaryTokenFileName.toString()));
+      try {
+        cred2.writeTokenStorageToStream(os);
+      } finally {
+        os.close();
+      }
+    } catch (IOException e) {
+      Assert.fail("Exception " + e);
+    }
+  }
+
   /**
    * run a distributed job and verify that TokenCache is available
    * @throws IOException
@@ -252,4 +259,33 @@ public class TestBinaryTokenFile {
     }
     assertEquals("dist job res is not 0:", 0, res);
   }
+
+  /**
+   * run a distributed job with -tokenCacheFile option parameter and
+   * verify that no exception happens.
+   * @throws IOException
+  */
+  @Test
+  public void testTokenCacheFile() throws IOException {
+    Configuration conf = mrCluster.getConfig();
+    createBinaryTokenFile(conf);
+    // provide namenodes names for the job to get the delegation tokens for
+    final String nnUri = dfsCluster.getURI(0).toString();
+    conf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri);
+
+    // using argument to pass the file name
+    final String[] args = {
+        "-tokenCacheFile", binaryTokenFileName.toString(),
+        "-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
+        };
+    int res = -1;
+    try {
+      res = ToolRunner.run(conf, new SleepJob(), args);
+    } catch (Exception e) {
+      System.out.println("Job failed with " + e.getLocalizedMessage());
+      e.printStackTrace(System.out);
+      fail("Job failed");
+    }
+    assertEquals("dist job res is not 0:", 0, res);
+  }
 }

+ 8 - 0
hadoop-yarn-project/CHANGES.txt

@@ -389,6 +389,14 @@ Release 2.6.0 - UNRELEASED
     YARN-1779. Fixed AMRMClient to handle AMRMTokens correctly across
     ResourceManager work-preserving-restart or failover. (Jian He via vinodkv)
 
+    YARN-2363. Submitted applications occasionally lack a tracking URL (jlowe)
+
+    YARN-2561. MR job client cannot reconnect to AM after NM restart. (Junping
+    Du via jlowe)
+
+    YARN-2563. Fixed YarnClient to call getTimeLineDelegationToken only if the
+    Token is not present. (Zhijie Shen via jianhe)
+
 Release 2.5.1 - 2014-09-05
 
   INCOMPATIBLE CHANGES

+ 15 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java

@@ -110,7 +110,8 @@ public class YarnClientImpl extends YarnClient {
   private AHSClient historyClient;
   private boolean historyServiceEnabled;
   protected TimelineClient timelineClient;
-  protected Text timelineService;
+  @VisibleForTesting
+  Text timelineService;
   protected boolean timelineServiceEnabled;
 
   private static final String ROOT = "root";
@@ -272,12 +273,6 @@ public class YarnClientImpl extends YarnClient {
 
   private void addTimelineDelegationToken(
       ContainerLaunchContext clc) throws YarnException, IOException {
-    org.apache.hadoop.security.token.Token<TimelineDelegationTokenIdentifier> timelineDelegationToken =
-        timelineClient.getDelegationToken(
-            UserGroupInformation.getCurrentUser().getUserName());
-    if (timelineDelegationToken == null) {
-      return;
-    }
     Credentials credentials = new Credentials();
     DataInputByteBuffer dibb = new DataInputByteBuffer();
     ByteBuffer tokens = clc.getTokens();
@@ -290,11 +285,15 @@ public class YarnClientImpl extends YarnClient {
     // one more
     for (org.apache.hadoop.security.token.Token<? extends TokenIdentifier> token : credentials
         .getAllTokens()) {
-      TokenIdentifier tokenIdentifier = token.decodeIdentifier();
-      if (tokenIdentifier instanceof TimelineDelegationTokenIdentifier) {
+      if (token.getKind().equals(TimelineDelegationTokenIdentifier.KIND_NAME)) {
         return;
       }
     }
+    org.apache.hadoop.security.token.Token<TimelineDelegationTokenIdentifier>
+        timelineDelegationToken = getTimelineDelegationToken();
+    if (timelineDelegationToken == null) {
+      return;
+    }
     credentials.addToken(timelineService, timelineDelegationToken);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Add timline delegation token into credentials: "
@@ -306,6 +305,13 @@ public class YarnClientImpl extends YarnClient {
     clc.setTokens(tokens);
   }
 
+  @VisibleForTesting
+  org.apache.hadoop.security.token.Token<TimelineDelegationTokenIdentifier>
+      getTimelineDelegationToken() throws IOException, YarnException {
+    return timelineClient.getDelegationToken(
+            UserGroupInformation.getCurrentUser().getUserName());
+  }
+
   @Private
   @VisibleForTesting
   protected boolean isSecurityEnabled() {

+ 48 - 28
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java

@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.client.api.impl;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -39,6 +41,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -745,10 +748,13 @@ public class TestYarnClient {
     Configuration conf = new YarnConfiguration();
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
     SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
+    TimelineDelegationTokenIdentifier timelineDT =
+        new TimelineDelegationTokenIdentifier();
     final Token<TimelineDelegationTokenIdentifier> dToken =
-        new Token<TimelineDelegationTokenIdentifier>();
+        new Token<TimelineDelegationTokenIdentifier>(
+            timelineDT.getBytes(), new byte[0], timelineDT.getKind(), new Text());
     // crate a mock client
-    YarnClientImpl client = new YarnClientImpl() {
+    YarnClientImpl client = spy(new YarnClientImpl() {
       @Override
       protected void serviceInit(Configuration conf) throws Exception {
         if (getConfig().getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
@@ -784,34 +790,48 @@ public class TestYarnClient {
       public boolean isSecurityEnabled() {
         return true;
       }
-    };
+    });
     client.init(conf);
     client.start();
-    ApplicationSubmissionContext context =
-        mock(ApplicationSubmissionContext.class);
-    ApplicationId applicationId = ApplicationId.newInstance(0, 1);
-    when(context.getApplicationId()).thenReturn(applicationId);
-    DataOutputBuffer dob = new DataOutputBuffer();
-    Credentials credentials = new Credentials();
-    credentials.writeTokenStorageToStream(dob);
-    ByteBuffer tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-    ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
-        null, null, null, null, tokens, null);
-    when(context.getAMContainerSpec()).thenReturn(clc);
-    client.submitApplication(context);
-    // Check whether token is added or not
-    credentials = new Credentials();
-    DataInputByteBuffer dibb = new DataInputByteBuffer();
-    tokens = clc.getTokens();
-    if (tokens != null) {
-      dibb.reset(tokens);
-      credentials.readTokenStorageStream(dibb);
-      tokens.rewind();
+    try {
+      // when i == 0, timeline DT already exists, no need to get one more
+      // when i == 1, timeline DT doesn't exist, need to get one more
+      for (int i = 0; i < 2; ++i) {
+        ApplicationSubmissionContext context =
+            mock(ApplicationSubmissionContext.class);
+        ApplicationId applicationId = ApplicationId.newInstance(0, i + 1);
+        when(context.getApplicationId()).thenReturn(applicationId);
+        DataOutputBuffer dob = new DataOutputBuffer();
+        Credentials credentials = new Credentials();
+        if (i == 0) {
+          credentials.addToken(client.timelineService, dToken);
+        }
+        credentials.writeTokenStorageToStream(dob);
+        ByteBuffer tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+        ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
+            null, null, null, null, tokens, null);
+        when(context.getAMContainerSpec()).thenReturn(clc);
+        client.submitApplication(context);
+        if (i == 0) {
+          // GetTimelineDelegationToken shouldn't be called
+          verify(client, never()).getTimelineDelegationToken();
+        }
+        // In either way, token should be there
+        credentials = new Credentials();
+        DataInputByteBuffer dibb = new DataInputByteBuffer();
+        tokens = clc.getTokens();
+        if (tokens != null) {
+          dibb.reset(tokens);
+          credentials.readTokenStorageStream(dibb);
+          tokens.rewind();
+        }
+        Collection<Token<? extends TokenIdentifier>> dTokens =
+            credentials.getAllTokens();
+        Assert.assertEquals(1, dTokens.size());
+        Assert.assertEquals(dToken, dTokens.iterator().next());
+      }
+    } finally {
+      client.stop();
     }
-    Collection<Token<? extends TokenIdentifier>> dTokens =
-        credentials.getAllTokens();
-    Assert.assertEquals(1, dTokens.size());
-    Assert.assertEquals(dToken, dTokens.iterator().next());
-    client.stop();
   }
 }

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -76,6 +78,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
@@ -84,6 +87,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -542,6 +546,7 @@ public class RMAppImpl implements RMApp, Recoverable {
       float progress = 0.0f;
       org.apache.hadoop.yarn.api.records.Token amrmToken = null;
       if (allowAccess) {
+        trackingUrl = getDefaultProxyTrackingUrl();
         if (this.currentAttempt != null) {
           currentApplicationAttemptId = this.currentAttempt.getAppAttemptId();
           trackingUrl = this.currentAttempt.getTrackingUrl();
@@ -602,6 +607,20 @@ public class RMAppImpl implements RMApp, Recoverable {
     }
   }
 
+  private String getDefaultProxyTrackingUrl() {
+    try {
+      final String scheme = WebAppUtils.getHttpSchemePrefix(conf);
+      String proxy = WebAppUtils.getProxyHostAndPort(conf);
+      URI proxyUri = ProxyUriUtils.getUriFromAMUrl(scheme, proxy);
+      URI result = ProxyUriUtils.getProxyUri(null, proxyUri, applicationId);
+      return result.toASCIIString();
+    } catch (URISyntaxException e) {
+      LOG.warn("Could not generate default proxy tracking URL for "
+          + applicationId);
+      return UNAVAILABLE;
+    }
+  }
+
   @Override
   public long getFinishTime() {
     this.readLock.lock();

+ 41 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -544,12 +544,47 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event;
       RMNode newNode = reconnectEvent.getReconnectedNode();
       rmNode.nodeManagerVersion = newNode.getNodeManagerVersion();
-      rmNode.httpPort = newNode.getHttpPort();
-      rmNode.httpAddress = newNode.getHttpAddress();
-      rmNode.totalCapability = newNode.getTotalCapability();
+      List<ApplicationId> runningApps = reconnectEvent.getRunningApplications();
+      boolean noRunningApps = 
+          (runningApps == null) || (runningApps.size() == 0);
       
-      // Reset heartbeat ID since node just restarted.
-      rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
+      // No application running on the node, so send node-removal event with 
+      // cleaning up old container info.
+      if (noRunningApps) {
+        rmNode.nodeUpdateQueue.clear();
+        rmNode.context.getDispatcher().getEventHandler().handle(
+            new NodeRemovedSchedulerEvent(rmNode));
+        
+        if (rmNode.getHttpPort() == newNode.getHttpPort()) {
+          // Reset heartbeat ID since node just restarted.
+          rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
+          if (rmNode.getState() != NodeState.UNHEALTHY) {
+            // Only add new node if old state is not UNHEALTHY
+            rmNode.context.getDispatcher().getEventHandler().handle(
+                new NodeAddedSchedulerEvent(newNode));
+          }
+        } else {
+          // Reconnected node differs, so replace old node and start new node
+          switch (rmNode.getState()) {
+            case RUNNING:
+              ClusterMetrics.getMetrics().decrNumActiveNodes();
+              break;
+            case UNHEALTHY:
+              ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
+              break;
+            }
+            rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
+            rmNode.context.getDispatcher().getEventHandler().handle(
+                new RMNodeStartedEvent(newNode.getNodeID(), null, null));
+        }
+      } else {
+        rmNode.httpPort = newNode.getHttpPort();
+        rmNode.httpAddress = newNode.getHttpAddress();
+        rmNode.totalCapability = newNode.getTotalCapability();
+      
+        // Reset heartbeat ID since node just restarted.
+        rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
+      }
 
       if (null != reconnectEvent.getRunningApplications()) {
         for (ApplicationId appId : reconnectEvent.getRunningApplications()) {
@@ -564,7 +599,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         // Update scheduler node's capacity for reconnect node.
         rmNode.context.getDispatcher().getEventHandler().handle(
             new NodeResourceUpdateSchedulerEvent(rmNode, 
-                ResourceOption.newInstance(rmNode.totalCapability, -1)));
+                ResourceOption.newInstance(newNode.getTotalCapability(), -1)));
       }
       
     }

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

@@ -26,6 +26,7 @@ import static org.mockito.Mockito.verify;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 
@@ -599,6 +600,16 @@ public class TestResourceTrackerService {
     dispatcher.await();
     Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
     Assert.assertEquals(5120 + 10240, metrics.getAvailableMB());
+    
+    // reconnect of node with changed capability and running applications
+    List<ApplicationId> runningApps = new ArrayList<ApplicationId>();
+    runningApps.add(ApplicationId.newInstance(1, 0));
+    nm1 = rm.registerNode("host2:5678", 15360, 2, runningApps);
+    dispatcher.await();
+    response = nm1.nodeHeartbeat(true);
+    dispatcher.await();
+    Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
+    Assert.assertEquals(5120 + 15360, metrics.getAvailableMB());
   }
 
   private void writeToHostsFile(String... hosts) throws IOException {

+ 4 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java

@@ -32,8 +32,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
 
-import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -75,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretMan
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -961,6 +960,9 @@ public class TestRMAppTransitions {
     Assert.assertEquals(report.getApplicationResourceUsageReport(),RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT);
     report = app.createAndGetApplicationReport("clientuser", true);
     Assert.assertNotNull(report.getApplicationResourceUsageReport());
+    Assert.assertTrue("bad proxy url for app",
+        report.getTrackingUrl().endsWith("/proxy/" + app.getApplicationId()
+            + "/"));
   }
 
   private void verifyApplicationFinished(RMAppState state) {