浏览代码

MAPREDUCE-6410. Fixed MapReduce JobHistory server to use the right (login) UGI to refresh log and cleaner settings. Contributed by Varun Saxena.

(cherry picked from commit d481684c7c9293a94f54ef622a92753531c6acc7)
Vinod Kumar Vavilapalli 10 年之前
父节点
当前提交
8c9e2cf10a

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -284,6 +284,9 @@ Release 2.7.1 - UNRELEASED
     MAPREDUCE-6387. Serialize the recently added Task#encryptedSpillKey field at 
     the end. (Arun Suresh via kasha)
 
+    MAPREDUCE-6410. Fixed MapReduce JobHistory server to use the right (login)
+    UGI to refresh log and cleaner settings. (Varun Saxena via vinodkv)
+
 Release 2.7.0 - 2015-04-20
 
   INCOMPATIBLE CHANGES

+ 41 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.hs.server;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -52,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.hs.JobHistory;
 import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.HSAdminRefreshProtocolService;
 import org.apache.hadoop.mapreduce.v2.hs.protocolPB.HSAdminRefreshProtocolServerSideTranslatorPB;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.BlockingService;
 
 @Private
@@ -67,6 +69,8 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol {
   private static final String HISTORY_ADMIN_SERVER = "HSAdminServer";
   private JobHistory jobHistoryService = null;
 
+  private UserGroupInformation loginUGI;
+
   public HSAdminServer(AggregatedLogDeletionService aggLogDelService,
       JobHistory jobHistoryService) {
     super(HSAdminServer.class.getName());
@@ -125,9 +129,24 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol {
 
   @Override
   protected void serviceStart() throws Exception {
+    if (UserGroupInformation.isSecurityEnabled()) {
+      loginUGI = UserGroupInformation.getLoginUser();
+    } else {
+      loginUGI = UserGroupInformation.getCurrentUser();
+    }
     clientRpcServer.start();
   }
 
+  @VisibleForTesting
+  UserGroupInformation getLoginUGI() {
+    return loginUGI;
+  }
+
+  @VisibleForTesting
+  void setLoginUGI(UserGroupInformation ugi) {
+    loginUGI = ugi;
+  }
+
   @Override
   protected void serviceStop() throws Exception {
     if (clientRpcServer != null) {
@@ -233,7 +252,17 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol {
   public void refreshLogRetentionSettings() throws IOException {
     UserGroupInformation user = checkAcls("refreshLogRetentionSettings");
 
-    aggLogDelService.refreshLogRetentionSettings();
+    try {
+      loginUGI.doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws IOException {
+          aggLogDelService.refreshLogRetentionSettings();
+          return null;
+        }
+      });
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
 
     HSAuditLogger.logSuccess(user.getShortUserName(),
         "refreshLogRetentionSettings", "HSAdminServer");
@@ -243,7 +272,17 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol {
   public void refreshJobRetentionSettings() throws IOException {
     UserGroupInformation user = checkAcls("refreshJobRetentionSettings");
 
-    jobHistoryService.refreshJobRetentionSettings();
+    try {
+      loginUGI.doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws IOException {
+          jobHistoryService.refreshJobRetentionSettings();
+          return null;
+        }
+      });
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
 
     HSAuditLogger.logSuccess(user.getShortUserName(),
         "refreshJobRetentionSettings", HISTORY_ADMIN_SERVER);

+ 55 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/server/TestHSAdminServer.java

@@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.v2.hs.server;
 import static org.junit.Assert.*;
 
 import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -45,6 +47,9 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.verify;
@@ -286,6 +291,56 @@ public class TestHSAdminServer {
     verify(jobHistoryService).refreshJobRetentionSettings();
   }
 
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testUGIForLogAndJobRefresh() throws Exception {
+    UserGroupInformation ugi =
+        UserGroupInformation.createUserForTesting("test", new String[] {"grp"});
+    UserGroupInformation loginUGI = spy(hsAdminServer.getLoginUGI());
+    hsAdminServer.setLoginUGI(loginUGI);
+
+    // Run refresh log retention settings with test user
+    ugi.doAs(new PrivilegedAction<Void>() {
+      @Override
+      public Void run() {
+        String[] args = new String[1];
+        args[0] = "-refreshLogRetentionSettings";
+        try {
+          hsAdminClient.run(args);
+        } catch (Exception e) {
+          fail("refreshLogRetentionSettings should have been successful");
+        }
+        return null;
+      }
+    });
+    // Verify if AggregatedLogDeletionService#refreshLogRetentionSettings was
+    // called with login UGI, instead of the UGI command was run with.
+    verify(loginUGI).doAs(any(PrivilegedExceptionAction.class));
+    verify(alds).refreshLogRetentionSettings();
+
+    // Reset for refresh job retention settings
+    reset(loginUGI);
+
+    // Run refresh job retention settings with test user
+    ugi.doAs(new PrivilegedAction<Void>() {
+      @Override
+      public Void run() {
+        String[] args = new String[1];
+        args[0] = "-refreshJobRetentionSettings";
+        try {
+          hsAdminClient.run(args);
+        } catch (Exception e) {
+          fail("refreshJobRetentionSettings should have been successful");
+        }
+        return null;
+      }
+    });
+    // Verify if JobHistory#refreshJobRetentionSettings was called with
+    // login UGI, instead of the UGI command was run with.
+    verify(loginUGI).doAs(any(PrivilegedExceptionAction.class));
+    verify(jobHistoryService).refreshJobRetentionSettings();
+  }
+
   @After
   public void cleanUp() {
     if (hsAdminServer != null)