1
0

TaskTracker.java 66 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.hadoop.mapred;
  19. import java.io.ByteArrayOutputStream;
  20. import java.io.File;
  21. import java.io.IOException;
  22. import java.io.OutputStream;
  23. import java.io.PrintStream;
  24. import java.net.BindException;
  25. import java.net.InetSocketAddress;
  26. import java.util.ArrayList;
  27. import java.util.HashMap;
  28. import java.util.HashSet;
  29. import java.util.Iterator;
  30. import java.util.List;
  31. import java.util.Map;
  32. import java.util.Random;
  33. import java.util.Set;
  34. import java.util.TreeMap;
  35. import java.util.concurrent.BlockingQueue;
  36. import java.util.concurrent.LinkedBlockingQueue;
  37. import java.util.regex.Pattern;
  38. import javax.servlet.ServletContext;
  39. import javax.servlet.ServletException;
  40. import javax.servlet.http.HttpServlet;
  41. import javax.servlet.http.HttpServletRequest;
  42. import javax.servlet.http.HttpServletResponse;
  43. import org.apache.commons.logging.Log;
  44. import org.apache.commons.logging.LogFactory;
  45. import org.apache.hadoop.fs.DF;
  46. import org.apache.hadoop.fs.FSDataInputStream;
  47. import org.apache.hadoop.fs.FSError;
  48. import org.apache.hadoop.fs.FileSystem;
  49. import org.apache.hadoop.fs.Path;
  50. import org.apache.hadoop.fs.LocalDirAllocator;
  51. import org.apache.hadoop.io.IntWritable;
  52. import org.apache.hadoop.ipc.RPC;
  53. import org.apache.hadoop.ipc.RemoteException;
  54. import org.apache.hadoop.ipc.Server;
  55. import org.apache.hadoop.metrics.MetricsContext;
  56. import org.apache.hadoop.metrics.MetricsException;
  57. import org.apache.hadoop.metrics.MetricsRecord;
  58. import org.apache.hadoop.metrics.MetricsUtil;
  59. import org.apache.hadoop.metrics.Updater;
  60. import org.apache.hadoop.net.DNS;
  61. import org.apache.hadoop.util.DiskChecker;
  62. import org.apache.hadoop.util.ReflectionUtils;
  63. import org.apache.hadoop.util.RunJar;
  64. import org.apache.hadoop.util.StringUtils;
  65. import org.apache.hadoop.util.DiskChecker.DiskErrorException;
  66. import org.apache.log4j.LogManager;
  67. /*******************************************************
  68. * TaskTracker is a process that starts and tracks MR Tasks
  69. * in a networked environment. It contacts the JobTracker
  70. * for Task assignments and reporting results.
  71. *
  72. * @author Mike Cafarella
  73. *******************************************************/
  74. public class TaskTracker
  75. implements MRConstants, TaskUmbilicalProtocol, Runnable {
  76. static final long WAIT_FOR_DONE = 3 * 1000;
  77. private int httpPort;
  78. static enum State {NORMAL, STALE, INTERRUPTED, DENIED}
  79. public static final Log LOG =
  80. LogFactory.getLog("org.apache.hadoop.mapred.TaskTracker");
  81. private boolean running = true;
  82. private LocalDirAllocator localDirAllocator;
  83. String taskTrackerName;
  84. String localHostname;
  85. InetSocketAddress jobTrackAddr;
  86. String taskReportBindAddress;
  87. private int taskReportPort;
  88. Server taskReportServer = null;
  89. InterTrackerProtocol jobClient;
  90. // last heartbeat response recieved
  91. short heartbeatResponseId = -1;
  92. /*
  93. * This is the last 'status' report sent by this tracker to the JobTracker.
  94. *
  95. * If the rpc call succeeds, this 'status' is cleared-out by this tracker;
  96. * indicating that a 'fresh' status report be generated; in the event the
  97. * rpc calls fails for whatever reason, the previous status report is sent
  98. * again.
  99. */
  100. TaskTrackerStatus status = null;
  101. StatusHttpServer server = null;
  102. boolean shuttingDown = false;
  103. Map<String, TaskInProgress> tasks = new HashMap<String, TaskInProgress>();
  104. /**
  105. * Map from taskId -> TaskInProgress.
  106. */
  107. Map<String, TaskInProgress> runningTasks = null;
  108. Map<String, RunningJob> runningJobs = null;
  109. volatile int mapTotal = 0;
  110. volatile int reduceTotal = 0;
  111. boolean justStarted = true;
  112. //dir -> DF
  113. Map<String, DF> localDirsDf = new HashMap<String, DF>();
  114. long minSpaceStart = 0;
  115. //must have this much space free to start new tasks
  116. boolean acceptNewTasks = true;
  117. long minSpaceKill = 0;
  118. //if we run under this limit, kill one task
  119. //and make sure we never receive any new jobs
  120. //until all the old tasks have been cleaned up.
  121. //this is if a machine is so full it's only good
  122. //for serving map output to the other nodes
  123. static Random r = new Random();
  124. FileSystem fs = null;
  125. private static final String SUBDIR = "taskTracker";
  126. private static final String CACHEDIR = "archive";
  127. private static final String JOBCACHE = "jobcache";
  128. private JobConf fConf;
  129. private MapOutputFile mapOutputFile;
  130. private int maxCurrentTasks;
  131. private int failures;
  132. private int finishedCount[] = new int[1];
  133. private MapEventsFetcherThread mapEventsFetcher;
  134. /**
  135. * the minimum interval between jobtracker polls
  136. */
  137. private static final long MIN_POLL_INTERVAL = 5000;
  138. /**
  139. * Number of maptask completion events locations to poll for at one time
  140. */
  141. private int probe_sample_size = 50;
  142. private class TaskTrackerMetrics implements Updater {
  143. private MetricsRecord metricsRecord = null;
  144. private int numCompletedTasks = 0;
  145. TaskTrackerMetrics() {
  146. MetricsContext context = MetricsUtil.getContext("mapred");
  147. metricsRecord = MetricsUtil.createRecord(context, "tasktracker");
  148. context.registerUpdater(this);
  149. }
  150. synchronized void completeTask() {
  151. ++numCompletedTasks;
  152. }
  153. /**
  154. * Since this object is a registered updater, this method will be called
  155. * periodically, e.g. every 5 seconds.
  156. */
  157. public void doUpdates(MetricsContext unused) {
  158. synchronized (this) {
  159. if (metricsRecord != null) {
  160. metricsRecord.setMetric("maps_running", mapTotal);
  161. metricsRecord.setMetric("reduces_running", reduceTotal);
  162. metricsRecord.setMetric("taskSlots", (short)maxCurrentTasks);
  163. metricsRecord.incrMetric("tasks_completed", numCompletedTasks);
  164. metricsRecord.update();
  165. }
  166. numCompletedTasks = 0;
  167. }
  168. }
  169. }
  170. private TaskTrackerMetrics myMetrics = null;
  171. /**
  172. * A list of tips that should be cleaned up.
  173. */
  174. private BlockingQueue<TaskTrackerAction> tasksToCleanup =
  175. new LinkedBlockingQueue<TaskTrackerAction>();
  176. /**
  177. * A daemon-thread that pulls tips off the list of things to cleanup.
  178. */
  179. private Thread taskCleanupThread =
  180. new Thread(new Runnable() {
  181. public void run() {
  182. while (true) {
  183. try {
  184. TaskTrackerAction action = tasksToCleanup.take();
  185. if (action instanceof KillJobAction) {
  186. purgeJob((KillJobAction) action);
  187. } else if (action instanceof KillTaskAction) {
  188. TaskInProgress tip;
  189. KillTaskAction killAction = (KillTaskAction) action;
  190. synchronized (TaskTracker.this) {
  191. tip = tasks.get(killAction.getTaskId());
  192. }
  193. LOG.info("Received KillTaskAction for task: " +
  194. killAction.getTaskId());
  195. purgeTask(tip, false);
  196. } else {
  197. LOG.error("Non-delete action given to cleanup thread: "
  198. + action);
  199. }
  200. } catch (Throwable except) {
  201. LOG.warn(StringUtils.stringifyException(except));
  202. }
  203. }
  204. }
  205. }, "taskCleanup");
  206. {
  207. taskCleanupThread.setDaemon(true);
  208. taskCleanupThread.start();
  209. }
  210. private RunningJob addTaskToJob(String jobId,
  211. Path localJobFile,
  212. TaskInProgress tip) {
  213. synchronized (runningJobs) {
  214. RunningJob rJob = null;
  215. if (!runningJobs.containsKey(jobId)) {
  216. rJob = new RunningJob(jobId, localJobFile);
  217. rJob.localized = false;
  218. rJob.tasks = new HashSet<TaskInProgress>();
  219. rJob.jobFile = localJobFile;
  220. runningJobs.put(jobId, rJob);
  221. } else {
  222. rJob = runningJobs.get(jobId);
  223. }
  224. synchronized (rJob) {
  225. rJob.tasks.add(tip);
  226. }
  227. runningJobs.notify(); //notify the fetcher thread
  228. return rJob;
  229. }
  230. }
  231. private void removeTaskFromJob(String jobId, TaskInProgress tip) {
  232. synchronized (runningJobs) {
  233. RunningJob rjob = runningJobs.get(jobId);
  234. if (rjob == null) {
  235. LOG.warn("Unknown job " + jobId + " being deleted.");
  236. } else {
  237. synchronized (rjob) {
  238. rjob.tasks.remove(tip);
  239. if (rjob.tasks.isEmpty()) {
  240. runningJobs.remove(jobId);
  241. }
  242. }
  243. }
  244. }
  245. }
  246. static String getCacheSubdir() {
  247. return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
  248. }
  249. static String getJobCacheSubdir() {
  250. return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
  251. }
  252. public long getProtocolVersion(String protocol,
  253. long clientVersion) throws IOException {
  254. if (protocol.equals(TaskUmbilicalProtocol.class.getName())) {
  255. return TaskUmbilicalProtocol.versionID;
  256. } else {
  257. throw new IOException("Unknown protocol for task tracker: " +
  258. protocol);
  259. }
  260. }
  261. /**
  262. * Do the real constructor work here. It's in a separate method
  263. * so we can call it again and "recycle" the object after calling
  264. * close().
  265. */
  266. synchronized void initialize() throws IOException {
  267. // use configured nameserver & interface to get local hostname
  268. this.localHostname =
  269. DNS.getDefaultHost
  270. (fConf.get("mapred.tasktracker.dns.interface","default"),
  271. fConf.get("mapred.tasktracker.dns.nameserver","default"));
  272. //check local disk
  273. checkLocalDirs(this.fConf.getLocalDirs());
  274. fConf.deleteLocalFiles(SUBDIR);
  275. // Clear out state tables
  276. this.tasks.clear();
  277. this.runningTasks = new TreeMap<String, TaskInProgress>();
  278. this.runningJobs = new TreeMap<String, RunningJob>();
  279. this.mapTotal = 0;
  280. this.reduceTotal = 0;
  281. this.acceptNewTasks = true;
  282. this.status = null;
  283. this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L);
  284. this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L);
  285. int numCopiers = this.fConf.getInt("mapred.reduce.parallel.copies", 5);
  286. //tweak the probe sample size (make it a function of numCopiers)
  287. probe_sample_size = Math.max(numCopiers*5, 50);
  288. this.myMetrics = new TaskTrackerMetrics();
  289. // port numbers
  290. this.taskReportPort = this.fConf.getInt("mapred.task.tracker.report.port", 50050);
  291. // bind address
  292. this.taskReportBindAddress = this.fConf.get("mapred.task.tracker.report.bindAddress", "0.0.0.0");
  293. // RPC initialization
  294. while (true) {
  295. try {
  296. this.taskReportServer = RPC.getServer(this, this.taskReportBindAddress, this.taskReportPort, maxCurrentTasks, false, this.fConf);
  297. this.taskReportServer.start();
  298. break;
  299. } catch (BindException e) {
  300. LOG.info("Could not open report server at " + this.taskReportPort + ", trying new port");
  301. this.taskReportPort++;
  302. }
  303. }
  304. // The rpc-server port can be ephemeral...
  305. // ... ensure we have the correct info
  306. this.taskReportPort = taskReportServer.getListenerAddress().getPort();
  307. this.fConf.setInt("mapred.task.tracker.report.port", this.taskReportPort);
  308. LOG.info("TaskTracker up at: " + this.taskReportPort);
  309. this.taskTrackerName = "tracker_" +
  310. localHostname + ":" + taskReportPort;
  311. LOG.info("Starting tracker " + taskTrackerName);
  312. // Clear out temporary files that might be lying around
  313. this.mapOutputFile.cleanupStorage();
  314. this.justStarted = true;
  315. this.jobClient = (InterTrackerProtocol)
  316. RPC.waitForProxy(InterTrackerProtocol.class,
  317. InterTrackerProtocol.versionID,
  318. jobTrackAddr, this.fConf);
  319. this.running = true;
  320. // start the thread that will fetch map task completion events
  321. this.mapEventsFetcher = new MapEventsFetcherThread();
  322. mapEventsFetcher.setDaemon(true);
  323. mapEventsFetcher.setName(
  324. "Map-events fetcher for all reduce tasks " + "on " +
  325. taskTrackerName);
  326. mapEventsFetcher.start();
  327. }
  328. private class MapEventsFetcherThread extends Thread {
  329. private List <FetchStatus> reducesInShuffle() {
  330. List <FetchStatus> fList = new ArrayList<FetchStatus>();
  331. for (Map.Entry <String, RunningJob> item : runningJobs.entrySet()) {
  332. RunningJob rjob = item.getValue();
  333. String jobId = item.getKey();
  334. FetchStatus f;
  335. synchronized (rjob) {
  336. f = rjob.getFetchStatus();
  337. for (TaskInProgress tip : rjob.tasks) {
  338. Task task = tip.getTask();
  339. if (!task.isMapTask()) {
  340. if (((ReduceTask)task).getPhase() ==
  341. TaskStatus.Phase.SHUFFLE) {
  342. if (rjob.getFetchStatus() == null) {
  343. //this is a new job; we start fetching its map events
  344. f = new FetchStatus(jobId,
  345. ((ReduceTask)task).getNumMaps());
  346. rjob.setFetchStatus(f);
  347. }
  348. f = rjob.getFetchStatus();
  349. fList.add(f);
  350. break; //no need to check any more tasks belonging to this
  351. }
  352. }
  353. }
  354. }
  355. }
  356. //at this point, we have information about for which of
  357. //the running jobs do we need to query the jobtracker for map
  358. //outputs (actually map events).
  359. return fList;
  360. }
  361. public void run() {
  362. LOG.info("Starting thread: " + getName());
  363. while (true) {
  364. try {
  365. List <FetchStatus> fList = null;
  366. synchronized (runningJobs) {
  367. while (((fList = reducesInShuffle()).size()) == 0) {
  368. try {
  369. runningJobs.wait();
  370. } catch (InterruptedException e) {
  371. LOG.info("Shutting down: " + getName());
  372. return;
  373. }
  374. }
  375. }
  376. // now fetch all the map task events for all the reduce tasks
  377. // possibly belonging to different jobs
  378. for (FetchStatus f : fList) {
  379. try {
  380. f.fetchMapCompletionEvents();
  381. try {
  382. Thread.sleep(MIN_POLL_INTERVAL);
  383. } catch (InterruptedException ie) {
  384. LOG.info("Shutting down: " + getName());
  385. return;
  386. }
  387. } catch (Exception e) {
  388. LOG.warn(
  389. "Ignoring exception that fetch for map completion" +
  390. " events threw for " + f.jobId + " threw: " +
  391. StringUtils.stringifyException(e));
  392. }
  393. }
  394. } catch (Exception e) {
  395. LOG.info("Ignoring exception " + e.getMessage());
  396. }
  397. }
  398. }
  399. }
  400. private class FetchStatus {
  401. /** The next event ID that we will start querying the JobTracker from*/
  402. private IntWritable fromEventId;
  403. /** This is the cache of map events for a given job */
  404. private List<TaskCompletionEvent> allMapEvents;
  405. /** This array will store indexes to "SUCCEEDED" map events from
  406. * allMapEvents. The array is indexed by the mapId.
  407. * The reason why we store the indexes is to quickly reset SUCCEEDED
  408. * events to OBSOLETE. Thus ReduceTasks might also get to know about
  409. * OBSOLETE events and avoid fetching map outputs from the corresponding
  410. * locations.
  411. */
  412. private int indexToEventsCache[];
  413. /** What jobid this fetchstatus object is for*/
  414. private String jobId;
  415. public FetchStatus(String jobId, int numMaps) {
  416. this.fromEventId = new IntWritable(0);
  417. this.jobId = jobId;
  418. this.allMapEvents = new ArrayList<TaskCompletionEvent>(numMaps);
  419. this.indexToEventsCache = new int[numMaps];
  420. }
  421. public TaskCompletionEvent[] getMapEvents(int fromId, int max) {
  422. TaskCompletionEvent[] mapEvents =
  423. TaskCompletionEvent.EMPTY_ARRAY;
  424. synchronized (allMapEvents) {
  425. if (allMapEvents.size() > fromId) {
  426. int actualMax = Math.min(max, (allMapEvents.size() - fromId));
  427. List <TaskCompletionEvent> eventSublist =
  428. allMapEvents.subList(fromId, actualMax + fromId);
  429. mapEvents =
  430. (TaskCompletionEvent[])eventSublist.toArray(mapEvents);
  431. }
  432. }
  433. return mapEvents;
  434. }
  435. public void fetchMapCompletionEvents() throws IOException {
  436. List <TaskCompletionEvent> recentMapEvents =
  437. queryJobTracker(fromEventId, jobId, jobClient);
  438. synchronized (allMapEvents) {
  439. for (TaskCompletionEvent t : recentMapEvents) {
  440. TaskCompletionEvent.Status status = t.getTaskStatus();
  441. allMapEvents.add(t);
  442. if (status == TaskCompletionEvent.Status.SUCCEEDED) {
  443. //store the index of the events cache for this success event.
  444. indexToEventsCache[t.idWithinJob()] = allMapEvents.size();
  445. }
  446. else if (status == TaskCompletionEvent.Status.FAILED ||
  447. status == TaskCompletionEvent.Status.OBSOLETE) {
  448. int idx = indexToEventsCache[t.idWithinJob()];
  449. //if this map task was declared a success earlier, we will have
  450. //idx > 0
  451. if (idx > 0) {
  452. //Mark the event as OBSOLETE and reset the index to 0. Note
  453. //we access the 'idx - 1' entry. This is because while storing
  454. //the idx in indexToEventsCache, we store the 'actual idx + 1'
  455. //Helps us to eliminate the index array elements initialization
  456. //to something like '-1'
  457. TaskCompletionEvent obsoleteEvent = allMapEvents.get(idx - 1);
  458. obsoleteEvent.setTaskStatus(
  459. TaskCompletionEvent.Status.OBSOLETE);
  460. indexToEventsCache[t.idWithinJob()] = 0;
  461. }
  462. }
  463. }
  464. }
  465. }
  466. }
  467. // intialize the job directory
  468. private void localizeJob(TaskInProgress tip) throws IOException {
  469. Path localJarFile = null;
  470. Task t = tip.getTask();
  471. String jobId = t.getJobId();
  472. Path localJobFile = new Path(fConf.getLocalPath(getJobCacheSubdir()),
  473. jobId + Path.SEPARATOR + "job.xml");
  474. RunningJob rjob = addTaskToJob(jobId, localJobFile, tip);
  475. synchronized (rjob) {
  476. if (!rjob.localized) {
  477. localJarFile = new Path(fConf.getLocalPath(getJobCacheSubdir()),
  478. jobId + Path.SEPARATOR + "job.jar");
  479. String jobFile = t.getJobFile();
  480. FileSystem localFs = FileSystem.getLocal(fConf);
  481. // this will happen on a partial execution of localizeJob.
  482. // Sometimes the job.xml gets copied but copying job.jar
  483. // might throw out an exception
  484. // we should clean up and then try again
  485. Path jobDir = localJobFile.getParent();
  486. if (localFs.exists(jobDir)){
  487. localFs.delete(jobDir);
  488. boolean b = localFs.mkdirs(jobDir);
  489. if (!b)
  490. throw new IOException("Not able to create job directory "
  491. + jobDir.toString());
  492. }
  493. fs.copyToLocalFile(new Path(jobFile), localJobFile);
  494. JobConf localJobConf = new JobConf(localJobFile);
  495. String jarFile = localJobConf.getJar();
  496. if (jarFile != null) {
  497. fs.copyToLocalFile(new Path(jarFile), localJarFile);
  498. localJobConf.setJar(localJarFile.toString());
  499. OutputStream out = localFs.create(localJobFile);
  500. try {
  501. localJobConf.write(out);
  502. } finally {
  503. out.close();
  504. }
  505. // also unjar the job.jar files in workdir
  506. File workDir = new File(
  507. new File(localJobFile.toString()).getParent(),
  508. "work");
  509. if (!workDir.mkdirs()) {
  510. if (!workDir.isDirectory()) {
  511. throw new IOException("Mkdirs failed to create " + workDir.toString());
  512. }
  513. }
  514. RunJar.unJar(new File(localJarFile.toString()), workDir);
  515. }
  516. rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
  517. localJobConf.getKeepFailedTaskFiles());
  518. rjob.localized = true;
  519. }
  520. }
  521. launchTaskForJob(tip, new JobConf(rjob.jobFile));
  522. }
  523. private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{
  524. synchronized (tip) {
  525. try {
  526. tip.setJobConf(jobConf);
  527. tip.launchTask();
  528. } catch (Throwable ie) {
  529. tip.runstate = TaskStatus.State.FAILED;
  530. try {
  531. tip.cleanup();
  532. } catch (Throwable ie2) {
  533. // Ignore it, we are just trying to cleanup.
  534. }
  535. String error = StringUtils.stringifyException(ie);
  536. tip.reportDiagnosticInfo(error);
  537. LOG.info(error);
  538. }
  539. }
  540. }
  541. public synchronized void shutdown() throws IOException {
  542. shuttingDown = true;
  543. close();
  544. if (this.server != null) {
  545. try {
  546. LOG.info("Shutting down StatusHttpServer");
  547. this.server.stop();
  548. } catch (InterruptedException ex) {
  549. ex.printStackTrace();
  550. }
  551. }
  552. }
  553. /**
  554. * Close down the TaskTracker and all its components. We must also shutdown
  555. * any running tasks or threads, and cleanup disk space. A new TaskTracker
  556. * within the same process space might be restarted, so everything must be
  557. * clean.
  558. */
  559. public synchronized void close() throws IOException {
  560. //
  561. // Kill running tasks. Do this in a 2nd vector, called 'tasksToClose',
  562. // because calling jobHasFinished() may result in an edit to 'tasks'.
  563. //
  564. TreeMap<String, TaskInProgress> tasksToClose =
  565. new TreeMap<String, TaskInProgress>();
  566. tasksToClose.putAll(tasks);
  567. for (TaskInProgress tip : tasksToClose.values()) {
  568. tip.jobHasFinished(false);
  569. }
  570. // Shutdown local RPC servers. Do them
  571. // in parallel, as RPC servers can take a long
  572. // time to shutdown. (They need to wait a full
  573. // RPC timeout, which might be 10-30 seconds.)
  574. new Thread("RPC shutdown") {
  575. public void run() {
  576. if (taskReportServer != null) {
  577. taskReportServer.stop();
  578. taskReportServer = null;
  579. }
  580. }
  581. }.start();
  582. this.running = false;
  583. // Clear local storage
  584. this.mapOutputFile.cleanupStorage();
  585. // Shutdown the fetcher thread
  586. this.mapEventsFetcher.interrupt();
  587. }
  588. /**
  589. * Start with the local machine name, and the default JobTracker
  590. */
  591. public TaskTracker(JobConf conf) throws IOException {
  592. maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2);
  593. this.fConf = conf;
  594. this.jobTrackAddr = JobTracker.getAddress(conf);
  595. this.mapOutputFile = new MapOutputFile();
  596. this.mapOutputFile.setConf(conf);
  597. int httpPort = conf.getInt("tasktracker.http.port", 50060);
  598. String httpBindAddress = conf.get("tasktracker.http.bindAddress", "0.0.0.0");
  599. this.server = new StatusHttpServer("task", httpBindAddress, httpPort, true);
  600. int workerThreads = conf.getInt("tasktracker.http.threads", 40);
  601. server.setThreads(1, workerThreads);
  602. // let the jsp pages get to the task tracker, config, and other relevant
  603. // objects
  604. FileSystem local = FileSystem.getLocal(conf);
  605. this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
  606. server.setAttribute("task.tracker", this);
  607. server.setAttribute("local.file.system", local);
  608. server.setAttribute("conf", conf);
  609. server.setAttribute("log", LOG);
  610. server.setAttribute("localDirAllocator", localDirAllocator);
  611. server.addServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
  612. server.start();
  613. this.httpPort = server.getPort();
  614. initialize();
  615. }
  616. /**
  617. * The connection to the JobTracker, used by the TaskRunner
  618. * for locating remote files.
  619. */
  620. public InterTrackerProtocol getJobClient() {
  621. return jobClient;
  622. }
  623. /**Return the DFS filesystem
  624. */
  625. public FileSystem getFileSystem(){
  626. return fs;
  627. }
  628. /** Return the port at which the tasktracker bound to */
  629. public synchronized int getTaskTrackerReportPort() {
  630. return taskReportPort;
  631. }
  632. /** Queries the job tracker for a set of outputs ready to be copied
  633. * @param fromEventId the first event ID we want to start from, this is
  634. * modified by the call to this method
  635. * @param jobClient the job tracker
  636. * @return a set of locations to copy outputs from
  637. * @throws IOException
  638. */
  639. private List<TaskCompletionEvent> queryJobTracker(IntWritable fromEventId,
  640. String jobId,
  641. InterTrackerProtocol jobClient)
  642. throws IOException {
  643. TaskCompletionEvent t[] = jobClient.getTaskCompletionEvents(
  644. jobId,
  645. fromEventId.get(),
  646. probe_sample_size);
  647. //we are interested in map task completion events only. So store
  648. //only those
  649. List <TaskCompletionEvent> recentMapEvents =
  650. new ArrayList<TaskCompletionEvent>();
  651. for (int i = 0; i < t.length; i++) {
  652. if (t[i].isMap) {
  653. recentMapEvents.add(t[i]);
  654. }
  655. }
  656. fromEventId.set(fromEventId.get() + t.length);
  657. return recentMapEvents;
  658. }
  659. /**
  660. * Main service loop. Will stay in this loop forever.
  661. */
  662. State offerService() throws Exception {
  663. long lastHeartbeat = 0;
  664. this.fs = FileSystem.getNamed(jobClient.getFilesystemName(), this.fConf);
  665. while (running && !shuttingDown) {
  666. try {
  667. long now = System.currentTimeMillis();
  668. long waitTime = HEARTBEAT_INTERVAL - (now - lastHeartbeat);
  669. if (waitTime > 0) {
  670. // sleeps for the wait time, wakes up if a task is finished.
  671. synchronized(finishedCount) {
  672. if (finishedCount[0] == 0) {
  673. finishedCount.wait(waitTime);
  674. }
  675. finishedCount[0] = 0;
  676. }
  677. }
  678. // Send the heartbeat and process the jobtracker's directives
  679. HeartbeatResponse heartbeatResponse = transmitHeartBeat();
  680. TaskTrackerAction[] actions = heartbeatResponse.getActions();
  681. LOG.debug("Got heartbeatResponse from JobTracker with responseId: " +
  682. heartbeatResponse.getResponseId() + " and " +
  683. ((actions != null) ? actions.length : 0) + " actions");
  684. if (reinitTaskTracker(actions)) {
  685. return State.STALE;
  686. }
  687. lastHeartbeat = now;
  688. justStarted = false;
  689. if (actions != null){
  690. for(TaskTrackerAction action: actions) {
  691. if (action instanceof LaunchTaskAction) {
  692. startNewTask((LaunchTaskAction) action);
  693. } else {
  694. tasksToCleanup.put(action);
  695. }
  696. }
  697. }
  698. markUnresponsiveTasks();
  699. killOverflowingTasks();
  700. //we've cleaned up, resume normal operation
  701. if (!acceptNewTasks && isIdle()) {
  702. acceptNewTasks=true;
  703. }
  704. } catch (InterruptedException ie) {
  705. LOG.info("Interrupted. Closing down.");
  706. return State.INTERRUPTED;
  707. } catch (DiskErrorException de) {
  708. String msg = "Exiting task tracker for disk error:\n" +
  709. StringUtils.stringifyException(de);
  710. LOG.error(msg);
  711. synchronized (this) {
  712. jobClient.reportTaskTrackerError(taskTrackerName,
  713. "DiskErrorException", msg);
  714. }
  715. return State.STALE;
  716. } catch (RemoteException re) {
  717. String reClass = re.getClassName();
  718. if (DisallowedTaskTrackerException.class.getName().equals(reClass)) {
  719. LOG.info("Tasktracker disallowed by JobTracker.");
  720. return State.DENIED;
  721. }
  722. } catch (Exception except) {
  723. String msg = "Caught exception: " +
  724. StringUtils.stringifyException(except);
  725. LOG.error(msg);
  726. }
  727. }
  728. return State.NORMAL;
  729. }
  730. /**
  731. * Build and transmit the heart beat to the JobTracker
  732. * @return false if the tracker was unknown
  733. * @throws IOException
  734. */
  735. private HeartbeatResponse transmitHeartBeat() throws IOException {
  736. //
  737. // Check if the last heartbeat got through...
  738. // if so then build the heartbeat information for the JobTracker;
  739. // else resend the previous status information.
  740. //
  741. if (status == null) {
  742. synchronized (this) {
  743. List<TaskStatus> taskReports =
  744. new ArrayList<TaskStatus>(runningTasks.size());
  745. for (TaskInProgress tip: runningTasks.values()) {
  746. taskReports.add(tip.createStatus());
  747. }
  748. status =
  749. new TaskTrackerStatus(taskTrackerName, localHostname,
  750. httpPort, taskReports,
  751. failures);
  752. }
  753. } else {
  754. LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +
  755. "' with reponseId '" + heartbeatResponseId);
  756. }
  757. //
  758. // Check if we should ask for a new Task
  759. //
  760. boolean askForNewTask;
  761. synchronized (this) {
  762. askForNewTask = (mapTotal < maxCurrentTasks ||
  763. reduceTotal < maxCurrentTasks) &&
  764. acceptNewTasks;
  765. }
  766. if (askForNewTask) {
  767. checkLocalDirs(fConf.getLocalDirs());
  768. askForNewTask = enoughFreeSpace(minSpaceStart);
  769. }
  770. //
  771. // Xmit the heartbeat
  772. //
  773. HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
  774. justStarted, askForNewTask,
  775. heartbeatResponseId);
  776. //
  777. // The heartbeat got through successfully!
  778. //
  779. heartbeatResponseId = heartbeatResponse.getResponseId();
  780. synchronized (this) {
  781. for (TaskStatus taskStatus : status.getTaskReports()) {
  782. if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
  783. if (taskStatus.getIsMap()) {
  784. mapTotal--;
  785. } else {
  786. reduceTotal--;
  787. }
  788. try {
  789. myMetrics.completeTask();
  790. } catch (MetricsException me) {
  791. LOG.warn("Caught: " + StringUtils.stringifyException(me));
  792. }
  793. runningTasks.remove(taskStatus.getTaskId());
  794. }
  795. }
  796. }
  797. // Force a rebuild of 'status' on the next iteration
  798. status = null;
  799. return heartbeatResponse;
  800. }
  801. /**
  802. * Check if the jobtracker directed a 'reset' of the tasktracker.
  803. *
  804. * @param actions the directives of the jobtracker for the tasktracker.
  805. * @return <code>true</code> if tasktracker is to be reset,
  806. * <code>false</code> otherwise.
  807. */
  808. private boolean reinitTaskTracker(TaskTrackerAction[] actions) {
  809. if (actions != null) {
  810. for (TaskTrackerAction action : actions) {
  811. if (action.getActionId() ==
  812. TaskTrackerAction.ActionType.REINIT_TRACKER) {
  813. LOG.info("Recieved RenitTrackerAction from JobTracker");
  814. return true;
  815. }
  816. }
  817. }
  818. return false;
  819. }
  820. /**
  821. * Kill any tasks that have not reported progress in the last X seconds.
  822. */
  823. private synchronized void markUnresponsiveTasks() throws IOException {
  824. long now = System.currentTimeMillis();
  825. for (TaskInProgress tip: runningTasks.values()) {
  826. if (tip.getRunState() == TaskStatus.State.RUNNING) {
  827. // Check the per-job timeout interval for tasks;
  828. // an interval of '0' implies it is never timed-out
  829. long jobTaskTimeout = tip.getTaskTimeout();
  830. if (jobTaskTimeout == 0) {
  831. continue;
  832. }
  833. // Check if the task has not reported progress for a
  834. // time-period greater than the configured time-out
  835. long timeSinceLastReport = now - tip.getLastProgressReport();
  836. if (timeSinceLastReport > jobTaskTimeout && !tip.wasKilled) {
  837. String msg =
  838. "Task " + tip.getTask().getTaskId() + " failed to report status for "
  839. + (timeSinceLastReport / 1000) + " seconds. Killing!";
  840. LOG.info(tip.getTask().getTaskId() + ": " + msg);
  841. ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
  842. tip.reportDiagnosticInfo(msg);
  843. purgeTask(tip, true);
  844. }
  845. }
  846. }
  847. }
  848. /**
  849. * The task tracker is done with this job, so we need to clean up.
  850. * @param action The action with the job
  851. * @throws IOException
  852. */
  853. private synchronized void purgeJob(KillJobAction action) throws IOException {
  854. String jobId = action.getJobId();
  855. LOG.info("Received 'KillJobAction' for job: " + jobId);
  856. RunningJob rjob = null;
  857. synchronized (runningJobs) {
  858. rjob = runningJobs.get(jobId);
  859. }
  860. if (rjob == null) {
  861. LOG.warn("Unknown job " + jobId + " being deleted.");
  862. } else {
  863. synchronized (rjob) {
  864. // Add this tips of this job to queue of tasks to be purged
  865. for (TaskInProgress tip : rjob.tasks) {
  866. tip.jobHasFinished(false);
  867. }
  868. // Delete the job directory for this
  869. // task if the job is done/failed
  870. if (!rjob.keepJobFiles){
  871. fConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + JOBCACHE +
  872. Path.SEPARATOR + rjob.getJobId());
  873. }
  874. // Remove this job
  875. rjob.tasks.clear();
  876. }
  877. }
  878. synchronized(runningJobs) {
  879. runningJobs.remove(jobId);
  880. }
  881. }
  882. /**
  883. * Remove the tip and update all relevant state.
  884. *
  885. * @param tip {@link TaskInProgress} to be removed.
  886. * @param wasFailure did the task fail or was it killed?
  887. */
  888. private void purgeTask(TaskInProgress tip, boolean wasFailure)
  889. throws IOException {
  890. if (tip != null) {
  891. LOG.info("About to purge task: " + tip.getTask().getTaskId());
  892. // Remove the task from running jobs,
  893. // removing the job if it's the last task
  894. removeTaskFromJob(tip.getTask().getJobId(), tip);
  895. tip.jobHasFinished(wasFailure);
  896. }
  897. }
  898. /** Check if we're dangerously low on disk space
  899. * If so, kill jobs to free up space and make sure
  900. * we don't accept any new tasks
  901. * Try killing the reduce jobs first, since I believe they
  902. * use up most space
  903. * Then pick the one with least progress
  904. */
  905. private void killOverflowingTasks() throws IOException {
  906. if (!enoughFreeSpace(minSpaceKill)) {
  907. acceptNewTasks=false;
  908. //we give up! do not accept new tasks until
  909. //all the ones running have finished and they're all cleared up
  910. synchronized (this) {
  911. TaskInProgress killMe = findTaskToKill();
  912. if (killMe!=null) {
  913. String msg = "Tasktracker running out of space." +
  914. " Killing task.";
  915. LOG.info(killMe.getTask().getTaskId() + ": " + msg);
  916. killMe.reportDiagnosticInfo(msg);
  917. purgeTask(killMe, false);
  918. }
  919. }
  920. }
  921. }
  922. /**
  923. * Pick a task to kill to free up space
  924. * @return the task to kill or null, if one wasn't found
  925. */
  926. private TaskInProgress findTaskToKill() {
  927. TaskInProgress killMe = null;
  928. for (Iterator it = runningTasks.values().iterator(); it.hasNext();) {
  929. TaskInProgress tip = (TaskInProgress) it.next();
  930. if ((tip.getRunState() == TaskStatus.State.RUNNING) &&
  931. !tip.wasKilled) {
  932. if (killMe == null) {
  933. killMe = tip;
  934. } else if (!tip.getTask().isMapTask()) {
  935. //reduce task, give priority
  936. if (killMe.getTask().isMapTask() ||
  937. (tip.getTask().getProgress().get() <
  938. killMe.getTask().getProgress().get())) {
  939. killMe = tip;
  940. }
  941. } else if (killMe.getTask().isMapTask() &&
  942. tip.getTask().getProgress().get() <
  943. killMe.getTask().getProgress().get()) {
  944. //map task, only add if the progress is lower
  945. killMe = tip;
  946. }
  947. }
  948. }
  949. return killMe;
  950. }
  951. /**
  952. * Check if all of the local directories have enough
  953. * free space
  954. *
  955. * If not, do not try to get a new task assigned
  956. * @return
  957. * @throws IOException
  958. */
  959. private boolean enoughFreeSpace(long minSpace) throws IOException {
  960. if (minSpace == 0) {
  961. return true;
  962. }
  963. String[] localDirs = fConf.getLocalDirs();
  964. for (int i = 0; i < localDirs.length; i++) {
  965. DF df = null;
  966. if (localDirsDf.containsKey(localDirs[i])) {
  967. df = localDirsDf.get(localDirs[i]);
  968. } else {
  969. df = new DF(new File(localDirs[i]), fConf);
  970. localDirsDf.put(localDirs[i], df);
  971. }
  972. if (df.getAvailable() < minSpace)
  973. return false;
  974. }
  975. return true;
  976. }
  977. /**
  978. * Start a new task.
  979. * All exceptions are handled locally, so that we don't mess up the
  980. * task tracker.
  981. */
  982. private void startNewTask(LaunchTaskAction action) {
  983. Task t = action.getTask();
  984. LOG.info("LaunchTaskAction: " + t.getTaskId());
  985. TaskInProgress tip = new TaskInProgress(t, this.fConf);
  986. synchronized (this) {
  987. tasks.put(t.getTaskId(), tip);
  988. runningTasks.put(t.getTaskId(), tip);
  989. boolean isMap = t.isMapTask();
  990. if (isMap) {
  991. mapTotal++;
  992. } else {
  993. reduceTotal++;
  994. }
  995. }
  996. try {
  997. localizeJob(tip);
  998. } catch (Throwable e) {
  999. String msg = ("Error initializing " + tip.getTask().getTaskId() +
  1000. ":\n" + StringUtils.stringifyException(e));
  1001. LOG.warn(msg);
  1002. tip.reportDiagnosticInfo(msg);
  1003. try {
  1004. tip.kill(true);
  1005. } catch (IOException ie2) {
  1006. LOG.info("Error cleaning up " + tip.getTask().getTaskId() + ":\n" +
  1007. StringUtils.stringifyException(ie2));
  1008. }
  1009. // Careful!
  1010. // This might not be an 'Exception' - don't handle 'Error' here!
  1011. if (e instanceof Error) {
  1012. throw ((Error) e);
  1013. }
  1014. }
  1015. }
  1016. /**
  1017. * The server retry loop.
  1018. * This while-loop attempts to connect to the JobTracker. It only
  1019. * loops when the old TaskTracker has gone bad (its state is
  1020. * stale somehow) and we need to reinitialize everything.
  1021. */
  1022. public void run() {
  1023. try {
  1024. boolean denied = false;
  1025. while (running && !shuttingDown && !denied) {
  1026. boolean staleState = false;
  1027. try {
  1028. // This while-loop attempts reconnects if we get network errors
  1029. while (running && !staleState && !shuttingDown && !denied) {
  1030. try {
  1031. State osState = offerService();
  1032. if (osState == State.STALE) {
  1033. staleState = true;
  1034. } else if (osState == State.DENIED) {
  1035. denied = true;
  1036. }
  1037. } catch (Exception ex) {
  1038. if (!shuttingDown) {
  1039. LOG.info("Lost connection to JobTracker [" +
  1040. jobTrackAddr + "]. Retrying...", ex);
  1041. try {
  1042. Thread.sleep(5000);
  1043. } catch (InterruptedException ie) {
  1044. }
  1045. }
  1046. }
  1047. }
  1048. } finally {
  1049. close();
  1050. }
  1051. if (shuttingDown) { return; }
  1052. LOG.warn("Reinitializing local state");
  1053. initialize();
  1054. }
  1055. if (denied) {
  1056. shutdown();
  1057. }
  1058. } catch (IOException iex) {
  1059. LOG.error("Got fatal exception while reinitializing TaskTracker: " +
  1060. StringUtils.stringifyException(iex));
  1061. return;
  1062. }
  1063. }
  1064. ///////////////////////////////////////////////////////
  1065. // TaskInProgress maintains all the info for a Task that
  1066. // lives at this TaskTracker. It maintains the Task object,
  1067. // its TaskStatus, and the TaskRunner.
  1068. ///////////////////////////////////////////////////////
  1069. class TaskInProgress {
  1070. Task task;
  1071. float progress;
  1072. volatile TaskStatus.State runstate;
  1073. long lastProgressReport;
  1074. StringBuffer diagnosticInfo = new StringBuffer();
  1075. private TaskRunner runner;
  1076. volatile boolean done = false;
  1077. boolean wasKilled = false;
  1078. private JobConf defaultJobConf;
  1079. private JobConf localJobConf;
  1080. private boolean keepFailedTaskFiles;
  1081. private boolean alwaysKeepTaskFiles;
  1082. private TaskStatus taskStatus;
  1083. private long taskTimeout;
  1084. /**
  1085. */
  1086. public TaskInProgress(Task task, JobConf conf) {
  1087. this.task = task;
  1088. this.progress = 0.0f;
  1089. this.runstate = TaskStatus.State.UNASSIGNED;
  1090. this.lastProgressReport = System.currentTimeMillis();
  1091. this.defaultJobConf = conf;
  1092. localJobConf = null;
  1093. taskStatus = new TaskStatus(task.getTaskId(),
  1094. task.isMapTask(),
  1095. progress, runstate,
  1096. diagnosticInfo.toString(),
  1097. "initializing",
  1098. getName(), task.isMapTask()? TaskStatus.Phase.MAP:
  1099. TaskStatus.Phase.SHUFFLE,
  1100. task.getCounters());
  1101. taskTimeout = (10 * 60 * 1000);
  1102. }
  1103. private void localizeTask(Task task) throws IOException{
  1104. Path localTaskDir =
  1105. new Path(this.defaultJobConf.getLocalPath(TaskTracker.getJobCacheSubdir()),
  1106. (task.getJobId() + Path.SEPARATOR + task.getTaskId()));
  1107. FileSystem localFs = FileSystem.getLocal(fConf);
  1108. if (!localFs.mkdirs(localTaskDir)) {
  1109. throw new IOException("Mkdirs failed to create " + localTaskDir.toString());
  1110. }
  1111. Path localTaskFile = new Path(localTaskDir, "job.xml");
  1112. task.setJobFile(localTaskFile.toString());
  1113. localJobConf.set("mapred.local.dir",
  1114. fConf.get("mapred.local.dir"));
  1115. localJobConf.set("mapred.task.id", task.getTaskId());
  1116. keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
  1117. task.localizeConfiguration(localJobConf);
  1118. OutputStream out = localFs.create(localTaskFile);
  1119. try {
  1120. localJobConf.write(out);
  1121. } finally {
  1122. out.close();
  1123. }
  1124. task.setConf(localJobConf);
  1125. String keepPattern = localJobConf.getKeepTaskFilesPattern();
  1126. if (keepPattern != null) {
  1127. alwaysKeepTaskFiles =
  1128. Pattern.matches(keepPattern, task.getTaskId());
  1129. } else {
  1130. alwaysKeepTaskFiles = false;
  1131. }
  1132. }
  1133. /**
  1134. */
  1135. public Task getTask() {
  1136. return task;
  1137. }
  1138. public void setJobConf(JobConf lconf){
  1139. this.localJobConf = lconf;
  1140. keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
  1141. taskTimeout = localJobConf.getLong("mapred.task.timeout",
  1142. 10 * 60 * 1000);
  1143. }
  1144. public JobConf getJobConf() {
  1145. return localJobConf;
  1146. }
  1147. /**
  1148. */
  1149. public synchronized TaskStatus createStatus() {
  1150. taskStatus.setProgress(progress);
  1151. taskStatus.setRunState(runstate);
  1152. taskStatus.setDiagnosticInfo(diagnosticInfo.toString());
  1153. if (diagnosticInfo.length() > 0) {
  1154. diagnosticInfo = new StringBuffer();
  1155. }
  1156. return taskStatus;
  1157. }
  1158. /**
  1159. * Kick off the task execution
  1160. */
  1161. public synchronized void launchTask() throws IOException {
  1162. localizeTask(task);
  1163. this.runstate = TaskStatus.State.RUNNING;
  1164. this.runner = task.createRunner(TaskTracker.this);
  1165. this.runner.start();
  1166. this.taskStatus.setStartTime(System.currentTimeMillis());
  1167. }
  1168. /**
  1169. * The task is reporting its progress
  1170. */
  1171. public synchronized void reportProgress(float p, String state,
  1172. TaskStatus.Phase newPhase,
  1173. Counters counters)
  1174. {
  1175. if (this.done) {
  1176. //make sure we ignore progress messages after a task has
  1177. //invoked TaskUmbilicalProtocol.done()
  1178. return;
  1179. }
  1180. LOG.info(task.getTaskId()+" "+p+"% "+state);
  1181. this.progress = p;
  1182. this.runstate = TaskStatus.State.RUNNING;
  1183. this.lastProgressReport = System.currentTimeMillis();
  1184. TaskStatus.Phase oldPhase = taskStatus.getPhase();
  1185. if (oldPhase != newPhase){
  1186. // sort phase started
  1187. if (newPhase == TaskStatus.Phase.SORT){
  1188. this.taskStatus.setShuffleFinishTime(System.currentTimeMillis());
  1189. }else if (newPhase == TaskStatus.Phase.REDUCE){
  1190. this.taskStatus.setSortFinishTime(System.currentTimeMillis());
  1191. }
  1192. this.taskStatus.setPhase(newPhase);
  1193. }
  1194. this.taskStatus.setStateString(state);
  1195. this.taskStatus.setCounters(counters);
  1196. }
  1197. /**
  1198. */
  1199. public long getLastProgressReport() {
  1200. return lastProgressReport;
  1201. }
  1202. /**
  1203. */
  1204. public TaskStatus.State getRunState() {
  1205. return runstate;
  1206. }
  1207. /**
  1208. * The task's configured timeout.
  1209. *
  1210. * @return the task's configured timeout.
  1211. */
  1212. public long getTaskTimeout() {
  1213. return taskTimeout;
  1214. }
  1215. /**
  1216. * The task has reported some diagnostic info about its status
  1217. */
  1218. public synchronized void reportDiagnosticInfo(String info) {
  1219. this.diagnosticInfo.append(info);
  1220. }
  1221. /**
  1222. * The task is reporting that it's done running
  1223. */
  1224. public synchronized void reportDone() {
  1225. LOG.info("Task " + task.getTaskId() + " is done.");
  1226. this.progress = 1.0f;
  1227. this.taskStatus.setFinishTime(System.currentTimeMillis());
  1228. this.done = true;
  1229. }
  1230. /**
  1231. * The task has actually finished running.
  1232. */
  1233. public void taskFinished() {
  1234. long start = System.currentTimeMillis();
  1235. //
  1236. // Wait until task reports as done. If it hasn't reported in,
  1237. // wait for a second and try again.
  1238. //
  1239. while (!done && (System.currentTimeMillis() - start < WAIT_FOR_DONE)) {
  1240. try {
  1241. Thread.sleep(1000);
  1242. } catch (InterruptedException ie) {
  1243. }
  1244. }
  1245. //
  1246. // Change state to success or failure, depending on whether
  1247. // task was 'done' before terminating
  1248. //
  1249. boolean needCleanup = false;
  1250. synchronized (this) {
  1251. if (done) {
  1252. runstate = TaskStatus.State.SUCCEEDED;
  1253. } else {
  1254. if (!wasKilled) {
  1255. failures += 1;
  1256. runstate = TaskStatus.State.FAILED;
  1257. } else {
  1258. runstate = TaskStatus.State.KILLED;
  1259. }
  1260. progress = 0.0f;
  1261. }
  1262. this.taskStatus.setFinishTime(System.currentTimeMillis());
  1263. needCleanup = (runstate == TaskStatus.State.FAILED) |
  1264. (runstate == TaskStatus.State.KILLED);
  1265. }
  1266. //
  1267. // If the task has failed, or if the task was killAndCleanup()'ed,
  1268. // we should clean up right away. We only wait to cleanup
  1269. // if the task succeeded, and its results might be useful
  1270. // later on to downstream job processing.
  1271. //
  1272. if (needCleanup) {
  1273. try {
  1274. cleanup();
  1275. } catch (IOException ie) {
  1276. }
  1277. }
  1278. }
  1279. /**
  1280. * We no longer need anything from this task, as the job has
  1281. * finished. If the task is still running, kill it and clean up.
  1282. *
  1283. * @param wasFailure did the task fail, as opposed to was it killed by
  1284. * the framework
  1285. */
  1286. public void jobHasFinished(boolean wasFailure) throws IOException {
  1287. // Kill the task if it is still running
  1288. synchronized(this){
  1289. if (getRunState() == TaskStatus.State.RUNNING) {
  1290. kill(wasFailure);
  1291. }
  1292. }
  1293. // Cleanup on the finished task
  1294. cleanup();
  1295. }
  1296. /**
  1297. * Something went wrong and the task must be killed.
  1298. * @param wasFailure was it a failure (versus a kill request)?
  1299. */
  1300. public synchronized void kill(boolean wasFailure) throws IOException {
  1301. if (runstate == TaskStatus.State.RUNNING) {
  1302. wasKilled = true;
  1303. if (wasFailure) {
  1304. failures += 1;
  1305. }
  1306. runner.kill();
  1307. runstate =
  1308. (wasFailure) ? TaskStatus.State.FAILED : TaskStatus.State.KILLED;
  1309. } else if (runstate == TaskStatus.State.UNASSIGNED) {
  1310. if (wasFailure) {
  1311. failures += 1;
  1312. runstate = TaskStatus.State.FAILED;
  1313. } else {
  1314. runstate = TaskStatus.State.KILLED;
  1315. }
  1316. }
  1317. }
  1318. /**
  1319. * The map output has been lost.
  1320. */
  1321. private synchronized void mapOutputLost(String failure
  1322. ) throws IOException {
  1323. if (runstate == TaskStatus.State.SUCCEEDED) {
  1324. LOG.info("Reporting output lost:"+task.getTaskId());
  1325. runstate = TaskStatus.State.FAILED; // change status to failure
  1326. progress = 0.0f;
  1327. reportDiagnosticInfo("Map output lost, rescheduling: " +
  1328. failure);
  1329. runningTasks.put(task.getTaskId(), this);
  1330. mapTotal++;
  1331. } else {
  1332. LOG.warn("Output already reported lost:"+task.getTaskId());
  1333. }
  1334. }
  1335. /**
  1336. * We no longer need anything from this task. Either the
  1337. * controlling job is all done and the files have been copied
  1338. * away, or the task failed and we don't need the remains.
  1339. * Any calls to cleanup should not lock the tip first.
  1340. * cleanup does the right thing- updates tasks in Tasktracker
  1341. * by locking tasktracker first and then locks the tip.
  1342. */
  1343. void cleanup() throws IOException {
  1344. String taskId = task.getTaskId();
  1345. LOG.debug("Cleaning up " + taskId);
  1346. synchronized (TaskTracker.this) {
  1347. tasks.remove(taskId);
  1348. if (alwaysKeepTaskFiles ||
  1349. (runstate == TaskStatus.State.FAILED &&
  1350. keepFailedTaskFiles)) {
  1351. return;
  1352. }
  1353. }
  1354. synchronized (this) {
  1355. try {
  1356. if (runner != null) {
  1357. runner.close();
  1358. }
  1359. defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR +
  1360. JOBCACHE + Path.SEPARATOR +
  1361. task.getJobId() +
  1362. Path.SEPARATOR + taskId);
  1363. } catch (Throwable ie) {
  1364. LOG.info("Error cleaning up task runner: " +
  1365. StringUtils.stringifyException(ie));
  1366. }
  1367. }
  1368. }
  1369. public boolean equals(Object obj) {
  1370. return (obj instanceof TaskInProgress) &&
  1371. task.getTaskId().equals
  1372. (((TaskInProgress) obj).getTask().getTaskId());
  1373. }
  1374. public int hashCode() {
  1375. return task.getTaskId().hashCode();
  1376. }
  1377. }
  1378. // ///////////////////////////////////////////////////////////////
  1379. // TaskUmbilicalProtocol
  1380. /////////////////////////////////////////////////////////////////
  1381. /**
  1382. * Called upon startup by the child process, to fetch Task data.
  1383. */
  1384. public synchronized Task getTask(String taskid) throws IOException {
  1385. TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
  1386. if (tip != null) {
  1387. return (Task) tip.getTask();
  1388. } else {
  1389. return null;
  1390. }
  1391. }
  1392. /**
  1393. * Called periodically to report Task progress, from 0.0 to 1.0.
  1394. */
  1395. public synchronized void progress(String taskid, float progress,
  1396. String state,
  1397. TaskStatus.Phase phase,
  1398. Counters counters
  1399. ) throws IOException {
  1400. TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
  1401. if (tip != null) {
  1402. tip.reportProgress(progress, state, phase, counters);
  1403. } else {
  1404. LOG.warn("Progress from unknown child task: "+taskid+". Ignored.");
  1405. }
  1406. }
  1407. /**
  1408. * Called when the task dies before completion, and we want to report back
  1409. * diagnostic info
  1410. */
  1411. public synchronized void reportDiagnosticInfo(String taskid, String info) throws IOException {
  1412. TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
  1413. if (tip != null) {
  1414. tip.reportDiagnosticInfo(info);
  1415. } else {
  1416. LOG.warn("Error from unknown child task: "+taskid+". Ignored.");
  1417. }
  1418. }
  1419. /** Child checking to see if we're alive. Normally does nothing.*/
  1420. public synchronized boolean ping(String taskid) throws IOException {
  1421. return tasks.get(taskid) != null;
  1422. }
  1423. /**
  1424. * The task is done.
  1425. */
  1426. public synchronized void done(String taskid) throws IOException {
  1427. TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
  1428. if (tip != null) {
  1429. tip.reportDone();
  1430. } else {
  1431. LOG.warn("Unknown child task done: "+taskid+". Ignored.");
  1432. }
  1433. }
  1434. /**
  1435. * A child task had a local filesystem error. Kill the task.
  1436. */
  1437. public synchronized void fsError(String taskId, String message)
  1438. throws IOException {
  1439. LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message);
  1440. TaskInProgress tip = runningTasks.get(taskId);
  1441. tip.reportDiagnosticInfo("FSError: " + message);
  1442. purgeTask(tip, true);
  1443. }
  1444. public TaskCompletionEvent[] getMapCompletionEvents(
  1445. String jobId, int fromEventId, int maxLocs) throws IOException {
  1446. TaskCompletionEvent[]mapEvents = TaskCompletionEvent.EMPTY_ARRAY;
  1447. RunningJob rjob;
  1448. synchronized (runningJobs) {
  1449. rjob = runningJobs.get(jobId);
  1450. if (rjob != null) {
  1451. synchronized (rjob) {
  1452. FetchStatus f = rjob.getFetchStatus();
  1453. if (f != null) {
  1454. mapEvents = f.getMapEvents(fromEventId, maxLocs);
  1455. }
  1456. }
  1457. }
  1458. }
  1459. return mapEvents;
  1460. }
  1461. /////////////////////////////////////////////////////
  1462. // Called by TaskTracker thread after task process ends
  1463. /////////////////////////////////////////////////////
  1464. /**
  1465. * The task is no longer running. It may not have completed successfully
  1466. */
  1467. void reportTaskFinished(String taskid) {
  1468. TaskInProgress tip;
  1469. synchronized (this) {
  1470. tip = (TaskInProgress) tasks.get(taskid);
  1471. }
  1472. if (tip != null) {
  1473. tip.taskFinished();
  1474. synchronized(finishedCount) {
  1475. finishedCount[0]++;
  1476. finishedCount.notify();
  1477. }
  1478. } else {
  1479. LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");
  1480. }
  1481. }
  1482. /**
  1483. * A completed map task's output has been lost.
  1484. */
  1485. public synchronized void mapOutputLost(String taskid,
  1486. String errorMsg) throws IOException {
  1487. TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
  1488. if (tip != null) {
  1489. tip.mapOutputLost(errorMsg);
  1490. } else {
  1491. LOG.warn("Unknown child with bad map output: "+taskid+". Ignored.");
  1492. }
  1493. }
  1494. /**
  1495. * The datastructure for initializing a job
  1496. */
  1497. static class RunningJob{
  1498. private String jobid;
  1499. private Path jobFile;
  1500. // keep this for later use
  1501. Set<TaskInProgress> tasks;
  1502. boolean localized;
  1503. boolean keepJobFiles;
  1504. FetchStatus f;
  1505. RunningJob(String jobid, Path jobFile) {
  1506. this.jobid = jobid;
  1507. localized = false;
  1508. tasks = new HashSet<TaskInProgress>();
  1509. this.jobFile = jobFile;
  1510. keepJobFiles = false;
  1511. }
  1512. Path getJobFile() {
  1513. return jobFile;
  1514. }
  1515. String getJobId() {
  1516. return jobid;
  1517. }
  1518. void setFetchStatus(FetchStatus f) {
  1519. this.f = f;
  1520. }
  1521. FetchStatus getFetchStatus() {
  1522. return f;
  1523. }
  1524. }
  1525. /**
  1526. * The main() for child processes.
  1527. */
  1528. public static class Child {
  1529. public static void main(String[] args) throws Throwable {
  1530. //LogFactory.showTime(false);
  1531. LOG.debug("Child starting");
  1532. JobConf defaultConf = new JobConf();
  1533. int port = Integer.parseInt(args[0]);
  1534. String taskid = args[1];
  1535. TaskUmbilicalProtocol umbilical =
  1536. (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
  1537. TaskUmbilicalProtocol.versionID,
  1538. new InetSocketAddress(port),
  1539. defaultConf);
  1540. Task task = umbilical.getTask(taskid);
  1541. JobConf job = new JobConf(task.getJobFile());
  1542. task.setConf(job);
  1543. defaultConf.addFinalResource(new Path(task.getJobFile()));
  1544. startPinging(umbilical, taskid); // start pinging parent
  1545. try {
  1546. // use job-specified working directory
  1547. FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
  1548. task.run(job, umbilical); // run the task
  1549. } catch (FSError e) {
  1550. LOG.fatal("FSError from child", e);
  1551. umbilical.fsError(taskid, e.getMessage());
  1552. } catch (Throwable throwable) {
  1553. LOG.warn("Error running child", throwable);
  1554. // Report back any failures, for diagnostic purposes
  1555. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  1556. throwable.printStackTrace(new PrintStream(baos));
  1557. umbilical.reportDiagnosticInfo(taskid, baos.toString());
  1558. } finally {
  1559. MetricsContext metricsContext = MetricsUtil.getContext("mapred");
  1560. metricsContext.close();
  1561. // Shutting down log4j of the child-vm...
  1562. // This assumes that on return from Task.run()
  1563. // there is no more logging done.
  1564. LogManager.shutdown();
  1565. }
  1566. }
  1567. /** Periodically ping parent and exit when this fails.*/
  1568. private static void startPinging(final TaskUmbilicalProtocol umbilical,
  1569. final String taskid) {
  1570. Thread thread = new Thread(new Runnable() {
  1571. public void run() {
  1572. final int MAX_RETRIES = 3;
  1573. int remainingRetries = MAX_RETRIES;
  1574. while (true) {
  1575. try {
  1576. if (!umbilical.ping(taskid)) {
  1577. LOG.warn("Parent died. Exiting "+taskid);
  1578. System.exit(66);
  1579. }
  1580. remainingRetries = MAX_RETRIES;
  1581. } catch (Throwable t) {
  1582. String msg = StringUtils.stringifyException(t);
  1583. LOG.info("Ping exception: " + msg);
  1584. remainingRetries -=1;
  1585. if (remainingRetries == 0) {
  1586. ReflectionUtils.logThreadInfo(LOG, "ping exception", 0);
  1587. LOG.warn("Last retry, killing "+taskid);
  1588. System.exit(65);
  1589. }
  1590. }
  1591. try {
  1592. Thread.sleep(1000);
  1593. } catch (InterruptedException e) {
  1594. }
  1595. }
  1596. }
  1597. }, "Pinger for "+taskid);
  1598. thread.setDaemon(true);
  1599. thread.start();
  1600. }
  1601. }
  1602. /**
  1603. * Get the name for this task tracker.
  1604. * @return the string like "tracker_mymachine:50010"
  1605. */
  1606. String getName() {
  1607. return taskTrackerName;
  1608. }
  1609. /**
  1610. * Get the list of tasks that will be reported back to the
  1611. * job tracker in the next heartbeat cycle.
  1612. * @return a copy of the list of TaskStatus objects
  1613. */
  1614. synchronized List<TaskStatus> getRunningTaskStatuses() {
  1615. List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
  1616. for(TaskInProgress tip: runningTasks.values()) {
  1617. result.add(tip.createStatus());
  1618. }
  1619. return result;
  1620. }
  1621. /**
  1622. * Get the list of stored tasks on this task tracker.
  1623. * @return
  1624. */
  1625. synchronized List<TaskStatus> getNonRunningTasks() {
  1626. List<TaskStatus> result = new ArrayList<TaskStatus>(tasks.size());
  1627. for(Map.Entry<String, TaskInProgress> task: tasks.entrySet()) {
  1628. if (!runningTasks.containsKey(task.getKey())) {
  1629. result.add(task.getValue().createStatus());
  1630. }
  1631. }
  1632. return result;
  1633. }
  1634. /**
  1635. * Get the default job conf for this tracker.
  1636. */
  1637. JobConf getJobConf() {
  1638. return fConf;
  1639. }
  1640. /**
  1641. * Check if the given local directories
  1642. * (and parent directories, if necessary) can be created.
  1643. * @param localDirs where the new TaskTracker should keep its local files.
  1644. * @throws DiskErrorException if all local directories are not writable
  1645. * @author hairong
  1646. */
  1647. private static void checkLocalDirs(String[] localDirs)
  1648. throws DiskErrorException {
  1649. boolean writable = false;
  1650. if (localDirs != null) {
  1651. for (int i = 0; i < localDirs.length; i++) {
  1652. try {
  1653. DiskChecker.checkDir(new File(localDirs[i]));
  1654. writable = true;
  1655. } catch(DiskErrorException e) {
  1656. LOG.warn("Task Tracker local " + e.getMessage());
  1657. }
  1658. }
  1659. }
  1660. if (!writable)
  1661. throw new DiskErrorException(
  1662. "all local directories are not writable");
  1663. }
  1664. /**
  1665. * Is this task tracker idle?
  1666. * @return has this task tracker finished and cleaned up all of its tasks?
  1667. */
  1668. public synchronized boolean isIdle() {
  1669. return tasks.isEmpty() && tasksToCleanup.isEmpty();
  1670. }
  1671. /**
  1672. * Start the TaskTracker, point toward the indicated JobTracker
  1673. */
  1674. public static void main(String argv[]) throws Exception {
  1675. if (argv.length != 0) {
  1676. System.out.println("usage: TaskTracker");
  1677. System.exit(-1);
  1678. }
  1679. try {
  1680. JobConf conf=new JobConf();
  1681. // enable the server to track time spent waiting on locks
  1682. ReflectionUtils.setContentionTracing
  1683. (conf.getBoolean("tasktracker.contention.tracking", false));
  1684. new TaskTracker(conf).run();
  1685. } catch (Throwable e) {
  1686. LOG.error("Can not start task tracker because "+
  1687. StringUtils.stringifyException(e));
  1688. System.exit(-1);
  1689. }
  1690. }
  1691. /**
  1692. * This class is used in TaskTracker's Jetty to serve the map outputs
  1693. * to other nodes.
  1694. * @author Owen O'Malley
  1695. */
  1696. public static class MapOutputServlet extends HttpServlet {
  1697. private static final int MAX_BYTES_TO_READ = 64 * 1024;
  1698. public void doGet(HttpServletRequest request,
  1699. HttpServletResponse response
  1700. ) throws ServletException, IOException {
  1701. String mapId = request.getParameter("map");
  1702. String reduceId = request.getParameter("reduce");
  1703. if (mapId == null || reduceId == null) {
  1704. throw new IOException("map and reduce parameters are required");
  1705. }
  1706. ServletContext context = getServletContext();
  1707. int reduce = Integer.parseInt(reduceId);
  1708. byte[] buffer = new byte[MAX_BYTES_TO_READ];
  1709. OutputStream outStream = response.getOutputStream();
  1710. JobConf conf = (JobConf) context.getAttribute("conf");
  1711. LocalDirAllocator lDirAlloc =
  1712. (LocalDirAllocator)context.getAttribute("localDirAllocator");
  1713. FileSystem fileSys =
  1714. (FileSystem) context.getAttribute("local.file.system");
  1715. // Index file
  1716. Path indexFileName = lDirAlloc.getLocalPathToRead(
  1717. mapId+"/file.out.index", conf);
  1718. FSDataInputStream indexIn = null;
  1719. // Map-output file
  1720. Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
  1721. mapId+"/file.out", conf);
  1722. FSDataInputStream mapOutputIn = null;
  1723. // true iff IOException was caused by attempt to access input
  1724. boolean isInputException = true;
  1725. try {
  1726. /**
  1727. * Read the index file to get the information about where
  1728. * the map-output for the given reducer is available.
  1729. */
  1730. //open index file
  1731. indexIn = fileSys.open(indexFileName);
  1732. //seek to the correct offset for the given reduce
  1733. indexIn.seek(reduce * 16);
  1734. //read the offset and length of the partition data
  1735. long startOffset = indexIn.readLong();
  1736. long partLength = indexIn.readLong();
  1737. indexIn.close();
  1738. indexIn = null;
  1739. //set the custom "Map-Output-Length" http header to
  1740. //the actual number of bytes being transferred
  1741. response.setHeader(MAP_OUTPUT_LENGTH, Long.toString(partLength));
  1742. //use the same buffersize as used for reading the data from disk
  1743. response.setBufferSize(MAX_BYTES_TO_READ);
  1744. /**
  1745. * Read the data from the sigle map-output file and
  1746. * send it to the reducer.
  1747. */
  1748. //open the map-output file
  1749. mapOutputIn = fileSys.open(mapOutputFileName);
  1750. //seek to the correct offset for the reduce
  1751. mapOutputIn.seek(startOffset);
  1752. long totalRead = 0;
  1753. int len = mapOutputIn.read(buffer, 0,
  1754. partLength < MAX_BYTES_TO_READ
  1755. ? (int)partLength : MAX_BYTES_TO_READ);
  1756. while (len > 0) {
  1757. try {
  1758. outStream.write(buffer, 0, len);
  1759. outStream.flush();
  1760. } catch (IOException ie) {
  1761. isInputException = false;
  1762. throw ie;
  1763. }
  1764. totalRead += len;
  1765. if (totalRead == partLength) break;
  1766. len = mapOutputIn.read(buffer, 0,
  1767. (partLength - totalRead) < MAX_BYTES_TO_READ
  1768. ? (int)(partLength - totalRead) : MAX_BYTES_TO_READ);
  1769. }
  1770. } catch (IOException ie) {
  1771. TaskTracker tracker =
  1772. (TaskTracker) context.getAttribute("task.tracker");
  1773. Log log = (Log) context.getAttribute("log");
  1774. String errorMsg = ("getMapOutput(" + mapId + "," + reduceId +
  1775. ") failed :\n"+
  1776. StringUtils.stringifyException(ie));
  1777. log.warn(errorMsg);
  1778. if (isInputException) {
  1779. tracker.mapOutputLost(mapId, errorMsg);
  1780. }
  1781. response.sendError(HttpServletResponse.SC_GONE, errorMsg);
  1782. throw ie;
  1783. } finally {
  1784. if (indexIn != null) {
  1785. indexIn.close();
  1786. }
  1787. if (mapOutputIn != null) {
  1788. mapOutputIn.close();
  1789. }
  1790. }
  1791. outStream.close();
  1792. }
  1793. }
  1794. }