Browse Source

MAPREDUCE-7304. Enhance the map-reduce Job end notifier to be able to notify the given URL via a custom class. Contributed by Zoltan Erdmann

Peter Bacsko 4 years ago
parent
commit
ced08fd87f

+ 50 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java

@@ -25,9 +25,11 @@ import java.net.MalformedURLException;
 import java.net.Proxy;
 import java.net.Proxy;
 import java.net.URL;
 import java.net.URL;
 
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapreduce.CustomJobEndNotifier;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.eclipse.jetty.util.log.Log;
 import org.eclipse.jetty.util.log.Log;
@@ -57,6 +59,9 @@ public class JobEndNotifier implements Configurable {
   protected int timeout; // Timeout (ms) on the connection and notification
   protected int timeout; // Timeout (ms) on the connection and notification
   protected URL urlToNotify; //URL to notify read from the config
   protected URL urlToNotify; //URL to notify read from the config
   protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification
   protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification
+  // A custom notifier implementation
+  // (see org.apache.hadoop.mapreduce.CustomJobEndNotifier)
+  private String customJobEndNotifierClassName;
 
 
   /**
   /**
    * Parse the URL that needs to be notified of the end of the job, along
    * Parse the URL that needs to be notified of the end of the job, along
@@ -84,6 +89,9 @@ public class JobEndNotifier implements Configurable {
 
 
     proxyConf = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY);
     proxyConf = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY);
 
 
+    customJobEndNotifierClassName = StringUtils.stripToNull(
+        conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS));
+
     //Configure the proxy to use if its set. It should be set like
     //Configure the proxy to use if its set. It should be set like
     //proxyType@proxyHostname:port
     //proxyType@proxyHostname:port
     if(proxyConf != null && !proxyConf.equals("") &&
     if(proxyConf != null && !proxyConf.equals("") &&
@@ -115,11 +123,22 @@ public class JobEndNotifier implements Configurable {
   public Configuration getConf() {
   public Configuration getConf() {
     return conf;
     return conf;
   }
   }
-  
+
   /**
   /**
    * Notify the URL just once. Use best effort.
    * Notify the URL just once. Use best effort.
    */
    */
   protected boolean notifyURLOnce() {
   protected boolean notifyURLOnce() {
+    if (customJobEndNotifierClassName == null) {
+      return notifyViaBuiltInNotifier();
+    } else {
+      return notifyViaCustomNotifier();
+    }
+  }
+
+  /**
+   * Uses a simple HttpURLConnection to do the Job end notification.
+   */
+  private boolean notifyViaBuiltInNotifier() {
     boolean success = false;
     boolean success = false;
     try {
     try {
       Log.getLog().info("Job end notification trying " + urlToNotify);
       Log.getLog().info("Job end notification trying " + urlToNotify);
@@ -145,6 +164,36 @@ public class JobEndNotifier implements Configurable {
     return success;
     return success;
   }
   }
 
 
+  /**
+   * Uses the custom Job end notifier class to do the Job end notification.
+   */
+  private boolean notifyViaCustomNotifier() {
+    try {
+      Log.getLog().info("Will be using " + customJobEndNotifierClassName
+                        + " for Job end notification");
+
+      final Class<? extends CustomJobEndNotifier> customJobEndNotifierClass =
+              Class.forName(customJobEndNotifierClassName)
+                      .asSubclass(CustomJobEndNotifier.class);
+      final CustomJobEndNotifier customJobEndNotifier =
+              customJobEndNotifierClass.getDeclaredConstructor().newInstance();
+
+      boolean success = customJobEndNotifier.notifyOnce(urlToNotify, conf);
+      if (success) {
+        Log.getLog().info("Job end notification to " + urlToNotify
+                          + " succeeded");
+      } else {
+        Log.getLog().warn("Job end notification to " + urlToNotify
+                          + " failed");
+      }
+      return success;
+    } catch (Exception e) {
+      Log.getLog().warn("Job end notification to " + urlToNotify
+                        + " failed", e);
+      return false;
+    }
+  }
+
   /**
   /**
    * Notify a server of the completion of a submitted job. The user must have
    * Notify a server of the completion of a submitted job. The user must have
    * configured MRJobConfig.MR_JOB_END_NOTIFICATION_URL
    * configured MRJobConfig.MR_JOB_END_NOTIFICATION_URL

+ 42 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java

@@ -31,6 +31,7 @@ import java.io.PrintStream;
 import java.net.Proxy;
 import java.net.Proxy;
 import java.net.URI;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URISyntaxException;
+import java.net.URL;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.ClosedChannelException;
 
 
 import javax.servlet.ServletException;
 import javax.servlet.ServletException;
@@ -42,7 +43,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.http.HttpServer2;
 import org.apache.hadoop.http.HttpServer2;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapreduce.CustomJobEndNotifier;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
@@ -299,6 +302,45 @@ public class TestJobEndNotifier extends JobEndNotifier {
     server.stop();
     server.stop();
   }
   }
 
 
+  @Test
+  public void testCustomNotifierClass() throws InterruptedException {
+    JobConf conf = new JobConf();
+    conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL,
+             "http://example.com?jobId=$jobId&jobStatus=$jobStatus");
+    conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS,
+             CustomNotifier.class.getName());
+    this.setConf(conf);
+
+    JobReport jobReport = mock(JobReport.class);
+    JobId jobId = mock(JobId.class);
+    when(jobId.toString()).thenReturn("mock-Id");
+    when(jobReport.getJobId()).thenReturn(jobId);
+    when(jobReport.getJobState()).thenReturn(JobState.SUCCEEDED);
+
+    CustomNotifier.urlToNotify = null;
+    this.notify(jobReport);
+    final URL urlToNotify = CustomNotifier.urlToNotify;
+
+    Assert.assertEquals("http://example.com?jobId=mock-Id&jobStatus=SUCCEEDED",
+                        urlToNotify.toString());
+  }
+
+  public static final class CustomNotifier implements CustomJobEndNotifier {
+
+    /**
+     * Once notifyOnce was invoked we'll store the URL in this variable
+     * so we can assert on it.
+     */
+    private static URL urlToNotify = null;
+
+    @Override
+    public boolean notifyOnce(final URL url, final Configuration jobConf) {
+      urlToNotify = url;
+      return true;
+    }
+
+  }
+
   private static HttpServer2 startHttpServer() throws Exception {
   private static HttpServer2 startHttpServer() throws Exception {
     new File(System.getProperty(
     new File(System.getProperty(
         "build.webapps", "build/webapps") + "/test").mkdirs();
         "build.webapps", "build/webapps") + "/test").mkdirs();

+ 46 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java

@@ -1886,6 +1886,52 @@ public class JobConf extends Configuration {
     set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri);
     set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri);
   }
   }
 
 
+  /**
+   * Returns the class to be invoked in order to send a notification
+   * after the job has completed (success/failure).
+   *
+   * @return the fully-qualified name of the class which implements
+   * {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier} set through the
+   * {@link org.apache.hadoop.mapreduce.MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS}
+   * property
+   *
+   * @see JobConf#setJobEndNotificationCustomNotifierClass(java.lang.String)
+   * @see org.apache.hadoop.mapreduce.MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS
+   */
+  public String getJobEndNotificationCustomNotifierClass() {
+    return get(JobContext.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS);
+  }
+
+  /**
+   * Sets the class to be invoked in order to send a notification after the job
+   * has completed (success/failure).
+   *
+   * A notification url still has to be set which will be passed to
+   * {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier#notifyOnce(
+   * java.net.URL, org.apache.hadoop.conf.Configuration)}
+   * along with the Job's conf.
+   *
+   * If this is set instead of using a simple HttpURLConnection
+   * we'll create a new instance of this class
+   * which should be an implementation of
+   * {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier},
+   * and we'll invoke that.
+   *
+   * @param customNotifierClassName the fully-qualified name of the class
+   *     which implements
+   *     {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier}
+   *
+   * @see JobConf#setJobEndNotificationURI(java.lang.String)
+   * @see
+   * org.apache.hadoop.mapreduce.MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS
+   */
+  public void setJobEndNotificationCustomNotifierClass(
+          String customNotifierClassName) {
+
+    set(JobContext.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS,
+            customNotifierClassName);
+  }
+
   /**
   /**
    * Get job-specific shared directory for use as scratch space
    * Get job-specific shared directory for use as scratch space
    * 
    * 

+ 57 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CustomJobEndNotifier.java

@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.net.URL;
+
+/**
+ * An interface for implementing a custom Job end notifier. The built-in
+ * Job end notifier uses a simple HTTP connection to notify the Job end status.
+ * By implementing this interface and setting the
+ * {@link MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS} property
+ * in the map-reduce Job configuration you can have your own
+ * notification mechanism. For now this still only works with HTTP/HTTPS URLs,
+ * but by implementing this class you can choose how you want to make the
+ * notification itself. For example you can choose to use a custom
+ * HTTP library, or do a delegation token authentication, maybe set a
+ * custom SSL context on the connection, etc. This means you still have to set
+ * the {@link MRJobConfig#MR_JOB_END_NOTIFICATION_URL} property
+ * in the Job's conf.
+ */
+public interface CustomJobEndNotifier {
+
+  /**
+   * The implementation should try to do a Job end notification only once.
+   *
+   * See {@link MRJobConfig#MR_JOB_END_RETRY_ATTEMPTS},
+   * {@link MRJobConfig#MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS}
+   * and org.apache.hadoop.mapreduce.v2.app.JobEndNotifier on how exactly
+   * this method will be invoked.
+   *
+   * @param url the URL which needs to be notified
+   *           (see {@link MRJobConfig#MR_JOB_END_NOTIFICATION_URL})
+   * @param jobConf the map-reduce Job's configuration
+   *
+   * @return true if the notification was successful
+   */
+  boolean notifyOnce(URL url, Configuration jobConf) throws Exception;
+
+}

+ 3 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -1105,6 +1105,9 @@ public interface MRJobConfig {
   public static final String MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL =
   public static final String MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL =
     "mapreduce.job.end-notification.max.retry.interval";
     "mapreduce.job.end-notification.max.retry.interval";
 
 
+  String MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS =
+      "mapreduce.job.end-notification.custom-notifier-class";
+
   public static final int DEFAULT_MR_JOB_END_NOTIFICATION_TIMEOUT =
   public static final int DEFAULT_MR_JOB_END_NOTIFICATION_TIMEOUT =
       5000;
       5000;
 
 

+ 17 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -1415,6 +1415,23 @@
   </description>
   </description>
 </property>
 </property>
 
 
+<property>
+  <name>mapreduce.job.end-notification.custom-notifier-class</name>
+  <description>A class to be invoked in order to send a notification after the
+               job has completed (success/failure). The class must implement
+               org.apache.hadoop.mapreduce.CustomJobEndNotifier. A notification
+               url still has to be set which will be passed to the notifyOnce
+               method of your implementation along with the Job's configuration.
+               If this is set instead of using a simple HttpURLConnection we'll
+               create a new instance of this class. For now this still only works
+               with HTTP/HTTPS URLs, but by implementing this class you can choose
+               how you want to make the notification itself. For example you can
+               choose to use a custom HTTP library, or do a delegation token
+               authentication, maybe set a custom SSL context on the connection, etc.
+               The class needs to have a no-arg constructor.
+  </description>
+</property>
+
   <property>
   <property>
     <name>mapreduce.job.log4j-properties-file</name>
     <name>mapreduce.job.log4j-properties-file</name>
     <value></value>
     <value></value>