TaskRunner.java 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.hadoop.mapred;
  19. import org.apache.commons.logging.*;
  20. import org.apache.hadoop.fs.*;
  21. import org.apache.hadoop.util.Shell.ShellCommandExecutor;
  22. import org.apache.hadoop.fs.FileSystem;
  23. import org.apache.hadoop.filecache.*;
  24. import org.apache.hadoop.util.*;
  25. import java.io.*;
  26. import java.net.InetSocketAddress;
  27. import java.util.ArrayList;
  28. import java.util.List;
  29. import java.util.Vector;
  30. import java.net.URI;
  31. /** Base class that runs a task in a separate process. Tasks are run in a
  32. * separate process in order to isolate the map/reduce system code from bugs in
  33. * user supplied map and reduce functions.
  34. */
  35. abstract class TaskRunner extends Thread {
  36. public static final Log LOG =
  37. LogFactory.getLog("org.apache.hadoop.mapred.TaskRunner");
  38. volatile boolean killed = false;
  39. private ShellCommandExecutor shexec; // shell terminal for running the task
  40. private Task t;
  41. private TaskTracker tracker;
  42. protected JobConf conf;
  43. /**
  44. * for cleaning up old map outputs
  45. */
  46. protected MapOutputFile mapOutputFile;
  47. public TaskRunner(Task t, TaskTracker tracker, JobConf conf) {
  48. this.t = t;
  49. this.tracker = tracker;
  50. this.conf = conf;
  51. this.mapOutputFile = new MapOutputFile(t.getJobID());
  52. this.mapOutputFile.setConf(conf);
  53. }
  54. public Task getTask() { return t; }
  55. public TaskTracker getTracker() { return tracker; }
  56. /** Called to assemble this task's input. This method is run in the parent
  57. * process before the child is spawned. It should not execute user code,
  58. * only system code. */
  59. public boolean prepare() throws IOException {
  60. return true;
  61. }
  62. /** Called when this task's output is no longer needed.
  63. * This method is run in the parent process after the child exits. It should
  64. * not execute user code, only system code.
  65. */
  66. public void close() throws IOException {}
  67. private String stringifyPathArray(Path[] p){
  68. if (p == null){
  69. return null;
  70. }
  71. StringBuffer str = new StringBuffer(p[0].toString());
  72. for (int i = 1; i < p.length; i++){
  73. str.append(",");
  74. str.append(p[i].toString());
  75. }
  76. return str.toString();
  77. }
  78. @Override
  79. public final void run() {
  80. try {
  81. //before preparing the job localize
  82. //all the archives
  83. TaskAttemptID taskid = t.getTaskID();
  84. LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
  85. File jobCacheDir = null;
  86. if (conf.getJar() != null) {
  87. jobCacheDir = new File(
  88. new Path(conf.getJar()).getParent().toString());
  89. }
  90. File workDir = new File(lDirAlloc.getLocalPathToRead(
  91. TaskTracker.getJobCacheSubdir()
  92. + Path.SEPARATOR + t.getJobID()
  93. + Path.SEPARATOR + t.getTaskID()
  94. + Path.SEPARATOR + "work",
  95. conf). toString());
  96. URI[] archives = DistributedCache.getCacheArchives(conf);
  97. URI[] files = DistributedCache.getCacheFiles(conf);
  98. FileStatus fileStatus;
  99. FileSystem fileSystem;
  100. Path localPath;
  101. String baseDir;
  102. if ((archives != null) || (files != null)) {
  103. if (archives != null) {
  104. String[] archivesTimestamps =
  105. DistributedCache.getArchiveTimestamps(conf);
  106. Path[] p = new Path[archives.length];
  107. for (int i = 0; i < archives.length;i++){
  108. fileSystem = FileSystem.get(archives[i], conf);
  109. fileStatus = fileSystem.getFileStatus(
  110. new Path(archives[i].getPath()));
  111. String cacheId = DistributedCache.makeRelative(archives[i],conf);
  112. String cachePath = TaskTracker.getCacheSubdir() +
  113. Path.SEPARATOR + cacheId;
  114. if (lDirAlloc.ifExists(cachePath, conf)) {
  115. localPath = lDirAlloc.getLocalPathToRead(cachePath, conf);
  116. }
  117. else {
  118. localPath = lDirAlloc.getLocalPathForWrite(cachePath,
  119. fileStatus.getLen(), conf);
  120. }
  121. baseDir = localPath.toString().replace(cacheId, "");
  122. p[i] = DistributedCache.getLocalCache(archives[i], conf,
  123. new Path(baseDir),
  124. fileStatus,
  125. true, Long.parseLong(
  126. archivesTimestamps[i]),
  127. new Path(workDir.
  128. getAbsolutePath()));
  129. }
  130. DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
  131. }
  132. if ((files != null)) {
  133. String[] fileTimestamps = DistributedCache.getFileTimestamps(conf);
  134. Path[] p = new Path[files.length];
  135. for (int i = 0; i < files.length;i++){
  136. fileSystem = FileSystem.get(files[i], conf);
  137. fileStatus = fileSystem.getFileStatus(
  138. new Path(files[i].getPath()));
  139. String cacheId = DistributedCache.makeRelative(files[i], conf);
  140. String cachePath = TaskTracker.getCacheSubdir() +
  141. Path.SEPARATOR + cacheId;
  142. if (lDirAlloc.ifExists(cachePath,conf)) {
  143. localPath = lDirAlloc.getLocalPathToRead(cachePath, conf);
  144. } else {
  145. localPath = lDirAlloc.getLocalPathForWrite(cachePath,
  146. fileStatus.getLen(), conf);
  147. }
  148. baseDir = localPath.toString().replace(cacheId, "");
  149. p[i] = DistributedCache.getLocalCache(files[i], conf,
  150. new Path(baseDir),
  151. fileStatus,
  152. false, Long.parseLong(
  153. fileTimestamps[i]),
  154. new Path(workDir.
  155. getAbsolutePath()));
  156. }
  157. DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
  158. }
  159. Path localTaskFile = new Path(t.getJobFile());
  160. FileSystem localFs = FileSystem.getLocal(conf);
  161. localFs.delete(localTaskFile, true);
  162. OutputStream out = localFs.create(localTaskFile);
  163. try {
  164. conf.write(out);
  165. } finally {
  166. out.close();
  167. }
  168. }
  169. // create symlinks for all the files in job cache dir in current
  170. // workingdir for streaming
  171. try{
  172. DistributedCache.createAllSymlink(conf, jobCacheDir,
  173. workDir);
  174. } catch(IOException ie){
  175. // Do not exit even if symlinks have not been created.
  176. LOG.warn(StringUtils.stringifyException(ie));
  177. }
  178. if (!prepare()) {
  179. return;
  180. }
  181. String sep = System.getProperty("path.separator");
  182. StringBuffer classPath = new StringBuffer();
  183. // start with same classpath as parent process
  184. classPath.append(System.getProperty("java.class.path"));
  185. classPath.append(sep);
  186. if (!workDir.mkdirs()) {
  187. if (!workDir.isDirectory()) {
  188. LOG.fatal("Mkdirs failed to create " + workDir.toString());
  189. }
  190. }
  191. String jar = conf.getJar();
  192. if (jar != null) {
  193. // if jar exists, it into workDir
  194. File[] libs = new File(jobCacheDir, "lib").listFiles();
  195. if (libs != null) {
  196. for (int i = 0; i < libs.length; i++) {
  197. classPath.append(sep); // add libs from jar to classpath
  198. classPath.append(libs[i]);
  199. }
  200. }
  201. classPath.append(sep);
  202. classPath.append(new File(jobCacheDir, "classes"));
  203. classPath.append(sep);
  204. classPath.append(jobCacheDir);
  205. }
  206. // include the user specified classpath
  207. //archive paths
  208. Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
  209. if (archiveClasspaths != null && archives != null) {
  210. Path[] localArchives = DistributedCache
  211. .getLocalCacheArchives(conf);
  212. if (localArchives != null){
  213. for (int i=0;i<archives.length;i++){
  214. for(int j=0;j<archiveClasspaths.length;j++){
  215. if (archives[i].getPath().equals(
  216. archiveClasspaths[j].toString())){
  217. classPath.append(sep);
  218. classPath.append(localArchives[i]
  219. .toString());
  220. }
  221. }
  222. }
  223. }
  224. }
  225. //file paths
  226. Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf);
  227. if (fileClasspaths!=null && files != null) {
  228. Path[] localFiles = DistributedCache
  229. .getLocalCacheFiles(conf);
  230. if (localFiles != null) {
  231. for (int i = 0; i < files.length; i++) {
  232. for (int j = 0; j < fileClasspaths.length; j++) {
  233. if (files[i].getPath().equals(
  234. fileClasspaths[j].toString())) {
  235. classPath.append(sep);
  236. classPath.append(localFiles[i].toString());
  237. }
  238. }
  239. }
  240. }
  241. }
  242. classPath.append(sep);
  243. classPath.append(workDir);
  244. // Build exec child jmv args.
  245. Vector<String> vargs = new Vector<String>(8);
  246. File jvm = // use same jvm as parent
  247. new File(new File(System.getProperty("java.home"), "bin"), "java");
  248. vargs.add(jvm.toString());
  249. // Add child (task) java-vm options.
  250. //
  251. // The following symbols if present in mapred.child.java.opts value are
  252. // replaced:
  253. // + @taskid@ is interpolated with value of TaskID.
  254. // Other occurrences of @ will not be altered.
  255. //
  256. // Example with multiple arguments and substitutions, showing
  257. // jvm GC logging, and start of a passwordless JVM JMX agent so can
  258. // connect with jconsole and the likes to watch child memory, threads
  259. // and get thread dumps.
  260. //
  261. // <property>
  262. // <name>mapred.child.java.opts</name>
  263. // <value>-verbose:gc -Xloggc:/tmp/@taskid@.gc \
  264. // -Dcom.sun.management.jmxremote.authenticate=false \
  265. // -Dcom.sun.management.jmxremote.ssl=false \
  266. // </value>
  267. // </property>
  268. //
  269. String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
  270. javaOpts = replaceAll(javaOpts, "@taskid@", taskid.toString());
  271. String [] javaOptsSplit = javaOpts.split(" ");
  272. // Add java.library.path; necessary for loading native libraries.
  273. //
  274. // 1. To support native-hadoop library i.e. libhadoop.so, we add the
  275. // parent processes' java.library.path to the child.
  276. // 2. We also add the 'cwd' of the task to it's java.library.path to help
  277. // users distribute native libraries via the DistributedCache.
  278. // 3. The user can also specify extra paths to be added to the
  279. // java.library.path via mapred.child.java.opts.
  280. //
  281. String libraryPath = System.getProperty("java.library.path");
  282. if (libraryPath == null) {
  283. libraryPath = workDir.getAbsolutePath();
  284. } else {
  285. libraryPath += sep + workDir;
  286. }
  287. boolean hasUserLDPath = false;
  288. for(int i=0; i<javaOptsSplit.length ;i++) {
  289. if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
  290. javaOptsSplit[i] += sep + libraryPath;
  291. hasUserLDPath = true;
  292. break;
  293. }
  294. }
  295. if(!hasUserLDPath) {
  296. vargs.add("-Djava.library.path=" + libraryPath);
  297. }
  298. for (int i = 0; i < javaOptsSplit.length; i++) {
  299. vargs.add(javaOptsSplit[i]);
  300. }
  301. // add java.io.tmpdir given by mapred.child.tmp
  302. String tmp = conf.get("mapred.child.tmp", "./tmp");
  303. Path tmpDir = new Path(tmp);
  304. // if temp directory path is not absolute
  305. // prepend it with workDir.
  306. if (!tmpDir.isAbsolute()) {
  307. tmpDir = new Path(workDir.toString(), tmp);
  308. }
  309. FileSystem localFs = FileSystem.getLocal(conf);
  310. if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {
  311. throw new IOException("Mkdirs failed to create " + tmpDir.toString());
  312. }
  313. vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());
  314. // Add classpath.
  315. vargs.add("-classpath");
  316. vargs.add(classPath.toString());
  317. // Setup the log4j prop
  318. long logSize = TaskLog.getTaskLogLength(conf);
  319. vargs.add("-Dhadoop.log.dir=" +
  320. new File(System.getProperty("hadoop.log.dir")
  321. ).getAbsolutePath());
  322. vargs.add("-Dhadoop.root.logger=INFO,TLA");
  323. vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
  324. vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
  325. if (conf.getProfileEnabled()) {
  326. if (conf.getProfileTaskRange(t.isMapTask()
  327. ).isIncluded(t.getPartition())) {
  328. File prof = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.PROFILE);
  329. vargs.add(String.format(conf.getProfileParams(), prof.toString()));
  330. }
  331. }
  332. // Add main class and its arguments
  333. vargs.add(TaskTracker.Child.class.getName()); // main of Child
  334. // pass umbilical address
  335. InetSocketAddress address = tracker.getTaskTrackerReportAddress();
  336. vargs.add(address.getAddress().getHostAddress());
  337. vargs.add(Integer.toString(address.getPort()));
  338. vargs.add(taskid.toString()); // pass task identifier
  339. // set memory limit using ulimit if feasible and necessary ...
  340. String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
  341. List<String> setup = null;
  342. if (ulimitCmd != null) {
  343. setup = new ArrayList<String>();
  344. for (String arg : ulimitCmd) {
  345. setup.add(arg);
  346. }
  347. }
  348. // Set up the redirection of the task's stdout and stderr streams
  349. File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
  350. File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
  351. stdout.getParentFile().mkdirs();
  352. List<String> wrappedCommand =
  353. TaskLog.captureOutAndError(setup, vargs, stdout, stderr, logSize);
  354. // Run the task as child of the parent TaskTracker process
  355. runChild(wrappedCommand, workDir, taskid);
  356. } catch (FSError e) {
  357. LOG.fatal("FSError", e);
  358. try {
  359. tracker.fsError(t.getTaskID(), e.getMessage());
  360. } catch (IOException ie) {
  361. LOG.fatal(t.getTaskID()+" reporting FSError", ie);
  362. }
  363. } catch (Throwable throwable) {
  364. LOG.warn(t.getTaskID()+" Child Error", throwable);
  365. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  366. throwable.printStackTrace(new PrintStream(baos));
  367. try {
  368. tracker.reportDiagnosticInfo(t.getTaskID(), baos.toString());
  369. } catch (IOException e) {
  370. LOG.warn(t.getTaskID()+" Reporting Diagnostics", e);
  371. }
  372. } finally {
  373. try{
  374. URI[] archives = DistributedCache.getCacheArchives(conf);
  375. URI[] files = DistributedCache.getCacheFiles(conf);
  376. if (archives != null){
  377. for (int i = 0; i < archives.length; i++){
  378. DistributedCache.releaseCache(archives[i], conf);
  379. }
  380. }
  381. if (files != null){
  382. for(int i = 0; i < files.length; i++){
  383. DistributedCache.releaseCache(files[i], conf);
  384. }
  385. }
  386. }catch(IOException ie){
  387. LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
  388. }
  389. tracker.reportTaskFinished(t.getTaskID());
  390. }
  391. }
  392. /**
  393. * Replace <code>toFind</code> with <code>replacement</code>.
  394. * When hadoop moves to JDK1.5, replace this method with
  395. * String#replace (Of is commons-lang available, replace with
  396. * StringUtils#replace).
  397. * @param text String to do replacements in.
  398. * @param toFind String to find.
  399. * @param replacement String to replace <code>toFind</code> with.
  400. * @return A String with all instances of <code>toFind</code>
  401. * replaced by <code>replacement</code> (The original
  402. * <code>text</code> is returned if <code>toFind</code> is not
  403. * found in <code>text<code>).
  404. */
  405. private static String replaceAll(String text, final String toFind,
  406. final String replacement) {
  407. if (text == null || toFind == null || replacement == null) {
  408. throw new IllegalArgumentException("Text " + text + " or toFind " +
  409. toFind + " or replacement " + replacement + " are null.");
  410. }
  411. int offset = 0;
  412. for (int index = text.indexOf(toFind); index >= 0;
  413. index = text.indexOf(toFind, offset)) {
  414. offset = index + toFind.length();
  415. text = text.substring(0, index) + replacement +
  416. text.substring(offset);
  417. }
  418. return text;
  419. }
  420. /**
  421. * Run the child process
  422. */
  423. private void runChild(List<String> args, File dir,
  424. TaskAttemptID taskid) throws IOException {
  425. try {
  426. shexec = new ShellCommandExecutor(args.toArray(new String[0]), dir);
  427. shexec.execute();
  428. } catch (IOException ioe) {
  429. // do nothing
  430. // error and output are appropriately redirected
  431. } finally { // handle the exit code
  432. int exit_code = shexec.getExitCode();
  433. if (!killed && exit_code != 0) {
  434. if (exit_code == 65) {
  435. tracker.getTaskTrackerMetrics().taskFailedPing();
  436. }
  437. throw new IOException("Task process exit with nonzero status of " +
  438. exit_code + ".");
  439. }
  440. }
  441. }
  442. /**
  443. * Kill the child process
  444. */
  445. public void kill() {
  446. if (shexec != null) {
  447. Process process = shexec.getProcess();
  448. if (process != null) {
  449. process.destroy();
  450. }
  451. }
  452. killed = true;
  453. }
  454. }