Pārlūkot izejas kodu

commit eda8352b94af85cd7795660d930fe7fad98dd43b
Author: Jitendra Nath Pandey <jitendra@sufferhome-lm.(none)>
Date: Thu Mar 18 03:58:08 2010 -0700

HADOOP-6632 from https://issues.apache.org/jira/secure/attachment/12439144/HADOOP-6632-Y20S-22.patch

+++ b/YAHOO-CHANGES.txt
+ HADOOP-6632. Support for using different Kerberos keys for different
+ instances of Hadoop services. (jitendra)
+


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077334 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 gadi atpakaļ
vecāks
revīzija
0acef86ae1
21 mainītis faili ar 327 papildinājumiem un 141 dzēšanām
  1. 10 7
      src/core/org/apache/hadoop/ipc/Client.java
  2. 11 3
      src/core/org/apache/hadoop/ipc/Server.java
  3. 0 1
      src/core/org/apache/hadoop/security/KerberosName.java
  4. 100 6
      src/core/org/apache/hadoop/security/SecurityUtil.java
  5. 2 0
      src/core/org/apache/hadoop/security/UserGroupInformation.java
  6. 19 2
      src/core/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java
  7. 7 1
      src/core/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenIdentifier.java
  8. 4 1
      src/core/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
  9. 0 19
      src/hdfs/org/apache/hadoop/hdfs/DFSUtil.java
  10. 2 1
      src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  11. 1 1
      src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  12. 18 6
      src/hdfs/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
  13. 37 21
      src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
  14. 61 26
      src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
  15. 2 9
      src/hdfs/org/apache/hadoop/hdfs/tools/DFSck.java
  16. 7 11
      src/mapred/org/apache/hadoop/mapred/JobTracker.java
  17. 5 8
      src/mapred/org/apache/hadoop/mapred/TaskTracker.java
  18. 4 2
      src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java
  19. 2 15
      src/test/org/apache/hadoop/hdfs/security/TestDelegationToken.java
  20. 6 1
      src/test/org/apache/hadoop/ipc/TestSaslRPC.java
  21. 29 0
      src/test/org/apache/hadoop/security/TestSecurityUtil.java

+ 10 - 7
src/core/org/apache/hadoop/ipc/Client.java

@@ -54,6 +54,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.SaslRpcClient;
 import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -233,13 +234,15 @@ public class Client {
         KerberosInfo krbInfo = protocol.getAnnotation(KerberosInfo.class);
         if (krbInfo != null) {
           String serverKey = krbInfo.serverPrincipal();
-          if (serverKey != null) {
-            if(LOG.isDebugEnabled()) {
-            LOG.debug("server principal key for protocol="
-                + protocol.getCanonicalName() + " is " + serverKey + 
-                " and val =" + conf.get(serverKey));
-            }
-            serverPrincipal = conf.get(serverKey);
+          if (serverKey == null) {
+            throw new IOException(
+                "Can't obtain server Kerberos config key from KerberosInfo");
+          }
+          serverPrincipal = SecurityUtil.getServerPrincipal(
+              conf.get(serverKey), server.getAddress().getCanonicalHostName());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("RPC Server Kerberos principal name for protocol="
+                + protocol.getCanonicalName() + " is " + serverPrincipal);
           }
         }
       }

+ 11 - 3
src/core/org/apache/hadoop/ipc/Server.java

@@ -752,6 +752,7 @@ public abstract class Server {
     // Cache the remote host & port info so that even if the socket is 
     // disconnected, we can say where it used to connect to.
     private String hostAddress;
+    private String hostName;
     private int remotePort;
     
     ConnectionHeader header = new ConnectionHeader();
@@ -792,6 +793,7 @@ public abstract class Server {
         this.hostAddress = "*Unknown*";
       } else {
         this.hostAddress = addr.getHostAddress();
+        this.hostName = addr.getCanonicalHostName();
       }
       this.remotePort = socket.getPort();
       this.responseQueue = new LinkedList<Call>();
@@ -814,6 +816,10 @@ public abstract class Server {
       return hostAddress;
     }
 
+    public String getHostName() {
+      return hostName;
+    }
+    
     public void setLastContact(long lastContact) {
       this.lastContact = lastContact;
     }
@@ -1208,7 +1214,7 @@ public abstract class Server {
             && (authMethod != AuthMethod.DIGEST)) {
           ProxyUsers.authorize(user, this.getHostAddress(), conf);
         }
-        authorize(user, header);
+        authorize(user, header, getHostName());
         if (LOG.isDebugEnabled()) {
           LOG.debug("Successfully authorized " + header);
         }
@@ -1521,10 +1527,12 @@ public abstract class Server {
    * 
    * @param user client user
    * @param connection incoming connection
+   * @param hostname fully-qualified domain name of incoming connection
    * @throws AuthorizationException when the client isn't authorized to talk the protocol
    */
   public void authorize(UserGroupInformation user, 
-                        ConnectionHeader connection
+                        ConnectionHeader connection,
+                        String hostname
                         ) throws AuthorizationException {
     if (authorize) {
       Class<?> protocol = null;
@@ -1534,7 +1542,7 @@ public abstract class Server {
         throw new AuthorizationException("Unknown protocol: " + 
                                          connection.getProtocol());
       }
-      ServiceAuthorizationManager.authorize(user, protocol, getConf());
+      ServiceAuthorizationManager.authorize(user, protocol, getConf(), hostname);
     }
   }
   

+ 0 - 1
src/core/org/apache/hadoop/security/KerberosName.java

@@ -338,7 +338,6 @@ public class KerberosName {
   public static void setConfiguration(Configuration conf) throws IOException {
     String ruleString = conf.get("hadoop.security.auth_to_local", "DEFAULT");
     rules = parseRules(ruleString);
-    System.out.println("Default realm: " + defaultRealm);
   }
 
   @SuppressWarnings("serial")

+ 100 - 6
src/core/org/apache/hadoop/security/SecurityUtil.java

@@ -17,7 +17,9 @@
 package org.apache.hadoop.security;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.URL;
+import java.net.UnknownHostException;
 import java.security.AccessController;
 import java.util.Set;
 
@@ -26,6 +28,7 @@ import javax.security.auth.kerberos.KerberosTicket;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import sun.security.jgss.krb5.Krb5Util;
@@ -33,7 +36,8 @@ import sun.security.krb5.Credentials;
 import sun.security.krb5.PrincipalName;
 
 public class SecurityUtil {
-  private static final Log LOG = LogFactory.getLog(SecurityUtil.class);
+  public static final Log LOG = LogFactory.getLog(SecurityUtil.class);
+  public static final String HOSTNAME_PATTERN = "_HOST";
 
   /**
    * Find the original TGT within the current subject's credentials. Cross-realm
@@ -44,9 +48,13 @@ public class SecurityUtil {
    *           if TGT can't be found
    */
   private static KerberosTicket getTgtFromSubject() throws IOException {
-    Set<KerberosTicket> tickets = Subject.getSubject(
-        AccessController.getContext()).getPrivateCredentials(
-        KerberosTicket.class);
+    Subject current = Subject.getSubject(AccessController.getContext());
+    if (current == null) {
+      throw new IOException(
+          "Can't get TGT from current Subject, because it is null");
+    }
+    Set<KerberosTicket> tickets = current
+        .getPrivateCredentials(KerberosTicket.class);
     for (KerberosTicket t : tickets) {
       if (isOriginalTGT(t.getServer().getName()))
         return t;
@@ -84,7 +92,8 @@ public class SecurityUtil {
       return;
     
     String serviceName = "host/" + remoteHost.getHost();
-    LOG.debug("Fetching service ticket for host at: " + serviceName);
+    if (LOG.isDebugEnabled())
+      LOG.debug("Fetching service ticket for host at: " + serviceName);
     Credentials serviceCred = null;
     try {
       PrincipalName principal = new PrincipalName(serviceName,
@@ -92,7 +101,7 @@ public class SecurityUtil {
       serviceCred = Credentials.acquireServiceCreds(principal
           .toString(), Krb5Util.ticketToCreds(getTgtFromSubject()));
     } catch (Exception e) {
-      throw new IOException("Invalid service principal name: "
+      throw new IOException("Can't get service ticket for: "
           + serviceName, e);
     }
     if (serviceCred == null) {
@@ -101,4 +110,89 @@ public class SecurityUtil {
     Subject.getSubject(AccessController.getContext()).getPrivateCredentials()
         .add(Krb5Util.credsToTicket(serviceCred));
   }
+  
+  /**
+   * Convert Kerberos principal name conf values to valid Kerberos principal
+   * names. It replaces $host in the conf values with hostname, which should be
+   * fully-qualified domain name. If hostname is null or "0.0.0.0", it uses
+   * dynamically looked-up fqdn of the current host instead.
+   * 
+   * @param principalConfig
+   *          the Kerberos principal name conf value to convert
+   * @param hostname
+   *          the fully-qualified domain name used for substitution
+   * @return converted Kerberos principal name
+   * @throws IOException
+   */
+  public static String getServerPrincipal(String principalConfig,
+      String hostname) throws IOException {
+    if (principalConfig == null)
+      return null;
+    String[] components = principalConfig.split("[/@]");
+    if (components.length != 3) {
+      throw new IOException(
+          "Kerberos service principal name isn't configured properly "
+              + "(should have 3 parts): " + principalConfig);
+    }
+
+    if (components[1].equals(HOSTNAME_PATTERN)) {
+      String fqdn = hostname;
+      if (fqdn == null || fqdn.equals("") || fqdn.equals("0.0.0.0")) {
+        fqdn = getLocalHostName();
+      }
+      return components[0] + "/" + fqdn + "@" + components[2];
+    } else {
+      return principalConfig;
+    }
+  }
+  
+  static String getLocalHostName() throws UnknownHostException {
+    return InetAddress.getLocalHost().getCanonicalHostName();
+  }
+
+  /**
+   * If a keytab has been provided, login as that user. Substitute $host in
+   * user's Kerberos principal name with a dynamically looked-up fully-qualified
+   * domain name of the current host.
+   * 
+   * @param conf
+   *          conf to use
+   * @param keytabFileKey
+   *          the key to look for keytab file in conf
+   * @param userNameKey
+   *          the key to look for user's Kerberos principal name in conf
+   * @throws IOException
+   */
+  public static void login(final Configuration conf,
+      final String keytabFileKey, final String userNameKey) throws IOException {
+    login(conf, keytabFileKey, userNameKey, getLocalHostName());
+  }
+
+  /**
+   * If a keytab has been provided, login as that user. Substitute $host in
+   * user's Kerberos principal name with hostname.
+   * 
+   * @param conf
+   *          conf to use
+   * @param keytabFileKey
+   *          the key to look for keytab file in conf
+   * @param userNameKey
+   *          the key to look for user's Kerberos principal name in conf
+   * @param hostname
+   *          hostname to use for substitution
+   * @throws IOException
+   */
+  public static void login(final Configuration conf,
+      final String keytabFileKey, final String userNameKey, String hostname)
+      throws IOException {
+    String keytabFilename = conf.get(keytabFileKey);
+    if (keytabFilename == null)
+      return;
+
+    String principalConfig = conf.get(userNameKey, System
+        .getProperty("user.name"));
+    String principalName = SecurityUtil.getServerPrincipal(principalConfig,
+        hostname);
+    UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename);
+  }
 }

+ 2 - 0
src/core/org/apache/hadoop/security/UserGroupInformation.java

@@ -417,6 +417,8 @@ public class UserGroupInformation {
       throw new IOException("Login failure for " + user + " from keytab " + 
                             path, le);
     }
+    LOG.info("Login successful for user " + keytabPrincipal
+        + " using keytab file " + keytabFile);
   }
   /**
    * Log a user in from a keytab file. Loads a user identity from a keytab

+ 19 - 2
src/core/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.security.authorize;
 
+import java.io.IOException;
 import java.util.IdentityHashMap;
 import java.util.Map;
 
@@ -25,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
@@ -59,11 +61,14 @@ public class ServiceAuthorizationManager {
    * 
    * @param user user accessing the service 
    * @param protocol service being accessed
+   * @param conf configuration to use
+   * @param hostname fully qualified domain name of the client
    * @throws AuthorizationException on authorization failure
    */
   public static void authorize(UserGroupInformation user, 
                                Class<?> protocol,
-                               Configuration conf
+                               Configuration conf,
+                               String hostname
                                ) throws AuthorizationException {
     AccessControlList acl = protocolToAcl.get(protocol);
     if (acl == null) {
@@ -77,7 +82,19 @@ public class ServiceAuthorizationManager {
     if (krbInfo != null) {
       String clientKey = krbInfo.clientPrincipal();
       if (clientKey != null && !clientKey.equals("")) {
-        clientPrincipal = conf.get(clientKey);
+        if (hostname == null) {
+          throw new AuthorizationException(
+              "Can't authorize client when client hostname is null");
+        }
+        try {
+          clientPrincipal = SecurityUtil.getServerPrincipal(
+              conf.get(clientKey), hostname);
+        } catch (IOException e) {
+          throw (AuthorizationException) new AuthorizationException(
+              "Can't figure out Kerberos principal name for connection from "
+                  + hostname + " for user=" + user + " protocol=" + protocol)
+              .initCause(e);
+        }
       }
     }
     if((clientPrincipal != null && !clientPrincipal.equals(user.getUserName())) || 

+ 7 - 1
src/core/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenIdentifier.java

@@ -28,6 +28,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.KerberosName;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.TokenIdentifier;
 
@@ -57,7 +58,12 @@ extends TokenIdentifier {
     if (renewer == null) {
       this.renewer = new Text();
     } else {
-      this.renewer = renewer;
+      KerberosName renewerKrbName = new KerberosName(renewer.toString());
+      try {
+        this.renewer = new Text(renewerKrbName.getShortName());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
     }
     if (realUser == null) {
       this.realUser = new Text();

+ 4 - 1
src/core/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java

@@ -36,6 +36,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.KerberosName;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.util.Daemon;
@@ -280,8 +281,10 @@ extends AbstractDelegationTokenIdentifier>
     }
     String owner = id.getUser().getUserName();
     Text renewer = id.getRenewer();
+    KerberosName cancelerKrbName = new KerberosName(canceller);
+    String cancelerShortName = cancelerKrbName.getShortName();
     if (!canceller.equals(owner)
-        && (renewer == null || "".equals(renewer.toString()) || !canceller
+        && (renewer == null || "".equals(renewer.toString()) || !cancelerShortName
             .equals(renewer.toString()))) {
       throw new AccessControlException(canceller
           + " is not authorized to cancel the token");

+ 0 - 19
src/hdfs/org/apache/hadoop/hdfs/DFSUtil.java

@@ -18,13 +18,10 @@
 
 package org.apache.hadoop.hdfs;
 
-import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 
-import org.apache.hadoop.conf.Configuration;
 import java.util.StringTokenizer;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
 
 public class DFSUtil {
   /**
@@ -51,22 +48,6 @@ public class DFSUtil {
     }
     return true;
   }
-
-  /**
-   * If a keytab has been provided, login as that user.
-   */
-  public static void login(final Configuration conf,
-                           final String keytabFileKey,
-                           final String userNameKey)
-                           throws IOException {
-    String keytabFilename = conf.get(keytabFileKey);
-    
-    if(keytabFilename == null)
-      return;
-    
-    String user = conf.get(userNameKey, System.getProperty("user.name"));
-    UserGroupInformation.loginUserFromKeytab(user, keytabFilename);
-  }
   
   /**
    * Converts a byte array to a string using UTF8 encoding.

+ 2 - 1
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -98,6 +98,7 @@ import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DiskChecker;
@@ -238,7 +239,7 @@ public class DataNode extends Configured
            final AbstractList<File> dataDirs) throws IOException {
     super(conf);
     UserGroupInformation.setConfiguration(conf);
-    DFSUtil.login(conf, DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, 
+    SecurityUtil.login(conf, DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, 
         DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY);
 
     datanodeObject = this;

+ 1 - 1
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -4979,7 +4979,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
       throw new IOException(
           "Delegation Token can be renewed only with kerberos or web authentication");
     }
-    String renewer = UserGroupInformation.getCurrentUser().getUserName();
+    String renewer = UserGroupInformation.getCurrentUser().getShortUserName();
     long expiryTime = dtSecretManager.renewToken(token, renewer);
     DelegationTokenIdentifier id = new DelegationTokenIdentifier();
     ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());

+ 18 - 6
src/hdfs/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java

@@ -31,6 +31,7 @@ import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import org.apache.hadoop.security.SecurityUtil;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -102,7 +103,9 @@ public class GetImageServlet extends HttpServlet {
           // use these key values.
           return UserGroupInformation
           .loginUserFromKeytabAndReturnUGI(
-              conf.get(DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY), 
+                  SecurityUtil.getServerPrincipal(conf
+                      .get(DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY), NameNode
+                      .getAddress(conf).getHostName()),
               conf.get(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY));
         }
       });
@@ -116,16 +119,25 @@ public class GetImageServlet extends HttpServlet {
     }
   }
   
-  private boolean isValidRequestor(String remoteUser, Configuration conf) {
+  private boolean isValidRequestor(String remoteUser, Configuration conf)
+      throws IOException {
     if(remoteUser == null) { // This really shouldn't happen...
       LOG.warn("Received null remoteUser while authorizing access to getImage servlet");
       return false;
     }
     
-    String [] validRequestors = {conf.get(DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY),
-                                 conf.get(DFS_NAMENODE_USER_NAME_KEY),
-                                 conf.get(DFS_SECONDARY_NAMENODE_KRB_HTTPS_USER_NAME_KEY),
-                                 conf.get(DFS_SECONDARY_NAMENODE_USER_NAME_KEY) };
+    String[] validRequestors = {
+        SecurityUtil.getServerPrincipal(conf
+            .get(DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY), NameNode.getAddress(
+            conf).getHostName()),
+        SecurityUtil.getServerPrincipal(conf.get(DFS_NAMENODE_USER_NAME_KEY),
+            NameNode.getAddress(conf).getHostName()),
+        SecurityUtil.getServerPrincipal(conf
+            .get(DFS_SECONDARY_NAMENODE_KRB_HTTPS_USER_NAME_KEY),
+            SecondaryNameNode.getHttpAddress(conf).getHostName()),
+        SecurityUtil.getServerPrincipal(conf
+            .get(DFS_SECONDARY_NAMENODE_USER_NAME_KEY), SecondaryNameNode
+            .getHttpAddress(conf).getHostName()) };
     
     for(String v : validRequestors) {
       if(v != null && v.equals(remoteUser)) {

+ 37 - 21
src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -59,6 +59,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.security.RefreshUserToGroupMappingsProtocol;
+import org.apache.hadoop.security.SecurityUtil;
 
 import java.io.*;
 import java.net.*;
@@ -182,6 +183,9 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
    */
   private void initialize(Configuration conf) throws IOException {
     InetSocketAddress socAddr = NameNode.getAddress(conf);
+    UserGroupInformation.setConfiguration(conf);
+    SecurityUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, 
+        DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName());
     int handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
     
     // set service-level authorization security policy
@@ -221,30 +225,43 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
     this.emptier.start();
   }
 
+  @SuppressWarnings("deprecation")
+  public static String getInfoServer(Configuration conf) {
+    String http = UserGroupInformation.isSecurityEnabled() ? "dfs.https.address"
+        : "dfs.http.address";
+    return NetUtils.getServerAddress(conf, "dfs.info.bindAddress",
+        "dfs.info.port", http);
+  }
+  
+  @SuppressWarnings("deprecation")
   private void startHttpServer(final Configuration conf) throws IOException {
+    final String infoAddr = NetUtils.getServerAddress(conf,
+        "dfs.info.bindAddress", "dfs.info.port", "dfs.http.address");
+    final InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
     if(UserGroupInformation.isSecurityEnabled()) {
-      String httpsUser = conf.get(DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY);
-      if(httpsUser == null) {
-        LOG.warn(DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY + 
-            " not defined in config. Starting http server as " 
-            + conf.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY)
-        	  +	": Kerberized SSL may be not function correctly.");
+      String httpsUser = SecurityUtil.getServerPrincipal(conf
+          .get(DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY), infoSocAddr
+          .getHostName());
+      if (httpsUser == null) {
+        LOG.warn(DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY
+            + " not defined in config. Starting http server as "
+            + SecurityUtil.getServerPrincipal(conf
+                .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), serverAddress
+                .getHostName())
+            + ": Kerberized SSL may be not function correctly.");
       } else {
         // Kerberized SSL servers must be run from the host principal...
         LOG.info("Logging in as " + httpsUser + " to start http server.");
-        DFSUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, 
-            DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY);
-        }
+        SecurityUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
+            DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY, infoSocAddr
+                .getHostName());
+      }
     }
     UserGroupInformation ugi = UserGroupInformation.getLoginUser();
     try {
       this.httpServer = ugi.doAs(new PrivilegedExceptionAction<HttpServer>() {
         @Override
         public HttpServer run() throws IOException, InterruptedException {
-          String infoAddr = 
-            NetUtils.getServerAddress(conf, "dfs.info.bindAddress", 
-                                      "dfs.info.port", "dfs.http.address");
-          InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
           String infoHost = infoSocAddr.getHostName();
           int infoPort = infoSocAddr.getPort();
           httpServer = new HttpServer("hdfs", infoHost, infoPort, 
@@ -301,10 +318,13 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
       if(UserGroupInformation.isSecurityEnabled() && 
           conf.get(DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY) != null) {
         // Go back to being the correct Namenode principal
-        LOG.info("Logging back in as " + conf.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY)
-            + " following http server start.");
-        DFSUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
-            DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY);
+        LOG.info("Logging back in as "
+            + SecurityUtil.getServerPrincipal(conf
+                .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), serverAddress
+                .getHostName()) + " following http server start.");
+        SecurityUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
+            DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, serverAddress
+                .getHostName());
       }
     }
  }
@@ -332,10 +352,6 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
    * @throws IOException
    */
   public NameNode(Configuration conf) throws IOException {
-    UserGroupInformation.setConfiguration(conf);
-    DFSUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, 
-                        DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY);
-    
     try {
       initialize(conf);
     } catch (IOException e) {

+ 61 - 26
src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java

@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -42,6 +43,7 @@ import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Krb5AndCertsSslSocketConnector;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
@@ -61,6 +63,10 @@ import org.apache.hadoop.util.StringUtils;
  **********************************************************/
 public class SecondaryNameNode implements Runnable {
     
+  static{
+    Configuration.addDefaultResource("hdfs-default.xml");
+    Configuration.addDefaultResource("hdfs-site.xml");
+  }
   public static final Log LOG = 
     LogFactory.getLog(SecondaryNameNode.class.getName());
 
@@ -119,9 +125,6 @@ public class SecondaryNameNode implements Runnable {
    * Create a connection to the primary namenode.
    */
   public SecondaryNameNode(Configuration conf)  throws IOException {
-    DFSUtil.login(conf, 
-        DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY,
-        DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY);
     try {
       initialize(conf);
     } catch(IOException e) {
@@ -129,11 +132,27 @@ public class SecondaryNameNode implements Runnable {
       throw e;
     }
   }
-
+  
+  @SuppressWarnings("deprecation")
+  public static InetSocketAddress getHttpAddress(Configuration conf) {
+    String infoAddr = NetUtils.getServerAddress(conf,
+        "dfs.secondary.info.bindAddress", "dfs.secondary.info.port",
+        "dfs.secondary.http.address");
+    return NetUtils.createSocketAddr(infoAddr);
+  }
+  
   /**
    * Initialize SecondaryNameNode.
    */
   private void initialize(final Configuration conf) throws IOException {
+    final InetSocketAddress infoSocAddr = getHttpAddress(conf);
+    infoBindAddress = infoSocAddr.getHostName();
+    if (UserGroupInformation.isSecurityEnabled()) {
+      SecurityUtil.login(conf, 
+          DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY,
+          DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY,
+          infoBindAddress);
+    }
     // initiate Java VM metrics
     JvmMetrics.init("SecondaryNameNode", conf.get("session.id"));
     
@@ -161,8 +180,12 @@ public class SecondaryNameNode implements Runnable {
 
     // initialize the webserver for uploading files.
     // Kerberized SSL servers must be run from the host principal...
-    DFSUtil.login(conf, DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY, 
-        DFSConfigKeys.DFS_SECONDARY_NAMENODE_KRB_HTTPS_USER_NAME_KEY);
+    if (UserGroupInformation.isSecurityEnabled()) {
+      SecurityUtil.login(conf,
+          DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY,
+          DFSConfigKeys.DFS_SECONDARY_NAMENODE_KRB_HTTPS_USER_NAME_KEY,
+          infoBindAddress);
+    }
     UserGroupInformation ugi = UserGroupInformation.getLoginUser();
     try {
       infoServer = ugi.doAs(new PrivilegedExceptionAction<HttpServer>() {
@@ -172,14 +195,6 @@ public class SecondaryNameNode implements Runnable {
           LOG.info("Starting web server as: " +
               UserGroupInformation.getLoginUser().getUserName());
 
-          String infoAddr = 
-            NetUtils.getServerAddress(conf, 
-                "dfs.secondary.info.bindAddress",
-                "dfs.secondary.info.port",
-            "dfs.secondary.http.address");
-
-          InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
-          infoBindAddress = infoSocAddr.getHostName();
           int tmpInfoPort = infoSocAddr.getPort();
           infoServer = new HttpServer("secondary", infoBindAddress, tmpInfoPort,
               tmpInfoPort == 0, conf);
@@ -205,12 +220,15 @@ public class SecondaryNameNode implements Runnable {
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     } finally {
-      // Go back to being the correct Namenode principal
-      LOG.info("Web server init done, returning to: " + 
-          UserGroupInformation.getLoginUser().getUserName());
-      DFSUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
-          DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY);
-      
+      if (UserGroupInformation.isSecurityEnabled()) {
+        // Go back to being the correct Namenode principal
+        SecurityUtil.login(conf, 
+            DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY,
+            DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY,
+            infoBindAddress);
+        LOG.info("Web server init done, returning to: " + 
+            UserGroupInformation.getLoginUser().getUserName());
+      }
     }
     // The web-server port can be ephemeral... ensure we have the correct info
     
@@ -245,10 +263,31 @@ public class SecondaryNameNode implements Runnable {
     }
   }
 
+  public void run() {
+    if (UserGroupInformation.isSecurityEnabled()) {
+      UserGroupInformation ugi = null;
+      try { 
+        ugi = UserGroupInformation.getLoginUser();
+      } catch (IOException e) {
+        LOG.error(StringUtils.stringifyException(e));
+        e.printStackTrace();
+        Runtime.getRuntime().exit(-1);
+      }
+      ugi.doAs(new PrivilegedAction<Object>() {
+        @Override
+        public Object run() {
+          doWork();
+          return null;
+        }
+      });
+    } else {
+      doWork();
+    }
+  }
   //
   // The main work loop
   //
-  public void run() {
+  public void doWork() {
 
     //
     // Poll the Namenode (once every 5 minutes) to find the size of the
@@ -352,11 +391,7 @@ public class SecondaryNameNode implements Runnable {
     if (!"hdfs".equals(fsName.getScheme())) {
       throw new IOException("This is not a DFS");
     }
-    String http = UserGroupInformation.isSecurityEnabled() ? "dfs.https.address" 
-                                                           : "dfs.http.address";
-    String infoAddr = NetUtils.getServerAddress(conf, "dfs.info.bindAddress", 
-                                     "dfs.info.port", http);
-    
+    String infoAddr = NameNode.getInfoServer(conf);
     LOG.debug("infoAddr = " + infoAddr);
     return infoAddr;
   }

+ 2 - 9
src/hdfs/org/apache/hadoop/hdfs/tools/DFSck.java

@@ -28,6 +28,7 @@ import java.security.PrivilegedExceptionAction;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Krb5AndCertsSslSocketConnector;
@@ -77,14 +78,6 @@ public class DFSck extends Configured implements Tool {
     this.ugi = UserGroupInformation.getCurrentUser();
   }
   
-  private String getInfoServer() throws IOException {
-    // select the right config
-    String http = UserGroupInformation.isSecurityEnabled() ? 
-        "dfs.https.address" : "dfs.http.address";
-    return NetUtils.getServerAddress(getConf(), "dfs.info.bindAddress",
-        "dfs.info.port", http);
-  }
-  
   /**
    * Print fsck usage information
    */
@@ -125,7 +118,7 @@ public class DFSck extends Configured implements Tool {
           }
           
           final StringBuffer url = new StringBuffer(proto);
-          url.append(getInfoServer()).append("/fsck?ugi=").append(ugi.getShortUserName()).append("&path=");
+          url.append(NameNode.getInfoServer(getConf())).append("/fsck?ugi=").append(ugi.getShortUserName()).append("&path=");
 
           String dir = "/";
           // find top-level dir first

+ 7 - 11
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -93,6 +93,7 @@ import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.security.RefreshUserToGroupMappingsProtocol;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.AuthorizationException;
@@ -2005,15 +2006,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   JobTracker(final JobConf conf, String identifier, Clock clock) 
   throws IOException, InterruptedException { 
     this.clock = clock;
+    // Set ports, start RPC servers, setup security policy etc.
+    InetSocketAddress addr = getAddress(conf);
+    this.localMachine = addr.getHostName();
+    this.port = addr.getPort();
     // find the owner of the process
     // get the desired principal to load
-    String keytabFilename = conf.get(JT_KEYTAB_FILE);
     UserGroupInformation.setConfiguration(conf);
-    if (keytabFilename != null) {
-      String desiredUser = conf.get(JT_USER_NAME,
-                                   System.getProperty("user.name"));
-      UserGroupInformation.loginUserFromKeytab(desiredUser,
-                                               keytabFilename);
+    SecurityUtil.login(conf, JT_KEYTAB_FILE, JT_USER_NAME, localMachine);
+    if (UserGroupInformation.isLoginKeytabBased()) {
       mrOwner = UserGroupInformation.getLoginUser();
     } else {
       mrOwner = UserGroupInformation.getCurrentUser();
@@ -2091,11 +2092,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       = conf.getClass("mapred.jobtracker.taskScheduler",
           JobQueueTaskScheduler.class, TaskScheduler.class);
     taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
-                                           
-    // Set ports, start RPC servers, setup security policy etc.
-    InetSocketAddress addr = getAddress(conf);
-    this.localMachine = addr.getHostName();
-    this.port = addr.getPort();
     
     // Set service-level authorization security policy
     if (conf.getBoolean(

+ 5 - 8
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -25,6 +25,7 @@ import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -90,7 +91,7 @@ import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.Updater;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
-import java.security.PrivilegedExceptionAction;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
@@ -582,18 +583,14 @@ public class TaskTracker
    */
   synchronized void initialize() throws IOException, InterruptedException {
     this.fConf = new JobConf(originalConf);
-    String keytabFilename = fConf.get(TT_KEYTAB_FILE);
     UserGroupInformation.setConfiguration(fConf);
-    if (keytabFilename != null) {
-      String desiredUser = fConf.get(TT_USER_NAME,
-                                    System.getProperty("user.name"));
-      UserGroupInformation.loginUserFromKeytab(desiredUser,
-                                               keytabFilename);
+    SecurityUtil.login(fConf, TT_KEYTAB_FILE, TT_USER_NAME);
+    if (UserGroupInformation.isLoginKeytabBased()) {
       mrOwner = UserGroupInformation.getLoginUser();
-
     } else {
       mrOwner = UserGroupInformation.getCurrentUser();
     }
+
     supergroup = fConf.get(JobConf.MR_SUPERGROUP,
                            "supergroup");
     LOG.info("Starting tasktracker with owner as " + mrOwner.getShortUserName()

+ 4 - 2
src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java

@@ -40,6 +40,7 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.KerberosName;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -86,7 +87,8 @@ public class TokenCache {
                                                Path [] ps, Configuration conf)
   throws IOException {
     // get jobtracker principal id (for the renewer)
-    Text jtCreds = new Text(conf.get(JobTracker.JT_USER_NAME, ""));
+    KerberosName jtKrbName = new KerberosName(conf.get(JobTracker.JT_USER_NAME, ""));
+    Text delegTokenRenewer = new Text(jtKrbName.getShortName());
     boolean notReadFile = true;
     for(Path p: ps) {
       //TODO: Connecting to the namenode is not required in the case,
@@ -120,7 +122,7 @@ public class TokenCache {
           }
         }
         // get the token
-        token = dfs.getDelegationToken(jtCreds);
+        token = dfs.getDelegationToken(delegTokenRenewer);
         if(token==null) 
           throw new IOException("Token from " + fs_addr + " is null");
 

+ 2 - 15
src/test/org/apache/hadoop/hdfs/security/TestDelegationToken.java

@@ -161,7 +161,7 @@ public class TestDelegationToken {
   public void testDelegationTokenWithDoAs() throws Exception {
     final DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
     final Token<DelegationTokenIdentifier> token = dfs.getDelegationToken(new Text(
-        "JobTracker/foo.com@FOO.COM"));
+        "JobTracker"));
     final UserGroupInformation longUgi = UserGroupInformation
         .createRemoteUser("JobTracker/foo.com@FOO.COM");
     final UserGroupInformation shortUgi = UserGroupInformation
@@ -183,20 +183,7 @@ public class TestDelegationToken {
       public Object run() throws IOException {
         final DistributedFileSystem dfs = (DistributedFileSystem) cluster
             .getFileSystem();
-        try {
-          //try renew with long name
-          dfs.renewDelegationToken(token);
-          Assert.fail("Should not renew delegation token for short user name");
-        } catch (IOException e) {
-          //PASS
-        }
-        try {
-          //try cancel with long name
-          dfs.cancelDelegationToken(token);
-          Assert.fail("Should not cancel delegation token for short user name");
-        } catch (IOException e) {
-          //PASS
-        }
+        dfs.renewDelegationToken(token);
         return null;
       }
     });

+ 6 - 1
src/test/org/apache/hadoop/ipc/TestSaslRPC.java

@@ -46,6 +46,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.SaslInputStream;
 import org.apache.hadoop.security.SaslRpcClient;
 import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 
@@ -61,6 +62,7 @@ public class TestSaslRPC {
   
   static final String ERROR_MESSAGE = "Token is invalid";
   static final String SERVER_PRINCIPAL_KEY = "test.ipc.server.principal";
+  static final String SERVER_KEYTAB_KEY = "test.ipc.server.keytab";
   private static Configuration conf;
   static {
     conf = new Configuration();
@@ -74,6 +76,7 @@ public class TestSaslRPC {
     ((Log4JLogger) SaslRpcClient.LOG).getLogger().setLevel(Level.ALL);
     ((Log4JLogger) SaslRpcServer.LOG).getLogger().setLevel(Level.ALL);
     ((Log4JLogger) SaslInputStream.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) SecurityUtil.LOG).getLogger().setLevel(Level.ALL);
   }
 
   public static class TestTokenIdentifier extends TokenIdentifier {
@@ -244,7 +247,8 @@ public class TestSaslRPC {
   static void testKerberosRpc(String principal, String keytab) throws Exception {
     final Configuration newConf = new Configuration(conf);
     newConf.set(SERVER_PRINCIPAL_KEY, principal);
-    UserGroupInformation.loginUserFromKeytab(principal, keytab);
+    newConf.set(SERVER_KEYTAB_KEY, keytab);
+    SecurityUtil.login(newConf, SERVER_KEYTAB_KEY, SERVER_PRINCIPAL_KEY);
     UserGroupInformation current = UserGroupInformation.getCurrentUser();
     System.out.println("UGI: " + current);
 
@@ -265,6 +269,7 @@ public class TestSaslRPC {
         RPC.stopProxy(proxy);
       }
     }
+    System.out.println("Test is successful.");
   }
   
   @Test

+ 29 - 0
src/test/org/apache/hadoop/security/TestSecurityUtil.java

@@ -17,6 +17,9 @@
 package org.apache.hadoop.security;
 
 import static org.junit.Assert.*;
+
+import java.io.IOException;
+
 import org.junit.Test;
 
 public class TestSecurityUtil {
@@ -32,4 +35,30 @@ public class TestSecurityUtil {
     assertFalse(SecurityUtil.isOriginalTGT("this@is/notright"));
     assertFalse(SecurityUtil.isOriginalTGT("krbtgt/foo@FOO"));
   }
+  
+  private void verify(String original, String hostname, String expected)
+      throws IOException {
+    assertTrue(SecurityUtil.getServerPrincipal(original, hostname).equals(
+        expected));
+    assertTrue(SecurityUtil.getServerPrincipal(original, null).equals(
+        expected));
+    assertTrue(SecurityUtil.getServerPrincipal(original, "").equals(
+        expected));
+    assertTrue(SecurityUtil.getServerPrincipal(original, "0.0.0.0").equals(
+        expected));
+  }
+
+  @Test
+  public void testGetServerPrincipal() throws IOException {
+    String service = "hdfs/";
+    String realm = "@REALM";
+    String hostname = SecurityUtil.getLocalHostName();
+    String shouldReplace = service + SecurityUtil.HOSTNAME_PATTERN + realm;
+    String replaced = service + hostname + realm;
+    verify(shouldReplace, hostname, replaced);
+    String shouldNotReplace = service + SecurityUtil.HOSTNAME_PATTERN + "NAME"
+        + realm;
+    verify(shouldNotReplace, hostname, shouldNotReplace);
+    verify(shouldNotReplace, shouldNotReplace, shouldNotReplace);
+  }
 }