浏览代码

HADOOP-1111. Add support for client notification of job completion. Contributed by Alejandro Abdelnur.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@530146 13f79535-47bb-0310-9956-ffa450edef68
Thomas White 18 年之前
父节点
当前提交
5adc1e1857

+ 6 - 0
CHANGES.txt

@@ -225,6 +225,12 @@ Trunk (unreleased changes)
 68. HADOOP-968.  Move shuffle and sort to run in reduce's child JVM,
     rather than in TaskTracker.  (Devaraj Das via cutting)
 
+69. HADOOP-1111.  Add support for client notification of job
+    completion. If the job configuration has a job.end.notification.url
+    property it will make a HTTP GET request to the specified URL.
+    The number of retries and the interval between retries is also
+    configurable. (Alejandro Abdelnur via tomwhite)
+
 
 Release 0.12.3 - 2007-04-06
 

+ 29 - 0
conf/hadoop-default.xml

@@ -857,4 +857,33 @@ creations/deletions), or "all".</description>
   </description>
 </property>
 
+<!-- Job Notification Configuration -->
+
+<!--
+<property>
+ <name>job.end.notification.url</name>
+ <value>http://localhost:8080/jobstatus.php?jobId=$jobId&amp;jobStatus=$jobStatus</value>
+ <description>Indicates url which will be called on completion of job to inform
+              end status of job.
+              User can give at most 2 variables with URI : $jobId and $jobStatus.
+              If they are present in URI, then they will be replaced by their
+              respective values.
+</description>
+</property>
+-->
+
+<property>
+  <name>job.end.retry.attempt</name>
+  <value>0</value>
+  <description>Indicates how many times hadoop should attempt to contact the
+               notification URL </description>
+</property>
+
+<property>
+  <name>job.end.retry.interval</name>
+   <value>30000</value>
+   <description>Indicates time in milliseconds between notification URL retry
+                calls</description>
+</property>
+
 </configuration>

+ 207 - 0
src/java/org/apache/hadoop/mapred/JobEndNotifier.java

@@ -0,0 +1,207 @@
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpMethod;
+import org.apache.commons.httpclient.URI;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class JobEndNotifier {
+  private static final Log LOG =
+    LogFactory.getLog(JobEndNotifier.class.getName());
+
+  private static Thread thread;
+  private static volatile boolean running;
+  private static BlockingQueue<JobEndStatusInfo> queue =
+    new DelayQueue<JobEndStatusInfo>();
+
+  public static void startNotifier() {
+    running = true;
+    thread = new Thread(
+      new Runnable() {
+        public void run() {
+          try {
+            while (running) {
+              sendNotification(queue.take());
+            }
+          }
+          catch (InterruptedException irex) {
+            if (running) {
+              LOG.error("Thread has ended unexpectedly", irex);
+            }
+          }
+        }
+
+        private void sendNotification(JobEndStatusInfo notification) {
+          try {
+            int code = httpNotification(notification.getUri());
+            if (code != 200) {
+              throw new IOException("Invalid response status code: " + code);
+            }
+          }
+          catch (IOException ioex) {
+            LOG.error("Notification failure [" + notification + "]", ioex);
+            if (notification.configureForRetry()) {
+              try {
+                queue.put(notification);
+              }
+              catch (InterruptedException iex) {
+                LOG.error("Notification queuing error [" + notification + "]",
+                  iex);
+              }
+            }
+          }
+          catch (Exception ex) {
+            LOG.error("Notification failure [" + notification + "]", ex);
+          }
+        }
+
+      }
+
+    );
+    thread.start();
+  }
+
+  public static void stopNotifier() {
+    running = false;
+    thread.interrupt();
+  }
+
+  private static JobEndStatusInfo createNotification(JobConf conf,
+    JobStatus status) {
+    JobEndStatusInfo notification = null;
+    String uri = conf.get("job.end.notification.url");
+    if (uri != null) {
+      // +1 to make logic for first notification identical to a retry
+      int retryAttempts = conf.getInt("job.end.retry.attempts", 0) + 1;
+      long retryInterval = conf.getInt("job.end.retry.interval", 30000);
+      if (uri.contains("$jobId")) {
+        uri = uri.replace("$jobId", status.getJobId());
+      }
+      if (uri.contains("$jobStatus")) {
+        String statusStr =
+          (status.getRunState() == JobStatus.SUCCEEDED) ? "SUCCEEDED" : "FAILED";
+        uri = uri.replace("$jobStatus", statusStr);
+      }
+      notification = new JobEndStatusInfo(uri, retryAttempts, retryInterval);
+    }
+    return notification;
+  }
+
+  public static void registerNotification(JobConf jobConf, JobStatus status) {
+    JobEndStatusInfo notification = createNotification(jobConf, status);
+    if (notification != null) {
+      try {
+        queue.put(notification);
+      }
+      catch (InterruptedException iex) {
+        LOG.error("Notification queuing failure [" + notification + "]", iex);
+      }
+    }
+  }
+
+  private static int httpNotification(String uri) throws IOException {
+    URI url = new URI(uri, false);
+    HttpClient m_client = new HttpClient();
+    HttpMethod method = new GetMethod(url.getEscapedURI());
+    method.setRequestHeader("Accept", "*/*");
+    return m_client.executeMethod(method);
+  }
+
+  // for use by the LocalJobRunner, without using a thread&queue,
+  // simple synchronous way
+  public static void localRunnerNotification(JobConf conf, JobStatus status) {
+    JobEndStatusInfo notification = createNotification(conf, status);
+    if (notification != null) {
+      while (notification.configureForRetry()) {
+        try {
+          int code = httpNotification(notification.getUri());
+          if (code != 200) {
+            throw new IOException("Invalid response status code: " + code);
+          }
+          else {
+            break;
+          }
+        }
+        catch (IOException ioex) {
+          LOG.error("Notification error [" + notification.getUri() + "]", ioex);
+        }
+        catch (Exception ex) {
+          LOG.error("Notification error [" + notification.getUri() + "]", ex);
+        }
+        try {
+          synchronized (Thread.currentThread()) {
+            Thread.currentThread().sleep(notification.getRetryInterval());
+          }
+        }
+        catch (InterruptedException iex) {
+          LOG.error("Notification retry error [" + notification + "]", iex);
+        }
+      }
+    }
+  }
+
+  private static class JobEndStatusInfo implements Delayed {
+    private String uri;
+    private int retryAttempts;
+    private long retryInterval;
+    private long delayTime;
+
+    JobEndStatusInfo(String uri, int retryAttempts, long retryInterval) {
+      this.uri = uri;
+      this.retryAttempts = retryAttempts;
+      this.retryInterval = retryInterval;
+      this.delayTime = System.currentTimeMillis();
+    }
+
+    public String getUri() {
+      return uri;
+    }
+
+    public int getRetryAttempts() {
+      return retryAttempts;
+    }
+
+    public long getRetryInterval() {
+      return retryInterval;
+    }
+
+    public long getDelayTime() {
+      return delayTime;
+    }
+
+    public boolean configureForRetry() {
+      boolean retry = false;
+      if (getRetryAttempts() > 0) {
+        retry = true;
+        delayTime = System.currentTimeMillis() + retryInterval;
+      }
+      retryAttempts--;
+      return retry;
+    }
+
+    public long getDelay(TimeUnit unit) {
+      long n = this.delayTime - System.currentTimeMillis();
+      return unit.convert(n, TimeUnit.MILLISECONDS);
+    }
+
+    public int compareTo(Delayed d) {
+      return (int)(delayTime - ((JobEndStatusInfo)d).delayTime);
+    }
+
+
+    public String toString() {
+      return "URL: " + uri + " remaining retries: " + retryAttempts +
+        " interval: " + retryInterval;
+    }
+
+  }
+
+}

+ 5 - 1
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -198,9 +198,13 @@ class JobInProgress {
       this.finishTime = System.currentTimeMillis();
       this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f, JobStatus.SUCCEEDED);
       tasksInited = true;
+
+      // Special case because the Job is not queued
+      JobEndNotifier.registerNotification(this.getJobConf(), this.getStatus());
+
       return;
     }
-        
+
     //
     // Create reduce tasks
     //

+ 8 - 2
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -115,7 +115,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
       } catch (InterruptedException e) {
       }
     }
-    if (runTracker) { tracker.offerService(); }
+    if (runTracker) {
+      JobEndNotifier.startNotifier();
+      tracker.offerService();
+    }
   }
 
   public static JobTracker getTracker() {
@@ -125,6 +128,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
   public static void stopTracker() throws IOException {
     runTracker = false;
     if (tracker != null) {
+      JobEndNotifier.stopNotifier();
       tracker.close();
       tracker = null;
     }
@@ -873,7 +877,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
   synchronized void finalizeJob(JobInProgress job) {
     // Mark the 'non-running' tasks for pruning
     markCompletedJob(job);
-      
+
+      JobEndNotifier.registerNotification(job.getJobConf(), job.getStatus());
+
     // Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs of a given user
     // in memory; information about the purged jobs is available via
     // JobHistory.

+ 4 - 0
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -161,10 +161,14 @@ class LocalJobRunner implements JobSubmissionProtocol {
         
         this.status.setRunState(JobStatus.SUCCEEDED);
 
+        JobEndNotifier.localRunnerNotification(job, status);
+
       } catch (Throwable t) {
         this.status.setRunState(JobStatus.FAILED);
         LOG.warn(id, t);
 
+        JobEndNotifier.localRunnerNotification(job, status);
+
       } finally {
         try {
           fs.delete(new Path(file).getParent());  // delete submit dir

+ 190 - 0
src/test/org/apache/hadoop/mapred/HadoopTestCase.java

@@ -0,0 +1,190 @@
+package org.apache.hadoop.mapred;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Abstract Test case class to run MR in local or cluster mode and in local FS
+ * or DFS.
+ *
+ * The Hadoop instance is started and stopped on each test method.
+ *
+ * If using DFS the filesystem is reformated at each start (test method).
+ *
+ * Job Configurations should be created using a configuration returned by the
+ * 'createJobConf()' method.
+ *
+ * @author Alejandro Abdelnur
+ */
+public abstract class HadoopTestCase extends TestCase {
+  public static final int LOCAL_MR = 1;
+  public static final int CLUSTER_MR = 2;
+  public static final int LOCAL_FS = 4;
+  public static final int DFS_FS = 8;
+
+  private boolean localMR;
+  private boolean localFS;
+
+  private int taskTrackers;
+  private int dataNodes;
+
+  /**
+   * Creates a testcase for local or cluster MR using DFS.
+   *
+   * The DFS will be formatted regardless if there was one or not before in the
+   * given location.
+   *
+   * @param mrMode indicates if the MR should be local (LOCAL_MR) or cluster
+   * (CLUSTER_MR)
+   * @param fsMode indicates if the FS should be local (LOCAL_FS) or DFS (DFS_FS)
+   *
+   * local FS when using relative PATHs)
+   *
+   * @param taskTrackers number of task trackers to start when using cluster
+   *
+   * @param dataNodes number of data nodes to start when using DFS
+   *
+   * @throws IOException thrown if the base directory cannot be set.
+   */
+  public HadoopTestCase(int mrMode, int fsMode, int taskTrackers, int dataNodes)
+    throws IOException {
+    if (mrMode != LOCAL_MR && mrMode != CLUSTER_MR) {
+      throw new IllegalArgumentException(
+        "Invalid MapRed mode, must be LOCAL_MR or CLUSTER_MR");
+    }
+    if (fsMode != LOCAL_FS && fsMode != DFS_FS) {
+      throw new IllegalArgumentException(
+        "Invalid FileSystem mode, must be LOCAL_FS or DFS_FS");
+    }
+    if (taskTrackers < 1) {
+      throw new IllegalArgumentException(
+        "Invalid taskTrackers value, must be greater than 0");
+    }
+    if (dataNodes < 1) {
+      throw new IllegalArgumentException(
+        "Invalid dataNodes value, must be greater than 0");
+    }
+    localMR = (mrMode == LOCAL_MR);
+    localFS = (fsMode == LOCAL_FS);
+/*
+    JobConf conf = new JobConf();
+    fsRoot = conf.get("hadoop.tmp.dir");
+
+    if (fsRoot == null) {
+      throw new IllegalArgumentException(
+        "hadoop.tmp.dir is not defined");
+    }
+
+    fsRoot = fsRoot.replace(' ', '+') + "/fs";
+
+    File file = new File(fsRoot);
+    if (!file.exists()) {
+      if (!file.mkdirs()) {
+        throw new RuntimeException("Could not create FS base path: " + file);
+      }
+    }
+*/
+    this.taskTrackers = taskTrackers;
+    this.dataNodes = dataNodes;
+  }
+
+  /**
+   * Indicates if the MR is running in local or cluster mode.
+   *
+   * @return returns TRUE if the MR is running locally, FALSE if running in
+   * cluster mode.
+   */
+  public boolean isLocalMR() {
+    return localMR;
+  }
+
+  /**
+   * Indicates if the filesystem is local or DFS.
+   *
+   * @return returns TRUE if the filesystem is local, FALSE if it is DFS.
+   */
+  public boolean isLocalFS() {
+    return localFS;
+  }
+
+
+  private MiniDFSCluster dfsCluster = null;
+  private MiniMRCluster mrCluster = null;
+  private FileSystem fileSystem = null;
+
+  /**
+   * Creates Hadoop instance based on constructor configuration before
+   * a test case is run.
+   *
+   * @throws Exception
+   */
+  protected void setUp() throws Exception {
+    super.setUp();
+    if (localFS) {
+      fileSystem = FileSystem.getLocal(new JobConf());
+    }
+    else {
+      dfsCluster = new MiniDFSCluster(new JobConf(), dataNodes, true, null);
+      fileSystem = dfsCluster.getFileSystem();
+    }
+    if (localMR) {
+    }
+    else {
+      //noinspection deprecation
+      mrCluster = new MiniMRCluster(taskTrackers, fileSystem.getName(), 1);
+    }
+  }
+
+  /**
+   * Destroys Hadoop instance based on constructor configuration after
+   * a test case is run.
+   *
+   * @throws Exception
+   */
+  protected void tearDown() throws Exception {
+    try {
+      if (mrCluster != null) {
+        mrCluster.shutdown();
+      }
+    }
+    catch (Exception ex) {
+      System.out.println(ex);
+    }
+    try {
+      if (dfsCluster != null) {
+        dfsCluster.shutdown();
+      }
+    }
+    catch (Exception ex) {
+      System.out.println(ex);
+    }
+    super.tearDown();
+  }
+
+  /**
+   * Returns the Filesystem in use.
+   *
+   * TestCases should use this Filesystem as it
+   * is properly configured with the workingDir for relative PATHs.
+   *
+   * @return the filesystem used by Hadoop.
+   */
+  protected FileSystem getFileSystem() {
+    return fileSystem;
+  }
+
+  /**
+   * Returns a job configuration preconfigured to run against the Hadoop
+   * managed by the testcase.
+   * @return configuration that works on the testcase Hadoop instance
+   */
+  protected JobConf createJobConf() {
+    return (localMR) ? new JobConf() : mrCluster.createJobConf();
+  }
+
+}

+ 200 - 0
src/test/org/apache/hadoop/mapred/NotificationTestCase.java

@@ -0,0 +1,200 @@
+package org.apache.hadoop.mapred;
+
+import org.mortbay.http.HttpServer;
+import org.mortbay.http.SocketListener;
+import org.mortbay.http.HttpContext;
+import org.mortbay.jetty.servlet.ServletHandler;
+import org.apache.log4j.net.SocketServer;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.examples.WordCount;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.ServletException;
+import java.io.IOException;
+import java.io.DataOutputStream;
+import java.util.Date;
+import java.net.Socket;
+import java.net.InetSocketAddress;
+
+/**
+ * Base class to test Job end notification in local and cluster mode.
+ *
+ * Starts up hadoop on Local or Cluster mode (by extending of the
+ * HadoopTestCase class) and it starts a servlet engine that hosts
+ * a servlet that will receive the notification of job finalization.
+ *
+ * The notification servlet returns a HTTP 400 the first time is called
+ * and a HTTP 200 the second time, thus testing retry.
+ *
+ * In both cases local file system is used (this is irrelevant for
+ * the tested functionality)
+ *
+ * @author Alejandro Abdelnur
+ * 
+ */
+public abstract class NotificationTestCase extends HadoopTestCase {
+
+  private static void stdPrintln(String s) {
+    //System.out.println(s);
+  }
+
+  protected NotificationTestCase(int mode) throws IOException {
+    super(mode, HadoopTestCase.LOCAL_FS, 1, 1);
+  }
+
+  private int port;
+  private String contextPath = "/notification";
+  private Class servletClass = NotificationServlet.class;
+  private String servletPath = "/mapred";
+  private HttpServer server;
+
+  private void startHttpServer() throws Exception {
+
+    // Create the server
+    if (server != null) {
+      server.stop();
+      server = null;
+    }
+    server = new HttpServer();
+
+    // Create a port listener
+    SocketListener listener = new SocketListener();
+    listener.setPort(0); // letting OS to pickup the PORT
+    server.addListener(listener);
+
+    // create context
+    HttpContext context = new HttpContext();
+    context.setContextPath(contextPath + "/*");
+
+    // create servlet handler
+    ServletHandler handler = new ServletHandler();
+    handler.addServlet(servletClass.getName(), servletPath,
+      servletClass.getName());
+
+    // bind servlet handler to context
+    context.addHandler(handler);
+
+    // bind context to servlet engine
+    server.addContext(context);
+
+    // Start server
+    server.start();
+    port = listener.getPort();
+
+  }
+
+  private void stopHttpServer() throws Exception {
+    if (server != null) {
+      server.stop();
+      server.destroy();
+      server = null;
+    }
+  }
+
+  public static class NotificationServlet extends HttpServlet {
+    public static int counter = 0;
+
+    protected void doGet(HttpServletRequest req, HttpServletResponse res)
+      throws ServletException, IOException {
+      if (counter == 0) {
+        stdPrintln((new Date()).toString() +
+          "Receiving First notification for [" + req.getQueryString() +
+          "], returning error");
+        res.sendError(HttpServletResponse.SC_BAD_REQUEST, "forcing error");
+      }
+      else {
+        stdPrintln((new Date()).toString() +
+          "Receiving Second notification for [" + req.getQueryString() +
+          "], returning OK");
+        res.setStatus(HttpServletResponse.SC_OK);
+      }
+      counter++;
+    }
+  }
+
+  private String getNotificationUrlTemplate() {
+    return "http://localhost:" + port + contextPath + servletPath +
+      "?jobId=$jobId&amp;jobStatus=$jobStatus";
+  }
+
+  protected JobConf createJobConf() {
+    JobConf conf = super.createJobConf();
+    conf.set("job.end.notification.url", getNotificationUrlTemplate());
+    conf.set("job.end.retry.attempts", 3);
+    conf.set("job.end.retry.interval", 200);
+    return conf;
+  }
+
+
+  protected void setUp() throws Exception {
+    super.setUp();
+    startHttpServer();
+  }
+
+  protected void tearDown() throws Exception {
+    stopHttpServer();
+    super.tearDown();
+  }
+
+  public void testMR() throws Exception {
+    System.out.println(launchWordCount(this.createJobConf(),
+      "a b c d e f g h", 1, 1));
+    synchronized(Thread.currentThread()) {
+      stdPrintln("Sleeping for 2 seconds to give time for retry");
+      Thread.currentThread().sleep(2000);
+    }
+    assertEquals(2, NotificationServlet.counter);
+  }
+
+  private String launchWordCount(JobConf conf,
+    String input,
+    int numMaps,
+    int numReduces) throws IOException {
+    Path inDir = new Path("testing/wc/input");
+    Path outDir = new Path("testing/wc/output");
+
+    // Hack for local FS that does not have the concept of a 'mounting point'
+    if (isLocalFS()) {
+      String localPathRoot = System.getProperty("test.build.data","/tmp")
+        .toString().replace(' ', '+');;
+      inDir = new Path(localPathRoot, inDir);
+      outDir = new Path(localPathRoot, outDir);
+    }
+
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(outDir);
+    if (!fs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+    {
+      DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+      file.writeBytes(input);
+      file.close();
+    }
+    conf.setJobName("wordcount");
+    conf.setInputFormat(TextInputFormat.class);
+
+    // the keys are words (strings)
+    conf.setOutputKeyClass(Text.class);
+    // the values are counts (ints)
+    conf.setOutputValueClass(IntWritable.class);
+
+    conf.setMapperClass(WordCount.MapClass.class);
+    conf.setCombinerClass(WordCount.Reduce.class);
+    conf.setReducerClass(WordCount.Reduce.class);
+
+    conf.setInputPath(inDir);
+    conf.setOutputPath(outDir);
+    conf.setNumMapTasks(numMaps);
+    conf.setNumReduceTasks(numReduces);
+    JobClient.runJob(conf);
+    return TestMiniMRWithDFS.readOutput(outDir, conf);
+  }
+
+}

+ 16 - 0
src/test/org/apache/hadoop/mapred/TestClusterMRNotification.java

@@ -0,0 +1,16 @@
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+/**
+ * Tests Job end notification in cluster mode.
+ *
+ * @author Alejandro Abdelnur
+ */
+public class TestClusterMRNotification extends NotificationTestCase {
+
+  public TestClusterMRNotification() throws IOException {
+    super(HadoopTestCase.CLUSTER_MR);
+  }
+
+}

+ 17 - 0
src/test/org/apache/hadoop/mapred/TestLocalMRNotification.java

@@ -0,0 +1,17 @@
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+
+/**
+ * Tests Job end notification in local mode.
+ *
+ * @author Alejandro Abdelnur
+ */
+public class TestLocalMRNotification extends NotificationTestCase {
+
+  public TestLocalMRNotification() throws IOException {
+    super(HadoopTestCase.LOCAL_MR);
+  }
+
+}