瀏覽代碼

MAPREDUCE-3251. Network ACLs can prevent some clients to talk to MR AM. Improved the earlier patch to not to JobHistoryServer repeatedly. Contributed by Anupam Seth.
svn merge --ignore-ancestry -c 1229787 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1229789 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 13 年之前
父節點
當前提交
92975b63e5

+ 4 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -108,6 +108,10 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3299. Added AMInfo table to the MR AM job pages to list all the
     MAPREDUCE-3299. Added AMInfo table to the MR AM job pages to list all the
     job-attempts when AM restarts and recovers. (Jonathan Eagles via vinodkv)
     job-attempts when AM restarts and recovers. (Jonathan Eagles via vinodkv)
 
 
+    MAPREDUCE-3251. Network ACLs can prevent some clients to talk to MR AM.
+    Improved the earlier patch to not to JobHistoryServer repeatedly.
+    (Anupam Seth via vinodkv)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
     MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -369,6 +369,11 @@ public interface MRJobConfig {
   public static final String MR_AM_JOB_REDUCE_PREEMPTION_LIMIT = 
   public static final String MR_AM_JOB_REDUCE_PREEMPTION_LIMIT = 
     MR_AM_PREFIX  + "job.reduce.preemption.limit";
     MR_AM_PREFIX  + "job.reduce.preemption.limit";
   public static final float DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT = 0.5f;
   public static final float DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT = 0.5f;
+  
+  /** AM ACL disabled. **/
+  public static final String JOB_AM_ACCESS_DISABLED = 
+    "mapreduce.job.am-access-disabled";
+  public static final boolean DEFAULT_JOB_AM_ACCESS_DISABLED = false;
 
 
   /**
   /**
    * Limit reduces starting until a certain percentage of maps have finished.
    * Limit reduces starting until a certain percentage of maps have finished.

+ 13 - 54
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.TypeConverter;
@@ -68,7 +69,6 @@ import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -94,6 +94,8 @@ public class ClientServiceDelegate {
   private static String UNKNOWN_USER = "Unknown User";
   private static String UNKNOWN_USER = "Unknown User";
   private String trackingUrl;
   private String trackingUrl;
 
 
+  private boolean amAclDisabledStatusLogged = false;
+
   public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
   public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
       JobID jobId, MRClientProtocol historyServerProxy) {
       JobID jobId, MRClientProtocol historyServerProxy) {
     this.conf = new Configuration(conf); // Cloning for modifying.
     this.conf = new Configuration(conf); // Cloning for modifying.
@@ -157,7 +159,7 @@ public class ClientServiceDelegate {
           application = rm.getApplicationReport(appId);
           application = rm.getApplicationReport(appId);
           continue;
           continue;
         }
         }
-        if(!conf.getBoolean(YarnConfiguration.RM_AM_NETWORK_ACL_CLOSED, false)) {
+        if(!conf.getBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED, false)) {
           UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
           UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
               UserGroupInformation.getCurrentUser().getUserName());
               UserGroupInformation.getCurrentUser().getUserName());
           serviceAddr = application.getHost() + ":" + application.getRpcPort();
           serviceAddr = application.getHost() + ":" + application.getRpcPort();
@@ -182,12 +184,14 @@ public class ClientServiceDelegate {
               return instantiateAMProxy(tempStr);
               return instantiateAMProxy(tempStr);
             }
             }
           });
           });
-	} else {
-           logApplicationReportInfo(application); 
-           LOG.info("Network ACL closed to AM for job " + jobId
-             + ". Redirecting to job history server.");
-           return checkAndGetHSProxy(null, JobState.RUNNING);
-        }  
+        } else {
+          if (!amAclDisabledStatusLogged) {
+            LOG.info("Network ACL closed to AM for job " + jobId
+                + ". Not going to try to reach the AM.");
+            amAclDisabledStatusLogged = true;
+          }
+          return getNotRunningJob(null, JobState.RUNNING);
+        }
         return realProxy;
         return realProxy;
       } catch (IOException e) {
       } catch (IOException e) {
         //possibly the AM has crashed
         //possibly the AM has crashed
@@ -248,55 +252,10 @@ public class ClientServiceDelegate {
     return realProxy;
     return realProxy;
   }
   }
 
 
-  private void logApplicationReportInfo(ApplicationReport application) {
-    if(application == null) {
-      return;
-    }
-    LOG.info("AppId: " + application.getApplicationId()
-      + " # reserved containers: " 
-      + application.getApplicationResourceUsageReport().getNumReservedContainers()
-      + " # used containers: " 
-      + application.getApplicationResourceUsageReport().getNumUsedContainers()
-      + " Needed resources (memory): "
-      + application.getApplicationResourceUsageReport().getNeededResources().getMemory()
-      + " Reserved resources (memory): "
-      + application.getApplicationResourceUsageReport().getReservedResources().getMemory()
-      + " Used resources (memory): "
-      + application.getApplicationResourceUsageReport().getUsedResources().getMemory()
-      + " Diagnostics: " 
-      + application.getDiagnostics()
-      + " Start time: "
-      + application.getStartTime()
-      + " Finish time: "
-      + application.getFinishTime()
-      + " Host: "
-      + application.getHost()
-      + " Name: "
-      + application.getName()
-      + " Orig. tracking url: "
-      + application.getOriginalTrackingUrl()
-      + " Queue: "
-      + application.getQueue()
-      + " RPC port: "
-      + application.getRpcPort()
-      + " Tracking url: "
-      + application.getTrackingUrl()
-      + " User: "
-      + application.getUser()
-      + " Client token: "
-      + application.getClientToken()
-      + " Final appl. status: "
-      + application.getFinalApplicationStatus()
-      + " Yarn appl. state: "
-      + application.getYarnApplicationState()
-    );
-  }
-
   private MRClientProtocol checkAndGetHSProxy(
   private MRClientProtocol checkAndGetHSProxy(
       ApplicationReport applicationReport, JobState state) {
       ApplicationReport applicationReport, JobState state) {
     if (null == historyServerProxy) {
     if (null == historyServerProxy) {
-      LOG.warn("Job History Server is not configured or " +
-      		"job information not yet available on History Server.");
+      LOG.warn("Job History Server is not configured.");
       return getNotRunningJob(applicationReport, state);
       return getNotRunningJob(applicationReport, state);
     }
     }
     return historyServerProxy;
     return historyServerProxy;

+ 61 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java

@@ -19,7 +19,12 @@
 package org.apache.hadoop.mapred;
 package org.apache.hadoop.mapred;
 
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Arrays;
@@ -31,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
@@ -165,7 +171,7 @@ public class TestClientServiceDelegate {
     ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(                       
     ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(                       
         historyServerProxy, rm);
         historyServerProxy, rm);
 
 
-    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);                           
+    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
     Assert.assertNotNull(jobStatus);
     Assert.assertNotNull(jobStatus);
     Assert.assertEquals("TestJobFilePath", jobStatus.getJobFile());                               
     Assert.assertEquals("TestJobFilePath", jobStatus.getJobFile());                               
     Assert.assertEquals("http://TestTrackingUrl", jobStatus.getTrackingUrl());                    
     Assert.assertEquals("http://TestTrackingUrl", jobStatus.getTrackingUrl());                    
@@ -254,6 +260,58 @@ public class TestClientServiceDelegate {
         any(String.class));
         any(String.class));
   }
   }
   
   
+  @Test
+  public void testAMAccessDisabled() throws IOException {
+    //test only applicable when AM not reachable
+    if(isAMReachableFromClient) {
+      return;
+    }
+
+    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+    when(historyServerProxy.getJobReport(getJobReportRequest())).thenReturn(                      
+        getJobReportResponseFromHistoryServer());                                                 
+
+    ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
+    when(rmDelegate.getApplicationReport(jobId.getAppId())).thenReturn(
+        getRunningApplicationReport("am1", 78)).thenReturn(
+          getRunningApplicationReport("am1", 78)).thenReturn(
+            getRunningApplicationReport("am1", 78)).thenReturn(
+        getFinishedApplicationReport());
+
+    ClientServiceDelegate clientServiceDelegate = spy(getClientServiceDelegate(
+        historyServerProxy, rmDelegate));
+
+    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+    Assert.assertNotNull(jobStatus);
+    Assert.assertEquals("N/A", jobStatus.getJobName());
+    
+    verify(clientServiceDelegate, times(0)).instantiateAMProxy(
+        any(String.class));
+
+    // Should not reach AM even for second and third times too.
+    jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+    Assert.assertNotNull(jobStatus);
+    Assert.assertEquals("N/A", jobStatus.getJobName());    
+    verify(clientServiceDelegate, times(0)).instantiateAMProxy(
+        any(String.class));
+    jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+    Assert.assertNotNull(jobStatus);
+    Assert.assertEquals("N/A", jobStatus.getJobName());    
+    verify(clientServiceDelegate, times(0)).instantiateAMProxy(
+        any(String.class));
+
+    // The third time around, app is completed, so should go to JHS
+    JobStatus jobStatus1 = clientServiceDelegate.getJobStatus(oldJobId);
+    Assert.assertNotNull(jobStatus1);
+    Assert.assertEquals("TestJobFilePath", jobStatus1.getJobFile());                               
+    Assert.assertEquals("http://TestTrackingUrl", jobStatus1.getTrackingUrl());                    
+    Assert.assertEquals(1.0f, jobStatus1.getMapProgress());                                        
+    Assert.assertEquals(1.0f, jobStatus1.getReduceProgress());
+    
+    verify(clientServiceDelegate, times(0)).instantiateAMProxy(
+        any(String.class));
+  }
+  
   private GetJobReportRequest getJobReportRequest() {
   private GetJobReportRequest getJobReportRequest() {
     GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class);
     GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class);
     request.setJobId(jobId);
     request.setJobId(jobId);
@@ -300,7 +358,7 @@ public class TestClientServiceDelegate {
       MRClientProtocol historyServerProxy, ResourceMgrDelegate rm) {
       MRClientProtocol historyServerProxy, ResourceMgrDelegate rm) {
     Configuration conf = new YarnConfiguration();
     Configuration conf = new YarnConfiguration();
     conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
     conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
-    conf.setBoolean(YarnConfiguration.RM_AM_NETWORK_ACL_CLOSED, !isAMReachableFromClient);
+    conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED, !isAMReachableFromClient);
     ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
     ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
         conf, rm, oldJobId, historyServerProxy);
         conf, rm, oldJobId, historyServerProxy);
     return clientServiceDelegate;
     return clientServiceDelegate;

+ 0 - 5
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -145,11 +145,6 @@ public class YarnConfiguration extends Configuration {
   /** ACL used in case none is found. Allows nothing. */
   /** ACL used in case none is found. Allows nothing. */
   public static final String DEFAULT_YARN_APP_ACL = " ";
   public static final String DEFAULT_YARN_APP_ACL = " ";
 
 
-  /** RM-AM ACL disabled. **/
-  public static final String RM_AM_NETWORK_ACL_CLOSED = 
-    RM_PREFIX + "am.acl.disabled";
-  public static final boolean DEFAULT_RM_AM_NETWORK_ACL_CLOSED = false;
-
   /** The address of the RM admin interface.*/
   /** The address of the RM admin interface.*/
   public static final String RM_ADMIN_ADDRESS = 
   public static final String RM_ADMIN_ADDRESS = 
     RM_PREFIX + "admin.address";
     RM_PREFIX + "admin.address";

+ 0 - 6
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml

@@ -116,12 +116,6 @@
     <value>*</value>
     <value>*</value>
   </property>
   </property>
 
 
-  <property>
-    <description>Network ACL to AM closed.</description>
-    <name>yarn.resourcemanager.am.acl.disabled</name>
-    <value>false</value>
-  </property> 
-
   <property>
   <property>
     <description>The address of the RM admin interface.</description>
     <description>The address of the RM admin interface.</description>
     <name>yarn.resourcemanager.admin.address</name>
     <name>yarn.resourcemanager.admin.address</name>

+ 6 - 0
hadoop-mapreduce-project/src/java/mapred-default.xml

@@ -959,6 +959,12 @@
   </description>
   </description>
 </property>
 </property>
 
 
+<property>
+  <description>Network ACL to AM closed.</description>
+  <name>mapreduce.job.am-access-disabled</name>
+  <value>false</value>
+</property>
+
 <property>
 <property>
   <name>mapreduce.job.acl-modify-job</name>
   <name>mapreduce.job.acl-modify-job</name>
   <value> </value>
   <value> </value>