TestJobEndNotifier.java 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.hadoop.mapred;
  19. import java.io.File;
  20. import java.io.IOException;
  21. import java.io.InputStreamReader;
  22. import java.io.PrintStream;
  23. import java.net.URI;
  24. import java.net.URISyntaxException;
  25. import java.net.URL;
  26. import javax.servlet.ServletException;
  27. import javax.servlet.http.HttpServlet;
  28. import javax.servlet.http.HttpServletRequest;
  29. import javax.servlet.http.HttpServletResponse;
  30. import junit.framework.TestCase;
  31. import org.apache.hadoop.conf.Configuration;
  32. import org.apache.hadoop.http.HttpServer;
  33. public class TestJobEndNotifier extends TestCase {
  34. HttpServer server;
  35. URL baseUrl;
  36. @SuppressWarnings("serial")
  37. public static class JobEndServlet extends HttpServlet {
  38. public static volatile int calledTimes = 0;
  39. public static URI requestUri;
  40. @Override
  41. public void doGet(HttpServletRequest request,
  42. HttpServletResponse response
  43. ) throws ServletException, IOException {
  44. InputStreamReader in = new InputStreamReader(request.getInputStream());
  45. PrintStream out = new PrintStream(response.getOutputStream());
  46. calledTimes++;
  47. try {
  48. requestUri = new URI(null, null,
  49. request.getRequestURI(), request.getQueryString(), null);
  50. } catch (URISyntaxException e) {
  51. }
  52. in.close();
  53. out.close();
  54. }
  55. }
  56. // Servlet that delays requests for a long time
  57. @SuppressWarnings("serial")
  58. public static class DelayServlet extends HttpServlet {
  59. public static volatile int calledTimes = 0;
  60. @Override
  61. public void doGet(HttpServletRequest request,
  62. HttpServletResponse response
  63. ) throws ServletException, IOException {
  64. boolean timedOut = false;
  65. calledTimes++;
  66. try {
  67. // Sleep for a long time
  68. Thread.sleep(1000000);
  69. } catch (InterruptedException e) {
  70. timedOut = true;
  71. }
  72. assertTrue("DelayServlet should be interrupted", timedOut);
  73. }
  74. }
  75. // Servlet that fails all requests into it
  76. @SuppressWarnings("serial")
  77. public static class FailServlet extends HttpServlet {
  78. public static volatile int calledTimes = 0;
  79. @Override
  80. public void doGet(HttpServletRequest request,
  81. HttpServletResponse response
  82. ) throws ServletException, IOException {
  83. calledTimes++;
  84. throw new IOException("I am failing!");
  85. }
  86. }
  87. public void setUp() throws Exception {
  88. new File(System.getProperty("build.webapps", "build/webapps") + "/test"
  89. ).mkdirs();
  90. server = new HttpServer("test", "0.0.0.0", 0, true);
  91. server.addServlet("delay", "/delay", DelayServlet.class);
  92. server.addServlet("jobend", "/jobend", JobEndServlet.class);
  93. server.addServlet("fail", "/fail", FailServlet.class);
  94. server.start();
  95. int port = server.getPort();
  96. baseUrl = new URL("http://localhost:" + port + "/");
  97. JobEndServlet.calledTimes = 0;
  98. JobEndServlet.requestUri = null;
  99. DelayServlet.calledTimes = 0;
  100. FailServlet.calledTimes = 0;
  101. }
  102. public void tearDown() throws Exception {
  103. server.stop();
  104. }
  105. /**
  106. * Validate that $jobId and $jobStatus fields are properly substituted
  107. * in the output URI
  108. */
  109. public void testUriSubstitution() throws InterruptedException {
  110. try {
  111. JobEndNotifier.startNotifier();
  112. JobStatus jobStatus = createTestJobStatus(
  113. "job_20130313155005308_0001", JobStatus.SUCCEEDED);
  114. JobConf jobConf = createTestJobConf(
  115. new Configuration(), 0,
  116. baseUrl + "jobend?jobid=$jobId&status=$jobStatus");
  117. JobEndNotifier.registerNotification(jobConf, jobStatus);
  118. int maxLoop = 100;
  119. while (JobEndServlet.calledTimes != 1 && maxLoop-- > 0) {
  120. Thread.sleep(100);
  121. }
  122. // Validate params
  123. assertEquals(1, JobEndServlet.calledTimes);
  124. assertEquals("jobid=job_20130313155005308_0001&status=SUCCEEDED",
  125. JobEndServlet.requestUri.getQuery());
  126. } finally {
  127. JobEndNotifier.stopNotifier();
  128. }
  129. }
  130. /**
  131. * Validate job.end.retry.attempts logic.
  132. */
  133. public void testRetryCount() throws InterruptedException {
  134. try {
  135. JobEndNotifier.startNotifier();
  136. int retryAttempts = 3;
  137. JobStatus jobStatus = createTestJobStatus(
  138. "job_20130313155005308_0001", JobStatus.SUCCEEDED);
  139. JobConf jobConf = createTestJobConf(
  140. new Configuration(), retryAttempts, baseUrl + "fail");
  141. JobEndNotifier.registerNotification(jobConf, jobStatus);
  142. int maxLoop = 100;
  143. while (FailServlet.calledTimes != (retryAttempts + 1) && maxLoop-- > 0) {
  144. Thread.sleep(100);
  145. }
  146. // Validate params
  147. assertEquals(retryAttempts + 1, FailServlet.calledTimes);
  148. } finally {
  149. JobEndNotifier.stopNotifier();
  150. }
  151. }
  152. /**
  153. * Validate that the notification times out after reaching
  154. * mapreduce.job.end-notification.timeout.
  155. */
  156. public void testNotificationTimeout() throws InterruptedException {
  157. try {
  158. Configuration conf = new Configuration();
  159. // Reduce the timeout to 1 second
  160. conf.setInt("mapreduce.job.end-notification.timeout", 1000);
  161. JobEndNotifier.startNotifier();
  162. // Submit one notification that will delay infinitely
  163. JobStatus jobStatus = createTestJobStatus(
  164. "job_20130313155005308_0001", JobStatus.SUCCEEDED);
  165. JobConf jobConf = createTestJobConf(
  166. conf, 0, baseUrl + "delay");
  167. JobEndNotifier.registerNotification(jobConf, jobStatus);
  168. // Submit another notification that will return promptly
  169. jobConf.setJobEndNotificationURI(baseUrl + "jobend");
  170. JobEndNotifier.registerNotification(jobConf, jobStatus);
  171. // Make sure the notification passed thru
  172. int maxLoop = 100;
  173. while (JobEndServlet.calledTimes != 1 && maxLoop-- > 0) {
  174. Thread.sleep(100);
  175. }
  176. assertEquals("JobEnd notification should have been received by now",
  177. 1, JobEndServlet.calledTimes);
  178. assertEquals(1, DelayServlet.calledTimes);
  179. assertEquals("/jobend", JobEndServlet.requestUri.getPath());
  180. } finally {
  181. JobEndNotifier.stopNotifier();
  182. }
  183. }
  184. /**
  185. * Basic validation for localRunnerNotification.
  186. */
  187. public void testLocalJobRunnerUriSubstitution() throws InterruptedException {
  188. JobStatus jobStatus = createTestJobStatus(
  189. "job_20130313155005308_0001", JobStatus.SUCCEEDED);
  190. JobConf jobConf = createTestJobConf(
  191. new Configuration(), 0,
  192. baseUrl + "jobend?jobid=$jobId&status=$jobStatus");
  193. JobEndNotifier.localRunnerNotification(jobConf, jobStatus);
  194. // No need to wait for the notification to go thru since calls are
  195. // synchronous
  196. // Validate params
  197. assertEquals(1, JobEndServlet.calledTimes);
  198. assertEquals("jobid=job_20130313155005308_0001&status=SUCCEEDED",
  199. JobEndServlet.requestUri.getQuery());
  200. }
  201. /**
  202. * Validate job.end.retry.attempts for the localJobRunner.
  203. */
  204. public void testLocalJobRunnerRetryCount() throws InterruptedException {
  205. int retryAttempts = 3;
  206. JobStatus jobStatus = createTestJobStatus(
  207. "job_20130313155005308_0001", JobStatus.SUCCEEDED);
  208. JobConf jobConf = createTestJobConf(
  209. new Configuration(), retryAttempts, baseUrl + "fail");
  210. JobEndNotifier.localRunnerNotification(jobConf, jobStatus);
  211. // Validate params
  212. assertEquals(retryAttempts + 1, FailServlet.calledTimes);
  213. }
  214. private static JobStatus createTestJobStatus(String jobId, int state) {
  215. return new JobStatus(
  216. JobID.forName(jobId), 0.5f, 0.0f,
  217. state);
  218. }
  219. private static JobConf createTestJobConf(
  220. Configuration conf, int retryAttempts, String notificationUri) {
  221. JobConf jobConf = new JobConf(conf);
  222. jobConf.setInt("job.end.retry.attempts", retryAttempts);
  223. jobConf.set("job.end.retry.interval", "0");
  224. jobConf.setJobEndNotificationURI(notificationUri);
  225. return jobConf;
  226. }
  227. }