Ver código fonte

Branch for Apache Hadoop 2.0.0-alpha release.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.0.0-alpha@1335805 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 13 anos atrás
pai
commit
e0e49326f7
51 arquivos alterados com 443 adições e 155 exclusões
  1. 2 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  3. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  4. 0 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java
  5. 104 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRBWBlockInvalidation.java
  6. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
  7. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
  8. 11 0
      hadoop-mapreduce-project/CHANGES.txt
  9. 5 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
  10. 5 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
  11. 7 7
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
  12. 9 8
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
  13. 7 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/MRClientProtocol.java
  14. 5 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java
  15. 5 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java
  16. 2 17
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java
  17. 6 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java
  18. 20 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java
  19. 5 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java
  20. 9 11
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
  21. 7 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java
  22. 11 13
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
  23. 4 6
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
  24. 5 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
  25. 7 6
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
  26. 3 6
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java
  27. 3 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java
  28. 1 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerToken.java
  29. 25 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/ProtoUtils.java
  30. 4 3
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
  31. 1 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
  32. 15 18
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
  33. 10 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppBlock.java
  34. 0 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
  35. 6 6
      hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRaid.java
  36. 5 6
      hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java
  37. 4 3
      hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidTestUtil.java
  38. 1 1
      hadoop-tools/hadoop-archives/src/main/java/org/apache/hadoop/tools/HadoopArchives.java
  39. 6 3
      hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
  40. 2 2
      hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/Logalyzer.java
  41. 31 0
      hadoop-tools/hadoop-rumen/dev-support/findbugs-exclude.xml
  42. 10 0
      hadoop-tools/hadoop-rumen/pom.xml
  43. 3 1
      hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java
  44. 3 1
      hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/JobConfPropertyNames.java
  45. 3 1
      hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java
  46. 2 1
      hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TraceBuilder.java
  47. 1 1
      hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/anonymization/WordListAnonymizerUtility.java
  48. 2 10
      hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/datatypes/NodeName.java
  49. 30 0
      hadoop-tools/hadoop-streaming/dev-support/findbugs-exclude.xml
  50. 10 0
      hadoop-tools/hadoop-streaming/pom.xml
  51. 8 6
      hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamJob.java

+ 2 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -416,6 +416,8 @@ Release 0.23.3 - UNRELEASED
     HADOOP-8327. distcpv2 and distcpv1 jars should not coexist (Dave Thompson
     via bobby)
 
+    HADOOP-8341. Fix or filter findbugs issues in hadoop-tools (bobby)
+
 Release 0.23.2 - UNRELEASED 
 
   NEW FEATURES

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -500,6 +500,9 @@ Release 2.0.0 - UNRELEASED
     HDFS-3376. DFSClient fails to make connection to DN if there are many
     unusable cached sockets (todd)
 
+    HDFS-3157. Error in deleting block is keep on coming from DN even after 
+    the block report and directory scanning has happened. (Ashish Singhi via umamahesh)
+
   BREAKDOWN OF HDFS-1623 SUBTASKS
 
     HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd)

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -1804,7 +1804,8 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
       case COMPLETE:
       case COMMITTED:
         if (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()) {
-          return new BlockToMarkCorrupt(storedBlock,
+          return new BlockToMarkCorrupt(new BlockInfo(iblk, storedBlock
+              .getINode().getReplication()),
               "block is " + ucState + " and reported genstamp " +
               iblk.getGenerationStamp() + " does not match " +
               "genstamp in block map " + storedBlock.getGenerationStamp());
@@ -1824,7 +1825,8 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
       if (!storedBlock.isComplete()) {
         return null; // not corrupt
       } else if (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()) {
-        return new BlockToMarkCorrupt(storedBlock,
+        return new BlockToMarkCorrupt(new BlockInfo(iblk, storedBlock
+            .getINode().getReplication()),
             "reported " + reportedState + " replica with genstamp " +
             iblk.getGenerationStamp() + " does not match COMPLETE block's " +
             "genstamp in block map " + storedBlock.getGenerationStamp());

+ 0 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java


+ 104 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRBWBlockInvalidation.java

@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test when RBW block is removed. Invalidation of the corrupted block happens
+ * and then the under replicated block gets replicated to the datanode.
+ */
+public class TestRBWBlockInvalidation {
+  private static NumberReplicas countReplicas(final FSNamesystem namesystem,
+      ExtendedBlock block) {
+    return namesystem.getBlockManager().countNodes(block.getLocalBlock());
+  }
+
+  /**
+   * Test when a block's replica is removed from RBW folder in one of the
+   * datanode, namenode should ask to invalidate that corrupted block and
+   * schedule replication for one more replica for that under replicated block.
+   */
+  @Test
+  public void testBlockInvalidationWhenRBWReplicaMissedInDN()
+      throws IOException, InterruptedException {
+    Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 300);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
+        .build();
+    FSDataOutputStream out = null;
+    try {
+      final FSNamesystem namesystem = cluster.getNamesystem();
+      FileSystem fs = cluster.getFileSystem();
+      Path testPath = new Path(MiniDFSCluster.getBaseDirectory(), "foo1");
+      out = fs.create(testPath, (short) 3);
+      out.writeBytes("HDFS-3157: " + testPath);
+      out.hsync();
+      String bpid = namesystem.getBlockPoolId();
+      ExtendedBlock blk = DFSTestUtil.getFirstBlock(fs, testPath);
+      Block block = blk.getLocalBlock();
+      // Deleting partial block and its meta information from the RBW folder
+      // of first datanode.
+      DataNode dn = cluster.getDataNodes().get(0);
+      File blockFile = DataNodeTestUtils.getBlockFile(dn, bpid, block);
+      File metaFile = DataNodeTestUtils.getMetaFile(dn, bpid, block);
+      assertTrue("Could not delete the block file from the RBW folder",
+          blockFile.delete());
+      assertTrue("Could not delete the block meta file from the RBW folder",
+          metaFile.delete());
+      out.close();
+      assertEquals("The corrupt replica could not be invalidated", 0,
+          countReplicas(namesystem, blk).corruptReplicas());
+      /*
+       * Sleep for 3 seconds, for under replicated block to get replicated. As
+       * one second will be taken by ReplicationMonitor and one more second for
+       * invalidated block to get deleted from the datanode.
+       */
+      Thread.sleep(3000);
+      blk = DFSTestUtil.getFirstBlock(fs, testPath);
+      assertEquals("There should be three live replicas", 3,
+          countReplicas(namesystem, blk).liveReplicas());
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+      cluster.shutdown();
+    }
+  }
+}

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java

@@ -136,6 +136,11 @@ public class DataNodeTestUtils {
       ) throws IOException {
     return FsDatasetTestUtil.getBlockFile(dn.getFSDataset(), bpid, b);
   }
+  
+  public static File getMetaFile(DataNode dn, String bpid, Block b)
+      throws IOException {
+    return FsDatasetTestUtil.getMetaFile(dn.getFSDataset(), bpid, b);
+  }
 
   public static boolean unlinkBlock(DataNode dn, ExtendedBlock bk, int numLinks
       ) throws IOException {

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java

@@ -36,6 +36,12 @@ public class FsDatasetTestUtil {
       ) throws IOException {
     return ((FsDatasetImpl)fsd).getBlockFile(bpid, b);
   }
+  
+  public static File getMetaFile(FsDatasetSpi<?> fsd, String bpid, Block b)
+      throws IOException {
+    return FsDatasetUtil.getMetaFile(getBlockFile(fsd, bpid, b), b
+        .getGenerationStamp());
+  }
 
   public static boolean unlinkBlock(FsDatasetSpi<?> fsd,
       ExtendedBlock block, int numLinks) throws IOException {

+ 11 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -175,6 +175,9 @@ Release 2.0.0 - UNRELEASED
     MAPREDUCE-3958. RM: Remove RMNodeState and replace it with NodeState
     (Bikas Saha via bobby)
 
+    MAPREDUCE-4231. Update RAID to use the new BlockCollection interface.
+    (szetszwo)
+
 Release 0.23.3 - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -212,8 +215,13 @@ Release 0.23.3 - UNRELEASED
 
     MAPREDUCE-4210. Expose listener address for WebApp (Daryn Sharp via bobby)
 
+    MAPREDUCE-4162. Correctly set token service (Daryn Sharp via bobby)
+
   OPTIMIZATIONS
 
+    MAPREDUCE-3850. Avoid redundant calls for tokens in TokenCache (Daryn
+    Sharp via bobby)
+
   BUG FIXES
 
     MAPREDUCE-4092.  commitJob Exception does not fail job (Jon Eagles via
@@ -355,6 +363,9 @@ Release 0.23.3 - UNRELEASED
     MAPREDUCE-4226. ConcurrentModificationException in FileSystemCounterGroup.
     (tomwhite)
 
+    MAPREDUCE-4215. RM app page shows 500 error on appid parse error 
+    (Jonathon Eagles via tgraves)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 5 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java

@@ -50,7 +50,9 @@ import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -77,7 +79,8 @@ class YarnChild {
 
     String host = args[0];
     int port = Integer.parseInt(args[1]);
-    final InetSocketAddress address = new InetSocketAddress(host, port);
+    final InetSocketAddress address =
+        NetUtils.createSocketAddrForHost(host, port);
     final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
     int jvmIdInt = Integer.parseInt(args[3]);
     JVMId jvmId = new JVMId(firstTaskid.getJobID(),
@@ -214,8 +217,7 @@ class YarnChild {
     LOG.debug("loading token. # keys =" +credentials.numberOfSecretKeys() +
         "; from file=" + jobTokenFile);
     Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials);
-    jt.setService(new Text(address.getAddress().getHostAddress() + ":"
-        + address.getPort()));
+    SecurityUtil.setTokenService(jt, address);
     UserGroupInformation current = UserGroupInformation.getCurrentUser();
     current.addToken(jt);
     for (Token<? extends TokenIdentifier> tok : credentials.getAllTokens()) {

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java

@@ -180,6 +180,11 @@ public class MRClientService extends AbstractService
     private RecordFactory recordFactory = 
       RecordFactoryProvider.getRecordFactory(null);
 
+    @Override
+    public InetSocketAddress getConnectAddress() {
+      return getBindAddress();
+    }
+    
     private Job verifyAndGetJob(JobId jobID, 
         boolean modifyAccess) throws YarnRemoteException {
       Job job = appContext.getJob(jobID);

+ 7 - 7
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapreduce.v2.app.launcher;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedAction;
 import java.util.HashSet;
@@ -34,7 +35,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.ShuffleHandler;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ContainerToken;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.ProtoUtils;
 import org.apache.hadoop.yarn.util.Records;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -321,13 +322,13 @@ public class ContainerLauncherImpl extends AbstractService implements
       final String containerManagerBindAddr, ContainerToken containerToken)
       throws IOException {
 
+    final InetSocketAddress cmAddr =
+        NetUtils.createSocketAddr(containerManagerBindAddr);
     UserGroupInformation user = UserGroupInformation.getCurrentUser();
 
     if (UserGroupInformation.isSecurityEnabled()) {
-      Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>(
-          containerToken.getIdentifier().array(), containerToken
-              .getPassword().array(), new Text(containerToken.getKind()),
-          new Text(containerToken.getService()));
+      Token<ContainerTokenIdentifier> token =
+          ProtoUtils.convertFromProtoFormat(containerToken, cmAddr);
       // the user in createRemoteUser in this context has to be ContainerID
       user = UserGroupInformation.createRemoteUser(containerID.toString());
       user.addToken(token);
@@ -338,8 +339,7 @@ public class ContainerLauncherImpl extends AbstractService implements
           @Override
           public ContainerManager run() {
             return (ContainerManager) rpc.getProxy(ContainerManager.class,
-                NetUtils.createSocketAddr(containerManagerBindAddr),
-                getConfig());
+                cmAddr, getConfig());
           }
         });
     return proxy;

+ 9 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -133,15 +134,14 @@ public abstract class RMCommunicator extends AbstractService  {
 
   protected void register() {
     //Register
-    String host = clientService.getBindAddress().getAddress()
-        .getCanonicalHostName();
+    InetSocketAddress serviceAddr = clientService.getBindAddress();
     try {
       RegisterApplicationMasterRequest request =
         recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
       request.setApplicationAttemptId(applicationAttemptId);
-      request.setHost(host);
-      request.setRpcPort(clientService.getBindAddress().getPort());
-      request.setTrackingUrl(host + ":" + clientService.getHttpPort());
+      request.setHost(serviceAddr.getHostName());
+      request.setRpcPort(serviceAddr.getPort());
+      request.setTrackingUrl(serviceAddr.getHostName() + ":" + clientService.getHttpPort());
       RegisterApplicationMasterResponse response =
         scheduler.registerApplicationMaster(request);
       minContainerCapability = response.getMinimumResourceCapability();
@@ -262,9 +262,6 @@ public abstract class RMCommunicator extends AbstractService  {
     if (UserGroupInformation.isSecurityEnabled()) {
       String tokenURLEncodedStr = System.getenv().get(
           ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("AppMasterToken is " + tokenURLEncodedStr);
-      }
       Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
 
       try {
@@ -273,6 +270,10 @@ public abstract class RMCommunicator extends AbstractService  {
         throw new YarnException(e);
       }
 
+      SecurityUtil.setTokenService(token, serviceAddr);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("AppMasterToken is " + token);
+      }
       currentUser.addToken(token);
     }
 

+ 7 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/MRClientProtocol.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapreduce.v2.api;
 
+import java.net.InetSocketAddress;
+
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
@@ -45,6 +47,11 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 
 public interface MRClientProtocol {
+  /**
+   * Address to which the client is connected
+   * @return InetSocketAddress
+   */
+  public InetSocketAddress getConnectAddress();
   public GetJobReportResponse getJobReport(GetJobReportRequest request) throws YarnRemoteException;
   public GetTaskReportResponse getTaskReport(GetTaskReportRequest request) throws YarnRemoteException;
   public GetTaskAttemptReportResponse getTaskAttemptReport(GetTaskAttemptReportRequest request) throws YarnRemoteException;

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java

@@ -104,6 +104,11 @@ public class MRClientProtocolPBClientImpl implements MRClientProtocol {
         MRClientProtocolPB.class, clientVersion, addr, conf);
   }
   
+  @Override
+  public InetSocketAddress getConnectAddress() {
+    return RPC.getServerAddress(proxy);
+  }
+
   @Override
   public GetJobReportResponse getJobReport(GetJobReportRequest request)
       throws YarnRemoteException {

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java

@@ -122,6 +122,11 @@ public class TestRPCFactories {
   
   public class MRClientProtocolTestImpl implements MRClientProtocol {
 
+    @Override
+    public InetSocketAddress getConnectAddress() {
+      return null;
+    }
+    
     @Override
     public GetJobReportResponse getJobReport(GetJobReportRequest request)
         throws YarnRemoteException {

+ 2 - 17
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java

@@ -35,13 +35,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Master;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
 import org.apache.hadoop.mapreduce.v2.LogParams;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@@ -388,21 +386,8 @@ public class Cluster {
    */
   public Token<DelegationTokenIdentifier> 
       getDelegationToken(Text renewer) throws IOException, InterruptedException{
-    Token<DelegationTokenIdentifier> result =
-      client.getDelegationToken(renewer);
-
-    if (result == null) {
-      return result;
-    }
-
-    InetSocketAddress addr = Master.getMasterAddress(conf);
-    StringBuilder service = new StringBuilder();
-    service.append(NetUtils.normalizeHostName(addr.getAddress().
-                                              getHostAddress()));
-    service.append(':');
-    service.append(addr.getPort());
-    result.setService(new Text(service.toString()));
-    return result;
+    // client has already set the service
+    return client.getDelegationToken(renewer);
   }
 
   /**

+ 6 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java

@@ -19,7 +19,9 @@
 package org.apache.hadoop.mapreduce.security;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -92,8 +94,11 @@ public class TokenCache {
 
   static void obtainTokensForNamenodesInternal(Credentials credentials,
       Path[] ps, Configuration conf) throws IOException {
+    Set<FileSystem> fsSet = new HashSet<FileSystem>();
     for(Path p: ps) {
-      FileSystem fs = FileSystem.get(p.toUri(), conf);
+      fsSet.add(p.getFileSystem(conf));
+    }
+    for (FileSystem fs : fsSet) {
       obtainTokensForNamenodesInternal(fs, credentials, conf);
     }
   }

+ 20 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java

@@ -251,6 +251,26 @@ public class TestTokenCache {
     return mockFs;
   }
 
+  @Test
+  public void testSingleTokenFetch() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM");
+    String renewer = Master.getMasterPrincipal(conf);
+    Credentials credentials = new Credentials();
+    
+    FileSystem mockFs = mock(FileSystem.class);
+    when(mockFs.getCanonicalServiceName()).thenReturn("host:0");
+    when(mockFs.getUri()).thenReturn(new URI("mockfs://host:0"));
+    
+    Path mockPath = mock(Path.class);
+    when(mockPath.getFileSystem(conf)).thenReturn(mockFs);
+    
+    Path[] paths = new Path[]{ mockPath, mockPath };
+    when(mockFs.getDelegationTokens("me", credentials)).thenReturn(null);
+    TokenCache.obtainTokensForNamenodesInternal(credentials, paths, conf);
+    verify(mockFs, times(1)).getDelegationTokens(renewer, credentials);
+  }
+
   @Test
   public void testCleanUpTokenReferral() throws Exception {
     Configuration conf = new Configuration();

+ 5 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java

@@ -178,6 +178,10 @@ public class HistoryClientService extends AbstractService {
 
     private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
 
+    public InetSocketAddress getConnectAddress() {
+      return getBindAddress();
+    }
+    
     private Job verifyAndGetJob(final JobId jobID) throws YarnRemoteException {
       UserGroupInformation loginUgi = null;
       Job job = null;
@@ -335,8 +339,7 @@ public class HistoryClientService extends AbstractService {
               jhsDTSecretManager);
       DelegationToken mrDToken = BuilderUtils.newDelegationToken(
         realJHSToken.getIdentifier(), realJHSToken.getKind().toString(),
-        realJHSToken.getPassword(), bindAddress.getAddress().getHostAddress()
-            + ":" + bindAddress.getPort());
+        realJHSToken.getPassword(), realJHSToken.getService().toString());
       response.setDelegationToken(mrDToken);
       return response;
       } catch (IOException i) {

+ 9 - 11
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java

@@ -32,7 +32,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -63,6 +62,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.YarnException;
@@ -144,7 +144,7 @@ public class ClientServiceDelegate {
     if (application != null) {
       trackingUrl = application.getTrackingUrl();
     }
-    String serviceAddr = null;
+    InetSocketAddress serviceAddr = null;
     while (application == null
         || YarnApplicationState.RUNNING == application
             .getYarnApplicationState()) {
@@ -172,25 +172,23 @@ public class ClientServiceDelegate {
         if(!conf.getBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED, false)) {
           UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
               UserGroupInformation.getCurrentUser().getUserName());
-          serviceAddr = application.getHost() + ":" + application.getRpcPort();
+          serviceAddr = NetUtils.createSocketAddrForHost(
+              application.getHost(), application.getRpcPort());
           if (UserGroupInformation.isSecurityEnabled()) {
             String clientTokenEncoded = application.getClientToken();
             Token<ApplicationTokenIdentifier> clientToken =
               new Token<ApplicationTokenIdentifier>();
             clientToken.decodeFromUrlString(clientTokenEncoded);
             // RPC layer client expects ip:port as service for tokens
-            InetSocketAddress addr = NetUtils.createSocketAddr(application
-                .getHost(), application.getRpcPort());
-            clientToken.setService(new Text(addr.getAddress().getHostAddress()
-                + ":" + addr.getPort()));
+            SecurityUtil.setTokenService(clientToken, serviceAddr);
             newUgi.addToken(clientToken);
           }
           LOG.debug("Connecting to " + serviceAddr);
-          final String tempStr = serviceAddr;
+          final InetSocketAddress finalServiceAddr = serviceAddr;
           realProxy = newUgi.doAs(new PrivilegedExceptionAction<MRClientProtocol>() {
             @Override
             public MRClientProtocol run() throws IOException {
-              return instantiateAMProxy(tempStr);
+              return instantiateAMProxy(finalServiceAddr);
             }
           });
         } else {
@@ -270,13 +268,13 @@ public class ClientServiceDelegate {
     return historyServerProxy;
   }
 
-  MRClientProtocol instantiateAMProxy(final String serviceAddr)
+  MRClientProtocol instantiateAMProxy(final InetSocketAddress serviceAddr)
       throws IOException {
     LOG.trace("Connecting to ApplicationMaster at: " + serviceAddr);
     YarnRPC rpc = YarnRPC.create(conf);
     MRClientProtocol proxy = 
          (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
-            NetUtils.createSocketAddr(serviceAddr), conf);
+            serviceAddr, conf);
     LOG.trace("Connected to ApplicationMaster at: " + serviceAddr);
     return proxy;
   }

+ 7 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapred;
 
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
 
@@ -209,4 +210,10 @@ public class NotRunningJob implements MRClientProtocol {
     /* Should not be invoked by anyone. */
     throw new NotImplementedException();
   }
+  
+  @Override
+  public InetSocketAddress getConnectAddress() {
+    /* Should not be invoked by anyone.  Normally used to set token service */
+    throw new NotImplementedException();
+  }
 }

+ 11 - 13
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java

@@ -37,8 +37,6 @@ import org.apache.hadoop.mapreduce.QueueInfo;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -67,14 +65,14 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.util.ProtoUtils;
 
 
 // TODO: This should be part of something like yarn-client.
 public class ResourceMgrDelegate {
   private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
       
-  private final String rmAddress;
+  private final InetSocketAddress rmAddress;
   private YarnConfiguration conf;
   ClientRMProtocol applicationsManager;
   private ApplicationId applicationId;
@@ -87,11 +85,7 @@ public class ResourceMgrDelegate {
   public ResourceMgrDelegate(YarnConfiguration conf) {
     this.conf = conf;
     YarnRPC rpc = YarnRPC.create(this.conf);
-    InetSocketAddress rmAddress = conf.getSocketAddr(
-            YarnConfiguration.RM_ADDRESS,
-            YarnConfiguration.DEFAULT_RM_ADDRESS,
-            YarnConfiguration.DEFAULT_RM_PORT);
-    this.rmAddress = rmAddress.toString();
+    this.rmAddress = getRmAddress(conf);
     LOG.debug("Connecting to ResourceManager at " + rmAddress);
     applicationsManager =
         (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
@@ -109,7 +103,13 @@ public class ResourceMgrDelegate {
       ClientRMProtocol applicationsManager) {
     this.conf = conf;
     this.applicationsManager = applicationsManager;
-    this.rmAddress = applicationsManager.toString();
+    this.rmAddress = getRmAddress(conf);
+  }
+  
+  private static InetSocketAddress getRmAddress(YarnConfiguration conf) {
+    return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+                              YarnConfiguration.DEFAULT_RM_ADDRESS,
+                              YarnConfiguration.DEFAULT_RM_PORT);
   }
   
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
@@ -168,9 +168,7 @@ public class ResourceMgrDelegate {
     org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse 
       response = applicationsManager.getDelegationToken(rmDTRequest);
     DelegationToken yarnToken = response.getRMDelegationToken();
-    return new Token<RMDelegationTokenIdentifier>(yarnToken.getIdentifier().array(),
-        yarnToken.getPassword().array(), 
-        new Text(yarnToken.getKind()), new Text(yarnToken.getService()));
+    return ProtoUtils.convertFromProtoFormat(yarnToken, rmAddress);
   }
 
 

+ 4 - 6
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java

@@ -56,7 +56,6 @@ import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.LogParams;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
-import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
@@ -84,6 +83,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.ProtoUtils;
 
 
 /**
@@ -184,7 +184,7 @@ public class YARNRunner implements ClientProtocol {
     return resMgrDelegate.getClusterMetrics();
   }
 
-  private Token<MRDelegationTokenIdentifier> getDelegationTokenFromHS(
+  private Token<?> getDelegationTokenFromHS(
       MRClientProtocol hsProxy, Text renewer) throws IOException,
       InterruptedException {
     GetDelegationTokenRequest request = recordFactory
@@ -192,10 +192,8 @@ public class YARNRunner implements ClientProtocol {
     request.setRenewer(renewer.toString());
     DelegationToken mrDelegationToken = hsProxy.getDelegationToken(request)
       .getDelegationToken();
-    return new Token<MRDelegationTokenIdentifier>(mrDelegationToken
-      .getIdentifier().array(), mrDelegationToken.getPassword().array(),
-      new Text(mrDelegationToken.getKind()), new Text(
-        mrDelegationToken.getService()));
+    return ProtoUtils.convertFromProtoFormat(mrDelegationToken,
+                                             hsProxy.getConnectAddress());
   }
   
   @Override

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java

@@ -368,6 +368,11 @@ public class TestClientRedirect {
       this(AMHOSTADDRESS);
     }
 
+    @Override
+    public InetSocketAddress getConnectAddress() {
+      return bindAddress;
+    }
+    
     public AMService(String hostAddress) {
       super("AMService");
       this.protocol = MRClientProtocol.class;

+ 7 - 6
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java

@@ -27,6 +27,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Collection;
 
@@ -242,7 +243,7 @@ public class TestClientServiceDelegate {
     // should use the same proxy to AM2 and so instantiateProxy shouldn't be
     // called.
     doReturn(firstGenAMProxy).doReturn(secondGenAMProxy).when(
-        clientServiceDelegate).instantiateAMProxy(any(String.class));
+        clientServiceDelegate).instantiateAMProxy(any(InetSocketAddress.class));
 
     JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
     Assert.assertNotNull(jobStatus);
@@ -257,7 +258,7 @@ public class TestClientServiceDelegate {
     Assert.assertEquals("jobName-secondGen", jobStatus.getJobName());
 
     verify(clientServiceDelegate, times(2)).instantiateAMProxy(
-        any(String.class));
+        any(InetSocketAddress.class));
   }
   
   @Test
@@ -286,19 +287,19 @@ public class TestClientServiceDelegate {
     Assert.assertEquals("N/A", jobStatus.getJobName());
     
     verify(clientServiceDelegate, times(0)).instantiateAMProxy(
-        any(String.class));
+        any(InetSocketAddress.class));
 
     // Should not reach AM even for second and third times too.
     jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
     Assert.assertNotNull(jobStatus);
     Assert.assertEquals("N/A", jobStatus.getJobName());    
     verify(clientServiceDelegate, times(0)).instantiateAMProxy(
-        any(String.class));
+        any(InetSocketAddress.class));
     jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
     Assert.assertNotNull(jobStatus);
     Assert.assertEquals("N/A", jobStatus.getJobName());    
     verify(clientServiceDelegate, times(0)).instantiateAMProxy(
-        any(String.class));
+        any(InetSocketAddress.class));
 
     // The third time around, app is completed, so should go to JHS
     JobStatus jobStatus1 = clientServiceDelegate.getJobStatus(oldJobId);
@@ -309,7 +310,7 @@ public class TestClientServiceDelegate {
     Assert.assertEquals(1.0f, jobStatus1.getReduceProgress());
     
     verify(clientServiceDelegate, times(0)).instantiateAMProxy(
-        any(String.class));
+        any(InetSocketAddress.class));
   }
   
   @Test

+ 3 - 6
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java

@@ -26,11 +26,9 @@ import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
-import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
 import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
@@ -38,11 +36,11 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.DelegationToken;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ProtoUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
@@ -95,9 +93,8 @@ public class TestJHSSecurity {
     // Now try talking to JHS using the delegation token
     UserGroupInformation ugi =
         UserGroupInformation.createRemoteUser("TheDarkLord");
-    ugi.addToken(new Token<MRDelegationTokenIdentifier>(token.getIdentifier()
-      .array(), token.getPassword().array(), new Text(token.getKind()),
-      new Text(token.getService())));
+    ugi.addToken(ProtoUtils.convertFromProtoFormat(
+        token, jobHistoryServer.getClientService().getBindAddress()));
     final YarnRPC rpc = YarnRPC.create(conf);
     MRClientProtocol userUsingDT =
         ugi.doAs(new PrivilegedAction<MRClientProtocol>() {

+ 3 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.SaslInputStream;
 import org.apache.hadoop.security.SaslRpcClient;
 import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import org.apache.log4j.Level;
@@ -98,10 +99,8 @@ public class TestUmbilicalProtocolWithJobToken {
     JobTokenIdentifier tokenId = new JobTokenIdentifier(new Text(jobId));
     Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(tokenId, sm);
     sm.addTokenForJob(jobId, token);
-    Text host = new Text(addr.getAddress().getHostAddress() + ":"
-        + addr.getPort());
-    token.setService(host);
-    LOG.info("Service IP address for token is " + host);
+    SecurityUtil.setTokenService(token, addr);
+    LOG.info("Service address for token is " + token.getService());
     current.addToken(token);
     current.doAs(new PrivilegedExceptionAction<Object>() {
       @Override

+ 1 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerToken.java

@@ -43,7 +43,7 @@ import org.apache.hadoop.yarn.api.ContainerManager;
  */
 @Public
 @Stable
-public interface ContainerToken {
+public interface ContainerToken extends DelegationToken {
   /**
    * Get the token identifier.
    * @return token identifier

+ 25 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/ProtoUtils.java

@@ -18,11 +18,17 @@
 
 package org.apache.hadoop.yarn.util;
 
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.DelegationToken;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -192,4 +198,23 @@ public class ProtoUtils {
     return ApplicationAccessType.valueOf(e.name().replace(
         APP_ACCESS_TYPE_PREFIX, ""));
   }
+
+  /**
+   * Convert a protobuf token into a rpc token and set its service
+   * 
+   * @param protoToken the yarn token
+   * @param serviceAddr the connect address for the service
+   * @return rpc token
+   */
+  public static <T extends TokenIdentifier> Token<T>
+  convertFromProtoFormat(DelegationToken protoToken, InetSocketAddress serviceAddr) {
+    Token<T> token = new Token<T>(protoToken.getIdentifier().array(),
+                                  protoToken.getPassword().array(),
+                                  new Text(protoToken.getKind()),
+                                  new Text(protoToken.getService()));
+    if (serviceAddr != null) {
+      SecurityUtil.setTokenService(token, serviceAddr);
+    }
+    return token;
+  }
 }

+ 4 - 3
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -275,10 +276,10 @@ public class BuilderUtils {
     containerToken.setKind(ContainerTokenIdentifier.KIND.toString());
     containerToken.setPassword(password);
     // RPC layer client expects ip:port as service for tokens
-    InetSocketAddress addr = NetUtils.createSocketAddr(nodeId.getHost(),
+    InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(),
         nodeId.getPort());
-    containerToken.setService(addr.getAddress().getHostAddress() + ":"
-        + addr.getPort());
+    // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token 
+    containerToken.setService(SecurityUtil.buildTokenService(addr).toString());
     return containerToken;
   }
 

+ 1 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java

@@ -464,8 +464,7 @@ public class ClientRMService extends AbstractService implements
               realRMDTtoken.getIdentifier(),
               realRMDTtoken.getKind().toString(),
               realRMDTtoken.getPassword(),
-              clientBindAddress.getAddress().getHostAddress() + ":"
-              + clientBindAddress.getPort()
+              realRMDTtoken.getService().toString()
               ));
       return response;
     } catch(IOException io) {

+ 15 - 18
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java

@@ -32,9 +32,9 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
@@ -46,7 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
+import org.apache.hadoop.yarn.util.ProtoUtils;
 
 /**
  * The launch of the AM itself.
@@ -131,27 +132,25 @@ public class AMLauncher implements Runnable {
 
     Container container = application.getMasterContainer();
 
-    final String containerManagerBindAddress = container.getNodeId().toString();
+    final NodeId node = container.getNodeId();
+    final InetSocketAddress containerManagerBindAddress =
+        NetUtils.createSocketAddrForHost(node.getHost(), node.getPort());
 
     final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again.
 
     UserGroupInformation currentUser = UserGroupInformation
         .createRemoteUser(containerId.toString());
     if (UserGroupInformation.isSecurityEnabled()) {
-      ContainerToken containerToken = container.getContainerToken();
       Token<ContainerTokenIdentifier> token =
-          new Token<ContainerTokenIdentifier>(
-              containerToken.getIdentifier().array(),
-              containerToken.getPassword().array(), new Text(
-                  containerToken.getKind()), new Text(
-                  containerToken.getService()));
+          ProtoUtils.convertFromProtoFormat(container.getContainerToken(),
+                                            containerManagerBindAddress);
       currentUser.addToken(token);
     }
     return currentUser.doAs(new PrivilegedAction<ContainerManager>() {
       @Override
       public ContainerManager run() {
         return (ContainerManager) rpc.getProxy(ContainerManager.class,
-            NetUtils.createSocketAddr(containerManagerBindAddress), conf);
+            containerManagerBindAddress, conf);
       }
     });
   }
@@ -218,22 +217,21 @@ public class AMLauncher implements Runnable {
       Token<ApplicationTokenIdentifier> token =
           new Token<ApplicationTokenIdentifier>(id,
               this.rmContext.getApplicationTokenSecretManager());
-      InetSocketAddress unresolvedAddr = conf.getSocketAddr(
+      InetSocketAddress serviceAddr = conf.getSocketAddr(
           YarnConfiguration.RM_SCHEDULER_ADDRESS,
           YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
           YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
-      String resolvedAddr =
-          unresolvedAddr.getAddress().getHostAddress() + ":"
-              + unresolvedAddr.getPort();
-      token.setService(new Text(resolvedAddr));
+      // normally the client should set the service after acquiring the token,
+      // but this token is directly provided to the tasks
+      SecurityUtil.setTokenService(token, serviceAddr);
       String appMasterTokenEncoded = token.encodeToUrlString();
-      LOG.debug("Putting appMaster token in env : " + appMasterTokenEncoded);
+      LOG.debug("Putting appMaster token in env : " + token);
       environment.put(
           ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME,
           appMasterTokenEncoded);
 
       // Add the RM token
-      credentials.addToken(new Text(resolvedAddr), token);
+      credentials.addToken(token.getService(), token);
       DataOutputBuffer dob = new DataOutputBuffer();
       credentials.writeTokenStorageToStream(dob);
       container.setContainerTokens(
@@ -245,7 +243,6 @@ public class AMLauncher implements Runnable {
           this.clientToAMSecretManager.getMasterKey(identifier);
       String encoded =
           Base64.encodeBase64URLSafeString(clientSecretKey.getEncoded());
-      LOG.debug("The encoded client secret-key to be put in env : " + encoded);
       environment.put(
           ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME, 
           encoded);

+ 10 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppBlock.java

@@ -55,7 +55,15 @@ public class AppBlock extends HtmlBlock {
       puts("Bad request: requires application ID");
       return;
     }
-    ApplicationId appID = Apps.toAppID(aid);
+
+    ApplicationId appID = null;
+    try {
+      appID = Apps.toAppID(aid);
+    } catch (Exception e) {
+      puts("Invalid Application ID: " + aid);
+      return;
+    }
+
     RMContext context = getInstance(RMContext.class);
     RMApp rmApp = context.getRMApps().get(appID);
     if (rmApp == null) {
@@ -74,7 +82,7 @@ public class AppBlock extends HtmlBlock {
         && !this.aclsManager.checkAccess(callerUGI,
             ApplicationAccessType.VIEW_APP, app.getUser(), appID)) {
       puts("You (User " + remoteUser
-          + ") are not authorized to view the logs for application " + appID);
+          + ") are not authorized to view application " + appID);
       return;
     }
 

+ 0 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java

@@ -401,7 +401,6 @@ public class TestContainerManagerSecurity {
           appTokenSecretManager);
     SecurityUtil.setTokenService(appToken, schedulerAddr);
     currentUser.addToken(appToken);
-    SecurityUtil.setTokenService(appToken, schedulerAddr);
     
     AMRMProtocol scheduler = currentUser
         .doAs(new PrivilegedAction<AMRMProtocol>() {

+ 6 - 6
hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRaid.java

@@ -144,7 +144,7 @@ public class BlockPlacementPolicyRaid extends BlockPlacementPolicy {
 
   /** {@inheritDoc} */
   @Override
-  public DatanodeDescriptor chooseReplicaToDelete(FSInodeInfo inode,
+  public DatanodeDescriptor chooseReplicaToDelete(BlockCollection inode,
       Block block, short replicationFactor,
       Collection<DatanodeDescriptor> first,
       Collection<DatanodeDescriptor> second) {
@@ -425,7 +425,7 @@ public class BlockPlacementPolicyRaid extends BlockPlacementPolicy {
   }
 
   /**
-   * Cache results for FSInodeInfo.getFullPathName()
+   * Cache results for getFullPathName()
    */
   static class CachedFullPathNames {
     FSNamesystem namesystem;
@@ -446,8 +446,8 @@ public class BlockPlacementPolicyRaid extends BlockPlacementPolicy {
       };
 
     static private class INodeWithHashCode {
-      FSInodeInfo inode;
-      INodeWithHashCode(FSInodeInfo inode) {
+      BlockCollection inode;
+      INodeWithHashCode(BlockCollection inode) {
         this.inode = inode;
       }
       @Override
@@ -459,11 +459,11 @@ public class BlockPlacementPolicyRaid extends BlockPlacementPolicy {
         return System.identityHashCode(inode);
       }
       String getFullPathName() {
-        return inode.getFullPathName();
+        return inode.getName();
       }
     }
 
-    public String get(FSInodeInfo inode) throws IOException {
+    public String get(BlockCollection inode) throws IOException {
       return cacheInternal.get(new INodeWithHashCode(inode));
     }
   }

+ 5 - 6
hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java

@@ -41,7 +41,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.CachedFullPathNames;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.CachedLocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.FileType;
-import org.apache.hadoop.hdfs.server.namenode.FSInodeInfo;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeRaidTestUtil;
@@ -241,7 +240,7 @@ public class TestBlockPlacementPolicyRaid {
       // test full path cache
       CachedFullPathNames cachedFullPathNames =
           new CachedFullPathNames(namesystem);
-      final FSInodeInfo[] inodes = NameNodeRaidTestUtil.getFSInodeInfo(
+      final BlockCollection[] inodes = NameNodeRaidTestUtil.getBlockCollections(
           namesystem, file1, file2);
 
       verifyCachedFullPathNameResult(cachedFullPathNames, inodes[0]);
@@ -477,14 +476,14 @@ public class TestBlockPlacementPolicyRaid {
   }
 
   private void verifyCachedFullPathNameResult(
-      CachedFullPathNames cachedFullPathNames, FSInodeInfo inode)
+      CachedFullPathNames cachedFullPathNames, BlockCollection inode)
   throws IOException {
-    String res1 = inode.getFullPathName();
+    String res1 = inode.getName();
     String res2 = cachedFullPathNames.get(inode);
     LOG.info("Actual path name: " + res1);
     LOG.info("Cached path name: " + res2);
     Assert.assertEquals(cachedFullPathNames.get(inode),
-                        inode.getFullPathName());
+                        inode.getName());
   }
 
   private void verifyCachedBlocksResult(CachedLocatedBlocks cachedBlocks,
@@ -503,7 +502,7 @@ public class TestBlockPlacementPolicyRaid {
   private Collection<LocatedBlock> getCompanionBlocks(
       FSNamesystem namesystem, BlockPlacementPolicyRaid policy,
       ExtendedBlock block) throws IOException {
-    INodeFile inode = blockManager.blocksMap.getINode(block
+    INodeFile inode = (INodeFile)blockManager.blocksMap.getINode(block
         .getLocalBlock());
     FileType type = policy.getFileType(inode.getFullPathName());
     return policy.getCompanionBlocks(inode.getFullPathName(), type,

+ 4 - 3
hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidTestUtil.java

@@ -18,16 +18,17 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 
 public class NameNodeRaidTestUtil {
-  public static FSInodeInfo[] getFSInodeInfo(final FSNamesystem namesystem,
+  public static BlockCollection[] getBlockCollections(final FSNamesystem namesystem,
       final String... files) throws UnresolvedLinkException {
-    final FSInodeInfo[] inodes = new FSInodeInfo[files.length];
+    final BlockCollection[] inodes = new BlockCollection[files.length];
     final FSDirectory dir = namesystem.dir; 
     dir.readLock();
     try {
       for(int i = 0; i < files.length; i++) {
-        inodes[i] = dir.rootDir.getNode(files[i], true);
+        inodes[i] = (BlockCollection)dir.rootDir.getNode(files[i], true);
       }
       return inodes;
     } finally {

+ 1 - 1
hadoop-tools/hadoop-archives/src/main/java/org/apache/hadoop/tools/HadoopArchives.java

@@ -117,7 +117,7 @@ public class HadoopArchives implements Tool {
     // will when running the mapreduce job.
     String testJar = System.getProperty(TEST_HADOOP_ARCHIVES_JAR_PATH, null);
     if (testJar != null) {
-      ((JobConf)conf).setJar(testJar);
+      this.conf.setJar(testJar);
     }
   }
 

+ 6 - 3
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java

@@ -136,10 +136,13 @@ public class DistCp extends Configured implements Tool {
 
     Job job = null;
     try {
-      metaFolder = createMetaFolderPath();
-      jobFS = metaFolder.getFileSystem(getConf());
+      synchronized(this) {
+        //Don't cleanup while we are setting up.
+        metaFolder = createMetaFolderPath();
+        jobFS = metaFolder.getFileSystem(getConf());
 
-      job = createJob();
+        job = createJob();
+      }
       createInputFileListing(job);
 
       job.submit();

+ 2 - 2
hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/Logalyzer.java

@@ -65,9 +65,9 @@ import org.apache.hadoop.mapreduce.lib.map.RegexMapper;
 public class Logalyzer {
   // Constants
   private static Configuration fsConfig = new Configuration();
-  public static String SORT_COLUMNS = 
+  public static final String SORT_COLUMNS = 
     "logalizer.logcomparator.sort.columns";
-  public static String COLUMN_SEPARATOR = 
+  public static final String COLUMN_SEPARATOR = 
     "logalizer.logcomparator.column.separator";
   
   static {

+ 31 - 0
hadoop-tools/hadoop-rumen/dev-support/findbugs-exclude.xml

@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<FindBugsFilter>
+  <And>
+    <Class name="org.apache.hadoop.tools.rumen.LoggedJob"/>
+    <Method name="getMapperTriesToSucceed"/>
+    <Bug pattern="EI_EXPOSE_REP"/>
+    <Bug code="EI"/>
+  </And>
+  <And>
+    <Class name="org.apache.hadoop.tools.rumen.ZombieJob"/>
+    <Method name="getInputSplits"/>
+    <Bug pattern="EI_EXPOSE_REP"/>
+    <Bug code="EI"/>
+  </And>
+</FindBugsFilter>

+ 10 - 0
hadoop-tools/hadoop-rumen/pom.xml

@@ -90,6 +90,16 @@
 
   <build>
     <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>findbugs-maven-plugin</artifactId>
+         <configuration>
+          <findbugsXmlOutput>true</findbugsXmlOutput>
+          <xmlOutput>true</xmlOutput>
+          <excludeFilterFile>${basedir}/dev-support/findbugs-exclude.xml</excludeFilterFile>
+          <effort>Max</effort>
+        </configuration>
+      </plugin>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-antrun-plugin</artifactId>

+ 3 - 1
hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.tools.rumen;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.PriorityQueue;
@@ -59,7 +60,8 @@ public class DeskewedJobTraceReader implements Closeable {
   static final private Log LOG =
       LogFactory.getLog(DeskewedJobTraceReader.class);
 
-  static private class JobComparator implements Comparator<LoggedJob> {
+  static private class JobComparator implements Comparator<LoggedJob>, 
+  Serializable {
     @Override
     public int compare(LoggedJob j1, LoggedJob j2) {
       return (j1.getSubmitTime() < j2.getSubmitTime()) ? -1 : (j1

+ 3 - 1
hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/JobConfPropertyNames.java

@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.tools.rumen;
 
+import java.util.Arrays;
+
 import org.apache.hadoop.mapreduce.MRJobConfig;
 
 public enum JobConfPropertyNames {
@@ -33,6 +35,6 @@ public enum JobConfPropertyNames {
   }
 
   public String[] getCandidates() {
-    return candidates;
+    return Arrays.copyOf(candidates, candidates.length);
   }
 }

+ 3 - 1
hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.tools.rumen;
 
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -68,7 +69,8 @@ public class LoggedNetworkTopology implements DeepCompare {
    * order.
    * 
    */
-  static class TopoSort implements Comparator<LoggedNetworkTopology> {
+  static class TopoSort implements Comparator<LoggedNetworkTopology>, 
+  Serializable {
     public int compare(LoggedNetworkTopology t1, LoggedNetworkTopology t2) {
       return t1.name.getValue().compareTo(t2.name.getValue());
     }

+ 2 - 1
hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TraceBuilder.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.tools.rumen;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -98,7 +99,7 @@ public class TraceBuilder extends Configured implements Tool {
      * history file names should result in the order of jobs' submission times.
      */
     private static class HistoryLogsComparator
-        implements Comparator<FileStatus> {
+        implements Comparator<FileStatus>, Serializable {
       @Override
       public int compare(FileStatus file1, FileStatus file2) {
         return file1.getPath().getName().compareTo(

+ 1 - 1
hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/anonymization/WordListAnonymizerUtility.java

@@ -27,7 +27,7 @@ import org.apache.commons.lang.StringUtils;
  * //TODO There is no caching for saving memory.
  */
 public class WordListAnonymizerUtility {
-  public static final String[] KNOWN_WORDS = 
+  static final String[] KNOWN_WORDS = 
     new String[] {"job", "tmp", "temp", "home", "homes", "usr", "user", "test"};
   
   /**

+ 2 - 10
hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/datatypes/NodeName.java

@@ -93,16 +93,8 @@ public class NodeName implements AnonymizableDataType<String> {
   }
   
   public NodeName(String rName, String hName) {
-    rName = (rName == null) 
-            ? rName 
-            : rName.length() == 0 
-              ? null 
-              : rName;
-    hName = (hName == null) 
-            ? hName 
-            : hName.length() == 0 
-              ? null 
-              : hName;
+    rName = (rName == null || rName.length() == 0) ? null : rName;
+    hName = (hName == null || hName.length() == 0) ? null : hName;
     if (hName == null) {
       nodeName = rName;
       rackName = rName;

+ 30 - 0
hadoop-tools/hadoop-streaming/dev-support/findbugs-exclude.xml

@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<FindBugsFilter>
+  <Match>
+    <Or>
+      <Class name="org.apache.hadoop.streaming.PipeMapper" />
+      <Class name="org.apache.hadoop.streaming.PipeReducer"/>
+    </Or>
+    <Or>
+      <Method name="getFieldSeparator"/>
+      <Method name="getInputSeparator"/>
+    </Or>
+    <Bug pattern="EI_EXPOSE_REP"/>
+  </Match>
+</FindBugsFilter>

+ 10 - 0
hadoop-tools/hadoop-streaming/pom.xml

@@ -96,6 +96,16 @@
 
   <build>
     <plugins>
+       <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>findbugs-maven-plugin</artifactId>
+         <configuration>
+          <findbugsXmlOutput>true</findbugsXmlOutput>
+          <xmlOutput>true</xmlOutput>
+          <excludeFilterFile>${basedir}/dev-support/findbugs-exclude.xml</excludeFilterFile>
+          <effort>Max</effort>
+        </configuration>
+      </plugin>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-antrun-plugin</artifactId>

+ 8 - 6
hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamJob.java

@@ -91,7 +91,7 @@ public class StreamJob implements Tool {
   @Deprecated
   public StreamJob(String[] argv, boolean mayExit) {
     this();
-    argv_ = argv;
+    argv_ = Arrays.copyOf(argv, argv.length);
     this.config_ = new Configuration();
   }
 
@@ -113,7 +113,7 @@ public class StreamJob implements Tool {
   @Override
   public int run(String[] args) throws Exception {
     try {
-      this.argv_ = args;
+      this.argv_ = Arrays.copyOf(args, args.length);
       init();
 
       preProcessArgs();
@@ -290,7 +290,7 @@ public class StreamJob implements Tool {
         LOG.warn("-file option is deprecated, please use generic option" +
         		" -files instead.");
 
-        String fileList = null;
+        StringBuffer fileList = new StringBuffer();
         for (String file : values) {
           packageFiles_.add(file);
           try {
@@ -298,13 +298,15 @@ public class StreamJob implements Tool {
             Path path = new Path(pathURI);
             FileSystem localFs = FileSystem.getLocal(config_);
             String finalPath = path.makeQualified(localFs).toString();
-            fileList = fileList == null ? finalPath : fileList + "," + finalPath;
+            if(fileList.length() > 0) {
+              fileList.append(',');
+            }
+            fileList.append(finalPath);
           } catch (Exception e) {
             throw new IllegalArgumentException(e);
           }
         }
-        config_.set("tmpfiles", config_.get("tmpfiles", "") +
-                                  (fileList == null ? "" : fileList));
+        config_.set("tmpfiles", config_.get("tmpfiles", "") + fileList);
         validate(packageFiles_);
       }