TaskTracker.java 79 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.hadoop.mapred;
  19. import java.io.ByteArrayOutputStream;
  20. import java.io.File;
  21. import java.io.FileNotFoundException;
  22. import java.io.IOException;
  23. import java.io.OutputStream;
  24. import java.io.PrintStream;
  25. import java.io.RandomAccessFile;
  26. import java.net.InetSocketAddress;
  27. import java.net.URI;
  28. import java.net.URISyntaxException;
  29. import java.util.ArrayList;
  30. import java.util.HashMap;
  31. import java.util.HashSet;
  32. import java.util.Iterator;
  33. import java.util.List;
  34. import java.util.Map;
  35. import java.util.Random;
  36. import java.util.Set;
  37. import java.util.TreeMap;
  38. import java.util.concurrent.BlockingQueue;
  39. import java.util.concurrent.LinkedBlockingQueue;
  40. import java.util.regex.Pattern;
  41. import java.util.Vector;
  42. import javax.servlet.ServletContext;
  43. import javax.servlet.ServletException;
  44. import javax.servlet.http.HttpServlet;
  45. import javax.servlet.http.HttpServletRequest;
  46. import javax.servlet.http.HttpServletResponse;
  47. import org.apache.commons.logging.Log;
  48. import org.apache.commons.logging.LogFactory;
  49. import org.apache.hadoop.filecache.DistributedCache;
  50. import org.apache.hadoop.fs.DF;
  51. import org.apache.hadoop.fs.FSDataInputStream;
  52. import org.apache.hadoop.fs.FSError;
  53. import org.apache.hadoop.fs.FileStatus;
  54. import org.apache.hadoop.fs.FileSystem;
  55. import org.apache.hadoop.fs.Path;
  56. import org.apache.hadoop.fs.LocalDirAllocator;
  57. import org.apache.hadoop.fs.FileUtil;
  58. import org.apache.hadoop.util.Shell.ShellCommandExecutor;
  59. import org.apache.hadoop.io.IntWritable;
  60. import org.apache.hadoop.ipc.RPC;
  61. import org.apache.hadoop.ipc.RemoteException;
  62. import org.apache.hadoop.ipc.Server;
  63. import org.apache.hadoop.mapred.pipes.Submitter;
  64. import org.apache.hadoop.metrics.MetricsContext;
  65. import org.apache.hadoop.metrics.MetricsException;
  66. import org.apache.hadoop.metrics.MetricsRecord;
  67. import org.apache.hadoop.metrics.MetricsUtil;
  68. import org.apache.hadoop.metrics.Updater;
  69. import org.apache.hadoop.metrics.jvm.JvmMetrics;
  70. import org.apache.hadoop.net.DNS;
  71. import org.apache.hadoop.net.NetUtils;
  72. import org.apache.hadoop.util.DiskChecker;
  73. import org.apache.hadoop.util.ReflectionUtils;
  74. import org.apache.hadoop.util.RunJar;
  75. import org.apache.hadoop.util.StringUtils;
  76. import org.apache.hadoop.util.DiskChecker.DiskErrorException;
  77. import org.apache.log4j.LogManager;
  78. /*******************************************************
  79. * TaskTracker is a process that starts and tracks MR Tasks
  80. * in a networked environment. It contacts the JobTracker
  81. * for Task assignments and reporting results.
  82. *
  83. *******************************************************/
  84. public class TaskTracker
  85. implements MRConstants, TaskUmbilicalProtocol, Runnable {
  86. static final long WAIT_FOR_DONE = 3 * 1000;
  87. private int httpPort;
  88. static enum State {NORMAL, STALE, INTERRUPTED, DENIED}
  89. public static final Log LOG =
  90. LogFactory.getLog("org.apache.hadoop.mapred.TaskTracker");
  91. private boolean running = true;
  92. private LocalDirAllocator localDirAllocator;
  93. String taskTrackerName;
  94. String localHostname;
  95. InetSocketAddress jobTrackAddr;
  96. InetSocketAddress taskReportAddress;
  97. Server taskReportServer = null;
  98. InterTrackerProtocol jobClient;
  99. // last heartbeat response recieved
  100. short heartbeatResponseId = -1;
  101. /*
  102. * This is the last 'status' report sent by this tracker to the JobTracker.
  103. *
  104. * If the rpc call succeeds, this 'status' is cleared-out by this tracker;
  105. * indicating that a 'fresh' status report be generated; in the event the
  106. * rpc calls fails for whatever reason, the previous status report is sent
  107. * again.
  108. */
  109. TaskTrackerStatus status = null;
  110. StatusHttpServer server = null;
  111. boolean shuttingDown = false;
  112. Map<String, TaskInProgress> tasks = new HashMap<String, TaskInProgress>();
  113. /**
  114. * Map from taskId -> TaskInProgress.
  115. */
  116. Map<String, TaskInProgress> runningTasks = null;
  117. Map<String, RunningJob> runningJobs = null;
  118. volatile int mapTotal = 0;
  119. volatile int reduceTotal = 0;
  120. boolean justStarted = true;
  121. //dir -> DF
  122. Map<String, DF> localDirsDf = new HashMap<String, DF>();
  123. long minSpaceStart = 0;
  124. //must have this much space free to start new tasks
  125. boolean acceptNewTasks = true;
  126. long minSpaceKill = 0;
  127. //if we run under this limit, kill one task
  128. //and make sure we never receive any new jobs
  129. //until all the old tasks have been cleaned up.
  130. //this is if a machine is so full it's only good
  131. //for serving map output to the other nodes
  132. static Random r = new Random();
  133. FileSystem fs = null;
  134. private static final String SUBDIR = "taskTracker";
  135. private static final String CACHEDIR = "archive";
  136. private static final String JOBCACHE = "jobcache";
  137. private JobConf originalConf;
  138. private JobConf fConf;
  139. private MapOutputFile mapOutputFile;
  140. private int maxCurrentMapTasks;
  141. private int maxCurrentReduceTasks;
  142. private int failures;
  143. private int finishedCount[] = new int[1];
  144. private MapEventsFetcherThread mapEventsFetcher;
  145. int workerThreads;
  146. /**
  147. * the minimum interval between jobtracker polls
  148. */
  149. private static final int MIN_POLL_INTERVAL = 5000;
  150. private volatile int heartbeatInterval = HEARTBEAT_INTERVAL_MIN;
  151. /**
  152. * Number of maptask completion events locations to poll for at one time
  153. */
  154. private int probe_sample_size = 50;
  155. private ShuffleServerMetrics shuffleServerMetrics;
  156. /** This class contains the methods that should be used for metrics-reporting
  157. * the specific metrics for shuffle. The TaskTracker is actually a server for
  158. * the shuffle and hence the name ShuffleServerMetrics.
  159. */
  160. private class ShuffleServerMetrics implements Updater {
  161. private MetricsRecord shuffleMetricsRecord = null;
  162. private int serverHandlerBusy = 0;
  163. private long outputBytes = 0;
  164. private int failedOutputs = 0;
  165. private int successOutputs = 0;
  166. ShuffleServerMetrics(JobConf conf) {
  167. MetricsContext context = MetricsUtil.getContext("mapred");
  168. shuffleMetricsRecord =
  169. MetricsUtil.createRecord(context, "shuffleOutput");
  170. this.shuffleMetricsRecord.setTag("sessionId", conf.getSessionId());
  171. context.registerUpdater(this);
  172. }
  173. synchronized void serverHandlerBusy() {
  174. ++serverHandlerBusy;
  175. }
  176. synchronized void serverHandlerFree() {
  177. --serverHandlerBusy;
  178. }
  179. synchronized void outputBytes(long bytes) {
  180. outputBytes += bytes;
  181. }
  182. synchronized void failedOutput() {
  183. ++failedOutputs;
  184. }
  185. synchronized void successOutput() {
  186. ++successOutputs;
  187. }
  188. public void doUpdates(MetricsContext unused) {
  189. synchronized (this) {
  190. if (workerThreads != 0) {
  191. shuffleMetricsRecord.setMetric("shuffle_handler_busy_percent",
  192. 100*((float)serverHandlerBusy/workerThreads));
  193. } else {
  194. shuffleMetricsRecord.setMetric("shuffle_handler_busy_percent", 0);
  195. }
  196. shuffleMetricsRecord.incrMetric("shuffle_output_bytes",
  197. outputBytes);
  198. shuffleMetricsRecord.incrMetric("shuffle_failed_outputs",
  199. failedOutputs);
  200. shuffleMetricsRecord.incrMetric("shuffle_success_outputs",
  201. successOutputs);
  202. outputBytes = 0;
  203. failedOutputs = 0;
  204. successOutputs = 0;
  205. }
  206. shuffleMetricsRecord.update();
  207. }
  208. }
  209. public class TaskTrackerMetrics implements Updater {
  210. private MetricsRecord metricsRecord = null;
  211. private int numCompletedTasks = 0;
  212. private int timedoutTasks = 0;
  213. private int tasksFailedPing = 0;
  214. TaskTrackerMetrics() {
  215. JobConf conf = getJobConf();
  216. String sessionId = conf.getSessionId();
  217. // Initiate Java VM Metrics
  218. JvmMetrics.init("TaskTracker", sessionId);
  219. // Create a record for Task Tracker metrics
  220. MetricsContext context = MetricsUtil.getContext("mapred");
  221. metricsRecord = MetricsUtil.createRecord(context, "tasktracker");
  222. metricsRecord.setTag("sessionId", sessionId);
  223. context.registerUpdater(this);
  224. }
  225. synchronized void completeTask() {
  226. ++numCompletedTasks;
  227. }
  228. synchronized void timedoutTask() {
  229. ++timedoutTasks;
  230. }
  231. synchronized void taskFailedPing() {
  232. ++tasksFailedPing;
  233. }
  234. /**
  235. * Since this object is a registered updater, this method will be called
  236. * periodically, e.g. every 5 seconds.
  237. */
  238. public void doUpdates(MetricsContext unused) {
  239. synchronized (this) {
  240. if (metricsRecord != null) {
  241. metricsRecord.setMetric("maps_running", mapTotal);
  242. metricsRecord.setMetric("reduces_running", reduceTotal);
  243. metricsRecord.setMetric("mapTaskSlots", (short)maxCurrentMapTasks);
  244. metricsRecord.setMetric("reduceTaskSlots",
  245. (short)maxCurrentReduceTasks);
  246. metricsRecord.incrMetric("tasks_completed", numCompletedTasks);
  247. metricsRecord.incrMetric("tasks_failed_timeout", timedoutTasks);
  248. metricsRecord.incrMetric("tasks_failed_ping", tasksFailedPing);
  249. }
  250. numCompletedTasks = 0;
  251. timedoutTasks = 0;
  252. tasksFailedPing = 0;
  253. }
  254. metricsRecord.update();
  255. }
  256. }
  257. private TaskTrackerMetrics myMetrics = null;
  258. public TaskTrackerMetrics getTaskTrackerMetrics() {
  259. return myMetrics;
  260. }
  261. /**
  262. * A list of tips that should be cleaned up.
  263. */
  264. private BlockingQueue<TaskTrackerAction> tasksToCleanup =
  265. new LinkedBlockingQueue<TaskTrackerAction>();
  266. /**
  267. * A daemon-thread that pulls tips off the list of things to cleanup.
  268. */
  269. private Thread taskCleanupThread =
  270. new Thread(new Runnable() {
  271. public void run() {
  272. while (true) {
  273. try {
  274. TaskTrackerAction action = tasksToCleanup.take();
  275. if (action instanceof KillJobAction) {
  276. purgeJob((KillJobAction) action);
  277. } else if (action instanceof KillTaskAction) {
  278. TaskInProgress tip;
  279. KillTaskAction killAction = (KillTaskAction) action;
  280. synchronized (TaskTracker.this) {
  281. tip = tasks.get(killAction.getTaskId());
  282. }
  283. LOG.info("Received KillTaskAction for task: " +
  284. killAction.getTaskId());
  285. purgeTask(tip, false);
  286. } else {
  287. LOG.error("Non-delete action given to cleanup thread: "
  288. + action);
  289. }
  290. } catch (Throwable except) {
  291. LOG.warn(StringUtils.stringifyException(except));
  292. }
  293. }
  294. }
  295. }, "taskCleanup");
  296. {
  297. taskCleanupThread.setDaemon(true);
  298. taskCleanupThread.start();
  299. }
  300. private RunningJob addTaskToJob(String jobId,
  301. Path localJobFile,
  302. TaskInProgress tip) {
  303. synchronized (runningJobs) {
  304. RunningJob rJob = null;
  305. if (!runningJobs.containsKey(jobId)) {
  306. rJob = new RunningJob(jobId, localJobFile);
  307. rJob.localized = false;
  308. rJob.tasks = new HashSet<TaskInProgress>();
  309. rJob.jobFile = localJobFile;
  310. runningJobs.put(jobId, rJob);
  311. } else {
  312. rJob = runningJobs.get(jobId);
  313. }
  314. synchronized (rJob) {
  315. rJob.tasks.add(tip);
  316. }
  317. runningJobs.notify(); //notify the fetcher thread
  318. return rJob;
  319. }
  320. }
  321. private void removeTaskFromJob(String jobId, TaskInProgress tip) {
  322. synchronized (runningJobs) {
  323. RunningJob rjob = runningJobs.get(jobId);
  324. if (rjob == null) {
  325. LOG.warn("Unknown job " + jobId + " being deleted.");
  326. } else {
  327. synchronized (rjob) {
  328. rjob.tasks.remove(tip);
  329. if (rjob.tasks.isEmpty()) {
  330. runningJobs.remove(jobId);
  331. }
  332. }
  333. }
  334. }
  335. }
  336. static String getCacheSubdir() {
  337. return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
  338. }
  339. static String getJobCacheSubdir() {
  340. return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
  341. }
  342. public long getProtocolVersion(String protocol,
  343. long clientVersion) throws IOException {
  344. if (protocol.equals(TaskUmbilicalProtocol.class.getName())) {
  345. return TaskUmbilicalProtocol.versionID;
  346. } else {
  347. throw new IOException("Unknown protocol for task tracker: " +
  348. protocol);
  349. }
  350. }
  351. /**
  352. * Do the real constructor work here. It's in a separate method
  353. * so we can call it again and "recycle" the object after calling
  354. * close().
  355. */
  356. synchronized void initialize() throws IOException {
  357. // use configured nameserver & interface to get local hostname
  358. this.fConf = new JobConf(originalConf);
  359. if (fConf.get("slave.host.name") != null) {
  360. this.localHostname = fConf.get("slave.host.name");
  361. }
  362. if (localHostname == null) {
  363. this.localHostname =
  364. DNS.getDefaultHost
  365. (fConf.get("mapred.tasktracker.dns.interface","default"),
  366. fConf.get("mapred.tasktracker.dns.nameserver","default"));
  367. }
  368. //check local disk
  369. checkLocalDirs(this.fConf.getLocalDirs());
  370. fConf.deleteLocalFiles(SUBDIR);
  371. // Clear out state tables
  372. this.tasks.clear();
  373. this.runningTasks = new TreeMap<String, TaskInProgress>();
  374. this.runningJobs = new TreeMap<String, RunningJob>();
  375. this.mapTotal = 0;
  376. this.reduceTotal = 0;
  377. this.acceptNewTasks = true;
  378. this.status = null;
  379. this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L);
  380. this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L);
  381. int numCopiers = this.fConf.getInt("mapred.reduce.parallel.copies", 5);
  382. //tweak the probe sample size (make it a function of numCopiers)
  383. probe_sample_size = Math.max(numCopiers*5, 50);
  384. this.myMetrics = new TaskTrackerMetrics();
  385. // bind address
  386. String address =
  387. NetUtils.getServerAddress(fConf,
  388. "mapred.task.tracker.report.bindAddress",
  389. "mapred.task.tracker.report.port",
  390. "mapred.task.tracker.report.address");
  391. InetSocketAddress socAddr = NetUtils.createSocketAddr(address);
  392. String bindAddress = socAddr.getHostName();
  393. int tmpPort = socAddr.getPort();
  394. // RPC initialization
  395. int max = maxCurrentMapTasks > maxCurrentReduceTasks ?
  396. maxCurrentMapTasks : maxCurrentReduceTasks;
  397. this.taskReportServer =
  398. RPC.getServer(this, bindAddress, tmpPort, max, false, this.fConf);
  399. this.taskReportServer.start();
  400. // get the assigned address
  401. this.taskReportAddress = taskReportServer.getListenerAddress();
  402. this.fConf.set("mapred.task.tracker.report.address",
  403. taskReportAddress.getHostName() + ":" + taskReportAddress.getPort());
  404. LOG.info("TaskTracker up at: " + this.taskReportAddress);
  405. this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress;
  406. LOG.info("Starting tracker " + taskTrackerName);
  407. // Clear out temporary files that might be lying around
  408. DistributedCache.purgeCache(this.fConf);
  409. this.mapOutputFile.cleanupStorage();
  410. this.justStarted = true;
  411. this.jobClient = (InterTrackerProtocol)
  412. RPC.waitForProxy(InterTrackerProtocol.class,
  413. InterTrackerProtocol.versionID,
  414. jobTrackAddr, this.fConf);
  415. this.running = true;
  416. // start the thread that will fetch map task completion events
  417. this.mapEventsFetcher = new MapEventsFetcherThread();
  418. mapEventsFetcher.setDaemon(true);
  419. mapEventsFetcher.setName(
  420. "Map-events fetcher for all reduce tasks " + "on " +
  421. taskTrackerName);
  422. mapEventsFetcher.start();
  423. }
  424. // Object on wait which MapEventsFetcherThread is going to wait.
  425. private Object waitingOn = new Object();
  426. private class MapEventsFetcherThread extends Thread {
  427. private List <FetchStatus> reducesInShuffle() {
  428. List <FetchStatus> fList = new ArrayList<FetchStatus>();
  429. for (Map.Entry <String, RunningJob> item : runningJobs.entrySet()) {
  430. RunningJob rjob = item.getValue();
  431. String jobId = item.getKey();
  432. FetchStatus f;
  433. synchronized (rjob) {
  434. f = rjob.getFetchStatus();
  435. for (TaskInProgress tip : rjob.tasks) {
  436. Task task = tip.getTask();
  437. if (!task.isMapTask()) {
  438. if (((ReduceTask)task).getPhase() ==
  439. TaskStatus.Phase.SHUFFLE) {
  440. if (rjob.getFetchStatus() == null) {
  441. //this is a new job; we start fetching its map events
  442. f = new FetchStatus(jobId,
  443. ((ReduceTask)task).getNumMaps());
  444. rjob.setFetchStatus(f);
  445. }
  446. f = rjob.getFetchStatus();
  447. fList.add(f);
  448. break; //no need to check any more tasks belonging to this
  449. }
  450. }
  451. }
  452. }
  453. }
  454. //at this point, we have information about for which of
  455. //the running jobs do we need to query the jobtracker for map
  456. //outputs (actually map events).
  457. return fList;
  458. }
  459. public void run() {
  460. LOG.info("Starting thread: " + getName());
  461. while (true) {
  462. try {
  463. List <FetchStatus> fList = null;
  464. synchronized (runningJobs) {
  465. while (((fList = reducesInShuffle()).size()) == 0) {
  466. try {
  467. runningJobs.wait();
  468. } catch (InterruptedException e) {
  469. LOG.info("Shutting down: " + getName());
  470. return;
  471. }
  472. }
  473. }
  474. // now fetch all the map task events for all the reduce tasks
  475. // possibly belonging to different jobs
  476. for (FetchStatus f : fList) {
  477. try {
  478. f.fetchMapCompletionEvents();
  479. long startWait;
  480. long endWait;
  481. // polling interval is heartbeat interval
  482. int waitTime = heartbeatInterval;
  483. // Thread will wait for a minumum of MIN_POLL_INTERVAL,
  484. // if it is notified before that, notification will be ignored.
  485. int minWait = MIN_POLL_INTERVAL;
  486. synchronized (waitingOn) {
  487. try {
  488. while (true) {
  489. startWait = System.currentTimeMillis();
  490. waitingOn.wait(waitTime);
  491. endWait = System.currentTimeMillis();
  492. int diff = (int)(endWait - startWait);
  493. if (diff >= minWait) {
  494. break;
  495. }
  496. minWait = minWait - diff;
  497. waitTime = minWait;
  498. }
  499. } catch (InterruptedException ie) {
  500. LOG.info("Shutting down: " + getName());
  501. return;
  502. }
  503. }
  504. } catch (Exception e) {
  505. LOG.warn(
  506. "Ignoring exception that fetch for map completion" +
  507. " events threw for " + f.jobId + " threw: " +
  508. StringUtils.stringifyException(e));
  509. }
  510. }
  511. } catch (Exception e) {
  512. LOG.info("Ignoring exception " + e.getMessage());
  513. }
  514. }
  515. }
  516. }
  517. private class FetchStatus {
  518. /** The next event ID that we will start querying the JobTracker from*/
  519. private IntWritable fromEventId;
  520. /** This is the cache of map events for a given job */
  521. private List<TaskCompletionEvent> allMapEvents;
  522. /** What jobid this fetchstatus object is for*/
  523. private String jobId;
  524. public FetchStatus(String jobId, int numMaps) {
  525. this.fromEventId = new IntWritable(0);
  526. this.jobId = jobId;
  527. this.allMapEvents = new ArrayList<TaskCompletionEvent>(numMaps);
  528. }
  529. public TaskCompletionEvent[] getMapEvents(int fromId, int max) {
  530. TaskCompletionEvent[] mapEvents =
  531. TaskCompletionEvent.EMPTY_ARRAY;
  532. boolean notifyFetcher = false;
  533. synchronized (allMapEvents) {
  534. if (allMapEvents.size() > fromId) {
  535. int actualMax = Math.min(max, (allMapEvents.size() - fromId));
  536. List <TaskCompletionEvent> eventSublist =
  537. allMapEvents.subList(fromId, actualMax + fromId);
  538. mapEvents = eventSublist.toArray(mapEvents);
  539. } else {
  540. // Notify Fetcher thread.
  541. notifyFetcher = true;
  542. }
  543. }
  544. if (notifyFetcher) {
  545. synchronized (waitingOn) {
  546. waitingOn.notify();
  547. }
  548. }
  549. return mapEvents;
  550. }
  551. public void fetchMapCompletionEvents() throws IOException {
  552. List <TaskCompletionEvent> recentMapEvents =
  553. queryJobTracker(fromEventId, jobId, jobClient);
  554. synchronized (allMapEvents) {
  555. allMapEvents.addAll(recentMapEvents);
  556. }
  557. }
  558. }
  559. private LocalDirAllocator lDirAlloc =
  560. new LocalDirAllocator("mapred.local.dir");
  561. // intialize the job directory
  562. private void localizeJob(TaskInProgress tip) throws IOException {
  563. Path localJarFile = null;
  564. Task t = tip.getTask();
  565. String jobId = t.getJobId();
  566. String jobFile = t.getJobFile();
  567. // Get sizes of JobFile and JarFile
  568. // sizes are -1 if they are not present.
  569. FileSystem fileSystem = FileSystem.get(fConf);
  570. FileStatus status[] = fileSystem.listStatus(new Path(jobFile).getParent());
  571. long jarFileSize = -1;
  572. long jobFileSize = -1;
  573. for(FileStatus stat : status) {
  574. if (stat.getPath().toString().contains("job.xml")) {
  575. jobFileSize = stat.getLen();
  576. } else {
  577. jobFileSize = -1;
  578. }
  579. if (stat.getPath().toString().contains("job.jar")) {
  580. jarFileSize = stat.getLen();
  581. } else {
  582. jarFileSize = -1;
  583. }
  584. }
  585. // Here we check for double the size of jobfile to accommodate for
  586. // localize task file and we check four times the size of jarFileSize to
  587. // accommodate for unjarring the jar file in work directory
  588. Path localJobFile = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
  589. + Path.SEPARATOR + jobId
  590. + Path.SEPARATOR + "job.xml"),
  591. 2 * jobFileSize + 5 * jarFileSize, fConf);
  592. RunningJob rjob = addTaskToJob(jobId, localJobFile, tip);
  593. synchronized (rjob) {
  594. if (!rjob.localized) {
  595. FileSystem localFs = FileSystem.getLocal(fConf);
  596. // this will happen on a partial execution of localizeJob.
  597. // Sometimes the job.xml gets copied but copying job.jar
  598. // might throw out an exception
  599. // we should clean up and then try again
  600. Path jobDir = localJobFile.getParent();
  601. if (localFs.exists(jobDir)){
  602. localFs.delete(jobDir);
  603. boolean b = localFs.mkdirs(jobDir);
  604. if (!b)
  605. throw new IOException("Not able to create job directory "
  606. + jobDir.toString());
  607. }
  608. fs.copyToLocalFile(new Path(jobFile), localJobFile);
  609. JobConf localJobConf = new JobConf(localJobFile);
  610. // create the 'work' directory
  611. File workDir = new File(new File(localJobFile.toString()).getParent(),
  612. "work");
  613. if (!workDir.mkdirs()) {
  614. if (!workDir.isDirectory()) {
  615. throw new IOException("Mkdirs failed to create " + workDir.toString());
  616. }
  617. }
  618. // unjar the job.jar files in workdir
  619. String jarFile = localJobConf.getJar();
  620. if (jarFile != null) {
  621. localJarFile = new Path(jobDir,"job.jar");
  622. fs.copyToLocalFile(new Path(jarFile), localJarFile);
  623. localJobConf.setJar(localJarFile.toString());
  624. OutputStream out = localFs.create(localJobFile);
  625. try {
  626. localJobConf.write(out);
  627. } finally {
  628. out.close();
  629. }
  630. RunJar.unJar(new File(localJarFile.toString()), workDir);
  631. }
  632. rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
  633. localJobConf.getKeepFailedTaskFiles());
  634. rjob.localized = true;
  635. }
  636. }
  637. launchTaskForJob(tip, new JobConf(rjob.jobFile));
  638. }
  639. private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{
  640. synchronized (tip) {
  641. try {
  642. tip.setJobConf(jobConf);
  643. tip.launchTask();
  644. } catch (Throwable ie) {
  645. tip.taskStatus.setRunState(TaskStatus.State.FAILED);
  646. try {
  647. tip.cleanup();
  648. } catch (Throwable ie2) {
  649. // Ignore it, we are just trying to cleanup.
  650. }
  651. String error = StringUtils.stringifyException(ie);
  652. tip.reportDiagnosticInfo(error);
  653. LOG.info(error);
  654. }
  655. }
  656. }
  657. public synchronized void shutdown() throws IOException {
  658. shuttingDown = true;
  659. close();
  660. if (this.server != null) {
  661. try {
  662. LOG.info("Shutting down StatusHttpServer");
  663. this.server.stop();
  664. } catch (InterruptedException ex) {
  665. ex.printStackTrace();
  666. }
  667. }
  668. }
  669. /**
  670. * Close down the TaskTracker and all its components. We must also shutdown
  671. * any running tasks or threads, and cleanup disk space. A new TaskTracker
  672. * within the same process space might be restarted, so everything must be
  673. * clean.
  674. */
  675. public synchronized void close() throws IOException {
  676. //
  677. // Kill running tasks. Do this in a 2nd vector, called 'tasksToClose',
  678. // because calling jobHasFinished() may result in an edit to 'tasks'.
  679. //
  680. TreeMap<String, TaskInProgress> tasksToClose =
  681. new TreeMap<String, TaskInProgress>();
  682. tasksToClose.putAll(tasks);
  683. for (TaskInProgress tip : tasksToClose.values()) {
  684. tip.jobHasFinished(false);
  685. }
  686. // Shutdown local RPC servers. Do them
  687. // in parallel, as RPC servers can take a long
  688. // time to shutdown. (They need to wait a full
  689. // RPC timeout, which might be 10-30 seconds.)
  690. new Thread("RPC shutdown") {
  691. public void run() {
  692. if (taskReportServer != null) {
  693. taskReportServer.stop();
  694. taskReportServer = null;
  695. }
  696. }
  697. }.start();
  698. this.running = false;
  699. // Clear local storage
  700. this.mapOutputFile.cleanupStorage();
  701. // Shutdown the fetcher thread
  702. this.mapEventsFetcher.interrupt();
  703. }
  704. /**
  705. * Start with the local machine name, and the default JobTracker
  706. */
  707. public TaskTracker(JobConf conf) throws IOException {
  708. originalConf = conf;
  709. maxCurrentMapTasks = conf.getInt(
  710. "mapred.tasktracker.map.tasks.maximum", 2);
  711. maxCurrentReduceTasks = conf.getInt(
  712. "mapred.tasktracker.reduce.tasks.maximum", 2);
  713. this.jobTrackAddr = JobTracker.getAddress(conf);
  714. this.mapOutputFile = new MapOutputFile();
  715. this.mapOutputFile.setConf(conf);
  716. String infoAddr =
  717. NetUtils.getServerAddress(conf,
  718. "tasktracker.http.bindAddress",
  719. "tasktracker.http.port",
  720. "mapred.task.tracker.http.address");
  721. InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
  722. String httpBindAddress = infoSocAddr.getHostName();
  723. int httpPort = infoSocAddr.getPort();
  724. this.server = new StatusHttpServer(
  725. "task", httpBindAddress, httpPort, httpPort == 0);
  726. workerThreads = conf.getInt("tasktracker.http.threads", 40);
  727. this.shuffleServerMetrics = new ShuffleServerMetrics(conf);
  728. server.setThreads(1, workerThreads);
  729. // let the jsp pages get to the task tracker, config, and other relevant
  730. // objects
  731. FileSystem local = FileSystem.getLocal(conf);
  732. this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
  733. server.setAttribute("task.tracker", this);
  734. server.setAttribute("local.file.system", local);
  735. server.setAttribute("conf", conf);
  736. server.setAttribute("log", LOG);
  737. server.setAttribute("localDirAllocator", localDirAllocator);
  738. server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
  739. server.addServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
  740. server.addServlet("taskLog", "/tasklog", TaskLogServlet.class);
  741. server.start();
  742. this.httpPort = server.getPort();
  743. initialize();
  744. }
  745. /**
  746. * The connection to the JobTracker, used by the TaskRunner
  747. * for locating remote files.
  748. */
  749. public InterTrackerProtocol getJobClient() {
  750. return jobClient;
  751. }
  752. /**Return the DFS filesystem
  753. */
  754. public FileSystem getFileSystem(){
  755. return fs;
  756. }
  757. /** Return the port at which the tasktracker bound to */
  758. public synchronized InetSocketAddress getTaskTrackerReportAddress() {
  759. return taskReportAddress;
  760. }
  761. /** Queries the job tracker for a set of outputs ready to be copied
  762. * @param fromEventId the first event ID we want to start from, this is
  763. * modified by the call to this method
  764. * @param jobClient the job tracker
  765. * @return a set of locations to copy outputs from
  766. * @throws IOException
  767. */
  768. private List<TaskCompletionEvent> queryJobTracker(IntWritable fromEventId,
  769. String jobId,
  770. InterTrackerProtocol jobClient)
  771. throws IOException {
  772. TaskCompletionEvent t[] = jobClient.getTaskCompletionEvents(
  773. jobId,
  774. fromEventId.get(),
  775. probe_sample_size);
  776. //we are interested in map task completion events only. So store
  777. //only those
  778. List <TaskCompletionEvent> recentMapEvents =
  779. new ArrayList<TaskCompletionEvent>();
  780. for (int i = 0; i < t.length; i++) {
  781. if (t[i].isMap) {
  782. recentMapEvents.add(t[i]);
  783. }
  784. }
  785. fromEventId.set(fromEventId.get() + t.length);
  786. return recentMapEvents;
  787. }
  788. /**
  789. * Main service loop. Will stay in this loop forever.
  790. */
  791. State offerService() throws Exception {
  792. long lastHeartbeat = 0;
  793. this.fs = FileSystem.getNamed(jobClient.getFilesystemName(), this.fConf);
  794. while (running && !shuttingDown) {
  795. try {
  796. long now = System.currentTimeMillis();
  797. long waitTime = heartbeatInterval - (now - lastHeartbeat);
  798. if (waitTime > 0) {
  799. // sleeps for the wait time, wakes up if a task is finished.
  800. synchronized(finishedCount) {
  801. if (finishedCount[0] == 0) {
  802. finishedCount.wait(waitTime);
  803. }
  804. finishedCount[0] = 0;
  805. }
  806. }
  807. // Send the heartbeat and process the jobtracker's directives
  808. HeartbeatResponse heartbeatResponse = transmitHeartBeat();
  809. TaskTrackerAction[] actions = heartbeatResponse.getActions();
  810. if(LOG.isDebugEnabled()) {
  811. LOG.debug("Got heartbeatResponse from JobTracker with responseId: " +
  812. heartbeatResponse.getResponseId() + " and " +
  813. ((actions != null) ? actions.length : 0) + " actions");
  814. }
  815. if (reinitTaskTracker(actions)) {
  816. return State.STALE;
  817. }
  818. lastHeartbeat = now;
  819. // resetting heartbeat interval from the response.
  820. heartbeatInterval = heartbeatResponse.getHeartbeatInterval();
  821. justStarted = false;
  822. if (actions != null){
  823. for(TaskTrackerAction action: actions) {
  824. if (action instanceof LaunchTaskAction) {
  825. startNewTask((LaunchTaskAction) action);
  826. } else {
  827. tasksToCleanup.put(action);
  828. }
  829. }
  830. }
  831. markUnresponsiveTasks();
  832. killOverflowingTasks();
  833. //we've cleaned up, resume normal operation
  834. if (!acceptNewTasks && isIdle()) {
  835. acceptNewTasks=true;
  836. }
  837. } catch (InterruptedException ie) {
  838. LOG.info("Interrupted. Closing down.");
  839. return State.INTERRUPTED;
  840. } catch (DiskErrorException de) {
  841. String msg = "Exiting task tracker for disk error:\n" +
  842. StringUtils.stringifyException(de);
  843. LOG.error(msg);
  844. synchronized (this) {
  845. jobClient.reportTaskTrackerError(taskTrackerName,
  846. "DiskErrorException", msg);
  847. }
  848. return State.STALE;
  849. } catch (RemoteException re) {
  850. String reClass = re.getClassName();
  851. if (DisallowedTaskTrackerException.class.getName().equals(reClass)) {
  852. LOG.info("Tasktracker disallowed by JobTracker.");
  853. return State.DENIED;
  854. }
  855. } catch (Exception except) {
  856. String msg = "Caught exception: " +
  857. StringUtils.stringifyException(except);
  858. LOG.error(msg);
  859. }
  860. }
  861. return State.NORMAL;
  862. }
  863. private long previousUpdate = 0;
  864. /**
  865. * Build and transmit the heart beat to the JobTracker
  866. * @return false if the tracker was unknown
  867. * @throws IOException
  868. */
  869. private HeartbeatResponse transmitHeartBeat() throws IOException {
  870. // Send Counters in the status once every COUNTER_UPDATE_INTERVAL
  871. long now = System.currentTimeMillis();
  872. boolean sendCounters;
  873. if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
  874. sendCounters = true;
  875. previousUpdate = now;
  876. }
  877. else {
  878. sendCounters = false;
  879. }
  880. //
  881. // Check if the last heartbeat got through...
  882. // if so then build the heartbeat information for the JobTracker;
  883. // else resend the previous status information.
  884. //
  885. if (status == null) {
  886. synchronized (this) {
  887. status = new TaskTrackerStatus(taskTrackerName, localHostname,
  888. httpPort,
  889. cloneAndResetRunningTaskStatuses(
  890. sendCounters),
  891. failures,
  892. maxCurrentMapTasks,
  893. maxCurrentReduceTasks);
  894. }
  895. } else {
  896. LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +
  897. "' with reponseId '" + heartbeatResponseId);
  898. }
  899. //
  900. // Check if we should ask for a new Task
  901. //
  902. boolean askForNewTask;
  903. long localMinSpaceStart;
  904. synchronized (this) {
  905. askForNewTask = (mapTotal < maxCurrentMapTasks ||
  906. reduceTotal < maxCurrentReduceTasks) &&
  907. acceptNewTasks;
  908. localMinSpaceStart = minSpaceStart;
  909. }
  910. if (askForNewTask) {
  911. checkLocalDirs(fConf.getLocalDirs());
  912. askForNewTask = enoughFreeSpace(localMinSpaceStart);
  913. }
  914. //
  915. // Xmit the heartbeat
  916. //
  917. HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
  918. justStarted, askForNewTask,
  919. heartbeatResponseId);
  920. //
  921. // The heartbeat got through successfully!
  922. //
  923. heartbeatResponseId = heartbeatResponse.getResponseId();
  924. synchronized (this) {
  925. for (TaskStatus taskStatus : status.getTaskReports()) {
  926. if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
  927. if (taskStatus.getIsMap()) {
  928. mapTotal--;
  929. } else {
  930. reduceTotal--;
  931. }
  932. try {
  933. myMetrics.completeTask();
  934. } catch (MetricsException me) {
  935. LOG.warn("Caught: " + StringUtils.stringifyException(me));
  936. }
  937. runningTasks.remove(taskStatus.getTaskId());
  938. }
  939. }
  940. // Clear transient status information which should only
  941. // be sent once to the JobTracker
  942. for (TaskInProgress tip: runningTasks.values()) {
  943. tip.getStatus().clearStatus();
  944. }
  945. }
  946. // Force a rebuild of 'status' on the next iteration
  947. status = null;
  948. return heartbeatResponse;
  949. }
  950. /**
  951. * Check if the jobtracker directed a 'reset' of the tasktracker.
  952. *
  953. * @param actions the directives of the jobtracker for the tasktracker.
  954. * @return <code>true</code> if tasktracker is to be reset,
  955. * <code>false</code> otherwise.
  956. */
  957. private boolean reinitTaskTracker(TaskTrackerAction[] actions) {
  958. if (actions != null) {
  959. for (TaskTrackerAction action : actions) {
  960. if (action.getActionId() ==
  961. TaskTrackerAction.ActionType.REINIT_TRACKER) {
  962. LOG.info("Recieved RenitTrackerAction from JobTracker");
  963. return true;
  964. }
  965. }
  966. }
  967. return false;
  968. }
  969. /**
  970. * Kill any tasks that have not reported progress in the last X seconds.
  971. */
  972. private synchronized void markUnresponsiveTasks() throws IOException {
  973. long now = System.currentTimeMillis();
  974. for (TaskInProgress tip: runningTasks.values()) {
  975. if (tip.getRunState() == TaskStatus.State.RUNNING) {
  976. // Check the per-job timeout interval for tasks;
  977. // an interval of '0' implies it is never timed-out
  978. long jobTaskTimeout = tip.getTaskTimeout();
  979. if (jobTaskTimeout == 0) {
  980. continue;
  981. }
  982. // Check if the task has not reported progress for a
  983. // time-period greater than the configured time-out
  984. long timeSinceLastReport = now - tip.getLastProgressReport();
  985. if (timeSinceLastReport > jobTaskTimeout && !tip.wasKilled) {
  986. String msg =
  987. "Task " + tip.getTask().getTaskId() + " failed to report status for "
  988. + (timeSinceLastReport / 1000) + " seconds. Killing!";
  989. LOG.info(tip.getTask().getTaskId() + ": " + msg);
  990. ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
  991. tip.reportDiagnosticInfo(msg);
  992. myMetrics.timedoutTask();
  993. purgeTask(tip, true);
  994. }
  995. }
  996. }
  997. }
  998. /**
  999. * The task tracker is done with this job, so we need to clean up.
  1000. * @param action The action with the job
  1001. * @throws IOException
  1002. */
  1003. private synchronized void purgeJob(KillJobAction action) throws IOException {
  1004. String jobId = action.getJobId();
  1005. LOG.info("Received 'KillJobAction' for job: " + jobId);
  1006. RunningJob rjob = null;
  1007. synchronized (runningJobs) {
  1008. rjob = runningJobs.get(jobId);
  1009. }
  1010. if (rjob == null) {
  1011. LOG.warn("Unknown job " + jobId + " being deleted.");
  1012. } else {
  1013. synchronized (rjob) {
  1014. // Add this tips of this job to queue of tasks to be purged
  1015. for (TaskInProgress tip : rjob.tasks) {
  1016. tip.jobHasFinished(false);
  1017. }
  1018. // Delete the job directory for this
  1019. // task if the job is done/failed
  1020. if (!rjob.keepJobFiles){
  1021. fConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + JOBCACHE +
  1022. Path.SEPARATOR + rjob.getJobId());
  1023. }
  1024. // Remove this job
  1025. rjob.tasks.clear();
  1026. }
  1027. }
  1028. synchronized(runningJobs) {
  1029. runningJobs.remove(jobId);
  1030. }
  1031. }
  1032. /**
  1033. * Remove the tip and update all relevant state.
  1034. *
  1035. * @param tip {@link TaskInProgress} to be removed.
  1036. * @param wasFailure did the task fail or was it killed?
  1037. */
  1038. private void purgeTask(TaskInProgress tip, boolean wasFailure)
  1039. throws IOException {
  1040. if (tip != null) {
  1041. LOG.info("About to purge task: " + tip.getTask().getTaskId());
  1042. // Remove the task from running jobs,
  1043. // removing the job if it's the last task
  1044. removeTaskFromJob(tip.getTask().getJobId(), tip);
  1045. tip.jobHasFinished(wasFailure);
  1046. }
  1047. }
  1048. /** Check if we're dangerously low on disk space
  1049. * If so, kill jobs to free up space and make sure
  1050. * we don't accept any new tasks
  1051. * Try killing the reduce jobs first, since I believe they
  1052. * use up most space
  1053. * Then pick the one with least progress
  1054. */
  1055. private void killOverflowingTasks() throws IOException {
  1056. long localMinSpaceKill;
  1057. synchronized(this){
  1058. localMinSpaceKill = minSpaceKill;
  1059. }
  1060. if (!enoughFreeSpace(localMinSpaceKill)) {
  1061. acceptNewTasks=false;
  1062. //we give up! do not accept new tasks until
  1063. //all the ones running have finished and they're all cleared up
  1064. synchronized (this) {
  1065. TaskInProgress killMe = findTaskToKill();
  1066. if (killMe!=null) {
  1067. String msg = "Tasktracker running out of space." +
  1068. " Killing task.";
  1069. LOG.info(killMe.getTask().getTaskId() + ": " + msg);
  1070. killMe.reportDiagnosticInfo(msg);
  1071. purgeTask(killMe, false);
  1072. }
  1073. }
  1074. }
  1075. }
  1076. /**
  1077. * Pick a task to kill to free up space
  1078. * @return the task to kill or null, if one wasn't found
  1079. */
  1080. private TaskInProgress findTaskToKill() {
  1081. TaskInProgress killMe = null;
  1082. for (Iterator it = runningTasks.values().iterator(); it.hasNext();) {
  1083. TaskInProgress tip = (TaskInProgress) it.next();
  1084. if ((tip.getRunState() == TaskStatus.State.RUNNING) &&
  1085. !tip.wasKilled) {
  1086. if (killMe == null) {
  1087. killMe = tip;
  1088. } else if (!tip.getTask().isMapTask()) {
  1089. //reduce task, give priority
  1090. if (killMe.getTask().isMapTask() ||
  1091. (tip.getTask().getProgress().get() <
  1092. killMe.getTask().getProgress().get())) {
  1093. killMe = tip;
  1094. }
  1095. } else if (killMe.getTask().isMapTask() &&
  1096. tip.getTask().getProgress().get() <
  1097. killMe.getTask().getProgress().get()) {
  1098. //map task, only add if the progress is lower
  1099. killMe = tip;
  1100. }
  1101. }
  1102. }
  1103. return killMe;
  1104. }
  1105. /**
  1106. * Check if all of the local directories have enough
  1107. * free space
  1108. *
  1109. * If not, do not try to get a new task assigned
  1110. * @return
  1111. * @throws IOException
  1112. */
  1113. private boolean enoughFreeSpace(long minSpace) throws IOException {
  1114. if (minSpace == 0) {
  1115. return true;
  1116. }
  1117. String[] localDirs = fConf.getLocalDirs();
  1118. for (int i = 0; i < localDirs.length; i++) {
  1119. DF df = null;
  1120. if (localDirsDf.containsKey(localDirs[i])) {
  1121. df = localDirsDf.get(localDirs[i]);
  1122. } else {
  1123. df = new DF(new File(localDirs[i]), fConf);
  1124. localDirsDf.put(localDirs[i], df);
  1125. }
  1126. if (df.getAvailable() > minSpace)
  1127. return true;
  1128. }
  1129. return false;
  1130. }
  1131. /**
  1132. * Start a new task.
  1133. * All exceptions are handled locally, so that we don't mess up the
  1134. * task tracker.
  1135. */
  1136. private void startNewTask(LaunchTaskAction action) {
  1137. Task t = action.getTask();
  1138. LOG.info("LaunchTaskAction: " + t.getTaskId());
  1139. TaskInProgress tip = new TaskInProgress(t, this.fConf);
  1140. synchronized (this) {
  1141. tasks.put(t.getTaskId(), tip);
  1142. runningTasks.put(t.getTaskId(), tip);
  1143. boolean isMap = t.isMapTask();
  1144. if (isMap) {
  1145. mapTotal++;
  1146. } else {
  1147. reduceTotal++;
  1148. }
  1149. }
  1150. try {
  1151. localizeJob(tip);
  1152. } catch (Throwable e) {
  1153. String msg = ("Error initializing " + tip.getTask().getTaskId() +
  1154. ":\n" + StringUtils.stringifyException(e));
  1155. LOG.warn(msg);
  1156. tip.reportDiagnosticInfo(msg);
  1157. try {
  1158. tip.kill(true);
  1159. } catch (IOException ie2) {
  1160. LOG.info("Error cleaning up " + tip.getTask().getTaskId() + ":\n" +
  1161. StringUtils.stringifyException(ie2));
  1162. }
  1163. // Careful!
  1164. // This might not be an 'Exception' - don't handle 'Error' here!
  1165. if (e instanceof Error) {
  1166. throw ((Error) e);
  1167. }
  1168. }
  1169. }
  1170. /**
  1171. * The server retry loop.
  1172. * This while-loop attempts to connect to the JobTracker. It only
  1173. * loops when the old TaskTracker has gone bad (its state is
  1174. * stale somehow) and we need to reinitialize everything.
  1175. */
  1176. public void run() {
  1177. try {
  1178. boolean denied = false;
  1179. while (running && !shuttingDown && !denied) {
  1180. boolean staleState = false;
  1181. try {
  1182. // This while-loop attempts reconnects if we get network errors
  1183. while (running && !staleState && !shuttingDown && !denied) {
  1184. try {
  1185. State osState = offerService();
  1186. if (osState == State.STALE) {
  1187. staleState = true;
  1188. } else if (osState == State.DENIED) {
  1189. denied = true;
  1190. }
  1191. } catch (Exception ex) {
  1192. if (!shuttingDown) {
  1193. LOG.info("Lost connection to JobTracker [" +
  1194. jobTrackAddr + "]. Retrying...", ex);
  1195. try {
  1196. Thread.sleep(5000);
  1197. } catch (InterruptedException ie) {
  1198. }
  1199. }
  1200. }
  1201. }
  1202. } finally {
  1203. close();
  1204. }
  1205. if (shuttingDown) { return; }
  1206. LOG.warn("Reinitializing local state");
  1207. initialize();
  1208. }
  1209. if (denied) {
  1210. shutdown();
  1211. }
  1212. } catch (IOException iex) {
  1213. LOG.error("Got fatal exception while reinitializing TaskTracker: " +
  1214. StringUtils.stringifyException(iex));
  1215. return;
  1216. }
  1217. }
  1218. ///////////////////////////////////////////////////////
  1219. // TaskInProgress maintains all the info for a Task that
  1220. // lives at this TaskTracker. It maintains the Task object,
  1221. // its TaskStatus, and the TaskRunner.
  1222. ///////////////////////////////////////////////////////
  1223. class TaskInProgress {
  1224. Task task;
  1225. long lastProgressReport;
  1226. StringBuffer diagnosticInfo = new StringBuffer();
  1227. private TaskRunner runner;
  1228. volatile boolean done = false;
  1229. boolean wasKilled = false;
  1230. private JobConf defaultJobConf;
  1231. private JobConf localJobConf;
  1232. private boolean keepFailedTaskFiles;
  1233. private boolean alwaysKeepTaskFiles;
  1234. private TaskStatus taskStatus;
  1235. private long taskTimeout;
  1236. private String debugCommand;
  1237. /**
  1238. */
  1239. public TaskInProgress(Task task, JobConf conf) {
  1240. this.task = task;
  1241. this.lastProgressReport = System.currentTimeMillis();
  1242. this.defaultJobConf = conf;
  1243. localJobConf = null;
  1244. taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskId(),
  1245. 0.0f,
  1246. TaskStatus.State.UNASSIGNED,
  1247. diagnosticInfo.toString(),
  1248. "initializing",
  1249. getName(),
  1250. task.isMapTask()? TaskStatus.Phase.MAP:
  1251. TaskStatus.Phase.SHUFFLE,
  1252. task.getCounters());
  1253. taskTimeout = (10 * 60 * 1000);
  1254. }
  1255. private void localizeTask(Task task) throws IOException{
  1256. Path localTaskDir =
  1257. lDirAlloc.getLocalPathForWrite((TaskTracker.getJobCacheSubdir() +
  1258. Path.SEPARATOR + task.getJobId() + Path.SEPARATOR +
  1259. task.getTaskId()), defaultJobConf );
  1260. FileSystem localFs = FileSystem.getLocal(fConf);
  1261. // create symlink for ../work if it already doesnt exist
  1262. String workDir = lDirAlloc.getLocalPathToRead(
  1263. TaskTracker.getJobCacheSubdir()
  1264. + Path.SEPARATOR + task.getJobId()
  1265. + Path.SEPARATOR
  1266. + "work", defaultJobConf).toString();
  1267. String link = localTaskDir.getParent().toString()
  1268. + Path.SEPARATOR + "work";
  1269. File flink = new File(link);
  1270. if (!flink.exists())
  1271. FileUtil.symLink(workDir, link);
  1272. // create the working-directory of the task
  1273. if (!localFs.mkdirs(localTaskDir)) {
  1274. throw new IOException("Mkdirs failed to create " + localTaskDir.toString());
  1275. }
  1276. Path localTaskFile = new Path(localTaskDir, "job.xml");
  1277. task.setJobFile(localTaskFile.toString());
  1278. localJobConf.set("mapred.local.dir",
  1279. fConf.get("mapred.local.dir"));
  1280. localJobConf.set("mapred.task.id", task.getTaskId());
  1281. keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
  1282. // create _taskid directory in output path temporary directory.
  1283. Path outputPath = localJobConf.getOutputPath();
  1284. if (outputPath != null) {
  1285. Path jobTmpDir = new Path(outputPath, "_temporary");
  1286. FileSystem fs = jobTmpDir.getFileSystem(localJobConf);
  1287. if (fs.exists(jobTmpDir)) {
  1288. Path taskTmpDir = new Path(jobTmpDir, "_" + task.getTaskId());
  1289. if (!fs.mkdirs(taskTmpDir)) {
  1290. throw new IOException("Mkdirs failed to create "
  1291. + taskTmpDir.toString());
  1292. }
  1293. } else {
  1294. throw new IOException("The directory " + jobTmpDir.toString()
  1295. + " doesnt exist ");
  1296. }
  1297. }
  1298. task.localizeConfiguration(localJobConf);
  1299. List<String[]> staticResolutions = NetUtils.getAllStaticResolutions();
  1300. if (staticResolutions != null && staticResolutions.size() > 0) {
  1301. StringBuffer str = new StringBuffer();
  1302. for (int i = 0; i < staticResolutions.size(); i++) {
  1303. String[] hostToResolved = staticResolutions.get(i);
  1304. str.append(hostToResolved[0]+"="+hostToResolved[1]);
  1305. if (i != staticResolutions.size() - 1) {
  1306. str.append(',');
  1307. }
  1308. }
  1309. localJobConf.set("hadoop.net.static.resolutions", str.toString());
  1310. }
  1311. OutputStream out = localFs.create(localTaskFile);
  1312. try {
  1313. localJobConf.write(out);
  1314. } finally {
  1315. out.close();
  1316. }
  1317. task.setConf(localJobConf);
  1318. String keepPattern = localJobConf.getKeepTaskFilesPattern();
  1319. if (keepPattern != null) {
  1320. alwaysKeepTaskFiles =
  1321. Pattern.matches(keepPattern, task.getTaskId());
  1322. } else {
  1323. alwaysKeepTaskFiles = false;
  1324. }
  1325. if (task.isMapTask()) {
  1326. debugCommand = localJobConf.getMapDebugScript();
  1327. } else {
  1328. debugCommand = localJobConf.getReduceDebugScript();
  1329. }
  1330. }
  1331. /**
  1332. */
  1333. public Task getTask() {
  1334. return task;
  1335. }
  1336. public synchronized void setJobConf(JobConf lconf){
  1337. this.localJobConf = lconf;
  1338. keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
  1339. taskTimeout = localJobConf.getLong("mapred.task.timeout",
  1340. 10 * 60 * 1000);
  1341. }
  1342. public synchronized JobConf getJobConf() {
  1343. return localJobConf;
  1344. }
  1345. /**
  1346. */
  1347. public synchronized TaskStatus getStatus() {
  1348. taskStatus.setDiagnosticInfo(diagnosticInfo.toString());
  1349. if (diagnosticInfo.length() > 0) {
  1350. diagnosticInfo = new StringBuffer();
  1351. }
  1352. return taskStatus;
  1353. }
  1354. /**
  1355. * Kick off the task execution
  1356. */
  1357. public synchronized void launchTask() throws IOException {
  1358. localizeTask(task);
  1359. this.taskStatus.setRunState(TaskStatus.State.RUNNING);
  1360. this.runner = task.createRunner(TaskTracker.this);
  1361. this.runner.start();
  1362. this.taskStatus.setStartTime(System.currentTimeMillis());
  1363. }
  1364. /**
  1365. * The task is reporting its progress
  1366. */
  1367. public synchronized void reportProgress(TaskStatus taskStatus)
  1368. {
  1369. LOG.info(task.getTaskId() + " " + taskStatus.getProgress() +
  1370. "% " + taskStatus.getStateString());
  1371. if (this.done ||
  1372. this.taskStatus.getRunState() != TaskStatus.State.RUNNING) {
  1373. //make sure we ignore progress messages after a task has
  1374. //invoked TaskUmbilicalProtocol.done() or if the task has been
  1375. //KILLED/FAILED
  1376. LOG.info(task.getTaskId() + " Ignoring status-update since " +
  1377. ((this.done) ? "task is 'done'" :
  1378. ("runState: " + this.taskStatus.getRunState()))
  1379. );
  1380. return;
  1381. }
  1382. this.taskStatus.statusUpdate(taskStatus);
  1383. this.lastProgressReport = System.currentTimeMillis();
  1384. }
  1385. /**
  1386. */
  1387. public long getLastProgressReport() {
  1388. return lastProgressReport;
  1389. }
  1390. /**
  1391. */
  1392. public TaskStatus.State getRunState() {
  1393. return taskStatus.getRunState();
  1394. }
  1395. /**
  1396. * The task's configured timeout.
  1397. *
  1398. * @return the task's configured timeout.
  1399. */
  1400. public long getTaskTimeout() {
  1401. return taskTimeout;
  1402. }
  1403. /**
  1404. * The task has reported some diagnostic info about its status
  1405. */
  1406. public synchronized void reportDiagnosticInfo(String info) {
  1407. this.diagnosticInfo.append(info);
  1408. }
  1409. /**
  1410. * The task is reporting that it's done running
  1411. */
  1412. public synchronized void reportDone() {
  1413. this.taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING);
  1414. this.taskStatus.setProgress(1.0f);
  1415. this.taskStatus.setFinishTime(System.currentTimeMillis());
  1416. this.done = true;
  1417. LOG.info("Task " + task.getTaskId() + " is done.");
  1418. }
  1419. /**
  1420. * The task has actually finished running.
  1421. */
  1422. public void taskFinished() {
  1423. long start = System.currentTimeMillis();
  1424. //
  1425. // Wait until task reports as done. If it hasn't reported in,
  1426. // wait for a second and try again.
  1427. //
  1428. while (!done && (System.currentTimeMillis() - start < WAIT_FOR_DONE)) {
  1429. try {
  1430. Thread.sleep(1000);
  1431. } catch (InterruptedException ie) {
  1432. }
  1433. }
  1434. //
  1435. // Change state to success or failure, depending on whether
  1436. // task was 'done' before terminating
  1437. //
  1438. boolean needCleanup = false;
  1439. synchronized (this) {
  1440. if (done) {
  1441. taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING);
  1442. } else {
  1443. if (!wasKilled) {
  1444. failures += 1;
  1445. taskStatus.setRunState(TaskStatus.State.FAILED);
  1446. // call the script here for the failed tasks.
  1447. if (debugCommand != null) {
  1448. String taskStdout ="";
  1449. String taskStderr ="";
  1450. String taskSyslog ="";
  1451. String jobConf = task.getJobFile();
  1452. try {
  1453. // get task's stdout file
  1454. taskStdout = FileUtil.makeShellPath(TaskLog.getTaskLogFile
  1455. (task.getTaskId(), TaskLog.LogName.STDOUT));
  1456. // get task's stderr file
  1457. taskStderr = FileUtil.makeShellPath(TaskLog.getTaskLogFile
  1458. (task.getTaskId(), TaskLog.LogName.STDERR));
  1459. // get task's syslog file
  1460. taskSyslog = FileUtil.makeShellPath(TaskLog.getTaskLogFile
  1461. (task.getTaskId(), TaskLog.LogName.SYSLOG));
  1462. } catch(IOException e){
  1463. LOG.warn("Exception finding task's stdout/err/syslog files");
  1464. }
  1465. File workDir = new File(task.getJobFile()).getParentFile();
  1466. // Build the command
  1467. File stdout = TaskLog.getTaskLogFile(task.getTaskId(),
  1468. TaskLog.LogName.DEBUGOUT);
  1469. // add pipes program as argument if it exists.
  1470. String program ="";
  1471. String executable = Submitter.getExecutable(localJobConf);
  1472. if ( executable != null) {
  1473. try {
  1474. program = new URI(executable).getFragment();
  1475. } catch (URISyntaxException ur) {
  1476. LOG.warn("Problem in the URI fragment for pipes executable");
  1477. }
  1478. }
  1479. String [] debug = debugCommand.split(" ");
  1480. Vector<String> vargs = new Vector<String>();
  1481. for (String component : debug) {
  1482. vargs.add(component);
  1483. }
  1484. vargs.add(taskStdout);
  1485. vargs.add(taskStderr);
  1486. vargs.add(taskSyslog);
  1487. vargs.add(jobConf);
  1488. vargs.add(program);
  1489. try {
  1490. List<String> wrappedCommand = TaskLog.captureDebugOut
  1491. (vargs, stdout);
  1492. // run the script.
  1493. try {
  1494. runScript(wrappedCommand, workDir);
  1495. } catch (IOException ioe) {
  1496. LOG.warn("runScript failed with: " + StringUtils.
  1497. stringifyException(ioe));
  1498. }
  1499. } catch(IOException e) {
  1500. LOG.warn("Error in preparing wrapped debug command");
  1501. }
  1502. // add all lines of debug out to diagnostics
  1503. try {
  1504. int num = localJobConf.getInt("mapred.debug.out.lines", -1);
  1505. addDiagnostics(FileUtil.makeShellPath(stdout),num,"DEBUG OUT");
  1506. } catch(IOException ioe) {
  1507. LOG.warn("Exception in add diagnostics!");
  1508. }
  1509. }
  1510. } else {
  1511. taskStatus.setRunState(TaskStatus.State.KILLED);
  1512. }
  1513. taskStatus.setProgress(0.0f);
  1514. }
  1515. this.taskStatus.setFinishTime(System.currentTimeMillis());
  1516. needCleanup = (taskStatus.getRunState() == TaskStatus.State.FAILED ||
  1517. taskStatus.getRunState() == TaskStatus.State.KILLED);
  1518. }
  1519. //
  1520. // If the task has failed, or if the task was killAndCleanup()'ed,
  1521. // we should clean up right away. We only wait to cleanup
  1522. // if the task succeeded, and its results might be useful
  1523. // later on to downstream job processing.
  1524. //
  1525. if (needCleanup) {
  1526. try {
  1527. cleanup();
  1528. } catch (IOException ie) {
  1529. }
  1530. }
  1531. }
  1532. /**
  1533. * Runs the script given in args
  1534. * @param args script name followed by its argumnets
  1535. * @param dir current working directory.
  1536. * @throws IOException
  1537. */
  1538. public void runScript(List<String> args, File dir) throws IOException {
  1539. ShellCommandExecutor shexec =
  1540. new ShellCommandExecutor(args.toArray(new String[0]), dir);
  1541. shexec.execute();
  1542. int exitCode = shexec.getExitCode();
  1543. if (exitCode != 0) {
  1544. throw new IOException("Task debug script exit with nonzero status of "
  1545. + exitCode + ".");
  1546. }
  1547. }
  1548. /**
  1549. * Add last 'num' lines of the given file to the diagnostics.
  1550. * if num =-1, all the lines of file are added to the diagnostics.
  1551. * @param file The file from which to collect diagnostics.
  1552. * @param num The number of lines to be sent to diagnostics.
  1553. * @param tag The tag is printed before the diagnostics are printed.
  1554. */
  1555. public void addDiagnostics(String file, int num, String tag) {
  1556. RandomAccessFile rafile = null;
  1557. try {
  1558. rafile = new RandomAccessFile(file,"r");
  1559. int no_lines =0;
  1560. String line = null;
  1561. StringBuffer tail = new StringBuffer();
  1562. tail.append("\n-------------------- "+tag+"---------------------\n");
  1563. String[] lines = null;
  1564. if (num >0) {
  1565. lines = new String[num];
  1566. }
  1567. while ((line = rafile.readLine()) != null) {
  1568. no_lines++;
  1569. if (num >0) {
  1570. if (no_lines <= num) {
  1571. lines[no_lines-1] = line;
  1572. }
  1573. else { // shift them up
  1574. for (int i=0; i<num-1; ++i) {
  1575. lines[i] = lines[i+1];
  1576. }
  1577. lines[num-1] = line;
  1578. }
  1579. }
  1580. else if (num == -1) {
  1581. tail.append(line);
  1582. tail.append("\n");
  1583. }
  1584. }
  1585. int n = no_lines > num ?num:no_lines;
  1586. if (num >0) {
  1587. for (int i=0;i<n;i++) {
  1588. tail.append(lines[i]);
  1589. tail.append("\n");
  1590. }
  1591. }
  1592. if(n!=0)
  1593. reportDiagnosticInfo(tail.toString());
  1594. } catch (FileNotFoundException fnfe){
  1595. LOG.warn("File "+file+ " not found");
  1596. } catch (IOException ioe){
  1597. LOG.warn("Error reading file "+file);
  1598. } finally {
  1599. try {
  1600. if (rafile != null) {
  1601. rafile.close();
  1602. }
  1603. } catch (IOException ioe) {
  1604. LOG.warn("Error closing file "+file);
  1605. }
  1606. }
  1607. }
  1608. /**
  1609. * We no longer need anything from this task, as the job has
  1610. * finished. If the task is still running, kill it and clean up.
  1611. *
  1612. * @param wasFailure did the task fail, as opposed to was it killed by
  1613. * the framework
  1614. */
  1615. public void jobHasFinished(boolean wasFailure) throws IOException {
  1616. // Kill the task if it is still running
  1617. synchronized(this){
  1618. if (getRunState() == TaskStatus.State.RUNNING) {
  1619. kill(wasFailure);
  1620. }
  1621. }
  1622. // Cleanup on the finished task
  1623. cleanup();
  1624. }
  1625. /**
  1626. * Something went wrong and the task must be killed.
  1627. * @param wasFailure was it a failure (versus a kill request)?
  1628. */
  1629. public synchronized void kill(boolean wasFailure) throws IOException {
  1630. if (taskStatus.getRunState() == TaskStatus.State.RUNNING) {
  1631. wasKilled = true;
  1632. if (wasFailure) {
  1633. failures += 1;
  1634. }
  1635. runner.kill();
  1636. taskStatus.setRunState((wasFailure) ?
  1637. TaskStatus.State.FAILED :
  1638. TaskStatus.State.KILLED);
  1639. } else if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
  1640. if (wasFailure) {
  1641. failures += 1;
  1642. taskStatus.setRunState(TaskStatus.State.FAILED);
  1643. } else {
  1644. taskStatus.setRunState(TaskStatus.State.KILLED);
  1645. }
  1646. }
  1647. }
  1648. /**
  1649. * The map output has been lost.
  1650. */
  1651. private synchronized void mapOutputLost(String failure
  1652. ) throws IOException {
  1653. //The check for COMMIT_PENDING should actually be a check for SUCCESS
  1654. //however for that, we have to introduce another Action type from the
  1655. //JT to the TT (SuccessTaskAction in the lines of KillTaskAction).
  1656. if (taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
  1657. // change status to failure
  1658. LOG.info("Reporting output lost:"+task.getTaskId());
  1659. taskStatus.setRunState(TaskStatus.State.FAILED);
  1660. taskStatus.setProgress(0.0f);
  1661. reportDiagnosticInfo("Map output lost, rescheduling: " +
  1662. failure);
  1663. runningTasks.put(task.getTaskId(), this);
  1664. mapTotal++;
  1665. } else {
  1666. LOG.warn("Output already reported lost:"+task.getTaskId());
  1667. }
  1668. }
  1669. /**
  1670. * We no longer need anything from this task. Either the
  1671. * controlling job is all done and the files have been copied
  1672. * away, or the task failed and we don't need the remains.
  1673. * Any calls to cleanup should not lock the tip first.
  1674. * cleanup does the right thing- updates tasks in Tasktracker
  1675. * by locking tasktracker first and then locks the tip.
  1676. */
  1677. void cleanup() throws IOException {
  1678. String taskId = task.getTaskId();
  1679. LOG.debug("Cleaning up " + taskId);
  1680. synchronized (TaskTracker.this) {
  1681. tasks.remove(taskId);
  1682. synchronized (this){
  1683. if (alwaysKeepTaskFiles ||
  1684. (taskStatus.getRunState() == TaskStatus.State.FAILED &&
  1685. keepFailedTaskFiles)) {
  1686. return;
  1687. }
  1688. }
  1689. }
  1690. synchronized (this) {
  1691. try {
  1692. if (runner != null) {
  1693. runner.close();
  1694. }
  1695. defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR +
  1696. JOBCACHE + Path.SEPARATOR +
  1697. task.getJobId() +
  1698. Path.SEPARATOR + taskId);
  1699. } catch (Throwable ie) {
  1700. LOG.info("Error cleaning up task runner: " +
  1701. StringUtils.stringifyException(ie));
  1702. }
  1703. }
  1704. }
  1705. public boolean equals(Object obj) {
  1706. return (obj instanceof TaskInProgress) &&
  1707. task.getTaskId().equals
  1708. (((TaskInProgress) obj).getTask().getTaskId());
  1709. }
  1710. public int hashCode() {
  1711. return task.getTaskId().hashCode();
  1712. }
  1713. }
  1714. // ///////////////////////////////////////////////////////////////
  1715. // TaskUmbilicalProtocol
  1716. /////////////////////////////////////////////////////////////////
  1717. /**
  1718. * Called upon startup by the child process, to fetch Task data.
  1719. */
  1720. public synchronized Task getTask(String taskid) throws IOException {
  1721. TaskInProgress tip = tasks.get(taskid);
  1722. if (tip != null) {
  1723. return tip.getTask();
  1724. } else {
  1725. return null;
  1726. }
  1727. }
  1728. /**
  1729. * Called periodically to report Task progress, from 0.0 to 1.0.
  1730. */
  1731. public synchronized boolean statusUpdate(String taskid,
  1732. TaskStatus taskStatus)
  1733. throws IOException {
  1734. TaskInProgress tip = tasks.get(taskid);
  1735. if (tip != null) {
  1736. tip.reportProgress(taskStatus);
  1737. return true;
  1738. } else {
  1739. LOG.warn("Progress from unknown child task: "+taskid);
  1740. return false;
  1741. }
  1742. }
  1743. /**
  1744. * Called when the task dies before completion, and we want to report back
  1745. * diagnostic info
  1746. */
  1747. public synchronized void reportDiagnosticInfo(String taskid, String info) throws IOException {
  1748. TaskInProgress tip = tasks.get(taskid);
  1749. if (tip != null) {
  1750. tip.reportDiagnosticInfo(info);
  1751. } else {
  1752. LOG.warn("Error from unknown child task: "+taskid+". Ignored.");
  1753. }
  1754. }
  1755. /** Child checking to see if we're alive. Normally does nothing.*/
  1756. public synchronized boolean ping(String taskid) throws IOException {
  1757. return tasks.get(taskid) != null;
  1758. }
  1759. /**
  1760. * The task is done.
  1761. */
  1762. public synchronized void done(String taskid) throws IOException {
  1763. TaskInProgress tip = tasks.get(taskid);
  1764. if (tip != null) {
  1765. tip.reportDone();
  1766. } else {
  1767. LOG.warn("Unknown child task done: "+taskid+". Ignored.");
  1768. }
  1769. }
  1770. /**
  1771. * A reduce-task failed to shuffle the map-outputs. Kill the task.
  1772. */
  1773. public synchronized void shuffleError(String taskId, String message)
  1774. throws IOException {
  1775. LOG.fatal("Task: " + taskId + " - Killed due to Shuffle Failure: " + message);
  1776. TaskInProgress tip = runningTasks.get(taskId);
  1777. tip.reportDiagnosticInfo("Shuffle Error: " + message);
  1778. purgeTask(tip, true);
  1779. }
  1780. /**
  1781. * A child task had a local filesystem error. Kill the task.
  1782. */
  1783. public synchronized void fsError(String taskId, String message)
  1784. throws IOException {
  1785. LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message);
  1786. TaskInProgress tip = runningTasks.get(taskId);
  1787. tip.reportDiagnosticInfo("FSError: " + message);
  1788. purgeTask(tip, true);
  1789. }
  1790. public TaskCompletionEvent[] getMapCompletionEvents(
  1791. String jobId, int fromEventId, int maxLocs) throws IOException {
  1792. TaskCompletionEvent[]mapEvents = TaskCompletionEvent.EMPTY_ARRAY;
  1793. RunningJob rjob;
  1794. synchronized (runningJobs) {
  1795. rjob = runningJobs.get(jobId);
  1796. if (rjob != null) {
  1797. synchronized (rjob) {
  1798. FetchStatus f = rjob.getFetchStatus();
  1799. if (f != null) {
  1800. mapEvents = f.getMapEvents(fromEventId, maxLocs);
  1801. }
  1802. }
  1803. }
  1804. }
  1805. return mapEvents;
  1806. }
  1807. /////////////////////////////////////////////////////
  1808. // Called by TaskTracker thread after task process ends
  1809. /////////////////////////////////////////////////////
  1810. /**
  1811. * The task is no longer running. It may not have completed successfully
  1812. */
  1813. void reportTaskFinished(String taskid) {
  1814. TaskInProgress tip;
  1815. synchronized (this) {
  1816. tip = tasks.get(taskid);
  1817. }
  1818. if (tip != null) {
  1819. tip.taskFinished();
  1820. synchronized(finishedCount) {
  1821. finishedCount[0]++;
  1822. finishedCount.notify();
  1823. }
  1824. } else {
  1825. LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");
  1826. }
  1827. }
  1828. /**
  1829. * A completed map task's output has been lost.
  1830. */
  1831. public synchronized void mapOutputLost(String taskid,
  1832. String errorMsg) throws IOException {
  1833. TaskInProgress tip = tasks.get(taskid);
  1834. if (tip != null) {
  1835. tip.mapOutputLost(errorMsg);
  1836. } else {
  1837. LOG.warn("Unknown child with bad map output: "+taskid+". Ignored.");
  1838. }
  1839. }
  1840. /**
  1841. * The datastructure for initializing a job
  1842. */
  1843. static class RunningJob{
  1844. private String jobid;
  1845. private Path jobFile;
  1846. // keep this for later use
  1847. Set<TaskInProgress> tasks;
  1848. boolean localized;
  1849. boolean keepJobFiles;
  1850. FetchStatus f;
  1851. RunningJob(String jobid, Path jobFile) {
  1852. this.jobid = jobid;
  1853. localized = false;
  1854. tasks = new HashSet<TaskInProgress>();
  1855. this.jobFile = jobFile;
  1856. keepJobFiles = false;
  1857. }
  1858. Path getJobFile() {
  1859. return jobFile;
  1860. }
  1861. String getJobId() {
  1862. return jobid;
  1863. }
  1864. void setFetchStatus(FetchStatus f) {
  1865. this.f = f;
  1866. }
  1867. FetchStatus getFetchStatus() {
  1868. return f;
  1869. }
  1870. }
  1871. /**
  1872. * The main() for child processes.
  1873. */
  1874. public static class Child {
  1875. public static void main(String[] args) throws Throwable {
  1876. //LogFactory.showTime(false);
  1877. LOG.debug("Child starting");
  1878. JobConf defaultConf = new JobConf();
  1879. String host = args[0];
  1880. int port = Integer.parseInt(args[1]);
  1881. InetSocketAddress address = new InetSocketAddress(host, port);
  1882. String taskid = args[2];
  1883. //set a very high idle timeout so that the connection is never closed
  1884. defaultConf.setInt("ipc.client.connection.maxidletime", 60*60*1000);
  1885. TaskUmbilicalProtocol umbilical =
  1886. (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
  1887. TaskUmbilicalProtocol.versionID,
  1888. address,
  1889. defaultConf);
  1890. Task task = umbilical.getTask(taskid);
  1891. JobConf job = new JobConf(task.getJobFile());
  1892. TaskLog.cleanup(job.getInt("mapred.userlog.retain.hours", 24));
  1893. task.setConf(job);
  1894. defaultConf.addResource(new Path(task.getJobFile()));
  1895. // Initiate Java VM metrics
  1896. JvmMetrics.init(task.getPhase().toString(), job.getSessionId());
  1897. try {
  1898. // use job-specified working directory
  1899. FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
  1900. task.run(job, umbilical); // run the task
  1901. } catch (FSError e) {
  1902. LOG.fatal("FSError from child", e);
  1903. umbilical.fsError(taskid, e.getMessage());
  1904. } catch (Throwable throwable) {
  1905. LOG.warn("Error running child", throwable);
  1906. // Report back any failures, for diagnostic purposes
  1907. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  1908. throwable.printStackTrace(new PrintStream(baos));
  1909. umbilical.reportDiagnosticInfo(taskid, baos.toString());
  1910. } finally {
  1911. MetricsContext metricsContext = MetricsUtil.getContext("mapred");
  1912. metricsContext.close();
  1913. // Shutting down log4j of the child-vm...
  1914. // This assumes that on return from Task.run()
  1915. // there is no more logging done.
  1916. LogManager.shutdown();
  1917. }
  1918. }
  1919. }
  1920. /**
  1921. * Get the name for this task tracker.
  1922. * @return the string like "tracker_mymachine:50010"
  1923. */
  1924. String getName() {
  1925. return taskTrackerName;
  1926. }
  1927. private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses(
  1928. boolean sendCounters) {
  1929. List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
  1930. for(TaskInProgress tip: runningTasks.values()) {
  1931. TaskStatus status = tip.getStatus();
  1932. status.setIncludeCounters(sendCounters);
  1933. // send counters for finished or failed tasks.
  1934. if (status.getRunState() != TaskStatus.State.RUNNING) {
  1935. status.setIncludeCounters(true);
  1936. }
  1937. result.add((TaskStatus)status.clone());
  1938. status.clearStatus();
  1939. }
  1940. return result;
  1941. }
  1942. /**
  1943. * Get the list of tasks that will be reported back to the
  1944. * job tracker in the next heartbeat cycle.
  1945. * @return a copy of the list of TaskStatus objects
  1946. */
  1947. synchronized List<TaskStatus> getRunningTaskStatuses() {
  1948. List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
  1949. for(TaskInProgress tip: runningTasks.values()) {
  1950. result.add(tip.getStatus());
  1951. }
  1952. return result;
  1953. }
  1954. /**
  1955. * Get the list of stored tasks on this task tracker.
  1956. * @return
  1957. */
  1958. synchronized List<TaskStatus> getNonRunningTasks() {
  1959. List<TaskStatus> result = new ArrayList<TaskStatus>(tasks.size());
  1960. for(Map.Entry<String, TaskInProgress> task: tasks.entrySet()) {
  1961. if (!runningTasks.containsKey(task.getKey())) {
  1962. result.add(task.getValue().getStatus());
  1963. }
  1964. }
  1965. return result;
  1966. }
  1967. /**
  1968. * Get the default job conf for this tracker.
  1969. */
  1970. JobConf getJobConf() {
  1971. return fConf;
  1972. }
  1973. /**
  1974. * Check if the given local directories
  1975. * (and parent directories, if necessary) can be created.
  1976. * @param localDirs where the new TaskTracker should keep its local files.
  1977. * @throws DiskErrorException if all local directories are not writable
  1978. */
  1979. private static void checkLocalDirs(String[] localDirs)
  1980. throws DiskErrorException {
  1981. boolean writable = false;
  1982. if (localDirs != null) {
  1983. for (int i = 0; i < localDirs.length; i++) {
  1984. try {
  1985. DiskChecker.checkDir(new File(localDirs[i]));
  1986. writable = true;
  1987. } catch(DiskErrorException e) {
  1988. LOG.warn("Task Tracker local " + e.getMessage());
  1989. }
  1990. }
  1991. }
  1992. if (!writable)
  1993. throw new DiskErrorException(
  1994. "all local directories are not writable");
  1995. }
  1996. /**
  1997. * Is this task tracker idle?
  1998. * @return has this task tracker finished and cleaned up all of its tasks?
  1999. */
  2000. public synchronized boolean isIdle() {
  2001. return tasks.isEmpty() && tasksToCleanup.isEmpty();
  2002. }
  2003. /**
  2004. * Start the TaskTracker, point toward the indicated JobTracker
  2005. */
  2006. public static void main(String argv[]) throws Exception {
  2007. StringUtils.startupShutdownMessage(TaskTracker.class, argv, LOG);
  2008. if (argv.length != 0) {
  2009. System.out.println("usage: TaskTracker");
  2010. System.exit(-1);
  2011. }
  2012. try {
  2013. JobConf conf=new JobConf();
  2014. // enable the server to track time spent waiting on locks
  2015. ReflectionUtils.setContentionTracing
  2016. (conf.getBoolean("tasktracker.contention.tracking", false));
  2017. new TaskTracker(conf).run();
  2018. } catch (Throwable e) {
  2019. LOG.error("Can not start task tracker because "+
  2020. StringUtils.stringifyException(e));
  2021. System.exit(-1);
  2022. }
  2023. }
  2024. /**
  2025. * This class is used in TaskTracker's Jetty to serve the map outputs
  2026. * to other nodes.
  2027. */
  2028. public static class MapOutputServlet extends HttpServlet {
  2029. private static final int MAX_BYTES_TO_READ = 64 * 1024;
  2030. public void doGet(HttpServletRequest request,
  2031. HttpServletResponse response
  2032. ) throws ServletException, IOException {
  2033. String mapId = request.getParameter("map");
  2034. String reduceId = request.getParameter("reduce");
  2035. if (mapId == null || reduceId == null) {
  2036. throw new IOException("map and reduce parameters are required");
  2037. }
  2038. ServletContext context = getServletContext();
  2039. int reduce = Integer.parseInt(reduceId);
  2040. byte[] buffer = new byte[MAX_BYTES_TO_READ];
  2041. // true iff IOException was caused by attempt to access input
  2042. boolean isInputException = true;
  2043. OutputStream outStream = null;
  2044. FSDataInputStream indexIn = null;
  2045. FSDataInputStream mapOutputIn = null;
  2046. ShuffleServerMetrics shuffleMetrics = (ShuffleServerMetrics)
  2047. context.getAttribute("shuffleServerMetrics");
  2048. try {
  2049. shuffleMetrics.serverHandlerBusy();
  2050. outStream = response.getOutputStream();
  2051. JobConf conf = (JobConf) context.getAttribute("conf");
  2052. LocalDirAllocator lDirAlloc =
  2053. (LocalDirAllocator)context.getAttribute("localDirAllocator");
  2054. FileSystem fileSys =
  2055. (FileSystem) context.getAttribute("local.file.system");
  2056. // Index file
  2057. Path indexFileName = lDirAlloc.getLocalPathToRead(
  2058. mapId+"/file.out.index", conf);
  2059. // Map-output file
  2060. Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
  2061. mapId+"/file.out", conf);
  2062. /**
  2063. * Read the index file to get the information about where
  2064. * the map-output for the given reducer is available.
  2065. */
  2066. //open index file
  2067. indexIn = fileSys.open(indexFileName);
  2068. //seek to the correct offset for the given reduce
  2069. indexIn.seek(reduce * 16);
  2070. //read the offset and length of the partition data
  2071. long startOffset = indexIn.readLong();
  2072. long partLength = indexIn.readLong();
  2073. indexIn.close();
  2074. indexIn = null;
  2075. //set the custom "Map-Output-Length" http header to
  2076. //the actual number of bytes being transferred
  2077. response.setHeader(MAP_OUTPUT_LENGTH, Long.toString(partLength));
  2078. //use the same buffersize as used for reading the data from disk
  2079. response.setBufferSize(MAX_BYTES_TO_READ);
  2080. /**
  2081. * Read the data from the sigle map-output file and
  2082. * send it to the reducer.
  2083. */
  2084. //open the map-output file
  2085. mapOutputIn = fileSys.open(mapOutputFileName);
  2086. //seek to the correct offset for the reduce
  2087. mapOutputIn.seek(startOffset);
  2088. long totalRead = 0;
  2089. int len = mapOutputIn.read(buffer, 0,
  2090. partLength < MAX_BYTES_TO_READ
  2091. ? (int)partLength : MAX_BYTES_TO_READ);
  2092. while (len > 0) {
  2093. try {
  2094. shuffleMetrics.outputBytes(len);
  2095. outStream.write(buffer, 0, len);
  2096. outStream.flush();
  2097. } catch (IOException ie) {
  2098. isInputException = false;
  2099. throw ie;
  2100. }
  2101. totalRead += len;
  2102. if (totalRead == partLength) break;
  2103. len = mapOutputIn.read(buffer, 0,
  2104. (partLength - totalRead) < MAX_BYTES_TO_READ
  2105. ? (int)(partLength - totalRead) : MAX_BYTES_TO_READ);
  2106. }
  2107. } catch (IOException ie) {
  2108. TaskTracker tracker =
  2109. (TaskTracker) context.getAttribute("task.tracker");
  2110. Log log = (Log) context.getAttribute("log");
  2111. String errorMsg = ("getMapOutput(" + mapId + "," + reduceId +
  2112. ") failed :\n"+
  2113. StringUtils.stringifyException(ie));
  2114. log.warn(errorMsg);
  2115. if (isInputException) {
  2116. tracker.mapOutputLost(mapId, errorMsg);
  2117. }
  2118. response.sendError(HttpServletResponse.SC_GONE, errorMsg);
  2119. shuffleMetrics.failedOutput();
  2120. throw ie;
  2121. } finally {
  2122. if (indexIn != null) {
  2123. indexIn.close();
  2124. }
  2125. if (mapOutputIn != null) {
  2126. mapOutputIn.close();
  2127. }
  2128. shuffleMetrics.serverHandlerFree();
  2129. }
  2130. outStream.close();
  2131. shuffleMetrics.successOutput();
  2132. }
  2133. }
  2134. }