TaskRunner.java 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.hadoop.mapred;
  19. import java.io.ByteArrayOutputStream;
  20. import java.io.File;
  21. import java.io.IOException;
  22. import java.io.OutputStream;
  23. import java.io.PrintStream;
  24. import java.net.InetSocketAddress;
  25. import java.net.URI;
  26. import java.security.PrivilegedExceptionAction;
  27. import java.util.ArrayList;
  28. import java.util.HashMap;
  29. import java.util.List;
  30. import java.util.Map;
  31. import java.util.Vector;
  32. import org.apache.commons.logging.Log;
  33. import org.apache.commons.logging.LogFactory;
  34. import org.apache.hadoop.mapreduce.MRConfig;
  35. import org.apache.hadoop.mapreduce.MRJobConfig;
  36. import org.apache.hadoop.mapreduce.filecache.DistributedCache;
  37. import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
  38. import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
  39. import org.apache.hadoop.mapreduce.security.TokenCache;
  40. import org.apache.hadoop.fs.FSError;
  41. import org.apache.hadoop.fs.FileSystem;
  42. import org.apache.hadoop.fs.FileUtil;
  43. import org.apache.hadoop.fs.LocalDirAllocator;
  44. import org.apache.hadoop.fs.Path;
  45. import org.apache.hadoop.fs.permission.FsPermission;
  46. import org.apache.hadoop.security.UserGroupInformation;
  47. import org.apache.hadoop.util.Shell;
  48. import org.apache.hadoop.util.StringUtils;
  49. import org.apache.log4j.Level;
  50. /** Base class that runs a task in a separate process. Tasks are run in a
  51. * separate process in order to isolate the map/reduce system code from bugs in
  52. * user supplied map and reduce functions.
  53. */
  54. abstract class TaskRunner extends Thread {
  55. public static final Log LOG =
  56. LogFactory.getLog(TaskRunner.class);
  57. volatile boolean killed = false;
  58. private TaskTracker.TaskInProgress tip;
  59. private Task t;
  60. private Object lock = new Object();
  61. private volatile boolean done = false;
  62. private int exitCode = -1;
  63. private boolean exitCodeSet = false;
  64. private static String SYSTEM_PATH_SEPARATOR = System.getProperty("path.separator");
  65. private TaskTracker tracker;
  66. private TaskDistributedCacheManager taskDistributedCacheManager;
  67. protected JobConf conf;
  68. JvmManager jvmManager;
  69. public TaskRunner(TaskTracker.TaskInProgress tip, TaskTracker tracker,
  70. JobConf conf) {
  71. this.tip = tip;
  72. this.t = tip.getTask();
  73. this.tracker = tracker;
  74. this.conf = conf;
  75. this.jvmManager = tracker.getJvmManagerInstance();
  76. }
  77. public Task getTask() { return t; }
  78. public TaskTracker.TaskInProgress getTaskInProgress() { return tip; }
  79. public TaskTracker getTracker() { return tracker; }
  80. public JvmManager getJvmManager() { return jvmManager; }
  81. /** Called when this task's output is no longer needed.
  82. * This method is run in the parent process after the child exits. It should
  83. * not execute user code, only system code.
  84. */
  85. public void close() throws IOException {}
  86. /**
  87. * Get the java command line options for the child map/reduce tasks.
  88. * @param jobConf job configuration
  89. * @param defaultValue default value
  90. * @return the java command line options for child map/reduce tasks
  91. * @deprecated Use command line options specific to map or reduce tasks set
  92. * via {@link JobConf#MAPRED_MAP_TASK_JAVA_OPTS} or
  93. * {@link JobConf#MAPRED_REDUCE_TASK_JAVA_OPTS}
  94. */
  95. @Deprecated
  96. public String getChildJavaOpts(JobConf jobConf, String defaultValue) {
  97. return jobConf.get(JobConf.MAPRED_TASK_JAVA_OPTS, defaultValue);
  98. }
  99. /**
  100. * Get the environment variables for the child map/reduce tasks.
  101. * @param jobConf job configuration
  102. * @return the environment variables for the child map/reduce tasks or
  103. * <code>null</code> if unspecified
  104. * @deprecated Use environment variables specific to the map or reduce tasks
  105. * set via {@link JobConf#MAPRED_MAP_TASK_ENV} or
  106. * {@link JobConf#MAPRED_REDUCE_TASK_ENV}
  107. */
  108. @Deprecated
  109. public String getChildEnv(JobConf jobConf) {
  110. return jobConf.get(JobConf.MAPRED_TASK_ENV);
  111. }
  112. /**
  113. * Get the log {@link Level} for the child map/reduce tasks.
  114. * @param jobConf
  115. * @return the log-level for the child map/reduce tasks
  116. */
  117. public abstract Level getLogLevel(JobConf jobConf);
  118. @Override
  119. public final void run() {
  120. String errorInfo = "Child Error";
  121. try {
  122. //before preparing the job localize
  123. //all the archives
  124. TaskAttemptID taskid = t.getTaskID();
  125. final LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
  126. final File workDir = formWorkDir(lDirAlloc, taskid, t.isTaskCleanupTask(), conf);
  127. // We don't create any symlinks yet, so presence/absence of workDir
  128. // actually on the file system doesn't matter.
  129. tip.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
  130. public Void run() throws IOException {
  131. taskDistributedCacheManager =
  132. tracker.getTrackerDistributedCacheManager()
  133. .newTaskDistributedCacheManager(conf);
  134. taskDistributedCacheManager.setup(lDirAlloc, workDir, TaskTracker
  135. .getPrivateDistributedCacheDir(conf.getUser()),
  136. TaskTracker.getPublicDistributedCacheDir());
  137. return null;
  138. }
  139. });
  140. // Set up the child task's configuration. After this call, no localization
  141. // of files should happen in the TaskTracker's process space. Any changes to
  142. // the conf object after this will NOT be reflected to the child.
  143. setupChildTaskConfiguration(lDirAlloc);
  144. // Build classpath
  145. List<String> classPaths =
  146. getClassPaths(conf, workDir, taskDistributedCacheManager);
  147. long logSize = TaskLog.getTaskLogLength(conf);
  148. // Build exec child JVM args.
  149. Vector<String> vargs =
  150. getVMArgs(taskid, workDir, classPaths, logSize);
  151. tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
  152. List<String> setup = new ArrayList<String>();
  153. // Set up the redirection of the task's stdout and stderr streams
  154. File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());
  155. File stdout = logFiles[0];
  156. File stderr = logFiles[1];
  157. tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout,
  158. stderr);
  159. Map<String, String> env = new HashMap<String, String>();
  160. errorInfo = getVMEnvironment(errorInfo, workDir, conf, env,
  161. taskid, logSize);
  162. launchJvmAndWait(setup, vargs, stdout, stderr, logSize, workDir, env);
  163. tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
  164. if (exitCodeSet) {
  165. if (!killed && exitCode != 0) {
  166. if (exitCode == 65) {
  167. tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());
  168. }
  169. throw new IOException("Task process exit with nonzero status of " +
  170. exitCode + ".");
  171. }
  172. }
  173. } catch (FSError e) {
  174. LOG.fatal("FSError", e);
  175. try {
  176. tracker.internalFsError(t.getTaskID(), e.getMessage());
  177. } catch (IOException ie) {
  178. LOG.fatal(t.getTaskID()+" reporting FSError", ie);
  179. }
  180. } catch (Throwable throwable) {
  181. LOG.warn(t.getTaskID() + " : " + errorInfo, throwable);
  182. Throwable causeThrowable = new Throwable(errorInfo, throwable);
  183. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  184. causeThrowable.printStackTrace(new PrintStream(baos));
  185. try {
  186. tracker.internalReportDiagnosticInfo(t.getTaskID(), baos.toString());
  187. } catch (IOException e) {
  188. LOG.warn(t.getTaskID()+" Reporting Diagnostics", e);
  189. }
  190. } finally {
  191. try{
  192. if (taskDistributedCacheManager != null) {
  193. taskDistributedCacheManager.release();
  194. }
  195. }catch(IOException ie){
  196. LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
  197. }
  198. // It is safe to call TaskTracker.TaskInProgress.reportTaskFinished with
  199. // *false* since the task has either
  200. // a) SUCCEEDED - which means commit has been done
  201. // b) FAILED - which means we do not need to commit
  202. tip.reportTaskFinished(false);
  203. }
  204. }
  205. void launchJvmAndWait(List<String> setup, Vector<String> vargs, File stdout,
  206. File stderr, long logSize, File workDir, Map<String, String> env)
  207. throws InterruptedException {
  208. jvmManager.launchJvm(this, jvmManager.constructJvmEnv(setup, vargs, stdout,
  209. stderr, logSize, workDir, env, conf));
  210. synchronized (lock) {
  211. while (!done) {
  212. lock.wait();
  213. }
  214. }
  215. }
  216. /**
  217. * Prepare the log files for the task
  218. *
  219. * @param taskid
  220. * @param isCleanup
  221. * @return an array of files. The first file is stdout, the second is stderr.
  222. * @throws IOException
  223. */
  224. File[] prepareLogFiles(TaskAttemptID taskid, boolean isCleanup)
  225. throws IOException {
  226. File[] logFiles = new File[2];
  227. logFiles[0] = TaskLog.getTaskLogFile(taskid, isCleanup,
  228. TaskLog.LogName.STDOUT);
  229. logFiles[1] = TaskLog.getTaskLogFile(taskid, isCleanup,
  230. TaskLog.LogName.STDERR);
  231. File logDir = logFiles[0].getParentFile();
  232. boolean b = logDir.mkdirs();
  233. if (!b) {
  234. LOG.warn("mkdirs failed. Ignoring");
  235. } else {
  236. FileSystem localFs = FileSystem.getLocal(conf);
  237. localFs.setPermission(new Path(logDir.getCanonicalPath()),
  238. new FsPermission((short)0700));
  239. }
  240. return logFiles;
  241. }
  242. /**
  243. * Write the child's configuration to the disk and set it in configuration so
  244. * that the child can pick it up from there.
  245. *
  246. * @param lDirAlloc
  247. * @throws IOException
  248. */
  249. void setupChildTaskConfiguration(LocalDirAllocator lDirAlloc)
  250. throws IOException {
  251. Path localTaskFile =
  252. lDirAlloc.getLocalPathForWrite(TaskTracker.getTaskConfFile(
  253. t.getUser(), t.getJobID().toString(), t.getTaskID().toString(), t
  254. .isTaskCleanupTask()), conf);
  255. // write the child's task configuration file to the local disk
  256. writeLocalTaskFile(localTaskFile.toString(), conf);
  257. // Set the final job file in the task. The child needs to know the correct
  258. // path to job.xml. So set this path accordingly.
  259. t.setJobFile(localTaskFile.toString());
  260. }
  261. /**
  262. * Parse the given string and return an array of individual java opts. Split
  263. * on whitespace and replace the special string "@taskid@" with the task ID
  264. * given.
  265. *
  266. * @param javaOpts The string to parse
  267. * @param taskid The task ID to replace the special string with
  268. * @return An array of individual java opts.
  269. */
  270. static String[] parseChildJavaOpts(String javaOpts, TaskAttemptID taskid) {
  271. javaOpts = javaOpts.replace("@taskid@", taskid.toString());
  272. return javaOpts.trim().split("\\s+");
  273. }
  274. /**
  275. * @param taskid
  276. * @param workDir
  277. * @param classPaths
  278. * @param logSize
  279. * @return
  280. * @throws IOException
  281. */
  282. private Vector<String> getVMArgs(TaskAttemptID taskid, File workDir,
  283. List<String> classPaths, long logSize)
  284. throws IOException {
  285. Vector<String> vargs = new Vector<String>(8);
  286. File jvm = // use same jvm as parent
  287. new File(new File(System.getProperty("java.home"), "bin"), "java");
  288. vargs.add(jvm.toString());
  289. // Add child (task) java-vm options.
  290. //
  291. // The following symbols if present in mapred.{map|reduce}.child.java.opts
  292. // value are replaced:
  293. // + @taskid@ is interpolated with value of TaskID.
  294. // Other occurrences of @ will not be altered.
  295. //
  296. // Example with multiple arguments and substitutions, showing
  297. // jvm GC logging, and start of a passwordless JVM JMX agent so can
  298. // connect with jconsole and the likes to watch child memory, threads
  299. // and get thread dumps.
  300. //
  301. // <property>
  302. // <name>mapred.child.java.opts</name>
  303. // <value>-Xmx512M -verbose:gc -Xloggc:/tmp/@taskid@.gc \
  304. // -Dcom.sun.management.jmxremote.authenticate=false \
  305. // -Dcom.sun.management.jmxremote.ssl=false \
  306. // </value>
  307. // </property>
  308. //
  309. // <property>
  310. // <name>mapred.child.java.opts</name>
  311. // <value>-Xmx1024M -verbose:gc -Xloggc:/tmp/@taskid@.gc \
  312. // -Dcom.sun.management.jmxremote.authenticate=false \
  313. // -Dcom.sun.management.jmxremote.ssl=false \
  314. // </value>
  315. // </property>
  316. //
  317. String[] javaOptsSplit = parseChildJavaOpts(getChildJavaOpts(conf,
  318. JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS),
  319. taskid);
  320. // Add java.library.path; necessary for loading native libraries.
  321. //
  322. // 1. To support native-hadoop library i.e. libhadoop.so, we add the
  323. // parent processes' java.library.path to the child.
  324. // 2. We also add the 'cwd' of the task to it's java.library.path to help
  325. // users distribute native libraries via the DistributedCache.
  326. // 3. The user can also specify extra paths to be added to the
  327. // java.library.path via mapred.{map|reduce}.child.java.opts.
  328. //
  329. String libraryPath = System.getProperty("java.library.path");
  330. if (libraryPath == null) {
  331. libraryPath = workDir.getAbsolutePath();
  332. } else {
  333. libraryPath += SYSTEM_PATH_SEPARATOR + workDir;
  334. }
  335. boolean hasUserLDPath = false;
  336. for(int i=0; i<javaOptsSplit.length ;i++) {
  337. if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
  338. javaOptsSplit[i] += SYSTEM_PATH_SEPARATOR + libraryPath;
  339. hasUserLDPath = true;
  340. break;
  341. }
  342. }
  343. if(!hasUserLDPath) {
  344. vargs.add("-Djava.library.path=" + libraryPath);
  345. }
  346. for (int i = 0; i < javaOptsSplit.length; i++) {
  347. vargs.add(javaOptsSplit[i]);
  348. }
  349. Path childTmpDir = createChildTmpDir(workDir, conf);
  350. vargs.add("-Djava.io.tmpdir=" + childTmpDir);
  351. // Add classpath.
  352. vargs.add("-classpath");
  353. String classPath = StringUtils.join(SYSTEM_PATH_SEPARATOR, classPaths);
  354. vargs.add(classPath);
  355. // Setup the log4j prop
  356. setupLog4jProperties(vargs, taskid, logSize);
  357. if (conf.getProfileEnabled()) {
  358. if (conf.getProfileTaskRange(t.isMapTask()
  359. ).isIncluded(t.getPartition())) {
  360. File prof = TaskLog.getTaskLogFile(taskid, t.isTaskCleanupTask(),
  361. TaskLog.LogName.PROFILE);
  362. vargs.add(String.format(conf.getProfileParams(), prof.toString()));
  363. }
  364. }
  365. // Add main class and its arguments
  366. vargs.add(Child.class.getName()); // main of Child
  367. // pass umbilical address
  368. InetSocketAddress address = tracker.getTaskTrackerReportAddress();
  369. vargs.add(address.getAddress().getHostAddress());
  370. vargs.add(Integer.toString(address.getPort()));
  371. vargs.add(taskid.toString()); // pass task identifier
  372. // pass task log location
  373. vargs.add(TaskLog.getAttemptDir(taskid, t.isTaskCleanupTask()).toString());
  374. return vargs;
  375. }
  376. private void setupLog4jProperties(Vector<String> vargs, TaskAttemptID taskid,
  377. long logSize) {
  378. vargs.add("-Dhadoop.log.dir=" +
  379. new File(System.getProperty("hadoop.log.dir")).getAbsolutePath());
  380. vargs.add("-Dhadoop.root.logger=" + getLogLevel(conf).toString() + ",TLA");
  381. vargs.add("-D" + TaskLogAppender.TASKID_PROPERTY + "=" + taskid);
  382. vargs.add("-D" + TaskLogAppender.ISCLEANUP_PROPERTY +
  383. "=" + t.isTaskCleanupTask());
  384. vargs.add("-D" + TaskLogAppender.LOGSIZE_PROPERTY + "=" + logSize);
  385. }
  386. /**
  387. * @param taskid
  388. * @param workDir
  389. * @return
  390. * @throws IOException
  391. */
  392. static Path createChildTmpDir(File workDir,
  393. JobConf conf)
  394. throws IOException {
  395. // add java.io.tmpdir given by mapreduce.task.tmp.dir
  396. String tmp = conf.get(MRJobConfig.TASK_TEMP_DIR, "./tmp");
  397. Path tmpDir = new Path(tmp);
  398. // if temp directory path is not absolute, prepend it with workDir.
  399. if (!tmpDir.isAbsolute()) {
  400. tmpDir = new Path(workDir.toString(), tmp);
  401. FileSystem localFs = FileSystem.getLocal(conf);
  402. if (!localFs.mkdirs(tmpDir) && localFs.getFileStatus(tmpDir).isFile()) {
  403. throw new IOException("Mkdirs failed to create " + tmpDir.toString());
  404. }
  405. }
  406. return tmpDir;
  407. }
  408. /**
  409. */
  410. private static List<String> getClassPaths(JobConf conf, File workDir,
  411. TaskDistributedCacheManager taskDistributedCacheManager)
  412. throws IOException {
  413. // Accumulates class paths for child.
  414. List<String> classPaths = new ArrayList<String>();
  415. // start with same classpath as parent process
  416. appendSystemClasspaths(classPaths);
  417. // include the user specified classpath
  418. appendJobJarClasspaths(conf.getJar(), classPaths);
  419. // Distributed cache paths
  420. classPaths.addAll(taskDistributedCacheManager.getClassPaths());
  421. // Include the working dir too
  422. classPaths.add(workDir.toString());
  423. return classPaths;
  424. }
  425. /**
  426. * sets the environment variables needed for task jvm and its children.
  427. * @param errorInfo
  428. * @param workDir
  429. * @param env
  430. * @return
  431. * @throws Throwable
  432. */
  433. private String getVMEnvironment(String errorInfo, File workDir, JobConf conf,
  434. Map<String, String> env, TaskAttemptID taskid, long logSize)
  435. throws Throwable {
  436. StringBuffer ldLibraryPath = new StringBuffer();
  437. ldLibraryPath.append(workDir.toString());
  438. String oldLdLibraryPath = null;
  439. oldLdLibraryPath = System.getenv("LD_LIBRARY_PATH");
  440. if (oldLdLibraryPath != null) {
  441. ldLibraryPath.append(SYSTEM_PATH_SEPARATOR);
  442. ldLibraryPath.append(oldLdLibraryPath);
  443. }
  444. env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
  445. // put jobTokenFile name into env
  446. String jobTokenFile = conf.get(TokenCache.JOB_TOKENS_FILENAME);
  447. LOG.debug("putting jobToken file name into environment fn=" + jobTokenFile);
  448. env.put(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION, jobTokenFile);
  449. // for the child of task jvm, set hadoop.root.logger
  450. env.put("HADOOP_ROOT_LOGGER", "INFO,TLA");
  451. String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
  452. if (hadoopClientOpts == null) {
  453. hadoopClientOpts = "";
  454. } else {
  455. hadoopClientOpts = hadoopClientOpts + " ";
  456. }
  457. hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" + taskid
  458. + " -Dhadoop.tasklog.iscleanup=" + t.isTaskCleanupTask()
  459. + " -Dhadoop.tasklog.totalLogFileSize=" + logSize;
  460. env.put("HADOOP_CLIENT_OPTS", hadoopClientOpts);
  461. // add the env variables passed by the user
  462. String mapredChildEnv = getChildEnv(conf);
  463. if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
  464. String childEnvs[] = mapredChildEnv.split(",");
  465. for (String cEnv : childEnvs) {
  466. try {
  467. String[] parts = cEnv.split("="); // split on '='
  468. String value = env.get(parts[0]);
  469. if (value != null) {
  470. // replace $env with the child's env constructed by tt's
  471. // example LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
  472. value = parts[1].replace("$" + parts[0], value);
  473. } else {
  474. // this key is not configured by the tt for the child .. get it
  475. // from the tt's env
  476. // example PATH=$PATH:/tmp
  477. value = System.getenv(parts[0]);
  478. if (value != null) {
  479. // the env key is present in the tt's env
  480. value = parts[1].replace("$" + parts[0], value);
  481. } else {
  482. // the env key is note present anywhere .. simply set it
  483. // example X=$X:/tmp or X=/tmp
  484. value = parts[1].replace("$" + parts[0], "");
  485. }
  486. }
  487. env.put(parts[0], value);
  488. } catch (Throwable t) {
  489. // set the error msg
  490. errorInfo = "Invalid User environment settings : " + mapredChildEnv
  491. + ". Failed to parse user-passed environment param."
  492. + " Expecting : env1=value1,env2=value2...";
  493. LOG.warn(errorInfo);
  494. throw t;
  495. }
  496. }
  497. }
  498. return errorInfo;
  499. }
  500. /**
  501. * Write the task specific job-configuration file.
  502. *
  503. * @param localFs
  504. * @throws IOException
  505. */
  506. private static void writeLocalTaskFile(String jobFile, JobConf conf)
  507. throws IOException {
  508. Path localTaskFile = new Path(jobFile);
  509. FileSystem localFs = FileSystem.getLocal(conf);
  510. localFs.delete(localTaskFile, true);
  511. OutputStream out = localFs.create(localTaskFile);
  512. try {
  513. conf.writeXml(out);
  514. } finally {
  515. out.close();
  516. }
  517. }
  518. /**
  519. * Prepare the Configs.LOCAL_DIR for the child. The child is sand-boxed now.
  520. * Whenever it uses LocalDirAllocator from now on inside the child, it will
  521. * only see files inside the attempt-directory. This is done in the Child's
  522. * process space.
  523. */
  524. static void setupChildMapredLocalDirs(Task t, JobConf conf) {
  525. String[] localDirs = conf.getTrimmedStrings(MRConfig.LOCAL_DIR);
  526. String jobId = t.getJobID().toString();
  527. String taskId = t.getTaskID().toString();
  528. boolean isCleanup = t.isTaskCleanupTask();
  529. String user = t.getUser();
  530. StringBuffer childMapredLocalDir =
  531. new StringBuffer(localDirs[0] + Path.SEPARATOR
  532. + TaskTracker.getLocalTaskDir(user, jobId, taskId, isCleanup));
  533. for (int i = 1; i < localDirs.length; i++) {
  534. childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
  535. + TaskTracker.getLocalTaskDir(user, jobId, taskId, isCleanup));
  536. }
  537. LOG.debug(MRConfig.LOCAL_DIR + " for child : " + childMapredLocalDir);
  538. conf.set(MRConfig.LOCAL_DIR, childMapredLocalDir.toString());
  539. }
  540. /** Creates the working directory pathname for a task attempt. */
  541. static File formWorkDir(LocalDirAllocator lDirAlloc,
  542. TaskAttemptID task, boolean isCleanup, JobConf conf)
  543. throws IOException {
  544. Path workDir =
  545. lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
  546. conf.getUser(), task.getJobID().toString(), task.toString(),
  547. isCleanup), conf);
  548. return new File(workDir.toString());
  549. }
  550. private static void appendSystemClasspaths(List<String> classPaths) {
  551. for (String c : System.getProperty("java.class.path").split(
  552. SYSTEM_PATH_SEPARATOR)) {
  553. classPaths.add(c);
  554. }
  555. }
  556. /**
  557. * Given a "jobJar" (typically retrieved via {@link JobConf#getJar()}),
  558. * appends classpath entries for it, as well as its lib/ and classes/
  559. * subdirectories.
  560. *
  561. * @param jobJar Job jar from configuration
  562. * @param classPaths Accumulator for class paths
  563. */
  564. static void appendJobJarClasspaths(String jobJar, List<String> classPaths) {
  565. if (jobJar == null) {
  566. return;
  567. }
  568. File jobCacheDir = new File(new Path(jobJar).getParent().toString());
  569. // if jar exists, it into workDir
  570. File[] libs = new File(jobCacheDir, "lib").listFiles();
  571. if (libs != null) {
  572. for (File l : libs) {
  573. classPaths.add(l.toString());
  574. }
  575. }
  576. classPaths.add(new File(jobCacheDir, "classes").toString());
  577. classPaths.add(new File(jobCacheDir, "job.jar").toString());
  578. }
  579. /**
  580. * Creates distributed cache symlinks and tmp directory, as appropriate.
  581. * Note that when we setup the distributed
  582. * cache, we didn't create the symlinks. This is done on a per task basis
  583. * by the currently executing task.
  584. *
  585. * @param conf The job configuration.
  586. * @param workDir Working directory, which is completely deleted.
  587. */
  588. public static void setupWorkDir(JobConf conf, File workDir) throws IOException {
  589. if (LOG.isDebugEnabled()) {
  590. LOG.debug("Fully deleting contents of " + workDir);
  591. }
  592. /** deletes only the contents of workDir leaving the directory empty. We
  593. * can't delete the workDir as it is the current working directory.
  594. */
  595. FileUtil.fullyDeleteContents(workDir);
  596. if (DistributedCache.getSymlink(conf)) {
  597. URI[] archives = DistributedCache.getCacheArchives(conf);
  598. URI[] files = DistributedCache.getCacheFiles(conf);
  599. Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
  600. Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
  601. if (archives != null) {
  602. for (int i = 0; i < archives.length; i++) {
  603. String link = archives[i].getFragment();
  604. String target = localArchives[i].toString();
  605. symlink(workDir, target, link);
  606. }
  607. }
  608. if (files != null) {
  609. for (int i = 0; i < files.length; i++) {
  610. String link = files[i].getFragment();
  611. String target = localFiles[i].toString();
  612. symlink(workDir, target, link);
  613. }
  614. }
  615. }
  616. // For streaming, create extra symlinks (for all the files
  617. // in the job cache dir) in the current working directory.
  618. // Note that this is only executed if the configuration
  619. // points to a jar file.
  620. if (conf.getJar() != null) {
  621. File jobCacheDir = new File(
  622. new Path(conf.getJar()).getParent().toString());
  623. try{
  624. TrackerDistributedCacheManager.createAllSymlink(conf, jobCacheDir,
  625. workDir);
  626. } catch(IOException ie){
  627. // Do not exit even if symlinks have not been created.
  628. LOG.warn(StringUtils.stringifyException(ie));
  629. }
  630. }
  631. createChildTmpDir(workDir, conf);
  632. }
  633. /**
  634. * Utility method for creating a symlink and warning on errors.
  635. *
  636. * If link is null, does nothing.
  637. */
  638. private static void symlink(File workDir, String target, String link)
  639. throws IOException {
  640. if (link != null) {
  641. link = workDir.toString() + Path.SEPARATOR + link;
  642. File flink = new File(link);
  643. if (!flink.exists()) {
  644. LOG.info(String.format("Creating symlink: %s <- %s", target, link));
  645. if (0 != FileUtil.symLink(target, link)) {
  646. throw new IOException(String.format(
  647. "Failed to create symlink: %s <- %s", target, link));
  648. }
  649. }
  650. }
  651. }
  652. /**
  653. * Kill the child process
  654. */
  655. public void kill() {
  656. killed = true;
  657. jvmManager.taskKilled(this);
  658. signalDone();
  659. }
  660. public void signalDone() {
  661. synchronized (lock) {
  662. done = true;
  663. lock.notify();
  664. }
  665. }
  666. public void setExitCode(int exitCode) {
  667. this.exitCodeSet = true;
  668. this.exitCode = exitCode;
  669. }
  670. }