Browse Source

MAPREDUCE-5066. Added a timeout for the job.end.notification.url. Contributed by Ivan Mitic.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1470216 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 12 years ago
parent
commit
794f9bb3e4

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -327,6 +327,9 @@ Release 2.0.5-beta - UNRELEASED
     MAPREDUCE-5163. Update MR App to not use API utility methods for collections
     after YARN-441. (Xuan Gong via vinodkv)
 
+    MAPREDUCE-5066. Added a timeout for the job.end.notification.url. (Ivan
+    Mitic via acmurthy)
+
 Release 2.0.4-alpha - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 8 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java

@@ -27,6 +27,7 @@ import java.net.URL;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.mortbay.log.Log;
@@ -54,6 +55,7 @@ public class JobEndNotifier implements Configurable {
   protected String proxyConf;
   protected int numTries; //Number of tries to attempt notification
   protected int waitInterval; //Time (ms) to wait between retrying notification
+  protected int timeout; // Timeout (ms) on the connection and notification
   protected URL urlToNotify; //URL to notify read from the config
   protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification
 
@@ -76,6 +78,9 @@ public class JobEndNotifier implements Configurable {
     );
     waitInterval = (waitInterval < 0) ? 5000 : waitInterval;
 
+    timeout = conf.getInt(JobContext.MR_JOB_END_NOTIFICATION_TIMEOUT,
+        JobContext.DEFAULT_MR_JOB_END_NOTIFICATION_TIMEOUT);
+
     userUrl = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL);
 
     proxyConf = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY);
@@ -112,8 +117,7 @@ public class JobEndNotifier implements Configurable {
   }
   
   /**
-   * Notify the URL just once. Use best effort. Timeout hard coded to 5
-   * seconds.
+   * Notify the URL just once. Use best effort.
    */
   protected boolean notifyURLOnce() {
     boolean success = false;
@@ -121,8 +125,8 @@ public class JobEndNotifier implements Configurable {
       Log.info("Job end notification trying " + urlToNotify);
       HttpURLConnection conn =
         (HttpURLConnection) urlToNotify.openConnection(proxyToUse);
-      conn.setConnectTimeout(5*1000);
-      conn.setReadTimeout(5*1000);
+      conn.setConnectTimeout(timeout);
+      conn.setReadTimeout(timeout);
       conn.setAllowUserInteraction(false);
       if(conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
         Log.warn("Job end notification to " + urlToNotify +" failed with code: "

+ 8 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java

@@ -73,6 +73,13 @@ public class TestJobEndNotifier extends JobEndNotifier {
       + waitInterval, waitInterval == 5000);
   }
 
+  private void testTimeout(Configuration conf) {
+    conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_TIMEOUT, "1000");
+    setConf(conf);
+    Assert.assertTrue("Expected timeout to be 1000, but was "
+      + timeout, timeout == 1000);
+  }
+
   private void testProxyConfiguration(Configuration conf) {
     conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "somehost");
     setConf(conf);
@@ -109,6 +116,7 @@ public class TestJobEndNotifier extends JobEndNotifier {
     Configuration conf = new Configuration();
     testNumRetries(conf);
     testWaitInterval(conf);
+    testTimeout(conf);
     testProxyConfiguration(conf);
   }
 

+ 24 - 10
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobEndNotifier.java

@@ -44,9 +44,10 @@ public class JobEndNotifier {
     JobEndStatusInfo notification = null;
     String uri = conf.getJobEndNotificationURI();
     if (uri != null) {
-      // +1 to make logic for first notification identical to a retry
-      int retryAttempts = conf.getInt(JobContext.MR_JOB_END_RETRY_ATTEMPTS, 0) + 1;
+      int retryAttempts = conf.getInt(JobContext.MR_JOB_END_RETRY_ATTEMPTS, 0);
       long retryInterval = conf.getInt(JobContext.MR_JOB_END_RETRY_INTERVAL, 30000);
+      int timeout = conf.getInt(JobContext.MR_JOB_END_NOTIFICATION_TIMEOUT,
+          JobContext.DEFAULT_MR_JOB_END_NOTIFICATION_TIMEOUT);
       if (uri.contains("$jobId")) {
         uri = uri.replace("$jobId", status.getJobID().toString());
       }
@@ -56,17 +57,22 @@ public class JobEndNotifier {
             (status.getRunState() == JobStatus.FAILED) ? "FAILED" : "KILLED";
         uri = uri.replace("$jobStatus", statusStr);
       }
-      notification = new JobEndStatusInfo(uri, retryAttempts, retryInterval);
+      notification = new JobEndStatusInfo(
+          uri, retryAttempts, retryInterval, timeout);
     }
     return notification;
   }
 
-  private static int httpNotification(String uri) throws IOException {
+  private static int httpNotification(String uri, int timeout)
+      throws IOException {
     URI url = new URI(uri, false);
-    HttpClient m_client = new HttpClient();
+    HttpClient httpClient = new HttpClient();
+    httpClient.getParams().setSoTimeout(timeout);
+    httpClient.getParams().setConnectionManagerTimeout(timeout);
+
     HttpMethod method = new GetMethod(url.getEscapedURI());
     method.setRequestHeader("Accept", "*/*");
-    return m_client.executeMethod(method);
+    return httpClient.executeMethod(method);
   }
 
   // for use by the LocalJobRunner, without using a thread&queue,
@@ -74,9 +80,10 @@ public class JobEndNotifier {
   public static void localRunnerNotification(JobConf conf, JobStatus status) {
     JobEndStatusInfo notification = createNotification(conf, status);
     if (notification != null) {
-      while (notification.configureForRetry()) {
+      do {
         try {
-          int code = httpNotification(notification.getUri());
+          int code = httpNotification(notification.getUri(),
+              notification.getTimeout());
           if (code != 200) {
             throw new IOException("Invalid response status code: " + code);
           }
@@ -96,7 +103,7 @@ public class JobEndNotifier {
         catch (InterruptedException iex) {
           LOG.error("Notification retry error [" + notification + "]", iex);
         }
-      }
+      } while (notification.configureForRetry());
     }
   }
 
@@ -105,12 +112,15 @@ public class JobEndNotifier {
     private int retryAttempts;
     private long retryInterval;
     private long delayTime;
+    private int timeout;
 
-    JobEndStatusInfo(String uri, int retryAttempts, long retryInterval) {
+    JobEndStatusInfo(String uri, int retryAttempts, long retryInterval,
+        int timeout) {
       this.uri = uri;
       this.retryAttempts = retryAttempts;
       this.retryInterval = retryInterval;
       this.delayTime = System.currentTimeMillis();
+      this.timeout = timeout;
     }
 
     public String getUri() {
@@ -125,6 +135,10 @@ public class JobEndNotifier {
       return retryInterval;
     }
 
+    public int getTimeout() {
+      return timeout;
+    }
+
     public boolean configureForRetry() {
       boolean retry = false;
       if (getRetryAttempts() > 0) {

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -616,6 +616,9 @@ public interface MRJobConfig {
   public static final String MR_JOB_END_NOTIFICATION_PROXY =
     "mapreduce.job.end-notification.proxy";
 
+  public static final String MR_JOB_END_NOTIFICATION_TIMEOUT =
+      "mapreduce.job.end-notification.timeout";
+
   public static final String MR_JOB_END_RETRY_ATTEMPTS =
     "mapreduce.job.end-notification.retry.attempts";
 
@@ -628,6 +631,9 @@ public interface MRJobConfig {
   public static final String MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL =
     "mapreduce.job.end-notification.max.retry.interval";
 
+  public static final int DEFAULT_MR_JOB_END_NOTIFICATION_TIMEOUT =
+      5000;
+
   /*
    * MR AM Service Authorization
    */

+ 197 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobEndNotifier.java

@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.HttpServer;
+
+public class TestJobEndNotifier extends TestCase {
+  HttpServer server;
+  URL baseUrl;
+
+  @SuppressWarnings("serial")
+  public static class JobEndServlet extends HttpServlet {
+    public static volatile int calledTimes = 0;
+    public static URI requestUri;
+
+    @Override
+    public void doGet(HttpServletRequest request, 
+                      HttpServletResponse response
+                      ) throws ServletException, IOException {
+      InputStreamReader in = new InputStreamReader(request.getInputStream());
+      PrintStream out = new PrintStream(response.getOutputStream());
+
+      calledTimes++;
+      try {
+        requestUri = new URI(null, null,
+            request.getRequestURI(), request.getQueryString(), null);
+      } catch (URISyntaxException e) {
+      }
+
+      in.close();
+      out.close();
+    }
+  }
+
+  // Servlet that delays requests for a long time
+  @SuppressWarnings("serial")
+  public static class DelayServlet extends HttpServlet {
+    public static volatile int calledTimes = 0;
+
+    @Override
+    public void doGet(HttpServletRequest request, 
+                      HttpServletResponse response
+                      ) throws ServletException, IOException {
+      boolean timedOut = false;
+      calledTimes++;
+      try {
+        // Sleep for a long time
+        Thread.sleep(1000000);
+      } catch (InterruptedException e) {
+        timedOut = true;
+      }
+      assertTrue("DelayServlet should be interrupted", timedOut);
+    }
+  }
+
+  // Servlet that fails all requests into it
+  @SuppressWarnings("serial")
+  public static class FailServlet extends HttpServlet {
+    public static volatile int calledTimes = 0;
+
+    @Override
+    public void doGet(HttpServletRequest request, 
+                      HttpServletResponse response
+                      ) throws ServletException, IOException {
+      calledTimes++;
+      throw new IOException("I am failing!");
+    }
+  }
+
+  public void setUp() throws Exception {
+    new File(System.getProperty("build.webapps", "build/webapps") + "/test"
+        ).mkdirs();
+    server = new HttpServer("test", "0.0.0.0", 0, true);
+    server.addServlet("delay", "/delay", DelayServlet.class);
+    server.addServlet("jobend", "/jobend", JobEndServlet.class);
+    server.addServlet("fail", "/fail", FailServlet.class);
+    server.start();
+    int port = server.getPort();
+    baseUrl = new URL("http://localhost:" + port + "/");
+
+    JobEndServlet.calledTimes = 0;
+    JobEndServlet.requestUri = null;
+    DelayServlet.calledTimes = 0;
+    FailServlet.calledTimes = 0;
+  }
+
+  public void tearDown() throws Exception {
+    server.stop();
+  }
+
+  /**
+   * Basic validation for localRunnerNotification.
+   */
+  public void testLocalJobRunnerUriSubstitution() throws InterruptedException {
+    JobStatus jobStatus = createTestJobStatus(
+        "job_20130313155005308_0001", JobStatus.SUCCEEDED);
+    JobConf jobConf = createTestJobConf(
+        new Configuration(), 0,
+        baseUrl + "jobend?jobid=$jobId&status=$jobStatus");
+    JobEndNotifier.localRunnerNotification(jobConf, jobStatus);
+
+    // No need to wait for the notification to go thru since calls are
+    // synchronous
+
+    // Validate params
+    assertEquals(1, JobEndServlet.calledTimes);
+    assertEquals("jobid=job_20130313155005308_0001&status=SUCCEEDED",
+        JobEndServlet.requestUri.getQuery());
+  }
+
+  /**
+   * Validate job.end.retry.attempts for the localJobRunner.
+   */
+  public void testLocalJobRunnerRetryCount() throws InterruptedException {
+    int retryAttempts = 3;
+    JobStatus jobStatus = createTestJobStatus(
+        "job_20130313155005308_0001", JobStatus.SUCCEEDED);
+    JobConf jobConf = createTestJobConf(
+        new Configuration(), retryAttempts, baseUrl + "fail");
+    JobEndNotifier.localRunnerNotification(jobConf, jobStatus);
+
+    // Validate params
+    assertEquals(retryAttempts + 1, FailServlet.calledTimes);
+  }
+
+  /**
+   * Validate that the notification times out after reaching
+   * mapreduce.job.end-notification.timeout.
+   */
+  public void testNotificationTimeout() throws InterruptedException {
+    Configuration conf = new Configuration();
+    // Reduce the timeout to 1 second
+    conf.setInt("mapreduce.job.end-notification.timeout", 1000);
+
+    JobStatus jobStatus = createTestJobStatus(
+        "job_20130313155005308_0001", JobStatus.SUCCEEDED);
+    JobConf jobConf = createTestJobConf(
+        conf, 0,
+        baseUrl + "delay");
+    long startTime = System.currentTimeMillis();
+    JobEndNotifier.localRunnerNotification(jobConf, jobStatus);
+    long elapsedTime = System.currentTimeMillis() - startTime;
+
+    // Validate params
+    assertEquals(1, DelayServlet.calledTimes);
+    // Make sure we timed out with time slightly above 1 second
+    // (default timeout is in terms of minutes, so we'll catch the problem)
+    assertTrue(elapsedTime < 2000);
+  }
+
+  private static JobStatus createTestJobStatus(String jobId, int state) {
+    return new JobStatus(
+        JobID.forName(jobId), 0.5f, 0.0f,
+        state, "root", "TestJobEndNotifier", null, null);
+  }
+
+  private static JobConf createTestJobConf(
+      Configuration conf, int retryAttempts, String notificationUri) {
+    JobConf jobConf = new JobConf(conf);
+    jobConf.setInt("job.end.retry.attempts", retryAttempts);
+    jobConf.set("job.end.retry.interval", "0");
+    jobConf.setJobEndNotificationURI(notificationUri);
+    return jobConf;
+  }
+}