Jelajahi Sumber

MAPREDUCE-2780. Use a utility method to set service in the token. Contributed by Daryn Sharp.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security@1156295 13f79535-47bb-0310-9956-ffa450edef68
Jitendra Nath Pandey 14 tahun lalu
induk
melakukan
84bd8c883c

+ 3 - 0
CHANGES.txt

@@ -56,6 +56,9 @@ Release 0.20.205.0 - unreleased
     HADOOP-6889. Make RPC to have an option to timeout - backport to 
     0.20-security. (John George and Ravi Prakash via mattf)
 
+    MAPREDUCE-2780. Use a utility method to set service in token. 
+    (Daryn Sharp via jitendra)
+
 Release 0.20.204.0 - unreleased
 
   NEW FEATURES

+ 2 - 3
src/core/org/apache/hadoop/ipc/Client.java

@@ -240,9 +240,8 @@ public class Client {
             throw new IOException(e.toString());
           }
           InetSocketAddress addr = remoteId.getAddress();
-          token = tokenSelector.selectToken(new Text(addr.getAddress()
-              .getHostAddress() + ":" + addr.getPort()), 
-              ticket.getTokens());
+          token = tokenSelector.selectToken(
+              SecurityUtil.buildTokenService(addr), ticket.getTokens());
         }
         KerberosInfo krbInfo = protocol.getAnnotation(KerberosInfo.class);
         if (krbInfo != null) {

+ 46 - 16
src/core/org/apache/hadoop/security/SecurityUtil.java

@@ -18,6 +18,7 @@ package org.apache.hadoop.security;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URL;
 import java.net.UnknownHostException;
@@ -30,8 +31,10 @@ import javax.security.auth.kerberos.KerberosTicket;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.token.Token;
 
 import sun.security.jgss.krb5.Krb5Util;
 import sun.security.krb5.Credentials;
@@ -231,28 +234,55 @@ public class SecurityUtil {
     UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename);
   }
   
+  /**
+   * Set the given token's service to the format expected by the RPC client 
+   * @param token a delegation token
+   * @param addr the socket for the rpc connection
+   */
+  public static void setTokenService(Token<?> token, InetSocketAddress addr) {
+    token.setService(buildTokenService(addr));
+  }
+  
+  /**
+   * Construct the service key for a token
+   * @param addr the socket for the rpc connection
+   * @return Text formatted for the service field in a token 
+   */
+  public static Text buildTokenService(InetSocketAddress addr) {
+    return new Text(buildDTAuthority(addr));
+  }
+  
   /**
    * create service name for Delegation token ip:port
    * @param uri
    * @return "ip:port"
    */
   public static String buildDTServiceName(URI uri, int defPort) {
-    int port = uri.getPort();
-    if(port == -1) 
-      port = defPort;
-    
-    // build the service name string "/ip:port"
-    // for whatever reason using NetUtils.createSocketAddr(target).toString()
-    // returns "localhost/ip:port"
-    StringBuffer sb = new StringBuffer();
-    String host = uri.getHost();
-    if (host != null) {
-      host = NetUtils.normalizeHostName(host);
-    } else {
-      host = "";
-    }
-    sb.append(host).append(":").append(port);
-    return sb.toString();
+    InetSocketAddress addr = NetUtils.createSocketAddr(uri.getAuthority(),
+                                                       defPort);
+    return buildDTAuthority(addr);
+   }
+  
+  /**
+   * create an authority name for looking up a Delegation token based
+   * on a socket
+   * @param addr InetSocketAddress of remote connection with a token
+   * @return "ip:port"
+   */
+  static String buildDTAuthority(InetSocketAddress addr) {
+    return buildDTAuthority(addr.getAddress().getHostAddress(), addr.getPort());
+  }
+  
+  /**
+   * create an authority name for looking up a Delegation token based
+   * on a host/ip pair
+   * @param host the remote host
+   * @param port the remote port
+   * @return "ip:port"
+   */
+  static String buildDTAuthority(String host, int port) {
+    host = (host != null) ? NetUtils.normalizeHostName(host) : "";
+    return host + ":" + port;
   }
   
   /**

+ 2 - 3
src/hdfs/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 
@@ -75,9 +76,7 @@ public class GetDelegationTokenServlet extends DfsServlet {
           
           Token<DelegationTokenIdentifier> token = 
             nn.getDelegationToken(new Text(renewerFinal));
-          String s = NameNode.getAddress(conf).getAddress().getHostAddress()
-                     + ":" + NameNode.getAddress(conf).getPort();
-          token.setService(new Text(s));
+          SecurityUtil.setTokenService(token, NameNode.getAddress(conf));
           Credentials ts = new Credentials();
           ts.addToken(new Text(ugi.getShortUserName()), token);
           ts.write(dosFinal);

+ 6 - 12
src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java

@@ -37,27 +37,25 @@ import javax.servlet.jsp.JspWriter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.http.HtmlQuoting;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.net.NetUtils;
 
 public class JspHelper {
   public static final String CURRENT_CONF = "current.conf";
@@ -444,12 +442,8 @@ public class JspHelper {
         Token<DelegationTokenIdentifier> token = 
           new Token<DelegationTokenIdentifier>();
         token.decodeFromUrlString(tokenString);
-        InetSocketAddress serviceAddr = NameNode.getAddress(conf);
-        LOG.info("Setting service in token: "
-            + new Text(serviceAddr.getAddress().getHostAddress() + ":"
-                + serviceAddr.getPort()));
-        token.setService(new Text(serviceAddr.getAddress().getHostAddress()
-            + ":" + serviceAddr.getPort()));
+        SecurityUtil.setTokenService(token, NameNode.getAddress(conf));
+        LOG.info("Setting service in token: " + token.getService());
         ByteArrayInputStream buf = 
           new ByteArrayInputStream(token.getIdentifier());
         DataInputStream in = new DataInputStream(buf);

+ 5 - 11
src/mapred/org/apache/hadoop/mapred/JobClient.java

@@ -34,14 +34,13 @@ import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URLConnection;
 import java.net.UnknownHostException;
+import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
-import java.security.PrivilegedExceptionAction;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -54,7 +53,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
@@ -68,14 +66,16 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.split.JobSplitWriter;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
@@ -1945,13 +1945,7 @@ public class JobClient extends Configured implements MRConstants, Tool  {
     getDelegationToken(Text renewer) throws IOException, InterruptedException {
     Token<DelegationTokenIdentifier> result =
       jobSubmitClient.getDelegationToken(renewer);
-    InetSocketAddress addr = JobTracker.getAddress(getConf());
-    StringBuilder service = new StringBuilder();
-    service.append(NetUtils.normalizeHostName(addr.getAddress().
-                                              getHostAddress()));
-    service.append(':');
-    service.append(addr.getPort());
-    result.setService(new Text(service.toString()));
+    SecurityUtil.setTokenService(result, JobTracker.getAddress(getConf()));
     return result;
   }
 

+ 4 - 6
src/mapred/org/apache/hadoop/mapred/JobLocalizer.java

@@ -23,17 +23,15 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.security.PrivilegedExceptionAction;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -42,13 +40,14 @@ import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapred.QueueManager.QueueACL;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -515,8 +514,7 @@ public class JobLocalizer {
     
     UserGroupInformation ugiJob = UserGroupInformation.createRemoteUser(jobid);
     Token<JobTokenIdentifier> jt = TokenCache.getJobToken(creds);
-    jt.setService(new Text(ttAddr.getAddress().getHostAddress() + ":"
-        + ttAddr.getPort()));
+    SecurityUtil.setTokenService(jt, ttAddr);
     ugiJob.addToken(jt);
 
     final TaskUmbilicalProtocol taskTracker = 

+ 11 - 14
src/test/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java

@@ -27,27 +27,26 @@ import static org.mockito.Mockito.when;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
 
-import org.apache.commons.logging.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
-
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SaslInputStream;
 import org.apache.hadoop.security.SaslRpcClient;
 import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-
+import org.apache.hadoop.security.token.Token;
 import org.apache.log4j.Level;
 import org.junit.Test;
 
@@ -97,10 +96,8 @@ public class TestClientProtocolWithDelegationToken {
     DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner, owner, null);
     Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>(
         dtId, sm);
-    Text host = new Text(addr.getAddress().getHostAddress() + ":"
-        + addr.getPort());
-    token.setService(host);
-    LOG.info("Service IP address for token is " + host);
+    SecurityUtil.setTokenService(token, addr);
+    LOG.info("Service IP address for token is " + token.getService());
     current.addToken(token);
     current.doAs(new PrivilegedExceptionAction<Object>() {
       @Override

+ 2 - 3
src/test/org/apache/hadoop/ipc/MiniRPCBenchmark.java

@@ -28,12 +28,12 @@ import java.util.Enumeration;
 
 import junit.framework.Assert;
 
-import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.token.Token;
@@ -203,8 +203,7 @@ public class MiniRPCBenchmark {
             token = p.getDelegationToken(new Text(RENEWER));
             currentUgi = UserGroupInformation.createUserForTesting(MINI_USER, 
                 GROUP_NAMES);
-            token.setService(new Text(addr.getAddress().getHostAddress() 
-                + ":" + addr.getPort()));
+            SecurityUtil.setTokenService(token, addr);
             currentUgi.addToken(token);
             return p;
           }

+ 6 - 12
src/test/org/apache/hadoop/ipc/TestSaslRPC.java

@@ -231,10 +231,8 @@ public class TestSaslRPC {
         .getUserName()));
     Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId,
         sm);
-    Text host = new Text(addr.getAddress().getHostAddress() + ":"
-        + addr.getPort());
-    token.setService(host);
-    LOG.info("Service IP address for token is " + host);
+    SecurityUtil.setTokenService(token, addr);
+    LOG.info("Service IP address for token is " + token.getService());
     current.addToken(token);
 
     TestSaslProtocol proxy = null;
@@ -284,10 +282,8 @@ public class TestSaslRPC {
         .getUserName()));
     Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId,
         sm);
-    Text host = new Text(addr.getAddress().getHostAddress() + ":"
-        + addr.getPort());
-    token.setService(host);
-    LOG.info("Service IP address for token is " + host);
+    SecurityUtil.setTokenService(token, addr);
+    LOG.info("Service IP address for token is " + token.getService());
     current.addToken(token);
 
     Configuration newConf = new Configuration(conf);
@@ -371,10 +367,8 @@ public class TestSaslRPC {
         .getUserName()));
     Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId,
         sm);
-    Text host = new Text(addr.getAddress().getHostAddress() + ":"
-        + addr.getPort());
-    token.setService(host);
-    LOG.info("Service IP address for token is " + host);
+    SecurityUtil.setTokenService(token, addr);
+    LOG.info("Service IP address for token is " + token.getService());
     current.addToken(token);
 
     current.doAs(new PrivilegedExceptionAction<Object>() {

+ 3 - 4
src/test/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java

@@ -44,6 +44,7 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.SaslInputStream;
 import org.apache.hadoop.security.SaslRpcClient;
 import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import org.apache.log4j.Level;
@@ -89,10 +90,8 @@ public class TestUmbilicalProtocolWithJobToken {
     JobTokenIdentifier tokenId = new JobTokenIdentifier(new Text(jobId));
     Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(tokenId, sm);
     sm.addTokenForJob(jobId, token);
-    Text host = new Text(addr.getAddress().getHostAddress() + ":"
-        + addr.getPort());
-    token.setService(host);
-    LOG.info("Service IP address for token is " + host);
+    SecurityUtil.setTokenService(token, addr);
+    LOG.info("Service IP address for token is " + token.getService());
     current.addToken(token);
     current.doAs(new PrivilegedExceptionAction<Object>() {
       @Override

+ 2 - 6
src/test/org/apache/hadoop/security/TestDoAsEffectiveUser.java

@@ -417,9 +417,7 @@ public class TestDoAsEffectiveUser {
         .getUserName()), new Text("SomeSuperUser"));
     Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId,
         sm);
-    Text host = new Text(addr.getAddress().getHostAddress() + ":"
-        + addr.getPort());
-    token.setService(host);
+    SecurityUtil.setTokenService(token, addr);
     UserGroupInformation proxyUserUgi = UserGroupInformation
         .createProxyUserForTesting(PROXY_USER_NAME, current, GROUP_NAMES);
     proxyUserUgi.addToken(token);
@@ -474,9 +472,7 @@ public class TestDoAsEffectiveUser {
         .getUserName()), new Text("SomeSuperUser"));
     Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId,
         sm);
-    Text host = new Text(addr.getAddress().getHostAddress() + ":"
-        + addr.getPort());
-    token.setService(host);
+    SecurityUtil.setTokenService(token, addr);
     current.addToken(token);
     String retVal = current.doAs(new PrivilegedExceptionAction<String>() {
       @Override