瀏覽代碼

MAPREDUCE-4603. Add support for JobClient to retry job-submission when JobTracker is in SafeMode. Contributed by Arun C. Murthy.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1390041 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 13 年之前
父節點
當前提交
5deaf04e9a

+ 3 - 2
CHANGES.txt

@@ -394,14 +394,15 @@ Release 1.1.0 - unreleased
     MAPREDUCE-782. Use PureJavaCrc32 in mapreduce spills. 
     MAPREDUCE-782. Use PureJavaCrc32 in mapreduce spills. 
     (Todd Lipcon, backport by Brandon Li via sseth)
     (Todd Lipcon, backport by Brandon Li via sseth)
 
 
-    HDFS-3667.  Add retry support to WebHdfsFileSystem.  (szetszwo)
-
     HADOOP-8748. Refactor DFSClient retry utility methods to a new class in
     HADOOP-8748. Refactor DFSClient retry utility methods to a new class in
     org.apache.hadoop.io.retry.  Contributed by Arun C Murthy.
     org.apache.hadoop.io.retry.  Contributed by Arun C Murthy.
 
 
     HDFS-3871. Change DFSClient to use RetryUtils.  (Arun C Murthy
     HDFS-3871. Change DFSClient to use RetryUtils.  (Arun C Murthy
     via szetszwo)
     via szetszwo)
 
 
+    MAPREDUCE-4603. Add support for JobClient to retry job-submission when
+    JobTracker is in SafeMode. (acmurthy)
+
   BUG FIXES
   BUG FIXES
 
 
     HDFS-3696. Set chunked streaming mode in WebHdfsFileSystem write operations
     HDFS-3696. Set chunked streaming mode in WebHdfsFileSystem write operations

+ 72 - 10
src/mapred/org/apache/hadoop/mapred/JobClient.java

@@ -40,6 +40,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
@@ -59,6 +60,10 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.io.retry.RetryUtils;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.Counters.Counter;
@@ -434,7 +439,9 @@ public class JobClient extends Configured implements MRConstants, Tool  {
     }
     }
   }
   }
 
 
+  private JobSubmissionProtocol rpcJobSubmitClient;
   private JobSubmissionProtocol jobSubmitClient;
   private JobSubmissionProtocol jobSubmitClient;
+  
   private Path sysDir = null;
   private Path sysDir = null;
   private Path stagingAreaDir = null;
   private Path stagingAreaDir = null;
   
   
@@ -445,6 +452,15 @@ public class JobClient extends Configured implements MRConstants, Tool  {
   private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
   private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
   static int tasklogtimeout;
   static int tasklogtimeout;
 
 
+  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY =
+      "mapreduce.jobclient.retry.policy.enabled";
+  public static final boolean MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = 
+      false;
+  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY =
+      "mapreduce.jobclient.retry.policy.spec";
+  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
+      "10000,6,60000,10"; //t1,n1,t2,n2,...
+  
   /**
   /**
    * Create a job client.
    * Create a job client.
    */
    */
@@ -477,16 +493,61 @@ public class JobClient extends Configured implements MRConstants, Tool  {
       conf.setNumMapTasks(1);
       conf.setNumMapTasks(1);
       this.jobSubmitClient = new LocalJobRunner(conf);
       this.jobSubmitClient = new LocalJobRunner(conf);
     } else {
     } else {
-      this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
+      this.rpcJobSubmitClient = 
+          createRPCProxy(JobTracker.getAddress(conf), conf);
+      this.jobSubmitClient = createProxy(this.rpcJobSubmitClient, conf);
     }        
     }        
   }
   }
 
 
   private static JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
   private static JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
       Configuration conf) throws IOException {
       Configuration conf) throws IOException {
-    return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
-        JobSubmissionProtocol.versionID, addr, 
-        UserGroupInformation.getCurrentUser(), conf,
-        NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
+    
+    JobSubmissionProtocol rpcJobSubmitClient = 
+        (JobSubmissionProtocol)RPC.getProxy(
+            JobSubmissionProtocol.class,
+            JobSubmissionProtocol.versionID, addr, 
+            UserGroupInformation.getCurrentUser(), conf,
+            NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class), 
+            0,
+            RetryUtils.getMultipleLinearRandomRetry(
+                conf,
+                MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY,
+                MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
+                MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY,
+                MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT
+                )
+            );
+    
+    return rpcJobSubmitClient;
+  }
+
+  private static JobSubmissionProtocol createProxy(
+      JobSubmissionProtocol rpcJobSubmitClient,
+      Configuration conf) throws IOException {
+
+    RetryPolicy defaultPolicy = 
+        RetryUtils.getDefaultRetryPolicy(
+            conf,
+            MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY,
+            MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
+            MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY,
+            MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
+            SafeModeException.class
+            ); 
+    
+    /* 
+     * Method specific retry policies for killJob and killTask...
+     * 
+     * No retries on any exception including 
+     * ConnectionException and SafeModeException
+     */
+    Map<String,RetryPolicy> methodNameToPolicyMap = 
+        new HashMap<String,RetryPolicy>();
+    methodNameToPolicyMap.put("killJob", RetryPolicies.TRY_ONCE_THEN_FAIL);
+    methodNameToPolicyMap.put("killTask", RetryPolicies.TRY_ONCE_THEN_FAIL);
+    
+    return (JobSubmissionProtocol) RetryProxy.create(JobSubmissionProtocol.class,
+        rpcJobSubmitClient, defaultPolicy, methodNameToPolicyMap);
   }
   }
 
 
   @InterfaceAudience.Private
   @InterfaceAudience.Private
@@ -502,7 +563,7 @@ public class JobClient extends Configured implements MRConstants, Tool  {
     public long renew(Token<?> token, Configuration conf
     public long renew(Token<?> token, Configuration conf
                       ) throws IOException, InterruptedException {
                       ) throws IOException, InterruptedException {
       InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token);
       InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token);
-      JobSubmissionProtocol jt = createRPCProxy(addr, conf);
+      JobSubmissionProtocol jt = createProxy(createRPCProxy(addr, conf), conf);
       return jt.renewDelegationToken((Token<DelegationTokenIdentifier>) token);
       return jt.renewDelegationToken((Token<DelegationTokenIdentifier>) token);
     }
     }
 
 
@@ -511,7 +572,7 @@ public class JobClient extends Configured implements MRConstants, Tool  {
     public void cancel(Token<?> token, Configuration conf
     public void cancel(Token<?> token, Configuration conf
                        ) throws IOException, InterruptedException {
                        ) throws IOException, InterruptedException {
       InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token);
       InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token);
-      JobSubmissionProtocol jt = createRPCProxy(addr, conf);
+      JobSubmissionProtocol jt = createProxy(createRPCProxy(addr, conf), conf);
       jt.cancelDelegationToken((Token<DelegationTokenIdentifier>) token);
       jt.cancelDelegationToken((Token<DelegationTokenIdentifier>) token);
     }
     }
 
 
@@ -537,15 +598,16 @@ public class JobClient extends Configured implements MRConstants, Tool  {
   public JobClient(InetSocketAddress jobTrackAddr, 
   public JobClient(InetSocketAddress jobTrackAddr, 
                    Configuration conf) throws IOException {
                    Configuration conf) throws IOException {
     this.ugi = UserGroupInformation.getCurrentUser();
     this.ugi = UserGroupInformation.getCurrentUser();
-    jobSubmitClient = createRPCProxy(jobTrackAddr, conf);
+    rpcJobSubmitClient = createRPCProxy(jobTrackAddr, conf); 
+    jobSubmitClient = createProxy(rpcJobSubmitClient, conf);
   }
   }
 
 
   /**
   /**
    * Close the <code>JobClient</code>.
    * Close the <code>JobClient</code>.
    */
    */
   public synchronized void close() throws IOException {
   public synchronized void close() throws IOException {
-    if (!(jobSubmitClient instanceof LocalJobRunner)) {
-      RPC.stopProxy(jobSubmitClient);
+    if (!(rpcJobSubmitClient instanceof LocalJobRunner)) {
+      RPC.stopProxy(rpcJobSubmitClient);
     }
     }
   }
   }
 
 

+ 22 - 3
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -3719,9 +3719,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   public JobStatus submitJob(JobID jobId, String jobSubmitDir,
   public JobStatus submitJob(JobID jobId, String jobSubmitDir,
       UserGroupInformation ugi, Credentials ts, boolean recovered)
       UserGroupInformation ugi, Credentials ts, boolean recovered)
       throws IOException {
       throws IOException {
-    if (isInSafeMode()) {
-      throw new IOException("JobTracker in safemode");
-    }
+    // Check for safe-mode
+    checkSafeMode();
     
     
     JobInfo jobInfo = null;
     JobInfo jobInfo = null;
     if (ugi == null) {
     if (ugi == null) {
@@ -3804,6 +3803,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getStagingAreaDir()
    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getStagingAreaDir()
    */
    */
   public String getStagingAreaDir() throws IOException {
   public String getStagingAreaDir() throws IOException {
+    // Check for safe-mode
+    checkSafeMode();
+
     try{
     try{
       final String user =
       final String user =
         UserGroupInformation.getCurrentUser().getShortUserName();
         UserGroupInformation.getCurrentUser().getShortUserName();
@@ -3920,6 +3922,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       return;
       return;
     }
     }
     
     
+    // No 'killJob' in safe-mode
+    checkSafeMode();
+    
     JobInProgress job = jobs.get(jobid);
     JobInProgress job = jobs.get(jobid);
     
     
     if (null == job) {
     if (null == job) {
@@ -4375,6 +4380,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
    */
    */
   public synchronized boolean killTask(TaskAttemptID taskid, boolean shouldFail)
   public synchronized boolean killTask(TaskAttemptID taskid, boolean shouldFail)
       throws IOException {
       throws IOException {
+    // No 'killTask' in safe-mode
+    checkSafeMode();
+
     TaskInProgress tip = taskidToTIPMap.get(taskid);
     TaskInProgress tip = taskidToTIPMap.get(taskid);
     if(tip != null) {
     if(tip != null) {
       // check both queue-level and job-level access
       // check both queue-level and job-level access
@@ -5266,4 +5274,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     return "<em>ON - " + safeModeInfo + "</em>";
     return "<em>ON - " + safeModeInfo + "</em>";
   }
   }
   
   
+  private void checkSafeMode() throws SafeModeException {
+    if (isInSafeMode()) {
+      try {
+        throw new SafeModeException((
+            isInAdminSafeMode()) ? adminSafeModeUser : null);
+      } catch (SafeModeException sfe) {
+        LOG.info("JobTracker in safe-mode, aborting operation", sfe);
+        throw sfe;
+      }
+    }
+  }
 }
 }

+ 43 - 0
src/mapred/org/apache/hadoop/mapred/SafeModeException.java

@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+/**
+ * This exception is thrown when the JobTracker is in safe mode.
+ */
+public class SafeModeException extends IOException {
+
+  private static final long serialVersionUID = 1984839257L;
+
+  /**
+   * SafeModeException
+   * @param adminUser admin who put JobTracker in safe-mode, 
+   *                  <code>null</code> if it was automatic
+   * 
+   */
+  public SafeModeException(String adminUser) {
+    super(
+        (adminUser == null) ? 
+            "JobTracker is in safe mode" : 
+              "JobTracker is in safe-mode set by admin " + adminUser);
+  }
+
+}

+ 115 - 0
src/test/org/apache/hadoop/mapred/TestJobClientRetries.java

@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TestMiniMRWithDFS.TestResult;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestJobClientRetries {
+  
+  private static final Log LOG = LogFactory.getLog(TestJobClientRetries.class);
+  
+  MiniMRCluster mr;
+  
+  @Test
+  public void testJobSubmission() throws Exception {
+    
+    // Start MR cluster
+    mr = new MiniMRCluster(2, "file:///", 3);
+    
+    final List<Exception> exceptions = new ArrayList<Exception>();
+
+    // Get jobConf
+    final JobConf jobConf = mr.createJobConf();
+    
+    // Stop JobTracker
+    LOG.info("Stopping JobTracker");
+    mr.stopJobTracker();
+    
+    /*
+     * Submit job *after* setting job-client retries to be *on*...
+     * the test *should* fail without this config being set
+     */
+    LOG.info("Stopping JobTracker");
+    jobConf.setBoolean(
+        JobClient.MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY, true);
+    WordCountThread wc = new WordCountThread(jobConf, exceptions);
+    wc.start();
+    
+    // Restart JobTracker after a little while
+    Thread.sleep(5000);
+    LOG.info("Re-starting JobTracker for job-submission to go through");
+    mr.startJobTracker();
+    
+    // Wait for the job to complete or for an exception to occur
+    LOG.info("Waiting for job success/failure ...");
+    wc.join();
+
+    Assert.assertNotNull(wc.result);
+    Assert.assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
+        "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", wc.result.output);
+    Assert.assertTrue("exceptions is not empty: " + exceptions, exceptions.isEmpty());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    mr.shutdown();
+  }
+  
+  public static class WordCountThread extends Thread {
+    JobConf jobConf;
+    List<Exception> exceptions;
+    TestResult result;
+    
+    public WordCountThread(JobConf jobConf, List<Exception> exceptions) {
+      super(WordCountThread.class.getName());
+      this.jobConf = jobConf;
+      this.exceptions = exceptions;
+    }
+
+    @Override
+    public void run() {
+      try {
+        FileSystem fs = FileSystem.getLocal(jobConf);
+        Path testdir = new Path(
+            System.getProperty("test.build.data","/tmp")).makeQualified(fs);
+        final Path inDir = new Path(testdir, "input");
+        final Path outDir = new Path(testdir, "output");
+        String input = "The quick brown fox\nhas many silly\nred fox sox\n";
+        LOG.info("Starting word-count");
+        result = 
+            TestMiniMRWithDFS.launchWordCount(
+                jobConf, inDir, outDir, input, 3, 1);
+        LOG.info("Finished word-count");
+      } catch (Exception e) {
+        LOG.error("Caught exception during word-count", e);
+        exceptions.add(e);
+        result = null;
+      }
+    }
+  }
+}