Просмотр исходного кода

HADOOP-17346. Fair call queue is defeated by abusive service principals (#2431)

Co-authored-by: ahussein <ahmed.hussein@verizonmedia.com>
Ahmed Hussein 4 лет назад
Родитель
Сommit
5ce18101cb

+ 14 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -212,6 +213,19 @@ public class CallQueueManager<E extends Schedulable>
     return scheduler.getPriorityLevel(e);
   }
 
+  int getPriorityLevel(UserGroupInformation user) {
+    if (scheduler instanceof DecayRpcScheduler) {
+      return ((DecayRpcScheduler)scheduler).getPriorityLevel(user);
+    }
+    return 0;
+  }
+
+  void setPriorityLevel(UserGroupInformation user, int priority) {
+    if (scheduler instanceof DecayRpcScheduler) {
+      ((DecayRpcScheduler)scheduler).setPriorityLevel(user, priority);
+    }
+  }
+
   void setClientBackoffEnabled(boolean value) {
     clientBackOffEnabled = value;
   }

+ 48 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java

@@ -40,6 +40,8 @@ import javax.management.ObjectName;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
+
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.AtomicDoubleArray;
 import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -193,6 +195,7 @@ public class DecayRpcScheduler implements RpcScheduler,
   private static final double PRECISION = 0.0001;
   private MetricsProxy metricsProxy;
   private final CostProvider costProvider;
+  private final Map<String, Integer> staticPriorities = new HashMap<>();
   private Set<String> serviceUserNames;
 
   /**
@@ -581,7 +584,10 @@ public class DecayRpcScheduler implements RpcScheduler,
     if (isServiceUser((String)identity)) {
       return 0;
     }
-
+    Integer staticPriority = staticPriorities.get(identity);
+    if (staticPriority != null) {
+      return staticPriority.intValue();
+    }
     long totalCallSnapshot = totalDecayedCallCost.get();
 
     double proportion = 0;
@@ -626,6 +632,15 @@ public class DecayRpcScheduler implements RpcScheduler,
     return priority;
   }
 
+  private String getIdentity(Schedulable obj) {
+    String identity = this.identityProvider.makeIdentity(obj);
+    if (identity == null) {
+      // Identity provider did not handle this
+      identity = DECAYSCHEDULER_UNKNOWN_IDENTITY;
+    }
+    return identity;
+  }
+
   /**
    * Compute the appropriate priority for a schedulable based on past requests.
    * @param obj the schedulable obj to query and remember
@@ -634,15 +649,42 @@ public class DecayRpcScheduler implements RpcScheduler,
   @Override
   public int getPriorityLevel(Schedulable obj) {
     // First get the identity
-    String identity = this.identityProvider.makeIdentity(obj);
-    if (identity == null) {
-      // Identity provider did not handle this
-      identity = DECAYSCHEDULER_UNKNOWN_IDENTITY;
-    }
+    String identity = getIdentity(obj);
+    // highest priority users may have a negative priority but their
+    // calls will be priority 0.
+    return Math.max(0, cachedOrComputedPriorityLevel(identity));
+  }
 
+  @VisibleForTesting
+  int getPriorityLevel(UserGroupInformation ugi) {
+    String identity = getIdentity(newSchedulable(ugi));
+    // returns true priority of the user.
     return cachedOrComputedPriorityLevel(identity);
   }
 
+  @VisibleForTesting
+  void setPriorityLevel(UserGroupInformation ugi, int priority) {
+    String identity = getIdentity(newSchedulable(ugi));
+    priority = Math.min(numLevels - 1, priority);
+    LOG.info("Setting priority for user:" + identity + "=" + priority);
+    staticPriorities.put(identity, priority);
+  }
+
+  // dummy instance to conform to identity provider api.
+  private static Schedulable newSchedulable(UserGroupInformation ugi) {
+    return new Schedulable() {
+      @Override
+      public UserGroupInformation getUserGroupInformation() {
+        return ugi;
+      }
+
+      @Override
+      public int getPriorityLevel() {
+        return 0;
+      }
+    };
+  }
+
   private boolean isServiceUser(String userName) {
     return this.serviceUserNames.contains(userName);
   }

+ 13 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java

@@ -51,6 +51,7 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.Rpc
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -980,7 +981,18 @@ public class RPC {
            " ProtocolImpl=" + protocolImpl.getClass().getName() +
            " protocolClass=" + protocolClass.getName());
      }
-   }
+      String client = SecurityUtil.getClientPrincipal(protocolClass, getConf());
+      if (client != null) {
+        // notify the server's rpc scheduler that the protocol user has
+        // highest priority.  the scheduler should exempt the user from
+        // priority calculations.
+        try {
+          setPriorityLevel(UserGroupInformation.createRemoteUser(client), -1);
+        } catch (Exception ex) {
+          LOG.warn("Failed to set scheduling priority for " + client, ex);
+        }
+      }
+    }
    
    static class VerProtocolImpl {
      final long version;

+ 16 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -643,7 +643,22 @@ public abstract class Server {
           address.getPort(), e);
     }
   }
-  
+
+  @org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting
+  int getPriorityLevel(Schedulable e) {
+    return callQueue.getPriorityLevel(e);
+  }
+
+  @org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting
+  int getPriorityLevel(UserGroupInformation ugi) {
+    return callQueue.getPriorityLevel(ugi);
+  }
+
+  @org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting
+  void setPriorityLevel(UserGroupInformation ugi, int priority) {
+    callQueue.setPriorityLevel(ugi, priority);
+  }
+
   /**
    * Returns a handle to the rpcMetrics (required in tests)
    * @return rpc metrics

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/UserIdentityProvider.java

@@ -31,6 +31,6 @@ public class UserIdentityProvider implements IdentityProvider {
       return null;
     }
 
-    return ugi.getUserName();
+    return ugi.getShortUserName();
   }
 }

+ 19 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java

@@ -380,7 +380,25 @@ public final class SecurityUtil {
     }
     return null;
   }
- 
+
+  /**
+   * Look up the client principal for a given protocol. It searches all known
+   * SecurityInfo providers.
+   * @param protocol the protocol class to get the information for
+   * @param conf configuration object
+   * @return client principal or null if it has no client principal defined.
+   */
+  public static String getClientPrincipal(Class<?> protocol,
+      Configuration conf) {
+    String user = null;
+    KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf);
+    if (krbInfo != null) {
+      String key = krbInfo.clientPrincipal();
+      user = (key != null && !key.isEmpty()) ? conf.get(key) : null;
+    }
+    return user;
+  }
+
   /**
    * Look up the TokenInfo for a given protocol. It searches all known
    * SecurityInfo providers.

+ 11 - 14
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java

@@ -28,7 +28,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.MachineList;
@@ -101,21 +100,19 @@ public class ServiceAuthorizationManager {
     String clientPrincipal = null;
     if (UserGroupInformation.isSecurityEnabled()) {
       // get client principal key to verify (if available)
-      KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf);
-      if (krbInfo != null) {
-        String clientKey = krbInfo.clientPrincipal();
-        if (clientKey != null && !clientKey.isEmpty()) {
-          try {
-            clientPrincipal = SecurityUtil.getServerPrincipal(
-                conf.get(clientKey), addr);
-          } catch (IOException e) {
-            throw (AuthorizationException) new AuthorizationException(
-                "Can't figure out Kerberos principal name for connection from "
-                + addr + " for user=" + user + " protocol=" + protocol)
-                .initCause(e);
-          }
+      clientPrincipal = SecurityUtil.getClientPrincipal(protocol, conf);
+      try {
+        if (clientPrincipal != null) {
+          clientPrincipal =
+              SecurityUtil.getServerPrincipal(clientPrincipal, addr);
         }
+      } catch (IOException e) {
+        throw (AuthorizationException) new AuthorizationException(
+            "Can't figure out Kerberos principal name for connection from "
+                + addr + " for user=" + user + " protocol=" + protocol)
+            .initCause(e);
       }
+
     }
     if((clientPrincipal != null && !clientPrincipal.equals(user.getUserName())) || 
        acls.length != 2  || !acls[0].isUserAllowed(user) || acls[1].isUserAllowed(user)) {

+ 1 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java

@@ -48,9 +48,8 @@ import java.util.concurrent.TimeUnit;
 public class TestDecayRpcScheduler {
   private Schedulable mockCall(String id) {
     Schedulable mockCall = mock(Schedulable.class);
-    UserGroupInformation ugi = mock(UserGroupInformation.class);
+    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(id);
 
-    when(ugi.getUserName()).thenReturn(id);
     when(mockCall.getUserGroupInformation()).thenReturn(ugi);
 
     return mockCall;

+ 37 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

@@ -1294,6 +1294,43 @@ public class TestRPC extends TestRpcBase {
     }
   }
 
+  @Test (timeout=30000)
+  public void testProtocolUserPriority() throws Exception {
+    final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0";
+    conf.set(CLIENT_PRINCIPAL_KEY, "clientForProtocol");
+    Server server = null;
+    try {
+      server = setupDecayRpcSchedulerandTestServer(ns + ".");
+
+      UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user");
+      // normal users start with priority 0.
+      Assert.assertEquals(0, server.getPriorityLevel(ugi));
+      // calls for a protocol defined client will have priority of 0.
+      Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi)));
+
+      // protocol defined client will have top priority of -1.
+      ugi = UserGroupInformation.createRemoteUser("clientForProtocol");
+      Assert.assertEquals(-1, server.getPriorityLevel(ugi));
+      // calls for a protocol defined client will have priority of 0.
+      Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi)));
+    } finally {
+      stop(server, null);
+    }
+  }
+
+  private static Schedulable newSchedulable(UserGroupInformation ugi) {
+    return new Schedulable(){
+      @Override
+      public UserGroupInformation getUserGroupInformation() {
+        return ugi;
+      }
+      @Override
+      public int getPriorityLevel() {
+        return 0; // doesn't matter.
+      }
+    };
+  }
+
   private Server setupDecayRpcSchedulerandTestServer(String ns)
       throws Exception {
     final int queueSizePerHandler = 3;

+ 4 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java

@@ -62,6 +62,8 @@ public class TestRpcBase {
 
   protected final static String SERVER_PRINCIPAL_KEY =
       "test.ipc.server.principal";
+  protected final static String CLIENT_PRINCIPAL_KEY =
+      "test.ipc.client.principal";
   protected final static String ADDRESS = "0.0.0.0";
   protected final static int PORT = 0;
   protected static InetSocketAddress addr;
@@ -271,7 +273,8 @@ public class TestRpcBase {
     }
   }
 
-  @KerberosInfo(serverPrincipal = SERVER_PRINCIPAL_KEY)
+  @KerberosInfo(serverPrincipal = SERVER_PRINCIPAL_KEY,
+                clientPrincipal = CLIENT_PRINCIPAL_KEY)
   @TokenInfo(TestTokenSelector.class)
   @ProtocolInfo(protocolName = "org.apache.hadoop.ipc.TestRpcBase$TestRpcService",
       protocolVersion = 1)