LocalJobRunner.java 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.hadoop.mapred;
  19. import java.io.File;
  20. import java.io.IOException;
  21. import java.io.OutputStream;
  22. import java.util.ArrayList;
  23. import java.util.Collections;
  24. import java.util.HashMap;
  25. import java.util.List;
  26. import java.util.Map;
  27. import java.util.Random;
  28. import java.util.concurrent.Executors;
  29. import java.util.concurrent.ExecutorService;
  30. import java.util.concurrent.TimeUnit;
  31. import java.util.concurrent.atomic.AtomicInteger;
  32. import org.apache.commons.logging.Log;
  33. import org.apache.commons.logging.LogFactory;
  34. import org.apache.hadoop.classification.InterfaceAudience;
  35. import org.apache.hadoop.classification.InterfaceStability;
  36. import org.apache.hadoop.conf.Configuration;
  37. import org.apache.hadoop.fs.FileSystem;
  38. import org.apache.hadoop.fs.LocalDirAllocator;
  39. import org.apache.hadoop.fs.Path;
  40. import org.apache.hadoop.io.Text;
  41. import org.apache.hadoop.ipc.ProtocolSignature;
  42. import org.apache.hadoop.mapreduce.ClusterMetrics;
  43. import org.apache.hadoop.mapreduce.MRConfig;
  44. import org.apache.hadoop.mapreduce.QueueInfo;
  45. import org.apache.hadoop.mapreduce.TaskCompletionEvent;
  46. import org.apache.hadoop.mapreduce.TaskTrackerInfo;
  47. import org.apache.hadoop.mapreduce.TaskType;
  48. import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
  49. import org.apache.hadoop.mapreduce.filecache.DistributedCache;
  50. import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
  51. import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
  52. import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
  53. import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
  54. import org.apache.hadoop.mapreduce.v2.LogParams;
  55. import org.apache.hadoop.security.Credentials;
  56. import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
  57. import org.apache.hadoop.mapreduce.server.jobtracker.State;
  58. import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
  59. import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
  60. import org.apache.hadoop.security.UserGroupInformation;
  61. import org.apache.hadoop.security.authorize.AccessControlList;
  62. import org.apache.hadoop.security.token.Token;
  63. /** Implements MapReduce locally, in-process, for debugging. */
  64. @InterfaceAudience.Private
  65. @InterfaceStability.Unstable
  66. public class LocalJobRunner implements ClientProtocol {
  67. public static final Log LOG =
  68. LogFactory.getLog(LocalJobRunner.class);
  69. /** The maximum number of map tasks to run in parallel in LocalJobRunner */
  70. public static final String LOCAL_MAX_MAPS =
  71. "mapreduce.local.map.tasks.maximum";
  72. private FileSystem fs;
  73. private HashMap<JobID, Job> jobs = new HashMap<JobID, Job>();
  74. private JobConf conf;
  75. private AtomicInteger map_tasks = new AtomicInteger(0);
  76. private int reduce_tasks = 0;
  77. final Random rand = new Random();
  78. private JobTrackerInstrumentation myMetrics = null;
  79. private static final String jobDir = "localRunner/";
  80. private static final Counters EMPTY_COUNTERS = new Counters();
  81. public long getProtocolVersion(String protocol, long clientVersion) {
  82. return ClientProtocol.versionID;
  83. }
  84. @Override
  85. public ProtocolSignature getProtocolSignature(String protocol,
  86. long clientVersion, int clientMethodsHash) throws IOException {
  87. return ProtocolSignature.getProtocolSignature(
  88. this, protocol, clientVersion, clientMethodsHash);
  89. }
  90. private class Job extends Thread implements TaskUmbilicalProtocol {
  91. // The job directory on the system: JobClient places job configurations here.
  92. // This is analogous to JobTracker's system directory.
  93. private Path systemJobDir;
  94. private Path systemJobFile;
  95. // The job directory for the task. Analagous to a task's job directory.
  96. private Path localJobDir;
  97. private Path localJobFile;
  98. private JobID id;
  99. private JobConf job;
  100. private int numMapTasks;
  101. private float [] partialMapProgress;
  102. private Counters [] mapCounters;
  103. private Counters reduceCounters;
  104. private JobStatus status;
  105. private List<TaskAttemptID> mapIds = Collections.synchronizedList(
  106. new ArrayList<TaskAttemptID>());
  107. private JobProfile profile;
  108. private FileSystem localFs;
  109. boolean killed = false;
  110. private TrackerDistributedCacheManager trackerDistributerdCacheManager;
  111. private TaskDistributedCacheManager taskDistributedCacheManager;
  112. public long getProtocolVersion(String protocol, long clientVersion) {
  113. return TaskUmbilicalProtocol.versionID;
  114. }
  115. @Override
  116. public ProtocolSignature getProtocolSignature(String protocol,
  117. long clientVersion, int clientMethodsHash) throws IOException {
  118. return ProtocolSignature.getProtocolSignature(
  119. this, protocol, clientVersion, clientMethodsHash);
  120. }
  121. public Job(JobID jobid, String jobSubmitDir) throws IOException {
  122. this.systemJobDir = new Path(jobSubmitDir);
  123. this.systemJobFile = new Path(systemJobDir, "job.xml");
  124. this.id = jobid;
  125. JobConf conf = new JobConf(systemJobFile);
  126. this.localFs = FileSystem.getLocal(conf);
  127. this.localJobDir = localFs.makeQualified(conf.getLocalPath(jobDir));
  128. this.localJobFile = new Path(this.localJobDir, id + ".xml");
  129. // Manage the distributed cache. If there are files to be copied,
  130. // this will trigger localFile to be re-written again.
  131. this.trackerDistributerdCacheManager =
  132. new TrackerDistributedCacheManager(conf, new DefaultTaskController());
  133. this.taskDistributedCacheManager =
  134. trackerDistributerdCacheManager.newTaskDistributedCacheManager(conf);
  135. taskDistributedCacheManager.setup(
  136. new LocalDirAllocator(MRConfig.LOCAL_DIR),
  137. new File(systemJobDir.toString()),
  138. "archive", "archive");
  139. if (DistributedCache.getSymlink(conf)) {
  140. // This is not supported largely because,
  141. // for a Child subprocess, the cwd in LocalJobRunner
  142. // is not a fresh slate, but rather the user's working directory.
  143. // This is further complicated because the logic in
  144. // setupWorkDir only creates symlinks if there's a jarfile
  145. // in the configuration.
  146. LOG.warn("LocalJobRunner does not support " +
  147. "symlinking into current working dir.");
  148. }
  149. // Setup the symlinks for the distributed cache.
  150. TaskRunner.setupWorkDir(conf, new File(localJobDir.toUri()).getAbsoluteFile());
  151. // Write out configuration file. Instead of copying it from
  152. // systemJobFile, we re-write it, since setup(), above, may have
  153. // updated it.
  154. OutputStream out = localFs.create(localJobFile);
  155. try {
  156. conf.writeXml(out);
  157. } finally {
  158. out.close();
  159. }
  160. this.job = new JobConf(localJobFile);
  161. // Job (the current object) is a Thread, so we wrap its class loader.
  162. if (!taskDistributedCacheManager.getClassPaths().isEmpty()) {
  163. setContextClassLoader(taskDistributedCacheManager.makeClassLoader(
  164. getContextClassLoader()));
  165. }
  166. profile = new JobProfile(job.getUser(), id, systemJobFile.toString(),
  167. "http://localhost:8080/", job.getJobName());
  168. status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING,
  169. profile.getUser(), profile.getJobName(), profile.getJobFile(),
  170. profile.getURL().toString());
  171. jobs.put(id, this);
  172. this.start();
  173. }
  174. JobProfile getProfile() {
  175. return profile;
  176. }
  177. /**
  178. * A Runnable instance that handles a map task to be run by an executor.
  179. */
  180. protected class MapTaskRunnable implements Runnable {
  181. private final int taskId;
  182. private final TaskSplitMetaInfo info;
  183. private final JobID jobId;
  184. private final JobConf localConf;
  185. // This is a reference to a shared object passed in by the
  186. // external context; this delivers state to the reducers regarding
  187. // where to fetch mapper outputs.
  188. private final Map<TaskAttemptID, MapOutputFile> mapOutputFiles;
  189. public volatile Throwable storedException;
  190. public MapTaskRunnable(TaskSplitMetaInfo info, int taskId, JobID jobId,
  191. Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
  192. this.info = info;
  193. this.taskId = taskId;
  194. this.mapOutputFiles = mapOutputFiles;
  195. this.jobId = jobId;
  196. this.localConf = new JobConf(job);
  197. }
  198. public void run() {
  199. try {
  200. TaskAttemptID mapId = new TaskAttemptID(new TaskID(
  201. jobId, TaskType.MAP, taskId), 0);
  202. LOG.info("Starting task: " + mapId);
  203. mapIds.add(mapId);
  204. MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,
  205. info.getSplitIndex(), 1);
  206. map.setUser(UserGroupInformation.getCurrentUser().
  207. getShortUserName());
  208. TaskRunner.setupChildMapredLocalDirs(map, localConf);
  209. MapOutputFile mapOutput = new MROutputFiles();
  210. mapOutput.setConf(localConf);
  211. mapOutputFiles.put(mapId, mapOutput);
  212. map.setJobFile(localJobFile.toString());
  213. localConf.setUser(map.getUser());
  214. map.localizeConfiguration(localConf);
  215. map.setConf(localConf);
  216. try {
  217. map_tasks.getAndIncrement();
  218. myMetrics.launchMap(mapId);
  219. map.run(localConf, Job.this);
  220. myMetrics.completeMap(mapId);
  221. } finally {
  222. map_tasks.getAndDecrement();
  223. }
  224. LOG.info("Finishing task: " + mapId);
  225. } catch (Throwable e) {
  226. this.storedException = e;
  227. }
  228. }
  229. }
  230. /**
  231. * Create Runnables to encapsulate map tasks for use by the executor
  232. * service.
  233. * @param taskInfo Info about the map task splits
  234. * @param jobId the job id
  235. * @param mapOutputFiles a mapping from task attempts to output files
  236. * @return a List of Runnables, one per map task.
  237. */
  238. protected List<MapTaskRunnable> getMapTaskRunnables(
  239. TaskSplitMetaInfo [] taskInfo, JobID jobId,
  240. Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
  241. int numTasks = 0;
  242. ArrayList<MapTaskRunnable> list = new ArrayList<MapTaskRunnable>();
  243. for (TaskSplitMetaInfo task : taskInfo) {
  244. list.add(new MapTaskRunnable(task, numTasks++, jobId,
  245. mapOutputFiles));
  246. }
  247. return list;
  248. }
  249. /**
  250. * Initialize the counters that will hold partial-progress from
  251. * the various task attempts.
  252. * @param numMaps the number of map tasks in this job.
  253. */
  254. private synchronized void initCounters(int numMaps) {
  255. // Initialize state trackers for all map tasks.
  256. this.partialMapProgress = new float[numMaps];
  257. this.mapCounters = new Counters[numMaps];
  258. for (int i = 0; i < numMaps; i++) {
  259. this.mapCounters[i] = EMPTY_COUNTERS;
  260. }
  261. this.reduceCounters = EMPTY_COUNTERS;
  262. }
  263. /**
  264. * Creates the executor service used to run map tasks.
  265. *
  266. * @param numMapTasks the total number of map tasks to be run
  267. * @return an ExecutorService instance that handles map tasks
  268. */
  269. protected ExecutorService createMapExecutor(int numMapTasks) {
  270. // Determine the size of the thread pool to use
  271. int maxMapThreads = job.getInt(LOCAL_MAX_MAPS, 1);
  272. if (maxMapThreads < 1) {
  273. throw new IllegalArgumentException(
  274. "Configured " + LOCAL_MAX_MAPS + " must be >= 1");
  275. }
  276. this.numMapTasks = numMapTasks;
  277. maxMapThreads = Math.min(maxMapThreads, this.numMapTasks);
  278. maxMapThreads = Math.max(maxMapThreads, 1); // In case of no tasks.
  279. initCounters(this.numMapTasks);
  280. LOG.debug("Starting thread pool executor.");
  281. LOG.debug("Max local threads: " + maxMapThreads);
  282. LOG.debug("Map tasks to process: " + this.numMapTasks);
  283. // Create a new executor service to drain the work queue.
  284. ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads);
  285. return executor;
  286. }
  287. @SuppressWarnings("unchecked")
  288. @Override
  289. public void run() {
  290. JobID jobId = profile.getJobID();
  291. JobContext jContext = new JobContextImpl(job, jobId);
  292. OutputCommitter outputCommitter = job.getOutputCommitter();
  293. try {
  294. TaskSplitMetaInfo[] taskSplitMetaInfos =
  295. SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
  296. int numReduceTasks = job.getNumReduceTasks();
  297. if (numReduceTasks > 1 || numReduceTasks < 0) {
  298. // we only allow 0 or 1 reducer in local mode
  299. numReduceTasks = 1;
  300. job.setNumReduceTasks(1);
  301. }
  302. outputCommitter.setupJob(jContext);
  303. status.setSetupProgress(1.0f);
  304. Map<TaskAttemptID, MapOutputFile> mapOutputFiles =
  305. Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());
  306. List<MapTaskRunnable> taskRunnables = getMapTaskRunnables(taskSplitMetaInfos,
  307. jobId, mapOutputFiles);
  308. ExecutorService mapService = createMapExecutor(taskRunnables.size());
  309. // Start populating the executor with work units.
  310. // They may begin running immediately (in other threads).
  311. for (Runnable r : taskRunnables) {
  312. mapService.submit(r);
  313. }
  314. try {
  315. mapService.shutdown(); // Instructs queue to drain.
  316. // Wait for tasks to finish; do not use a time-based timeout.
  317. // (See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6179024)
  318. LOG.info("Waiting for map tasks");
  319. mapService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
  320. } catch (InterruptedException ie) {
  321. // Cancel all threads.
  322. mapService.shutdownNow();
  323. throw ie;
  324. }
  325. LOG.info("Map task executor complete.");
  326. // After waiting for the map tasks to complete, if any of these
  327. // have thrown an exception, rethrow it now in the main thread context.
  328. for (MapTaskRunnable r : taskRunnables) {
  329. if (r.storedException != null) {
  330. throw new Exception(r.storedException);
  331. }
  332. }
  333. TaskAttemptID reduceId =
  334. new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, 0), 0);
  335. try {
  336. if (numReduceTasks > 0) {
  337. ReduceTask reduce = new ReduceTask(systemJobFile.toString(),
  338. reduceId, 0, mapIds.size(), 1);
  339. reduce.setUser(UserGroupInformation.getCurrentUser().
  340. getShortUserName());
  341. JobConf localConf = new JobConf(job);
  342. localConf.set("mapreduce.jobtracker.address", "local");
  343. TaskRunner.setupChildMapredLocalDirs(reduce, localConf);
  344. // move map output to reduce input
  345. for (int i = 0; i < mapIds.size(); i++) {
  346. if (!this.isInterrupted()) {
  347. TaskAttemptID mapId = mapIds.get(i);
  348. Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
  349. MapOutputFile localOutputFile = new MROutputFiles();
  350. localOutputFile.setConf(localConf);
  351. Path reduceIn =
  352. localOutputFile.getInputFileForWrite(mapId.getTaskID(),
  353. localFs.getFileStatus(mapOut).getLen());
  354. if (!localFs.mkdirs(reduceIn.getParent())) {
  355. throw new IOException("Mkdirs failed to create "
  356. + reduceIn.getParent().toString());
  357. }
  358. if (!localFs.rename(mapOut, reduceIn))
  359. throw new IOException("Couldn't rename " + mapOut);
  360. } else {
  361. throw new InterruptedException();
  362. }
  363. }
  364. if (!this.isInterrupted()) {
  365. reduce.setJobFile(localJobFile.toString());
  366. localConf.setUser(reduce.getUser());
  367. reduce.localizeConfiguration(localConf);
  368. reduce.setConf(localConf);
  369. reduce_tasks += 1;
  370. myMetrics.launchReduce(reduce.getTaskID());
  371. reduce.run(localConf, this);
  372. myMetrics.completeReduce(reduce.getTaskID());
  373. reduce_tasks -= 1;
  374. } else {
  375. throw new InterruptedException();
  376. }
  377. }
  378. } finally {
  379. for (MapOutputFile output : mapOutputFiles.values()) {
  380. output.removeAll();
  381. }
  382. }
  383. // delete the temporary directory in output directory
  384. outputCommitter.commitJob(jContext);
  385. status.setCleanupProgress(1.0f);
  386. if (killed) {
  387. this.status.setRunState(JobStatus.KILLED);
  388. } else {
  389. this.status.setRunState(JobStatus.SUCCEEDED);
  390. }
  391. JobEndNotifier.localRunnerNotification(job, status);
  392. } catch (Throwable t) {
  393. try {
  394. outputCommitter.abortJob(jContext,
  395. org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
  396. } catch (IOException ioe) {
  397. LOG.info("Error cleaning up job:" + id);
  398. }
  399. status.setCleanupProgress(1.0f);
  400. if (killed) {
  401. this.status.setRunState(JobStatus.KILLED);
  402. } else {
  403. this.status.setRunState(JobStatus.FAILED);
  404. }
  405. LOG.warn(id, t);
  406. JobEndNotifier.localRunnerNotification(job, status);
  407. } finally {
  408. try {
  409. fs.delete(systemJobFile.getParent(), true); // delete submit dir
  410. localFs.delete(localJobFile, true); // delete local copy
  411. // Cleanup distributed cache
  412. taskDistributedCacheManager.release();
  413. trackerDistributerdCacheManager.purgeCache();
  414. } catch (IOException e) {
  415. LOG.warn("Error cleaning up "+id+": "+e);
  416. }
  417. }
  418. }
  419. // TaskUmbilicalProtocol methods
  420. public JvmTask getTask(JvmContext context) { return null; }
  421. public synchronized boolean statusUpdate(TaskAttemptID taskId,
  422. TaskStatus taskStatus) throws IOException, InterruptedException {
  423. LOG.info(taskStatus.getStateString());
  424. int taskIndex = mapIds.indexOf(taskId);
  425. if (taskIndex >= 0) { // mapping
  426. float numTasks = (float) this.numMapTasks;
  427. partialMapProgress[taskIndex] = taskStatus.getProgress();
  428. mapCounters[taskIndex] = taskStatus.getCounters();
  429. float partialProgress = 0.0f;
  430. for (float f : partialMapProgress) {
  431. partialProgress += f;
  432. }
  433. status.setMapProgress(partialProgress / numTasks);
  434. } else {
  435. reduceCounters = taskStatus.getCounters();
  436. status.setReduceProgress(taskStatus.getProgress());
  437. }
  438. // ignore phase
  439. return true;
  440. }
  441. /** Return the current values of the counters for this job,
  442. * including tasks that are in progress.
  443. */
  444. public synchronized Counters getCurrentCounters() {
  445. if (null == mapCounters) {
  446. // Counters not yet initialized for job.
  447. return EMPTY_COUNTERS;
  448. }
  449. Counters current = EMPTY_COUNTERS;
  450. for (Counters c : mapCounters) {
  451. current = Counters.sum(current, c);
  452. }
  453. current = Counters.sum(current, reduceCounters);
  454. return current;
  455. }
  456. /**
  457. * Task is reporting that it is in commit_pending
  458. * and it is waiting for the commit Response
  459. */
  460. public void commitPending(TaskAttemptID taskid,
  461. TaskStatus taskStatus)
  462. throws IOException, InterruptedException {
  463. statusUpdate(taskid, taskStatus);
  464. }
  465. public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) {
  466. // Ignore for now
  467. }
  468. public void reportNextRecordRange(TaskAttemptID taskid,
  469. SortedRanges.Range range) throws IOException {
  470. LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
  471. }
  472. public boolean ping(TaskAttemptID taskid) throws IOException {
  473. return true;
  474. }
  475. public boolean canCommit(TaskAttemptID taskid)
  476. throws IOException {
  477. return true;
  478. }
  479. public void done(TaskAttemptID taskId) throws IOException {
  480. int taskIndex = mapIds.indexOf(taskId);
  481. if (taskIndex >= 0) { // mapping
  482. status.setMapProgress(1.0f);
  483. } else {
  484. status.setReduceProgress(1.0f);
  485. }
  486. }
  487. public synchronized void fsError(TaskAttemptID taskId, String message)
  488. throws IOException {
  489. LOG.fatal("FSError: "+ message + "from task: " + taskId);
  490. }
  491. public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
  492. LOG.fatal("shuffleError: "+ message + "from task: " + taskId);
  493. }
  494. public synchronized void fatalError(TaskAttemptID taskId, String msg)
  495. throws IOException {
  496. LOG.fatal("Fatal: "+ msg + "from task: " + taskId);
  497. }
  498. public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
  499. int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
  500. return new MapTaskCompletionEventsUpdate(
  501. org.apache.hadoop.mapred.TaskCompletionEvent.EMPTY_ARRAY, false);
  502. }
  503. }
  504. public LocalJobRunner(Configuration conf) throws IOException {
  505. this(new JobConf(conf));
  506. }
  507. @Deprecated
  508. public LocalJobRunner(JobConf conf) throws IOException {
  509. this.fs = FileSystem.getLocal(conf);
  510. this.conf = conf;
  511. myMetrics = new JobTrackerMetricsInst(null, new JobConf(conf));
  512. }
  513. // JobSubmissionProtocol methods
  514. private static int jobid = 0;
  515. public synchronized org.apache.hadoop.mapreduce.JobID getNewJobID() {
  516. return new org.apache.hadoop.mapreduce.JobID("local", ++jobid);
  517. }
  518. public org.apache.hadoop.mapreduce.JobStatus submitJob(
  519. org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir,
  520. Credentials credentials) throws IOException {
  521. Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);
  522. job.job.setCredentials(credentials);
  523. return job.status;
  524. }
  525. public void killJob(org.apache.hadoop.mapreduce.JobID id) {
  526. jobs.get(JobID.downgrade(id)).killed = true;
  527. jobs.get(JobID.downgrade(id)).interrupt();
  528. }
  529. public void setJobPriority(org.apache.hadoop.mapreduce.JobID id,
  530. String jp) throws IOException {
  531. throw new UnsupportedOperationException("Changing job priority " +
  532. "in LocalJobRunner is not supported.");
  533. }
  534. /** Throws {@link UnsupportedOperationException} */
  535. public boolean killTask(org.apache.hadoop.mapreduce.TaskAttemptID taskId,
  536. boolean shouldFail) throws IOException {
  537. throw new UnsupportedOperationException("Killing tasks in " +
  538. "LocalJobRunner is not supported");
  539. }
  540. public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(
  541. org.apache.hadoop.mapreduce.JobID id, TaskType type) {
  542. return new org.apache.hadoop.mapreduce.TaskReport[0];
  543. }
  544. public org.apache.hadoop.mapreduce.JobStatus getJobStatus(
  545. org.apache.hadoop.mapreduce.JobID id) {
  546. Job job = jobs.get(JobID.downgrade(id));
  547. if(job != null)
  548. return job.status;
  549. else
  550. return null;
  551. }
  552. public org.apache.hadoop.mapreduce.Counters getJobCounters(
  553. org.apache.hadoop.mapreduce.JobID id) {
  554. Job job = jobs.get(JobID.downgrade(id));
  555. return new org.apache.hadoop.mapreduce.Counters(job.getCurrentCounters());
  556. }
  557. public String getFilesystemName() throws IOException {
  558. return fs.getUri().toString();
  559. }
  560. public ClusterMetrics getClusterMetrics() {
  561. int numMapTasks = map_tasks.get();
  562. return new ClusterMetrics(numMapTasks, reduce_tasks, numMapTasks,
  563. reduce_tasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
  564. }
  565. /**
  566. * @deprecated Use {@link #getJobTrackerStatus()} instead.
  567. */
  568. @Deprecated
  569. public State getJobTrackerState() throws IOException, InterruptedException {
  570. return State.RUNNING;
  571. }
  572. public JobTrackerStatus getJobTrackerStatus() {
  573. return JobTrackerStatus.RUNNING;
  574. }
  575. public long getTaskTrackerExpiryInterval() throws IOException, InterruptedException {
  576. return 0;
  577. }
  578. /**
  579. * Get all active trackers in cluster.
  580. * @return array of TaskTrackerInfo
  581. */
  582. public TaskTrackerInfo[] getActiveTrackers()
  583. throws IOException, InterruptedException {
  584. return null;
  585. }
  586. /**
  587. * Get all blacklisted trackers in cluster.
  588. * @return array of TaskTrackerInfo
  589. */
  590. public TaskTrackerInfo[] getBlacklistedTrackers()
  591. throws IOException, InterruptedException {
  592. return null;
  593. }
  594. public TaskCompletionEvent[] getTaskCompletionEvents(
  595. org.apache.hadoop.mapreduce.JobID jobid
  596. , int fromEventId, int maxEvents) throws IOException {
  597. return TaskCompletionEvent.EMPTY_ARRAY;
  598. }
  599. public org.apache.hadoop.mapreduce.JobStatus[] getAllJobs() {return null;}
  600. /**
  601. * Returns the diagnostic information for a particular task in the given job.
  602. * To be implemented
  603. */
  604. public String[] getTaskDiagnostics(
  605. org.apache.hadoop.mapreduce.TaskAttemptID taskid) throws IOException{
  606. return new String [0];
  607. }
  608. /**
  609. * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getSystemDir()
  610. */
  611. public String getSystemDir() {
  612. Path sysDir = new Path(
  613. conf.get(JTConfig.JT_SYSTEM_DIR, "/tmp/hadoop/mapred/system"));
  614. return fs.makeQualified(sysDir).toString();
  615. }
  616. /**
  617. * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getQueueAdmins()
  618. */
  619. public AccessControlList getQueueAdmins(String queueName) throws IOException {
  620. return new AccessControlList(" ");// no queue admins for local job runner
  621. }
  622. /**
  623. * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getStagingAreaDir()
  624. */
  625. public String getStagingAreaDir() throws IOException {
  626. Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT,
  627. "/tmp/hadoop/mapred/staging"));
  628. UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
  629. String user;
  630. if (ugi != null) {
  631. user = ugi.getShortUserName() + rand.nextInt();
  632. } else {
  633. user = "dummy" + rand.nextInt();
  634. }
  635. return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
  636. }
  637. public String getJobHistoryDir() {
  638. return null;
  639. }
  640. @Override
  641. public QueueInfo[] getChildQueues(String queueName) throws IOException {
  642. return null;
  643. }
  644. @Override
  645. public QueueInfo[] getRootQueues() throws IOException {
  646. return null;
  647. }
  648. @Override
  649. public QueueInfo[] getQueues() throws IOException {
  650. return null;
  651. }
  652. @Override
  653. public QueueInfo getQueue(String queue) throws IOException {
  654. return null;
  655. }
  656. @Override
  657. public org.apache.hadoop.mapreduce.QueueAclsInfo[]
  658. getQueueAclsForCurrentUser() throws IOException{
  659. return null;
  660. }
  661. /**
  662. * Set the max number of map tasks to run concurrently in the LocalJobRunner.
  663. * @param job the job to configure
  664. * @param maxMaps the maximum number of map tasks to allow.
  665. */
  666. public static void setLocalMaxRunningMaps(
  667. org.apache.hadoop.mapreduce.JobContext job,
  668. int maxMaps) {
  669. job.getConfiguration().setInt(LOCAL_MAX_MAPS, maxMaps);
  670. }
  671. /**
  672. * @return the max number of map tasks to run concurrently in the
  673. * LocalJobRunner.
  674. */
  675. public static int getLocalMaxRunningMaps(
  676. org.apache.hadoop.mapreduce.JobContext job) {
  677. return job.getConfiguration().getInt(LOCAL_MAX_MAPS, 1);
  678. }
  679. @Override
  680. public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
  681. ) throws IOException,
  682. InterruptedException {
  683. }
  684. @Override
  685. public Token<DelegationTokenIdentifier>
  686. getDelegationToken(Text renewer) throws IOException, InterruptedException {
  687. return null;
  688. }
  689. @Override
  690. public long renewDelegationToken(Token<DelegationTokenIdentifier> token
  691. ) throws IOException,InterruptedException{
  692. return 0;
  693. }
  694. @Override
  695. public LogParams getLogFileParams(org.apache.hadoop.mapreduce.JobID jobID,
  696. org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID)
  697. throws IOException, InterruptedException {
  698. throw new UnsupportedOperationException("Not supported");
  699. }
  700. }