JobInProgress.java 129 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.hadoop.mapred;
  19. import java.io.File;
  20. import java.io.FileOutputStream;
  21. import java.io.IOException;
  22. import java.net.UnknownHostException;
  23. import java.security.PrivilegedExceptionAction;
  24. import java.util.ArrayList;
  25. import java.util.Collection;
  26. import java.util.Comparator;
  27. import java.util.EnumMap;
  28. import java.util.HashMap;
  29. import java.util.HashSet;
  30. import java.util.IdentityHashMap;
  31. import java.util.Iterator;
  32. import java.util.LinkedHashSet;
  33. import java.util.LinkedList;
  34. import java.util.List;
  35. import java.util.Map;
  36. import java.util.Set;
  37. import java.util.TreeMap;
  38. import java.util.Vector;
  39. import java.util.concurrent.atomic.AtomicBoolean;
  40. import org.apache.commons.logging.Log;
  41. import org.apache.commons.logging.LogFactory;
  42. import org.apache.hadoop.classification.InterfaceAudience;
  43. import org.apache.hadoop.classification.InterfaceStability;
  44. import org.apache.hadoop.fs.FileSystem;
  45. import org.apache.hadoop.fs.LocalFileSystem;
  46. import org.apache.hadoop.fs.Path;
  47. import org.apache.hadoop.io.Text;
  48. import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
  49. import org.apache.hadoop.mapreduce.JobContext;
  50. import org.apache.hadoop.mapreduce.JobCounter;
  51. import org.apache.hadoop.mapreduce.JobSubmissionFiles;
  52. import org.apache.hadoop.mapreduce.MRJobConfig;
  53. import org.apache.hadoop.mapreduce.TaskType;
  54. import org.apache.hadoop.mapreduce.counters.LimitExceededException;
  55. import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
  56. import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
  57. import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
  58. import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
  59. import org.apache.hadoop.mapreduce.jobhistory.JobPriorityChangeEvent;
  60. import org.apache.hadoop.mapreduce.jobhistory.JobStatusChangedEvent;
  61. import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
  62. import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
  63. import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
  64. import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
  65. import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
  66. import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
  67. import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
  68. import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
  69. import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
  70. import org.apache.hadoop.mapreduce.security.TokenCache;
  71. import org.apache.hadoop.security.Credentials;
  72. import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
  73. import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
  74. import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
  75. import org.apache.hadoop.mapreduce.split.JobSplit;
  76. import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
  77. import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
  78. import org.apache.hadoop.mapreduce.task.JobContextImpl;
  79. import org.apache.hadoop.net.NetUtils;
  80. import org.apache.hadoop.net.NetworkTopology;
  81. import org.apache.hadoop.net.Node;
  82. import org.apache.hadoop.security.UserGroupInformation;
  83. import org.apache.hadoop.security.token.Token;
  84. import org.apache.hadoop.security.token.TokenIdentifier;
  85. import org.apache.hadoop.util.StringUtils;
  86. /**
  87. * JobInProgress maintains all the info for keeping a Job on the straight and
  88. * narrow. It keeps its JobProfile and its latest JobStatus, plus a set of
  89. * tables for doing bookkeeping of its Tasks.
  90. */
  91. @InterfaceAudience.LimitedPrivate({"MapReduce"})
  92. @InterfaceStability.Unstable
  93. public class JobInProgress {
  94. /**
  95. * Used when the a kill is issued to a job which is initializing.
  96. */
  97. static class KillInterruptedException extends InterruptedException {
  98. private static final long serialVersionUID = 1L;
  99. public KillInterruptedException(String msg) {
  100. super(msg);
  101. }
  102. }
  103. static final Log LOG = LogFactory.getLog(JobInProgress.class);
  104. JobProfile profile;
  105. JobStatus status;
  106. Path jobFile = null;
  107. Path localJobFile = null;
  108. TaskInProgress maps[] = new TaskInProgress[0];
  109. TaskInProgress reduces[] = new TaskInProgress[0];
  110. TaskInProgress cleanup[] = new TaskInProgress[0];
  111. TaskInProgress setup[] = new TaskInProgress[0];
  112. int numMapTasks = 0;
  113. int numReduceTasks = 0;
  114. final long memoryPerMap;
  115. final long memoryPerReduce;
  116. volatile int numSlotsPerMap = 1;
  117. volatile int numSlotsPerReduce = 1;
  118. final int maxTaskFailuresPerTracker;
  119. // Counters to track currently running/finished/failed Map/Reduce task-attempts
  120. int runningMapTasks = 0;
  121. int runningReduceTasks = 0;
  122. int finishedMapTasks = 0;
  123. int finishedReduceTasks = 0;
  124. int failedMapTasks = 0;
  125. int failedReduceTasks = 0;
  126. static final float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
  127. int completedMapsForReduceSlowstart = 0;
  128. // runningMapTasks include speculative tasks, so we need to capture
  129. // speculative tasks separately
  130. int speculativeMapTasks = 0;
  131. int speculativeReduceTasks = 0;
  132. int mapFailuresPercent = 0;
  133. int reduceFailuresPercent = 0;
  134. int failedMapTIPs = 0;
  135. int failedReduceTIPs = 0;
  136. private volatile boolean launchedCleanup = false;
  137. private volatile boolean launchedSetup = false;
  138. private volatile boolean jobKilled = false;
  139. private volatile boolean jobFailed = false;
  140. private final boolean jobSetupCleanupNeeded;
  141. private final boolean taskCleanupNeeded;
  142. JobPriority priority = JobPriority.NORMAL;
  143. protected JobTracker jobtracker;
  144. protected Credentials tokenStorage;
  145. JobHistory jobHistory;
  146. // NetworkTopology Node to the set of TIPs
  147. Map<Node, List<TaskInProgress>> nonRunningMapCache;
  148. // Map of NetworkTopology Node to set of running TIPs
  149. Map<Node, Set<TaskInProgress>> runningMapCache;
  150. // A list of non-local non-running maps
  151. List<TaskInProgress> nonLocalMaps;
  152. // A set of non-local running maps
  153. Set<TaskInProgress> nonLocalRunningMaps;
  154. // A list of non-running reduce TIPs
  155. List<TaskInProgress> nonRunningReduces;
  156. // A set of running reduce TIPs
  157. Set<TaskInProgress> runningReduces;
  158. // A list of cleanup tasks for the map task attempts, to be launched
  159. List<TaskAttemptID> mapCleanupTasks = new LinkedList<TaskAttemptID>();
  160. // A list of cleanup tasks for the reduce task attempts, to be launched
  161. List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();
  162. int maxLevel;
  163. /**
  164. * A special value indicating that
  165. * {@link #findNewMapTask(TaskTrackerStatus, int, int, int, double)} should
  166. * schedule any available map tasks for this job, including speculative tasks.
  167. */
  168. int anyCacheLevel;
  169. /**
  170. * A special value indicating that
  171. * {@link #findNewMapTask(TaskTrackerStatus, int, int, int, double)} should
  172. * schedule any only off-switch and speculative map tasks for this job.
  173. */
  174. private static final int NON_LOCAL_CACHE_LEVEL = -1;
  175. private int taskCompletionEventTracker = 0;
  176. List<TaskCompletionEvent> taskCompletionEvents;
  177. // The maximum percentage of trackers in cluster added to the 'blacklist'.
  178. private static final double CLUSTER_BLACKLIST_PERCENT = 0.25;
  179. // The maximum percentage of fetch failures allowed for a map
  180. private static final double MAX_ALLOWED_FETCH_FAILURES_PERCENT = 0.5;
  181. // No. of tasktrackers in the cluster
  182. private volatile int clusterSize = 0;
  183. // The no. of tasktrackers where >= conf.getMaxTaskFailuresPerTracker()
  184. // tasks have failed
  185. private volatile int flakyTaskTrackers = 0;
  186. // Map of trackerHostName -> no. of task failures
  187. private Map<String, Integer> trackerToFailuresMap =
  188. new TreeMap<String, Integer>();
  189. //Confine estimation algorithms to an "oracle" class that JIP queries.
  190. ResourceEstimator resourceEstimator;
  191. long startTime;
  192. long launchTime;
  193. long finishTime;
  194. // First *task launch times
  195. final Map<TaskType, Long> firstTaskLaunchTimes =
  196. new EnumMap<TaskType, Long>(TaskType.class);
  197. // Indicates how many times the job got restarted
  198. private final int restartCount;
  199. JobConf conf;
  200. protected AtomicBoolean tasksInited = new AtomicBoolean(false);
  201. private JobInitKillStatus jobInitKillStatus = new JobInitKillStatus();
  202. LocalFileSystem localFs;
  203. FileSystem fs;
  204. String user;
  205. JobID jobId;
  206. volatile private boolean hasSpeculativeMaps;
  207. volatile private boolean hasSpeculativeReduces;
  208. long inputLength = 0;
  209. Counters jobCounters = new Counters();
  210. // Maximum no. of fetch-failure notifications after which map task is killed
  211. private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
  212. // Don't lower speculativeCap below one TT's worth (for small clusters)
  213. private static final int MIN_SPEC_CAP = 10;
  214. private static final float MIN_SLOTS_CAP = 0.01f;
  215. // Map of mapTaskId -> no. of fetch failures
  216. private Map<TaskAttemptID, Integer> mapTaskIdToFetchFailuresMap =
  217. new TreeMap<TaskAttemptID, Integer>();
  218. private Object schedulingInfo;
  219. private String submitHostName;
  220. private String submitHostAddress;
  221. //thresholds for speculative execution
  222. float slowTaskThreshold;
  223. float speculativeCap;
  224. float slowNodeThreshold; //standard deviations
  225. //Statistics are maintained for a couple of things
  226. //mapTaskStats is used for maintaining statistics about
  227. //the completion time of map tasks on the trackers. On a per
  228. //tracker basis, the mean time for task completion is maintained
  229. private DataStatistics mapTaskStats = new DataStatistics();
  230. //reduceTaskStats is used for maintaining statistics about
  231. //the completion time of reduce tasks on the trackers. On a per
  232. //tracker basis, the mean time for task completion is maintained
  233. private DataStatistics reduceTaskStats = new DataStatistics();
  234. //trackerMapStats used to maintain a mapping from the tracker to the
  235. //the statistics about completion time of map tasks
  236. private Map<String,DataStatistics> trackerMapStats =
  237. new HashMap<String,DataStatistics>();
  238. //trackerReduceStats used to maintain a mapping from the tracker to the
  239. //the statistics about completion time of reduce tasks
  240. private Map<String,DataStatistics> trackerReduceStats =
  241. new HashMap<String,DataStatistics>();
  242. //runningMapStats used to maintain the RUNNING map tasks' statistics
  243. private DataStatistics runningMapTaskStats = new DataStatistics();
  244. //runningReduceStats used to maintain the RUNNING reduce tasks' statistics
  245. private DataStatistics runningReduceTaskStats = new DataStatistics();
  246. private static class FallowSlotInfo {
  247. long timestamp;
  248. int numSlots;
  249. public FallowSlotInfo(long timestamp, int numSlots) {
  250. this.timestamp = timestamp;
  251. this.numSlots = numSlots;
  252. }
  253. public long getTimestamp() {
  254. return timestamp;
  255. }
  256. public void setTimestamp(long timestamp) {
  257. this.timestamp = timestamp;
  258. }
  259. public int getNumSlots() {
  260. return numSlots;
  261. }
  262. public void setNumSlots(int numSlots) {
  263. this.numSlots = numSlots;
  264. }
  265. }
  266. private Map<TaskTracker, FallowSlotInfo> trackersReservedForMaps =
  267. new HashMap<TaskTracker, FallowSlotInfo>();
  268. private Map<TaskTracker, FallowSlotInfo> trackersReservedForReduces =
  269. new HashMap<TaskTracker, FallowSlotInfo>();
  270. private Path jobSubmitDir = null;
  271. /**
  272. * Create an almost empty JobInProgress, which can be used only for tests
  273. */
  274. protected JobInProgress(JobID jobid, JobConf conf, JobTracker tracker) {
  275. this.conf = conf;
  276. this.jobId = jobid;
  277. this.numMapTasks = conf.getNumMapTasks();
  278. this.numReduceTasks = conf.getNumReduceTasks();
  279. this.maxLevel = NetworkTopology.DEFAULT_HOST_LEVEL;
  280. this.anyCacheLevel = this.maxLevel+1;
  281. this.jobtracker = tracker;
  282. this.restartCount = 0;
  283. this.profile = new JobProfile(conf.getUser(), jobid, "", "",
  284. conf.getJobName(),conf.getQueueName());
  285. this.memoryPerMap = conf.getMemoryForMapTask();
  286. this.memoryPerReduce = conf.getMemoryForReduceTask();
  287. this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
  288. hasSpeculativeMaps = conf.getMapSpeculativeExecution();
  289. hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
  290. this.nonLocalMaps = new LinkedList<TaskInProgress>();
  291. this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
  292. this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
  293. this.nonRunningReduces = new LinkedList<TaskInProgress>();
  294. this.runningReduces = new LinkedHashSet<TaskInProgress>();
  295. this.resourceEstimator = new ResourceEstimator(this);
  296. this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP,
  297. this.profile.getUser(), this.profile.getJobName(),
  298. this.profile.getJobFile(), "");
  299. this.jobtracker.getInstrumentation().addPrepJob(conf, jobid);
  300. this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
  301. (numMapTasks + numReduceTasks + 10);
  302. this.slowTaskThreshold = Math.max(0.0f,
  303. conf.getFloat(MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD,1.0f));
  304. this.speculativeCap = conf.getFloat(
  305. MRJobConfig.SPECULATIVECAP,0.1f);
  306. this.slowNodeThreshold = conf.getFloat(
  307. MRJobConfig.SPECULATIVE_SLOWNODE_THRESHOLD,1.0f);
  308. this.jobSetupCleanupNeeded = conf.getBoolean(
  309. MRJobConfig.SETUP_CLEANUP_NEEDED, true);
  310. this.taskCleanupNeeded = conf.getBoolean(
  311. MRJobConfig.TASK_CLEANUP_NEEDED, true);
  312. if (tracker != null) { // Some mock tests have null tracker
  313. this.jobHistory = tracker.getJobHistory();
  314. }
  315. this.tokenStorage = null;
  316. }
  317. JobInProgress(JobConf conf) {
  318. restartCount = 0;
  319. jobSetupCleanupNeeded = false;
  320. taskCleanupNeeded = true;
  321. this.memoryPerMap = conf.getMemoryForMapTask();
  322. this.memoryPerReduce = conf.getMemoryForReduceTask();
  323. this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
  324. }
  325. /**
  326. * Create a JobInProgress with the given job file, plus a handle
  327. * to the tracker.
  328. */
  329. public JobInProgress(JobTracker jobtracker,
  330. final JobConf default_conf, int rCount,
  331. JobInfo jobInfo,
  332. Credentials ts
  333. ) throws IOException, InterruptedException {
  334. try {
  335. this.restartCount = rCount;
  336. this.jobId = JobID.downgrade(jobInfo.getJobID());
  337. String url = "http://" + jobtracker.getJobTrackerMachine() + ":"
  338. + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + this.jobId;
  339. this.jobtracker = jobtracker;
  340. this.jobHistory = jobtracker.getJobHistory();
  341. this.startTime = System.currentTimeMillis();
  342. this.localFs = jobtracker.getLocalFileSystem();
  343. this.tokenStorage = ts;
  344. // use the user supplied token to add user credentials to the conf
  345. jobSubmitDir = jobInfo.getJobSubmitDir();
  346. user = jobInfo.getUser().toString();
  347. UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
  348. if (ts != null) {
  349. for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
  350. ugi.addToken(token);
  351. }
  352. }
  353. fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
  354. public FileSystem run() throws IOException {
  355. return jobSubmitDir.getFileSystem(default_conf);
  356. }
  357. });
  358. this.localJobFile = default_conf.getLocalPath(JobTracker.SUBDIR + "/"
  359. + this.jobId + ".xml");
  360. jobFile = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
  361. fs.copyToLocalFile(jobFile, localJobFile);
  362. conf = new JobConf(localJobFile);
  363. if (conf.getUser() == null) {
  364. this.conf.setUser(user);
  365. }
  366. if (!conf.getUser().equals(user)) {
  367. String desc = "The username " + conf.getUser() + " obtained from the "
  368. + "conf doesn't match the username " + user + " the user "
  369. + "authenticated as";
  370. AuditLogger.logFailure(user, Operation.SUBMIT_JOB.name(),
  371. conf.getUser(), jobId.toString(), desc);
  372. throw new IOException(desc);
  373. }
  374. String userGroups[] = ugi.getGroupNames();
  375. String primaryGroup = (userGroups.length > 0) ? userGroups[0] : null;
  376. if (primaryGroup != null) {
  377. conf.set("group.name", primaryGroup);
  378. }
  379. this.priority = conf.getJobPriority();
  380. this.profile = new JobProfile(conf.getUser(), this.jobId, jobFile
  381. .toString(), url, conf.getJobName(), conf.getQueueName());
  382. this.status = new JobStatus(this.jobId, 0.0f, 0.0f, JobStatus.PREP,
  383. profile.getUser(), profile.getJobName(), profile.getJobFile(),
  384. profile.getURL().toString());
  385. this.jobtracker.getInstrumentation().addPrepJob(conf, this.jobId);
  386. status.setStartTime(startTime);
  387. this.status.setJobPriority(this.priority);
  388. this.numMapTasks = conf.getNumMapTasks();
  389. this.numReduceTasks = conf.getNumReduceTasks();
  390. this.memoryPerMap = conf.getMemoryForMapTask();
  391. this.memoryPerReduce = conf.getMemoryForReduceTask();
  392. this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(
  393. numMapTasks + numReduceTasks + 10);
  394. JobContext jobContext = new JobContextImpl(conf, jobId);
  395. this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded();
  396. this.taskCleanupNeeded = jobContext.getTaskCleanupNeeded();
  397. // Construct the jobACLs
  398. status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf));
  399. this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
  400. this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
  401. this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
  402. hasSpeculativeMaps = conf.getMapSpeculativeExecution();
  403. hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
  404. this.maxLevel = jobtracker.getNumTaskCacheLevels();
  405. this.anyCacheLevel = this.maxLevel + 1;
  406. this.nonLocalMaps = new LinkedList<TaskInProgress>();
  407. this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
  408. this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
  409. this.nonRunningReduces = new LinkedList<TaskInProgress>();
  410. this.runningReduces = new LinkedHashSet<TaskInProgress>();
  411. this.resourceEstimator = new ResourceEstimator(this);
  412. this.submitHostName = conf.getJobSubmitHostName();
  413. this.submitHostAddress = conf.getJobSubmitHostAddress();
  414. this.slowTaskThreshold = Math.max(0.0f, conf.getFloat(
  415. MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD, 1.0f));
  416. this.speculativeCap = conf.getFloat(MRJobConfig.SPECULATIVECAP, 0.1f);
  417. this.slowNodeThreshold = conf.getFloat(
  418. MRJobConfig.SPECULATIVE_SLOWNODE_THRESHOLD, 1.0f);
  419. // register job's tokens for renewal
  420. DelegationTokenRenewal.registerDelegationTokensForRenewal(jobInfo
  421. .getJobID(), ts, jobtracker.getConf());
  422. } finally {
  423. // close all FileSystems that was created above for the current user
  424. // At this point, this constructor is called in the context of an RPC, and
  425. // hence the "current user" is actually referring to the kerberos
  426. // authenticated user (if security is ON).
  427. FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
  428. }
  429. }
  430. private void printCache (Map<Node, List<TaskInProgress>> cache) {
  431. LOG.info("The taskcache info:");
  432. for (Map.Entry<Node, List<TaskInProgress>> n : cache.entrySet()) {
  433. List <TaskInProgress> tips = n.getValue();
  434. LOG.info("Cached TIPs on node: " + n.getKey());
  435. for (TaskInProgress tip : tips) {
  436. LOG.info("tip : " + tip.getTIPId());
  437. }
  438. }
  439. }
  440. Map<Node, List<TaskInProgress>> createCache(
  441. TaskSplitMetaInfo[] splits, int maxLevel) {
  442. Map<Node, List<TaskInProgress>> cache =
  443. new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
  444. for (int i = 0; i < splits.length; i++) {
  445. String[] splitLocations = splits[i].getLocations();
  446. if (splitLocations.length == 0) {
  447. nonLocalMaps.add(maps[i]);
  448. continue;
  449. }
  450. for(String host: splitLocations) {
  451. Node node = jobtracker.resolveAndAddToTopology(host);
  452. LOG.info("tip:" + maps[i].getTIPId() + " has split on node:" + node);
  453. for (int j = 0; j < maxLevel; j++) {
  454. List<TaskInProgress> hostMaps = cache.get(node);
  455. if (hostMaps == null) {
  456. hostMaps = new ArrayList<TaskInProgress>();
  457. cache.put(node, hostMaps);
  458. hostMaps.add(maps[i]);
  459. }
  460. //check whether the hostMaps already contains an entry for a TIP
  461. //This will be true for nodes that are racks and multiple nodes in
  462. //the rack contain the input for a tip. Note that if it already
  463. //exists in the hostMaps, it must be the last element there since
  464. //we process one TIP at a time sequentially in the split-size order
  465. if (hostMaps.get(hostMaps.size() - 1) != maps[i]) {
  466. hostMaps.add(maps[i]);
  467. }
  468. node = node.getParent();
  469. }
  470. }
  471. }
  472. return cache;
  473. }
  474. /**
  475. * Check if the job has been initialized.
  476. * @return <code>true</code> if the job has been initialized,
  477. * <code>false</code> otherwise
  478. */
  479. public boolean inited() {
  480. return tasksInited.get();
  481. }
  482. /**
  483. * Get the user for the job
  484. */
  485. public String getUser() {
  486. return user;
  487. }
  488. boolean getMapSpeculativeExecution() {
  489. return hasSpeculativeMaps;
  490. }
  491. boolean getReduceSpeculativeExecution() {
  492. return hasSpeculativeReduces;
  493. }
  494. long getMemoryForMapTask() {
  495. return memoryPerMap;
  496. }
  497. long getMemoryForReduceTask() {
  498. return memoryPerReduce;
  499. }
  500. /**
  501. * Get the number of slots required to run a single map task-attempt.
  502. * @return the number of slots required to run a single map task-attempt
  503. */
  504. int getNumSlotsPerMap() {
  505. return numSlotsPerMap;
  506. }
  507. /**
  508. * Set the number of slots required to run a single map task-attempt.
  509. * This is typically set by schedulers which support high-ram jobs.
  510. * @param slots the number of slots required to run a single map task-attempt
  511. */
  512. void setNumSlotsPerMap(int numSlotsPerMap) {
  513. this.numSlotsPerMap = numSlotsPerMap;
  514. }
  515. /**
  516. * Get the number of slots required to run a single reduce task-attempt.
  517. * @return the number of slots required to run a single reduce task-attempt
  518. */
  519. int getNumSlotsPerReduce() {
  520. return numSlotsPerReduce;
  521. }
  522. /**
  523. * Set the number of slots required to run a single reduce task-attempt.
  524. * This is typically set by schedulers which support high-ram jobs.
  525. * @param slots the number of slots required to run a single reduce
  526. * task-attempt
  527. */
  528. void setNumSlotsPerReduce(int numSlotsPerReduce) {
  529. this.numSlotsPerReduce = numSlotsPerReduce;
  530. }
  531. /**
  532. * Construct the splits, etc. This is invoked from an async
  533. * thread so that split-computation doesn't block anyone. Only the
  534. * {@link JobTracker} should invoke this api. Look
  535. * at {@link JobTracker#initJob(JobInProgress)} for more details.
  536. */
  537. public synchronized void initTasks()
  538. throws IOException, KillInterruptedException, UnknownHostException {
  539. if (tasksInited.get() || isComplete()) {
  540. return;
  541. }
  542. synchronized(jobInitKillStatus){
  543. if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) {
  544. return;
  545. }
  546. jobInitKillStatus.initStarted = true;
  547. }
  548. LOG.info("Initializing " + jobId);
  549. logSubmissionToJobHistory();
  550. // log the job priority
  551. setPriority(this.priority);
  552. //
  553. // generate security keys needed by Tasks
  554. //
  555. generateAndStoreTokens();
  556. //
  557. // read input splits and create a map per a split
  558. //
  559. TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(jobId);
  560. numMapTasks = taskSplitMetaInfo.length;
  561. checkTaskLimits();
  562. // Sanity check the locations so we don't create/initialize unnecessary tasks
  563. for (TaskSplitMetaInfo split : taskSplitMetaInfo) {
  564. NetUtils.verifyHostnames(split.getLocations());
  565. }
  566. jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
  567. jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
  568. createMapTasks(jobFile.toString(), taskSplitMetaInfo);
  569. if (numMapTasks > 0) {
  570. nonRunningMapCache = createCache(taskSplitMetaInfo,
  571. maxLevel);
  572. }
  573. // set the launch time
  574. this.launchTime = JobTracker.getClock().getTime();
  575. createReduceTasks(jobFile.toString());
  576. // Calculate the minimum number of maps to be complete before
  577. // we should start scheduling reduces
  578. completedMapsForReduceSlowstart =
  579. (int)Math.ceil(
  580. (conf.getFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
  581. DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) *
  582. numMapTasks));
  583. initSetupCleanupTasks(jobFile.toString());
  584. synchronized(jobInitKillStatus){
  585. jobInitKillStatus.initDone = true;
  586. if(jobInitKillStatus.killed) {
  587. //setup not launched so directly terminate
  588. throw new KillInterruptedException("Job " + jobId + " killed in init");
  589. }
  590. }
  591. tasksInited.set(true);
  592. JobInitedEvent jie = new JobInitedEvent(
  593. profile.getJobID(), this.launchTime,
  594. numMapTasks, numReduceTasks,
  595. JobStatus.getJobRunState(JobStatus.PREP),
  596. false);
  597. jobHistory.logEvent(jie, jobId);
  598. // Log the number of map and reduce tasks
  599. LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks
  600. + " map tasks and " + numReduceTasks + " reduce tasks.");
  601. }
  602. // Returns true if the job is empty (0 maps, 0 reduces and no setup-cleanup)
  603. // else return false.
  604. synchronized boolean isJobEmpty() {
  605. return maps.length == 0 && reduces.length == 0 && !jobSetupCleanupNeeded;
  606. }
  607. synchronized boolean isSetupCleanupRequired() {
  608. return jobSetupCleanupNeeded;
  609. }
  610. // Should be called once the init is done. This will complete the job
  611. // because the job is empty (0 maps, 0 reduces and no setup-cleanup).
  612. synchronized void completeEmptyJob() {
  613. jobComplete();
  614. }
  615. synchronized void completeSetup() {
  616. setupComplete();
  617. }
  618. void logSubmissionToJobHistory() throws IOException {
  619. // log job info
  620. String username = conf.getUser();
  621. if (username == null) { username = ""; }
  622. String jobname = conf.getJobName();
  623. String jobQueueName = conf.getQueueName();
  624. setUpLocalizedJobConf(conf, jobId);
  625. jobHistory.setupEventWriter(jobId, conf);
  626. JobSubmittedEvent jse =
  627. new JobSubmittedEvent(jobId, jobname, username, this.startTime,
  628. jobFile.toString(), status.getJobACLs(), jobQueueName);
  629. jobHistory.logEvent(jse, jobId);
  630. }
  631. TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId)
  632. throws IOException {
  633. TaskSplitMetaInfo[] allTaskSplitMetaInfo =
  634. SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, conf, jobSubmitDir);
  635. return allTaskSplitMetaInfo;
  636. }
  637. /**
  638. * If the number of taks is greater than the configured value
  639. * throw an exception that will fail job initialization
  640. */
  641. void checkTaskLimits() throws IOException {
  642. int maxTasks = jobtracker.getMaxTasksPerJob();
  643. if (maxTasks > 0 && numMapTasks + numReduceTasks > maxTasks) {
  644. throw new IOException(
  645. "The number of tasks for this job " +
  646. (numMapTasks + numReduceTasks) +
  647. " exceeds the configured limit " + maxTasks);
  648. }
  649. }
  650. synchronized void createMapTasks(String jobFile,
  651. TaskSplitMetaInfo[] splits) {
  652. maps = new TaskInProgress[numMapTasks];
  653. for(int i=0; i < numMapTasks; ++i) {
  654. inputLength += splits[i].getInputDataLength();
  655. maps[i] = new TaskInProgress(jobId, jobFile,
  656. splits[i],
  657. jobtracker, conf, this,
  658. i, numSlotsPerMap);
  659. }
  660. LOG.info("Input size for job " + jobId + " = " + inputLength
  661. + ". Number of splits = " + splits.length);
  662. }
  663. synchronized void createReduceTasks(String jobFile) {
  664. this.reduces = new TaskInProgress[numReduceTasks];
  665. for (int i = 0; i < numReduceTasks; i++) {
  666. reduces[i] = new TaskInProgress(jobId, jobFile,
  667. numMapTasks, i,
  668. jobtracker, conf,
  669. this, numSlotsPerReduce);
  670. nonRunningReduces.add(reduces[i]);
  671. }
  672. }
  673. synchronized void initSetupCleanupTasks(String jobFile) {
  674. if (!jobSetupCleanupNeeded) {
  675. LOG.info("Setup/Cleanup not needed for job " + jobId);
  676. // nothing to initialize
  677. return;
  678. }
  679. // create cleanup two cleanup tips, one map and one reduce.
  680. cleanup = new TaskInProgress[2];
  681. // cleanup map tip. This map doesn't use any splits. Just assign an empty
  682. // split.
  683. TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
  684. cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
  685. jobtracker, conf, this, numMapTasks, 1);
  686. cleanup[0].setJobCleanupTask();
  687. // cleanup reduce tip.
  688. cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
  689. numReduceTasks, jobtracker, conf, this, 1);
  690. cleanup[1].setJobCleanupTask();
  691. // create two setup tips, one map and one reduce.
  692. setup = new TaskInProgress[2];
  693. // setup map tip. This map doesn't use any split. Just assign an empty
  694. // split.
  695. setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
  696. jobtracker, conf, this, numMapTasks + 1, 1);
  697. setup[0].setJobSetupTask();
  698. // setup reduce tip.
  699. setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
  700. numReduceTasks + 1, jobtracker, conf, this, 1);
  701. setup[1].setJobSetupTask();
  702. }
  703. void setupComplete() {
  704. status.setSetupProgress(1.0f);
  705. if (this.status.getRunState() == JobStatus.PREP) {
  706. changeStateTo(JobStatus.RUNNING);
  707. JobStatusChangedEvent jse =
  708. new JobStatusChangedEvent(profile.getJobID(),
  709. JobStatus.getJobRunState(JobStatus.RUNNING));
  710. jobHistory.logEvent(jse, profile.getJobID());
  711. }
  712. }
  713. /////////////////////////////////////////////////////
  714. // Accessors for the JobInProgress
  715. /////////////////////////////////////////////////////
  716. public JobProfile getProfile() {
  717. return profile;
  718. }
  719. public JobStatus getStatus() {
  720. return status;
  721. }
  722. public synchronized long getLaunchTime() {
  723. return launchTime;
  724. }
  725. Map<TaskType, Long> getFirstTaskLaunchTimes() {
  726. return firstTaskLaunchTimes;
  727. }
  728. public long getStartTime() {
  729. return startTime;
  730. }
  731. public long getFinishTime() {
  732. return finishTime;
  733. }
  734. public int desiredMaps() {
  735. return numMapTasks;
  736. }
  737. public synchronized int finishedMaps() {
  738. return finishedMapTasks;
  739. }
  740. public int desiredReduces() {
  741. return numReduceTasks;
  742. }
  743. public synchronized int runningMaps() {
  744. return runningMapTasks;
  745. }
  746. public synchronized int runningReduces() {
  747. return runningReduceTasks;
  748. }
  749. public synchronized int finishedReduces() {
  750. return finishedReduceTasks;
  751. }
  752. public synchronized int pendingMaps() {
  753. return numMapTasks - runningMapTasks - failedMapTIPs -
  754. finishedMapTasks + speculativeMapTasks;
  755. }
  756. public synchronized int pendingReduces() {
  757. return numReduceTasks - runningReduceTasks - failedReduceTIPs -
  758. finishedReduceTasks + speculativeReduceTasks;
  759. }
  760. public int getNumSlotsPerTask(TaskType taskType) {
  761. if (taskType == TaskType.MAP) {
  762. return numSlotsPerMap;
  763. } else if (taskType == TaskType.REDUCE) {
  764. return numSlotsPerReduce;
  765. } else {
  766. return 1;
  767. }
  768. }
  769. public JobPriority getPriority() {
  770. return this.priority;
  771. }
  772. public void setPriority(JobPriority priority) {
  773. if(priority == null) {
  774. priority = JobPriority.NORMAL;
  775. }
  776. synchronized (this) {
  777. this.priority = priority;
  778. status.setJobPriority(priority);
  779. // log and change to the job's priority
  780. JobPriorityChangeEvent prEvent =
  781. new JobPriorityChangeEvent(jobId, priority);
  782. jobHistory.logEvent(prEvent, jobId);
  783. }
  784. }
  785. // Update the job start/launch time (upon restart) and log to history
  786. synchronized void updateJobInfo(long startTime, long launchTime) {
  787. // log and change to the job's start/launch time
  788. this.startTime = startTime;
  789. this.launchTime = launchTime;
  790. JobInfoChangeEvent event =
  791. new JobInfoChangeEvent(jobId, startTime, launchTime);
  792. jobHistory.logEvent(event, jobId);
  793. }
  794. /**
  795. * Get the number of times the job has restarted
  796. */
  797. int getNumRestarts() {
  798. return restartCount;
  799. }
  800. long getInputLength() {
  801. return inputLength;
  802. }
  803. boolean isCleanupLaunched() {
  804. return launchedCleanup;
  805. }
  806. boolean isSetupLaunched() {
  807. return launchedSetup;
  808. }
  809. /**
  810. * Get all the tasks of the desired type in this job.
  811. * @param type {@link TaskType} of the tasks required
  812. * @return An array of {@link TaskInProgress} matching the given type.
  813. * Returns an empty array if no tasks are found for the given type.
  814. */
  815. TaskInProgress[] getTasks(TaskType type) {
  816. TaskInProgress[] tasks = null;
  817. switch (type) {
  818. case MAP:
  819. {
  820. tasks = maps;
  821. }
  822. break;
  823. case REDUCE:
  824. {
  825. tasks = reduces;
  826. }
  827. break;
  828. case JOB_SETUP:
  829. {
  830. tasks = setup;
  831. }
  832. break;
  833. case JOB_CLEANUP:
  834. {
  835. tasks = cleanup;
  836. }
  837. break;
  838. default:
  839. {
  840. tasks = new TaskInProgress[0];
  841. }
  842. break;
  843. }
  844. return tasks;
  845. }
  846. /**
  847. * Return the nonLocalRunningMaps
  848. * @return
  849. */
  850. Set<TaskInProgress> getNonLocalRunningMaps()
  851. {
  852. return nonLocalRunningMaps;
  853. }
  854. /**
  855. * Return the runningMapCache
  856. * @return
  857. */
  858. Map<Node, Set<TaskInProgress>> getRunningMapCache()
  859. {
  860. return runningMapCache;
  861. }
  862. /**
  863. * Return runningReduces
  864. * @return
  865. */
  866. Set<TaskInProgress> getRunningReduces()
  867. {
  868. return runningReduces;
  869. }
  870. /**
  871. * Get the job configuration
  872. * @return the job's configuration
  873. */
  874. JobConf getJobConf() {
  875. return conf;
  876. }
  877. /**
  878. * Return a vector of completed TaskInProgress objects
  879. */
  880. public synchronized Vector<TaskInProgress> reportTasksInProgress(boolean shouldBeMap,
  881. boolean shouldBeComplete) {
  882. Vector<TaskInProgress> results = new Vector<TaskInProgress>();
  883. TaskInProgress tips[] = null;
  884. if (shouldBeMap) {
  885. tips = maps;
  886. } else {
  887. tips = reduces;
  888. }
  889. for (int i = 0; i < tips.length; i++) {
  890. if (tips[i].isComplete() == shouldBeComplete) {
  891. results.add(tips[i]);
  892. }
  893. }
  894. return results;
  895. }
  896. /**
  897. * Return a vector of cleanup TaskInProgress objects
  898. */
  899. public synchronized Vector<TaskInProgress> reportCleanupTIPs(
  900. boolean shouldBeComplete) {
  901. Vector<TaskInProgress> results = new Vector<TaskInProgress>();
  902. for (int i = 0; i < cleanup.length; i++) {
  903. if (cleanup[i].isComplete() == shouldBeComplete) {
  904. results.add(cleanup[i]);
  905. }
  906. }
  907. return results;
  908. }
  909. /**
  910. * Return a vector of setup TaskInProgress objects
  911. */
  912. public synchronized Vector<TaskInProgress> reportSetupTIPs(
  913. boolean shouldBeComplete) {
  914. Vector<TaskInProgress> results = new Vector<TaskInProgress>();
  915. for (int i = 0; i < setup.length; i++) {
  916. if (setup[i].isComplete() == shouldBeComplete) {
  917. results.add(setup[i]);
  918. }
  919. }
  920. return results;
  921. }
  922. ////////////////////////////////////////////////////
  923. // Status update methods
  924. ////////////////////////////////////////////////////
  925. /**
  926. * Assuming {@link JobTracker} is locked on entry.
  927. */
  928. public synchronized void updateTaskStatus(TaskInProgress tip,
  929. TaskStatus status) {
  930. double oldProgress = tip.getProgress(); // save old progress
  931. boolean wasRunning = tip.isRunning();
  932. boolean wasComplete = tip.isComplete();
  933. boolean wasPending = tip.isOnlyCommitPending();
  934. TaskAttemptID taskid = status.getTaskID();
  935. boolean wasAttemptRunning = tip.isAttemptRunning(taskid);
  936. // If the TIP is already completed and the task reports as SUCCEEDED then
  937. // mark the task as KILLED.
  938. // In case of task with no promotion the task tracker will mark the task
  939. // as SUCCEEDED.
  940. // User has requested to kill the task, but TT reported SUCCEEDED,
  941. // mark the task KILLED.
  942. if ((wasComplete || tip.wasKilled(taskid)) &&
  943. (status.getRunState() == TaskStatus.State.SUCCEEDED)) {
  944. status.setRunState(TaskStatus.State.KILLED);
  945. }
  946. // If the job is complete or task-cleanup is switched off
  947. // and a task has just reported its state as FAILED_UNCLEAN/KILLED_UNCLEAN,
  948. // make the task's state FAILED/KILLED without launching cleanup attempt.
  949. // Note that if task is already a cleanup attempt,
  950. // we don't change the state to make sure the task gets a killTaskAction
  951. if ((this.isComplete() || jobFailed || jobKilled || !taskCleanupNeeded) &&
  952. !tip.isCleanupAttempt(taskid)) {
  953. if (status.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
  954. status.setRunState(TaskStatus.State.FAILED);
  955. } else if (status.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
  956. status.setRunState(TaskStatus.State.KILLED);
  957. }
  958. }
  959. boolean change = tip.updateStatus(status);
  960. if (change) {
  961. TaskStatus.State state = status.getRunState();
  962. // get the TaskTrackerStatus where the task ran
  963. TaskTracker taskTracker =
  964. this.jobtracker.getTaskTracker(tip.machineWhereTaskRan(taskid));
  965. TaskTrackerStatus ttStatus =
  966. (taskTracker == null) ? null : taskTracker.getStatus();
  967. String taskTrackerHttpLocation = null;
  968. if (null != ttStatus){
  969. String host;
  970. if (NetUtils.getStaticResolution(ttStatus.getHost()) != null) {
  971. host = NetUtils.getStaticResolution(ttStatus.getHost());
  972. } else {
  973. host = ttStatus.getHost();
  974. }
  975. taskTrackerHttpLocation = "http://" + host + ":"
  976. + ttStatus.getHttpPort();
  977. }
  978. TaskCompletionEvent taskEvent = null;
  979. if (state == TaskStatus.State.SUCCEEDED) {
  980. taskEvent = new TaskCompletionEvent(
  981. taskCompletionEventTracker,
  982. taskid,
  983. tip.idWithinJob(),
  984. status.getIsMap() &&
  985. !tip.isJobCleanupTask() &&
  986. !tip.isJobSetupTask(),
  987. TaskCompletionEvent.Status.SUCCEEDED,
  988. taskTrackerHttpLocation
  989. );
  990. taskEvent.setTaskRunTime((int)(status.getFinishTime()
  991. - status.getStartTime()));
  992. tip.setSuccessEventNumber(taskCompletionEventTracker);
  993. } else if (state == TaskStatus.State.COMMIT_PENDING) {
  994. // If it is the first attempt reporting COMMIT_PENDING
  995. // ask the task to commit.
  996. if (!wasComplete && !wasPending) {
  997. tip.doCommit(taskid);
  998. }
  999. return;
  1000. } else if (state == TaskStatus.State.FAILED_UNCLEAN ||
  1001. state == TaskStatus.State.KILLED_UNCLEAN) {
  1002. tip.incompleteSubTask(taskid, this.status);
  1003. // add this task, to be rescheduled as cleanup attempt
  1004. if (tip.isMapTask()) {
  1005. mapCleanupTasks.add(taskid);
  1006. } else {
  1007. reduceCleanupTasks.add(taskid);
  1008. }
  1009. // Remove the task entry from jobtracker
  1010. jobtracker.removeTaskEntry(taskid);
  1011. }
  1012. //For a failed task update the JT datastructures.
  1013. else if (state == TaskStatus.State.FAILED ||
  1014. state == TaskStatus.State.KILLED) {
  1015. // Get the event number for the (possibly) previously successful
  1016. // task. If there exists one, then set that status to OBSOLETE
  1017. int eventNumber;
  1018. if ((eventNumber = tip.getSuccessEventNumber()) != -1) {
  1019. TaskCompletionEvent t =
  1020. this.taskCompletionEvents.get(eventNumber);
  1021. if (t.getTaskAttemptId().equals(taskid))
  1022. t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
  1023. }
  1024. // Tell the job to fail the relevant task
  1025. failedTask(tip, taskid, status, taskTracker,
  1026. wasRunning, wasComplete, wasAttemptRunning);
  1027. // Did the task failure lead to tip failure?
  1028. TaskCompletionEvent.Status taskCompletionStatus =
  1029. (state == TaskStatus.State.FAILED ) ?
  1030. TaskCompletionEvent.Status.FAILED :
  1031. TaskCompletionEvent.Status.KILLED;
  1032. if (tip.isFailed()) {
  1033. taskCompletionStatus = TaskCompletionEvent.Status.TIPFAILED;
  1034. }
  1035. taskEvent = new TaskCompletionEvent(taskCompletionEventTracker,
  1036. taskid,
  1037. tip.idWithinJob(),
  1038. status.getIsMap() &&
  1039. !tip.isJobCleanupTask() &&
  1040. !tip.isJobSetupTask(),
  1041. taskCompletionStatus,
  1042. taskTrackerHttpLocation
  1043. );
  1044. }
  1045. // Add the 'complete' task i.e. successful/failed
  1046. // It _is_ safe to add the TaskCompletionEvent.Status.SUCCEEDED
  1047. // *before* calling TIP.completedTask since:
  1048. // a. One and only one task of a TIP is declared as a SUCCESS, the
  1049. // other (speculative tasks) are marked KILLED
  1050. // b. TIP.completedTask *does not* throw _any_ exception at all.
  1051. if (taskEvent != null) {
  1052. this.taskCompletionEvents.add(taskEvent);
  1053. taskCompletionEventTracker++;
  1054. JobTrackerStatistics.TaskTrackerStat ttStat = jobtracker.
  1055. getStatistics().getTaskTrackerStat(tip.machineWhereTaskRan(taskid));
  1056. if(ttStat != null) { // ttStat can be null in case of lost tracker
  1057. ttStat.incrTotalTasks();
  1058. }
  1059. if (state == TaskStatus.State.SUCCEEDED) {
  1060. completedTask(tip, status);
  1061. if(ttStat != null) {
  1062. ttStat.incrSucceededTasks();
  1063. }
  1064. }
  1065. }
  1066. }
  1067. //
  1068. // Update JobInProgress status
  1069. //
  1070. if (LOG.isDebugEnabled()) {
  1071. LOG.debug("Taking progress for " + tip.getTIPId() + " from " +
  1072. oldProgress + " to " + tip.getProgress());
  1073. }
  1074. if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
  1075. double progressDelta = tip.getProgress() - oldProgress;
  1076. if (tip.isMapTask()) {
  1077. this.status.setMapProgress((float) (this.status.mapProgress() +
  1078. progressDelta / maps.length));
  1079. } else {
  1080. this.status.setReduceProgress((float) (this.status.reduceProgress() +
  1081. (progressDelta / reduces.length)));
  1082. }
  1083. }
  1084. }
  1085. /**
  1086. * Returns the job-level counters.
  1087. *
  1088. * @return the job-level counters.
  1089. */
  1090. public synchronized Counters getJobCounters() {
  1091. return jobCounters;
  1092. }
  1093. /**
  1094. * Returns map phase counters by summing over all map tasks in progress.
  1095. */
  1096. public synchronized Counters getMapCounters() {
  1097. return incrementTaskCounters(new Counters(), maps);
  1098. }
  1099. /**
  1100. * Returns map phase counters by summing over all map tasks in progress.
  1101. */
  1102. public synchronized Counters getReduceCounters() {
  1103. return incrementTaskCounters(new Counters(), reduces);
  1104. }
  1105. /**
  1106. * Returns the total job counters, by adding together the job,
  1107. * the map and the reduce counters.
  1108. */
  1109. public Counters getCounters() {
  1110. Counters result = new Counters();
  1111. synchronized (this) {
  1112. result.incrAllCounters(getJobCounters());
  1113. }
  1114. // the counters of TIPs are not updated in place.
  1115. // hence read-only access is ok without any locks
  1116. incrementTaskCounters(result, maps);
  1117. return incrementTaskCounters(result, reduces);
  1118. }
  1119. /**
  1120. * Increments the counters with the counters from each task.
  1121. * @param counters the counters to increment
  1122. * @param tips the tasks to add in to counters
  1123. * @return counters the same object passed in as counters
  1124. */
  1125. private Counters incrementTaskCounters(Counters counters,
  1126. TaskInProgress[] tips) {
  1127. try {
  1128. for (TaskInProgress tip : tips) {
  1129. counters.incrAllCounters(tip.getCounters());
  1130. }
  1131. } catch (LimitExceededException e) {
  1132. // too many user counters/groups, leaving existing counters intact.
  1133. }
  1134. return counters;
  1135. }
  1136. /////////////////////////////////////////////////////
  1137. // Create/manage tasks
  1138. /////////////////////////////////////////////////////
  1139. /**
  1140. * Return a MapTask, if appropriate, to run on the given tasktracker
  1141. */
  1142. public synchronized Task obtainNewMapTask(TaskTrackerStatus tts,
  1143. int clusterSize,
  1144. int numUniqueHosts,
  1145. int maxCacheLevel
  1146. ) throws IOException {
  1147. if (status.getRunState() != JobStatus.RUNNING) {
  1148. LOG.info("Cannot create task split for " + profile.getJobID());
  1149. return null;
  1150. }
  1151. int target = findNewMapTask(tts, clusterSize, numUniqueHosts,
  1152. maxCacheLevel);
  1153. if (target == -1) {
  1154. return null;
  1155. }
  1156. Task result = maps[target].getTaskToRun(tts.getTrackerName());
  1157. if (result != null) {
  1158. addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
  1159. }
  1160. return result;
  1161. }
  1162. /**
  1163. * Return a MapTask, if appropriate, to run on the given tasktracker
  1164. */
  1165. public synchronized Task obtainNewMapTask(TaskTrackerStatus tts,
  1166. int clusterSize,
  1167. int numUniqueHosts
  1168. ) throws IOException {
  1169. return obtainNewMapTask(tts, clusterSize, numUniqueHosts, anyCacheLevel);
  1170. }
  1171. /*
  1172. * Return task cleanup attempt if any, to run on a given tracker
  1173. */
  1174. public Task obtainTaskCleanupTask(TaskTrackerStatus tts,
  1175. boolean isMapSlot)
  1176. throws IOException {
  1177. if (!tasksInited.get()) {
  1178. return null;
  1179. }
  1180. synchronized (this) {
  1181. if (this.status.getRunState() != JobStatus.RUNNING ||
  1182. jobFailed || jobKilled) {
  1183. return null;
  1184. }
  1185. String taskTracker = tts.getTrackerName();
  1186. if (!shouldRunOnTaskTracker(taskTracker)) {
  1187. return null;
  1188. }
  1189. TaskAttemptID taskid = null;
  1190. TaskInProgress tip = null;
  1191. if (isMapSlot) {
  1192. if (!mapCleanupTasks.isEmpty()) {
  1193. taskid = mapCleanupTasks.remove(0);
  1194. tip = maps[taskid.getTaskID().getId()];
  1195. }
  1196. } else {
  1197. if (!reduceCleanupTasks.isEmpty()) {
  1198. taskid = reduceCleanupTasks.remove(0);
  1199. tip = reduces[taskid.getTaskID().getId()];
  1200. }
  1201. }
  1202. if (tip != null) {
  1203. return tip.addRunningTask(taskid, taskTracker, true);
  1204. }
  1205. return null;
  1206. }
  1207. }
  1208. public synchronized Task obtainNewLocalMapTask(TaskTrackerStatus tts,
  1209. int clusterSize,
  1210. int numUniqueHosts)
  1211. throws IOException {
  1212. if (!tasksInited.get()) {
  1213. LOG.info("Cannot create task split for " + profile.getJobID());
  1214. return null;
  1215. }
  1216. return obtainNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel);
  1217. }
  1218. public synchronized Task obtainNewNonLocalMapTask(TaskTrackerStatus tts,
  1219. int clusterSize,
  1220. int numUniqueHosts)
  1221. throws IOException {
  1222. if (!tasksInited.get()) {
  1223. LOG.info("Cannot create task split for " + profile.getJobID());
  1224. return null;
  1225. }
  1226. return obtainNewMapTask(tts, clusterSize, numUniqueHosts,
  1227. NON_LOCAL_CACHE_LEVEL);
  1228. }
  1229. /**
  1230. * Return a CleanupTask, if appropriate, to run on the given tasktracker
  1231. *
  1232. */
  1233. public Task obtainJobCleanupTask(TaskTrackerStatus tts,
  1234. int clusterSize,
  1235. int numUniqueHosts,
  1236. boolean isMapSlot
  1237. ) throws IOException {
  1238. if(!tasksInited.get() || !jobSetupCleanupNeeded) {
  1239. return null;
  1240. }
  1241. synchronized(this) {
  1242. if (!canLaunchJobCleanupTask()) {
  1243. return null;
  1244. }
  1245. String taskTracker = tts.getTrackerName();
  1246. // Update the last-known clusterSize
  1247. this.clusterSize = clusterSize;
  1248. if (!shouldRunOnTaskTracker(taskTracker)) {
  1249. return null;
  1250. }
  1251. List<TaskInProgress> cleanupTaskList = new ArrayList<TaskInProgress>();
  1252. if (isMapSlot) {
  1253. cleanupTaskList.add(cleanup[0]);
  1254. } else {
  1255. cleanupTaskList.add(cleanup[1]);
  1256. }
  1257. TaskInProgress tip = findTaskFromList(cleanupTaskList,
  1258. tts, numUniqueHosts, false);
  1259. if (tip == null) {
  1260. return null;
  1261. }
  1262. // Now launch the cleanupTask
  1263. Task result = tip.getTaskToRun(tts.getTrackerName());
  1264. if (result != null) {
  1265. addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
  1266. if (jobFailed) {
  1267. result.setJobCleanupTaskState(org.apache.hadoop.mapreduce.JobStatus
  1268. .State.FAILED);
  1269. } else if (jobKilled) {
  1270. result.setJobCleanupTaskState(org.apache.hadoop.mapreduce.JobStatus
  1271. .State.KILLED);
  1272. } else {
  1273. result.setJobCleanupTaskState(org.apache.hadoop.mapreduce
  1274. .JobStatus.State.SUCCEEDED);
  1275. }
  1276. }
  1277. return result;
  1278. }
  1279. }
  1280. /**
  1281. * Check whether cleanup task can be launched for the job.
  1282. *
  1283. * Cleanup task can be launched if it is not already launched
  1284. * or job is Killed
  1285. * or all maps and reduces are complete
  1286. * @return true/false
  1287. */
  1288. private synchronized boolean canLaunchJobCleanupTask() {
  1289. // check if the job is running
  1290. if (status.getRunState() != JobStatus.RUNNING &&
  1291. status.getRunState() != JobStatus.PREP) {
  1292. return false;
  1293. }
  1294. // check if cleanup task has been launched already or if setup isn't
  1295. // launched already. The later check is useful when number of maps is
  1296. // zero.
  1297. if (launchedCleanup || !isSetupFinished()) {
  1298. return false;
  1299. }
  1300. // check if job has failed or killed
  1301. if (jobKilled || jobFailed) {
  1302. return true;
  1303. }
  1304. // Check if all maps and reducers have finished.
  1305. boolean launchCleanupTask =
  1306. ((finishedMapTasks + failedMapTIPs) == (numMapTasks));
  1307. if (launchCleanupTask) {
  1308. launchCleanupTask =
  1309. ((finishedReduceTasks + failedReduceTIPs) == numReduceTasks);
  1310. }
  1311. return launchCleanupTask;
  1312. }
  1313. /**
  1314. * Return a SetupTask, if appropriate, to run on the given tasktracker
  1315. *
  1316. */
  1317. public Task obtainJobSetupTask(TaskTrackerStatus tts,
  1318. int clusterSize,
  1319. int numUniqueHosts,
  1320. boolean isMapSlot
  1321. ) throws IOException {
  1322. if(!tasksInited.get() || !jobSetupCleanupNeeded) {
  1323. return null;
  1324. }
  1325. synchronized(this) {
  1326. if (!canLaunchSetupTask()) {
  1327. return null;
  1328. }
  1329. String taskTracker = tts.getTrackerName();
  1330. // Update the last-known clusterSize
  1331. this.clusterSize = clusterSize;
  1332. if (!shouldRunOnTaskTracker(taskTracker)) {
  1333. return null;
  1334. }
  1335. List<TaskInProgress> setupTaskList = new ArrayList<TaskInProgress>();
  1336. if (isMapSlot) {
  1337. setupTaskList.add(setup[0]);
  1338. } else {
  1339. setupTaskList.add(setup[1]);
  1340. }
  1341. TaskInProgress tip = findTaskFromList(setupTaskList,
  1342. tts, numUniqueHosts, false);
  1343. if (tip == null) {
  1344. return null;
  1345. }
  1346. // Now launch the setupTask
  1347. Task result = tip.getTaskToRun(tts.getTrackerName());
  1348. if (result != null) {
  1349. addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
  1350. }
  1351. return result;
  1352. }
  1353. }
  1354. public synchronized boolean scheduleReduces() {
  1355. return finishedMapTasks >= completedMapsForReduceSlowstart;
  1356. }
  1357. /**
  1358. * Check whether setup task can be launched for the job.
  1359. *
  1360. * Setup task can be launched after the tasks are inited
  1361. * and Job is in PREP state
  1362. * and if it is not already launched
  1363. * or job is not Killed/Failed
  1364. * @return true/false
  1365. */
  1366. private synchronized boolean canLaunchSetupTask() {
  1367. return (tasksInited.get() && status.getRunState() == JobStatus.PREP &&
  1368. !launchedSetup && !jobKilled && !jobFailed);
  1369. }
  1370. /**
  1371. * Return a ReduceTask, if appropriate, to run on the given tasktracker.
  1372. * We don't have cache-sensitivity for reduce tasks, as they
  1373. * work on temporary MapRed files.
  1374. */
  1375. public synchronized Task obtainNewReduceTask(TaskTrackerStatus tts,
  1376. int clusterSize,
  1377. int numUniqueHosts
  1378. ) throws IOException {
  1379. if (status.getRunState() != JobStatus.RUNNING) {
  1380. LOG.info("Cannot create task split for " + profile.getJobID());
  1381. return null;
  1382. }
  1383. // Ensure we have sufficient map outputs ready to shuffle before
  1384. // scheduling reduces
  1385. if (!scheduleReduces()) {
  1386. return null;
  1387. }
  1388. int target = findNewReduceTask(tts, clusterSize, numUniqueHosts);
  1389. if (target == -1) {
  1390. return null;
  1391. }
  1392. Task result = reduces[target].getTaskToRun(tts.getTrackerName());
  1393. if (result != null) {
  1394. addRunningTaskToTIP(reduces[target], result.getTaskID(), tts, true);
  1395. }
  1396. return result;
  1397. }
  1398. // returns the (cache)level at which the nodes matches
  1399. private int getMatchingLevelForNodes(Node n1, Node n2) {
  1400. int count = 0;
  1401. do {
  1402. if (n1.equals(n2)) {
  1403. return count;
  1404. }
  1405. ++count;
  1406. n1 = n1.getParent();
  1407. n2 = n2.getParent();
  1408. } while (n1 != null);
  1409. return this.maxLevel;
  1410. }
  1411. /**
  1412. * Populate the data structures as a task is scheduled.
  1413. *
  1414. * Assuming {@link JobTracker} is locked on entry.
  1415. *
  1416. * @param tip The tip for which the task is added
  1417. * @param id The attempt-id for the task
  1418. * @param tts task-tracker status
  1419. * @param isScheduled Whether this task is scheduled from the JT or has
  1420. * joined back upon restart
  1421. */
  1422. synchronized void addRunningTaskToTIP(TaskInProgress tip, TaskAttemptID id,
  1423. TaskTrackerStatus tts,
  1424. boolean isScheduled) {
  1425. // Make an entry in the tip if the attempt is not scheduled i.e externally
  1426. // added
  1427. if (!isScheduled) {
  1428. tip.addRunningTask(id, tts.getTrackerName());
  1429. }
  1430. final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
  1431. // keeping the earlier ordering intact
  1432. TaskType name;
  1433. String splits = "";
  1434. Enum counter = null;
  1435. if (tip.isJobSetupTask()) {
  1436. launchedSetup = true;
  1437. name = TaskType.JOB_SETUP;
  1438. } else if (tip.isJobCleanupTask()) {
  1439. launchedCleanup = true;
  1440. name = TaskType.JOB_CLEANUP;
  1441. } else if (tip.isMapTask()) {
  1442. ++runningMapTasks;
  1443. name = TaskType.MAP;
  1444. counter = JobCounter.TOTAL_LAUNCHED_MAPS;
  1445. splits = tip.getSplitNodes();
  1446. if (tip.isSpeculating()) {
  1447. speculativeMapTasks++;
  1448. metrics.speculateMap(id);
  1449. if (LOG.isDebugEnabled()) {
  1450. LOG.debug("Chosen speculative task, current speculativeMap task count: "
  1451. + speculativeMapTasks);
  1452. }
  1453. }
  1454. metrics.launchMap(id);
  1455. } else {
  1456. ++runningReduceTasks;
  1457. name = TaskType.REDUCE;
  1458. counter = JobCounter.TOTAL_LAUNCHED_REDUCES;
  1459. if (tip.isSpeculating()) {
  1460. speculativeReduceTasks++;
  1461. metrics.speculateReduce(id);
  1462. if (LOG.isDebugEnabled()) {
  1463. LOG.debug("Chosen speculative task, current speculativeReduce task count: "
  1464. + speculativeReduceTasks);
  1465. }
  1466. }
  1467. metrics.launchReduce(id);
  1468. }
  1469. // Note that the logs are for the scheduled tasks only. Tasks that join on
  1470. // restart has already their logs in place.
  1471. if (tip.isFirstAttempt(id)) {
  1472. TaskStartedEvent tse = new TaskStartedEvent(tip.getTIPId(),
  1473. tip.getExecStartTime(),
  1474. name, splits);
  1475. jobHistory.logEvent(tse, tip.getJob().jobId);
  1476. setFirstTaskLaunchTime(tip);
  1477. }
  1478. if (!tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
  1479. jobCounters.incrCounter(counter, 1);
  1480. }
  1481. //TODO The only problem with these counters would be on restart.
  1482. // The jobtracker updates the counter only when the task that is scheduled
  1483. // if from a non-running tip and is local (data, rack ...). But upon restart
  1484. // as the reports come from the task tracker, there is no good way to infer
  1485. // when exactly to increment the locality counters. The only solution is to
  1486. // increment the counters for all the tasks irrespective of
  1487. // - whether the tip is running or not
  1488. // - whether its a speculative task or not
  1489. //
  1490. // So to simplify, increment the data locality counter whenever there is
  1491. // data locality.
  1492. if (tip.isMapTask() && !tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
  1493. // increment the data locality counter for maps
  1494. int level = getLocalityLevel(tip, tts);
  1495. switch (level) {
  1496. case 0 :
  1497. LOG.info("Choosing data-local task " + tip.getTIPId());
  1498. jobCounters.incrCounter(JobCounter.DATA_LOCAL_MAPS, 1);
  1499. metrics.launchDataLocalMap(id);
  1500. break;
  1501. case 1:
  1502. LOG.info("Choosing rack-local task " + tip.getTIPId());
  1503. jobCounters.incrCounter(JobCounter.RACK_LOCAL_MAPS, 1);
  1504. metrics.launchRackLocalMap(id);
  1505. break;
  1506. default :
  1507. // check if there is any locality
  1508. if (level != this.maxLevel) {
  1509. LOG.info("Choosing cached task at level " + level + tip.getTIPId());
  1510. jobCounters.incrCounter(JobCounter.OTHER_LOCAL_MAPS, 1);
  1511. }
  1512. break;
  1513. }
  1514. }
  1515. }
  1516. void setFirstTaskLaunchTime(TaskInProgress tip) {
  1517. TaskType key = getTaskType(tip);
  1518. synchronized(firstTaskLaunchTimes) {
  1519. // Could be optimized to do only one lookup with a little more code
  1520. if (!firstTaskLaunchTimes.containsKey(key)) {
  1521. firstTaskLaunchTimes.put(key, tip.getExecStartTime());
  1522. }
  1523. }
  1524. }
  1525. public static String convertTrackerNameToHostName(String trackerName) {
  1526. // Ugly!
  1527. // Convert the trackerName to it's host name
  1528. int indexOfColon = trackerName.indexOf(":");
  1529. String trackerHostName = (indexOfColon == -1) ?
  1530. trackerName :
  1531. trackerName.substring(0, indexOfColon);
  1532. return trackerHostName.substring("tracker_".length());
  1533. }
  1534. /**
  1535. * Note that a task has failed on a given tracker and add the tracker
  1536. * to the blacklist iff too many trackers in the cluster i.e.
  1537. * (clusterSize * CLUSTER_BLACKLIST_PERCENT) haven't turned 'flaky' already.
  1538. *
  1539. * @param taskTracker task-tracker on which a task failed
  1540. */
  1541. synchronized void addTrackerTaskFailure(String trackerName,
  1542. TaskTracker taskTracker) {
  1543. if (flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) {
  1544. String trackerHostName = convertTrackerNameToHostName(trackerName);
  1545. Integer trackerFailures = trackerToFailuresMap.get(trackerHostName);
  1546. if (trackerFailures == null) {
  1547. trackerFailures = 0;
  1548. }
  1549. trackerToFailuresMap.put(trackerHostName, ++trackerFailures);
  1550. // Check if this tasktracker has turned 'flaky'
  1551. if (trackerFailures.intValue() == maxTaskFailuresPerTracker) {
  1552. ++flakyTaskTrackers;
  1553. // Cancel reservations if appropriate
  1554. if (taskTracker != null) {
  1555. if (trackersReservedForMaps.containsKey(taskTracker)) {
  1556. taskTracker.unreserveSlots(TaskType.MAP, this);
  1557. }
  1558. if (trackersReservedForReduces.containsKey(taskTracker)) {
  1559. taskTracker.unreserveSlots(TaskType.REDUCE, this);
  1560. }
  1561. }
  1562. LOG.info("TaskTracker at '" + trackerHostName + "' turned 'flaky'");
  1563. }
  1564. }
  1565. }
  1566. public synchronized void reserveTaskTracker(TaskTracker taskTracker,
  1567. TaskType type, int numSlots) {
  1568. Map<TaskTracker, FallowSlotInfo> map =
  1569. (type == TaskType.MAP) ? trackersReservedForMaps : trackersReservedForReduces;
  1570. long now = System.currentTimeMillis();
  1571. FallowSlotInfo info = map.get(taskTracker);
  1572. int reservedSlots = 0;
  1573. if (info == null) {
  1574. info = new FallowSlotInfo(now, numSlots);
  1575. reservedSlots = numSlots;
  1576. } else {
  1577. // Increment metering info if the reservation is changing
  1578. if (info.getNumSlots() != numSlots) {
  1579. Enum<JobCounter> counter =
  1580. (type == TaskType.MAP) ?
  1581. JobCounter.FALLOW_SLOTS_MILLIS_MAPS :
  1582. JobCounter.FALLOW_SLOTS_MILLIS_REDUCES;
  1583. long fallowSlotMillis = (now - info.getTimestamp()) * info.getNumSlots();
  1584. jobCounters.incrCounter(counter, fallowSlotMillis);
  1585. // Update
  1586. reservedSlots = numSlots - info.getNumSlots();
  1587. info.setTimestamp(now);
  1588. info.setNumSlots(numSlots);
  1589. }
  1590. }
  1591. map.put(taskTracker, info);
  1592. if (type == TaskType.MAP) {
  1593. jobtracker.getInstrumentation().addReservedMapSlots(reservedSlots);
  1594. }
  1595. else {
  1596. jobtracker.getInstrumentation().addReservedReduceSlots(reservedSlots);
  1597. }
  1598. jobtracker.incrementReservations(type, reservedSlots);
  1599. }
  1600. public synchronized void unreserveTaskTracker(TaskTracker taskTracker,
  1601. TaskType type) {
  1602. Map<TaskTracker, FallowSlotInfo> map =
  1603. (type == TaskType.MAP) ? trackersReservedForMaps :
  1604. trackersReservedForReduces;
  1605. FallowSlotInfo info = map.get(taskTracker);
  1606. if (info == null) {
  1607. LOG.warn("Cannot find information about fallow slots for " +
  1608. taskTracker.getTrackerName());
  1609. return;
  1610. }
  1611. long now = System.currentTimeMillis();
  1612. Enum<JobCounter> counter =
  1613. (type == TaskType.MAP) ?
  1614. JobCounter.FALLOW_SLOTS_MILLIS_MAPS :
  1615. JobCounter.FALLOW_SLOTS_MILLIS_REDUCES;
  1616. long fallowSlotMillis = (now - info.getTimestamp()) * info.getNumSlots();
  1617. jobCounters.incrCounter(counter, fallowSlotMillis);
  1618. map.remove(taskTracker);
  1619. if (type == TaskType.MAP) {
  1620. jobtracker.getInstrumentation().decReservedMapSlots(info.getNumSlots());
  1621. }
  1622. else {
  1623. jobtracker.getInstrumentation().decReservedReduceSlots(
  1624. info.getNumSlots());
  1625. }
  1626. jobtracker.decrementReservations(type, info.getNumSlots());
  1627. }
  1628. public int getNumReservedTaskTrackersForMaps() {
  1629. return trackersReservedForMaps.size();
  1630. }
  1631. public int getNumReservedTaskTrackersForReduces() {
  1632. return trackersReservedForReduces.size();
  1633. }
  1634. private int getTrackerTaskFailures(String trackerName) {
  1635. String trackerHostName = convertTrackerNameToHostName(trackerName);
  1636. Integer failedTasks = trackerToFailuresMap.get(trackerHostName);
  1637. return (failedTasks != null) ? failedTasks.intValue() : 0;
  1638. }
  1639. /**
  1640. * Get the black listed trackers for the job
  1641. *
  1642. * @return List of blacklisted tracker names
  1643. */
  1644. List<String> getBlackListedTrackers() {
  1645. List<String> blackListedTrackers = new ArrayList<String>();
  1646. for (Map.Entry<String,Integer> e : trackerToFailuresMap.entrySet()) {
  1647. if (e.getValue().intValue() >= maxTaskFailuresPerTracker) {
  1648. blackListedTrackers.add(e.getKey());
  1649. }
  1650. }
  1651. return blackListedTrackers;
  1652. }
  1653. /**
  1654. * Get the no. of 'flaky' tasktrackers for a given job.
  1655. *
  1656. * @return the no. of 'flaky' tasktrackers for a given job.
  1657. */
  1658. int getNoOfBlackListedTrackers() {
  1659. return flakyTaskTrackers;
  1660. }
  1661. /**
  1662. * Get the information on tasktrackers and no. of errors which occurred
  1663. * on them for a given job.
  1664. *
  1665. * @return the map of tasktrackers and no. of errors which occurred
  1666. * on them for a given job.
  1667. */
  1668. synchronized Map<String, Integer> getTaskTrackerErrors() {
  1669. // Clone the 'trackerToFailuresMap' and return the copy
  1670. Map<String, Integer> trackerErrors =
  1671. new TreeMap<String, Integer>(trackerToFailuresMap);
  1672. return trackerErrors;
  1673. }
  1674. /**
  1675. * Remove a map TIP from the lists for running maps.
  1676. * Called when a map fails/completes (note if a map is killed,
  1677. * it won't be present in the list since it was completed earlier)
  1678. * @param tip the tip that needs to be retired
  1679. */
  1680. private synchronized void retireMap(TaskInProgress tip) {
  1681. if (runningMapCache == null) {
  1682. LOG.warn("Running cache for maps missing!! "
  1683. + "Job details are missing.");
  1684. return;
  1685. }
  1686. String[] splitLocations = tip.getSplitLocations();
  1687. // Remove the TIP from the list for running non-local maps
  1688. if (splitLocations == null || splitLocations.length == 0) {
  1689. nonLocalRunningMaps.remove(tip);
  1690. return;
  1691. }
  1692. // Remove from the running map caches
  1693. for(String host: splitLocations) {
  1694. Node node = jobtracker.getNode(host);
  1695. for (int j = 0; j < maxLevel; ++j) {
  1696. Set<TaskInProgress> hostMaps = runningMapCache.get(node);
  1697. if (hostMaps != null) {
  1698. hostMaps.remove(tip);
  1699. if (hostMaps.size() == 0) {
  1700. runningMapCache.remove(node);
  1701. }
  1702. }
  1703. node = node.getParent();
  1704. }
  1705. }
  1706. }
  1707. /**
  1708. * Remove a reduce TIP from the list for running-reduces
  1709. * Called when a reduce fails/completes
  1710. * @param tip the tip that needs to be retired
  1711. */
  1712. private synchronized void retireReduce(TaskInProgress tip) {
  1713. if (runningReduces == null) {
  1714. LOG.warn("Running list for reducers missing!! "
  1715. + "Job details are missing.");
  1716. return;
  1717. }
  1718. runningReduces.remove(tip);
  1719. }
  1720. /**
  1721. * Adds a map tip to the list of running maps.
  1722. * @param tip the tip that needs to be scheduled as running
  1723. */
  1724. protected synchronized void scheduleMap(TaskInProgress tip) {
  1725. runningMapTaskStats.add(0.0f);
  1726. if (runningMapCache == null) {
  1727. LOG.warn("Running cache for maps is missing!! "
  1728. + "Job details are missing.");
  1729. return;
  1730. }
  1731. String[] splitLocations = tip.getSplitLocations();
  1732. // Add the TIP to the list of non-local running TIPs
  1733. if (splitLocations == null || splitLocations.length == 0) {
  1734. nonLocalRunningMaps.add(tip);
  1735. return;
  1736. }
  1737. for(String host: splitLocations) {
  1738. Node node = jobtracker.getNode(host);
  1739. for (int j = 0; j < maxLevel; ++j) {
  1740. Set<TaskInProgress> hostMaps = runningMapCache.get(node);
  1741. if (hostMaps == null) {
  1742. // create a cache if needed
  1743. hostMaps = new LinkedHashSet<TaskInProgress>();
  1744. runningMapCache.put(node, hostMaps);
  1745. }
  1746. hostMaps.add(tip);
  1747. node = node.getParent();
  1748. }
  1749. }
  1750. }
  1751. /**
  1752. * Adds a reduce tip to the list of running reduces
  1753. * @param tip the tip that needs to be scheduled as running
  1754. */
  1755. protected synchronized void scheduleReduce(TaskInProgress tip) {
  1756. runningReduceTaskStats.add(0.0f);
  1757. if (runningReduces == null) {
  1758. LOG.warn("Running cache for reducers missing!! "
  1759. + "Job details are missing.");
  1760. return;
  1761. }
  1762. runningReduces.add(tip);
  1763. }
  1764. /**
  1765. * Adds the failed TIP in the front of the list for non-running maps
  1766. * @param tip the tip that needs to be failed
  1767. */
  1768. private synchronized void failMap(TaskInProgress tip) {
  1769. if (nonRunningMapCache == null) {
  1770. LOG.warn("Non-running cache for maps missing!! "
  1771. + "Job details are missing.");
  1772. return;
  1773. }
  1774. // 1. Its added everywhere since other nodes (having this split local)
  1775. // might have removed this tip from their local cache
  1776. // 2. Give high priority to failed tip - fail early
  1777. String[] splitLocations = tip.getSplitLocations();
  1778. // Add the TIP in the front of the list for non-local non-running maps
  1779. if (splitLocations.length == 0) {
  1780. nonLocalMaps.add(0, tip);
  1781. return;
  1782. }
  1783. for(String host: splitLocations) {
  1784. Node node = jobtracker.getNode(host);
  1785. for (int j = 0; j < maxLevel; ++j) {
  1786. List<TaskInProgress> hostMaps = nonRunningMapCache.get(node);
  1787. if (hostMaps == null) {
  1788. hostMaps = new LinkedList<TaskInProgress>();
  1789. nonRunningMapCache.put(node, hostMaps);
  1790. }
  1791. hostMaps.add(0, tip);
  1792. node = node.getParent();
  1793. }
  1794. }
  1795. }
  1796. /**
  1797. * Adds a failed TIP in the front of the list for non-running reduces
  1798. * @param tip the tip that needs to be failed
  1799. */
  1800. private synchronized void failReduce(TaskInProgress tip) {
  1801. if (nonRunningReduces == null) {
  1802. LOG.warn("Failed cache for reducers missing!! "
  1803. + "Job details are missing.");
  1804. return;
  1805. }
  1806. nonRunningReduces.add(0, tip);
  1807. }
  1808. /**
  1809. * Find a non-running task in the passed list of TIPs
  1810. * @param tips a collection of TIPs
  1811. * @param ttStatus the status of tracker that has requested a task to run
  1812. * @param numUniqueHosts number of unique hosts that run trask trackers
  1813. * @param removeFailedTip whether to remove the failed tips
  1814. */
  1815. private synchronized TaskInProgress findTaskFromList(
  1816. Collection<TaskInProgress> tips, TaskTrackerStatus ttStatus,
  1817. int numUniqueHosts,
  1818. boolean removeFailedTip) {
  1819. Iterator<TaskInProgress> iter = tips.iterator();
  1820. while (iter.hasNext()) {
  1821. TaskInProgress tip = iter.next();
  1822. // Select a tip if
  1823. // 1. runnable : still needs to be run and is not completed
  1824. // 2. ~running : no other node is running it
  1825. // 3. earlier attempt failed : has not failed on this host
  1826. // and has failed on all the other hosts
  1827. // A TIP is removed from the list if
  1828. // (1) this tip is scheduled
  1829. // (2) if the passed list is a level 0 (host) cache
  1830. // (3) when the TIP is non-schedulable (running, killed, complete)
  1831. if (tip.isRunnable() && !tip.isRunning()) {
  1832. // check if the tip has failed on this host
  1833. if (!tip.hasFailedOnMachine(ttStatus.getHost()) ||
  1834. tip.getNumberOfFailedMachines() >= numUniqueHosts) {
  1835. // check if the tip has failed on all the nodes
  1836. iter.remove();
  1837. return tip;
  1838. } else if (removeFailedTip) {
  1839. // the case where we want to remove a failed tip from the host cache
  1840. // point#3 in the TIP removal logic above
  1841. iter.remove();
  1842. }
  1843. } else {
  1844. // see point#3 in the comment above for TIP removal logic
  1845. iter.remove();
  1846. }
  1847. }
  1848. return null;
  1849. }
  1850. public boolean hasSpeculativeMaps() {
  1851. return hasSpeculativeMaps;
  1852. }
  1853. public boolean hasSpeculativeReduces() {
  1854. return hasSpeculativeReduces;
  1855. }
  1856. /**
  1857. * Retrieve a task for speculation.
  1858. * If a task slot becomes available and there are less than SpeculativeCap
  1859. * speculative tasks running:
  1860. * 1)Ignore the request if the TT's progressRate is < SlowNodeThreshold
  1861. * 2)Choose candidate tasks - those tasks whose progress rate is below
  1862. * slowTaskThreshold * mean(progress-rates)
  1863. * 3)Speculate task that's expected to complete last
  1864. * @param list pool of tasks to choose from
  1865. * @param taskTrackerName the name of the TaskTracker asking for a task
  1866. * @param taskTrackerHost the hostname of the TaskTracker asking for a task
  1867. * @param taskType the type of task (MAP/REDUCE) that we are considering
  1868. * @return the TIP to speculatively re-execute
  1869. */
  1870. protected synchronized TaskInProgress findSpeculativeTask(
  1871. Collection<TaskInProgress> list, String taskTrackerName,
  1872. String taskTrackerHost, TaskType taskType) {
  1873. if (list.isEmpty()) {
  1874. return null;
  1875. }
  1876. long now = JobTracker.getClock().getTime();
  1877. // Don't return anything if either the TaskTracker is slow or we have
  1878. // already launched enough speculative tasks in the cluster.
  1879. if (isSlowTracker(taskTrackerName) || atSpeculativeCap(list, taskType)) {
  1880. return null;
  1881. }
  1882. TaskInProgress slowestTIP = null;
  1883. Comparator<TaskInProgress> LateComparator =
  1884. new EstimatedTimeLeftComparator(now);
  1885. Iterator<TaskInProgress> iter = list.iterator();
  1886. while (iter.hasNext()) {
  1887. TaskInProgress tip = iter.next();
  1888. // If this tip has already run on this machine once or it doesn't need any
  1889. // more speculative attempts, skip it.
  1890. if (tip.hasRunOnMachine(taskTrackerHost, taskTrackerName) ||
  1891. !tip.canBeSpeculated(now)) {
  1892. continue;
  1893. }
  1894. if (slowestTIP == null) {
  1895. slowestTIP = tip;
  1896. } else {
  1897. slowestTIP =
  1898. LateComparator.compare(tip, slowestTIP) < 0 ? tip : slowestTIP;
  1899. }
  1900. }
  1901. if (slowestTIP != null) {
  1902. if (LOG.isDebugEnabled()) {
  1903. LOG.debug("Chose task " + slowestTIP.getTIPId() + ". Statistics: Task's : " +
  1904. slowestTIP.getCurrentProgressRate(now) + " Job's : " +
  1905. (slowestTIP.isMapTask() ? runningMapTaskStats : runningReduceTaskStats));
  1906. }
  1907. }
  1908. return slowestTIP;
  1909. }
  1910. /**
  1911. * Find new map task
  1912. * @param tts The task tracker that is asking for a task
  1913. * @param clusterSize The number of task trackers in the cluster
  1914. * @param numUniqueHosts The number of hosts that run task trackers
  1915. * @param maxCacheLevel The maximum topology level until which to schedule
  1916. * maps.
  1917. * A value of {@link #anyCacheLevel} implies any
  1918. * available task (node-local, rack-local, off-switch and
  1919. * speculative tasks).
  1920. * A value of {@link #NON_LOCAL_CACHE_LEVEL} implies only
  1921. * off-switch/speculative tasks should be scheduled.
  1922. * @return the index in tasks of the selected task (or -1 for no task)
  1923. */
  1924. private synchronized int findNewMapTask(final TaskTrackerStatus tts,
  1925. final int clusterSize,
  1926. final int numUniqueHosts,
  1927. final int maxCacheLevel) {
  1928. String taskTrackerName = tts.getTrackerName();
  1929. String taskTrackerHost = tts.getHost();
  1930. if (numMapTasks == 0) {
  1931. if(LOG.isDebugEnabled()) {
  1932. LOG.debug("No maps to schedule for " + profile.getJobID());
  1933. }
  1934. return -1;
  1935. }
  1936. TaskInProgress tip = null;
  1937. //
  1938. // Update the last-known clusterSize
  1939. //
  1940. this.clusterSize = clusterSize;
  1941. if (!shouldRunOnTaskTracker(taskTrackerName)) {
  1942. return -1;
  1943. }
  1944. // Check to ensure this TaskTracker has enough resources to
  1945. // run tasks from this job
  1946. long outSize = resourceEstimator.getEstimatedMapOutputSize();
  1947. long availSpace = tts.getResourceStatus().getAvailableSpace();
  1948. if(availSpace < outSize) {
  1949. LOG.warn("No room for map task. Node " + tts.getHost() +
  1950. " has " + availSpace +
  1951. " bytes free; but we expect map to take " + outSize);
  1952. return -1; //see if a different TIP might work better.
  1953. }
  1954. // For scheduling a map task, we have two caches and a list (optional)
  1955. // I) one for non-running task
  1956. // II) one for running task (this is for handling speculation)
  1957. // III) a list of TIPs that have empty locations (e.g., dummy splits),
  1958. // the list is empty if all TIPs have associated locations
  1959. // First a look up is done on the non-running cache and on a miss, a look
  1960. // up is done on the running cache. The order for lookup within the cache:
  1961. // 1. from local node to root [bottom up]
  1962. // 2. breadth wise for all the parent nodes at max level
  1963. // We fall to linear scan of the list (III above) if we have misses in the
  1964. // above caches
  1965. Node node = jobtracker.getNode(tts.getHost());
  1966. //
  1967. // I) Non-running TIP :
  1968. //
  1969. // 1. check from local node to the root [bottom up cache lookup]
  1970. // i.e if the cache is available and the host has been resolved
  1971. // (node!=null)
  1972. if (node != null) {
  1973. Node key = node;
  1974. int level = 0;
  1975. // maxCacheLevel might be greater than this.maxLevel if findNewMapTask is
  1976. // called to schedule any task (local, rack-local, off-switch or speculative)
  1977. // tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if findNewMapTask is
  1978. // (i.e. -1) if findNewMapTask is to only schedule off-switch/speculative
  1979. // tasks
  1980. int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel);
  1981. for (level = 0;level < maxLevelToSchedule; ++level) {
  1982. List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
  1983. if (cacheForLevel != null) {
  1984. tip = findTaskFromList(cacheForLevel, tts,
  1985. numUniqueHosts,level == 0);
  1986. if (tip != null) {
  1987. // Add to running cache
  1988. scheduleMap(tip);
  1989. // remove the cache if its empty
  1990. if (cacheForLevel.size() == 0) {
  1991. nonRunningMapCache.remove(key);
  1992. }
  1993. return tip.getIdWithinJob();
  1994. }
  1995. }
  1996. key = key.getParent();
  1997. }
  1998. // Check if we need to only schedule a local task (node-local/rack-local)
  1999. if (level == maxCacheLevel) {
  2000. return -1;
  2001. }
  2002. }
  2003. //2. Search breadth-wise across parents at max level for non-running
  2004. // TIP if
  2005. // - cache exists and there is a cache miss
  2006. // - node information for the tracker is missing (tracker's topology
  2007. // info not obtained yet)
  2008. // collection of node at max level in the cache structure
  2009. Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel();
  2010. // get the node parent at max level
  2011. Node nodeParentAtMaxLevel =
  2012. (node == null) ? null : JobTracker.getParentNode(node, maxLevel - 1);
  2013. for (Node parent : nodesAtMaxLevel) {
  2014. // skip the parent that has already been scanned
  2015. if (parent == nodeParentAtMaxLevel) {
  2016. continue;
  2017. }
  2018. List<TaskInProgress> cache = nonRunningMapCache.get(parent);
  2019. if (cache != null) {
  2020. tip = findTaskFromList(cache, tts, numUniqueHosts, false);
  2021. if (tip != null) {
  2022. // Add to the running cache
  2023. scheduleMap(tip);
  2024. // remove the cache if empty
  2025. if (cache.size() == 0) {
  2026. nonRunningMapCache.remove(parent);
  2027. }
  2028. LOG.info("Choosing a non-local task " + tip.getTIPId());
  2029. return tip.getIdWithinJob();
  2030. }
  2031. }
  2032. }
  2033. // 3. Search non-local tips for a new task
  2034. tip = findTaskFromList(nonLocalMaps, tts, numUniqueHosts, false);
  2035. if (tip != null) {
  2036. // Add to the running list
  2037. scheduleMap(tip);
  2038. LOG.info("Choosing a non-local task " + tip.getTIPId());
  2039. return tip.getIdWithinJob();
  2040. }
  2041. //
  2042. // II) Running TIP :
  2043. //
  2044. if (hasSpeculativeMaps) {
  2045. tip = getSpeculativeMap(taskTrackerName, taskTrackerHost);
  2046. if (tip != null) {
  2047. return tip.getIdWithinJob();
  2048. }
  2049. }
  2050. return -1;
  2051. }
  2052. private synchronized TaskInProgress getSpeculativeMap(String taskTrackerName,
  2053. String taskTrackerHost) {
  2054. //////// Populate allTips with all TaskInProgress
  2055. Set<TaskInProgress> allTips = new HashSet<TaskInProgress>();
  2056. // collection of node at max level in the cache structure
  2057. Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel();
  2058. // Add all tasks from max-level nodes breadth-wise
  2059. for (Node parent : nodesAtMaxLevel) {
  2060. Set<TaskInProgress> cache = runningMapCache.get(parent);
  2061. if (cache != null) {
  2062. allTips.addAll(cache);
  2063. }
  2064. }
  2065. // Add all non-local TIPs
  2066. allTips.addAll(nonLocalRunningMaps);
  2067. ///////// Select a TIP to run on
  2068. TaskInProgress tip = findSpeculativeTask(allTips, taskTrackerName,
  2069. taskTrackerHost, TaskType.MAP);
  2070. if (tip != null) {
  2071. LOG.info("Choosing map task " + tip.getTIPId() +
  2072. " for speculative execution");
  2073. } else {
  2074. if (LOG.isDebugEnabled()) {
  2075. LOG.debug("No speculative map task found for tracker " + taskTrackerName);
  2076. }
  2077. }
  2078. return tip;
  2079. }
  2080. /**
  2081. * Find new reduce task
  2082. * @param tts The task tracker that is asking for a task
  2083. * @param clusterSize The number of task trackers in the cluster
  2084. * @param numUniqueHosts The number of hosts that run task trackers
  2085. * @return the index in tasks of the selected task (or -1 for no task)
  2086. */
  2087. private synchronized int findNewReduceTask(TaskTrackerStatus tts,
  2088. int clusterSize,
  2089. int numUniqueHosts) {
  2090. String taskTrackerName = tts.getTrackerName();
  2091. String taskTrackerHost = tts.getHost();
  2092. if (numReduceTasks == 0) {
  2093. if(LOG.isDebugEnabled()) {
  2094. LOG.debug("No reduces to schedule for " + profile.getJobID());
  2095. }
  2096. return -1;
  2097. }
  2098. TaskInProgress tip = null;
  2099. // Update the last-known clusterSize
  2100. this.clusterSize = clusterSize;
  2101. if (!shouldRunOnTaskTracker(taskTrackerName)) {
  2102. return -1;
  2103. }
  2104. long outSize = resourceEstimator.getEstimatedReduceInputSize();
  2105. long availSpace = tts.getResourceStatus().getAvailableSpace();
  2106. if(availSpace < outSize) {
  2107. LOG.warn("No room for reduce task. Node " + taskTrackerName + " has " +
  2108. availSpace +
  2109. " bytes free; but we expect reduce input to take " + outSize);
  2110. return -1; //see if a different TIP might work better.
  2111. }
  2112. // 1. check for a never-executed reduce tip
  2113. // reducers don't have a cache and so pass -1 to explicitly call that out
  2114. tip = findTaskFromList(nonRunningReduces, tts, numUniqueHosts, false);
  2115. if (tip != null) {
  2116. scheduleReduce(tip);
  2117. return tip.getIdWithinJob();
  2118. }
  2119. // 2. check for a reduce tip to be speculated
  2120. if (hasSpeculativeReduces) {
  2121. tip = getSpeculativeReduce(taskTrackerName, taskTrackerHost);
  2122. if (tip != null) {
  2123. return tip.getIdWithinJob();
  2124. }
  2125. }
  2126. return -1;
  2127. }
  2128. private synchronized TaskInProgress getSpeculativeReduce(
  2129. String taskTrackerName, String taskTrackerHost) {
  2130. TaskInProgress tip = findSpeculativeTask(
  2131. runningReduces, taskTrackerName, taskTrackerHost, TaskType.REDUCE);
  2132. if (tip != null) {
  2133. LOG.info("Choosing reduce task " + tip.getTIPId() +
  2134. " for speculative execution");
  2135. } else {
  2136. if (LOG.isDebugEnabled()) {
  2137. LOG.debug("No speculative map task found for tracker "
  2138. + taskTrackerHost);
  2139. }
  2140. }
  2141. return tip;
  2142. }
  2143. /**
  2144. * Check to see if the maximum number of speculative tasks are
  2145. * already being executed currently.
  2146. * @param tasks the set of tasks to test
  2147. * @param type the type of task (MAP/REDUCE) that we are considering
  2148. * @return has the cap been reached?
  2149. */
  2150. private boolean atSpeculativeCap(Collection<TaskInProgress> tasks,
  2151. TaskType type) {
  2152. float numTasks = tasks.size();
  2153. if (numTasks == 0){
  2154. return true; // avoid divide by zero
  2155. }
  2156. int speculativeTaskCount = type == TaskType.MAP ? speculativeMapTasks
  2157. : speculativeReduceTasks;
  2158. //return true if totalSpecTask < max(10, 0.01 * total-slots,
  2159. // 0.1 * total-running-tasks)
  2160. if (speculativeTaskCount < MIN_SPEC_CAP) {
  2161. return false; // at least one slow tracker's worth of slots(default=10)
  2162. }
  2163. ClusterStatus c = jobtracker.getClusterStatus(false);
  2164. int numSlots = (type == TaskType.MAP ? c.getMaxMapTasks() : c.getMaxReduceTasks());
  2165. if ((float)speculativeTaskCount < numSlots * MIN_SLOTS_CAP) {
  2166. return false;
  2167. }
  2168. boolean atCap = (((float)(speculativeTaskCount)/numTasks) >= speculativeCap);
  2169. if (LOG.isDebugEnabled()) {
  2170. LOG.debug("SpeculativeCap is "+speculativeCap+", specTasks/numTasks is " +
  2171. ((float)(speculativeTaskCount)/numTasks)+
  2172. ", so atSpecCap() is returning "+atCap);
  2173. }
  2174. return atCap;
  2175. }
  2176. /**
  2177. * A class for comparing the estimated time to completion of two tasks
  2178. */
  2179. private static class EstimatedTimeLeftComparator
  2180. implements Comparator<TaskInProgress> {
  2181. private long time;
  2182. public EstimatedTimeLeftComparator(long now) {
  2183. this.time = now;
  2184. }
  2185. /**
  2186. * Estimated time to completion is measured as:
  2187. * % of task left to complete (1 - progress) / progress rate of the task.
  2188. *
  2189. * This assumes that tasks are linear in their progress, which is
  2190. * often wrong, especially since progress for reducers is currently
  2191. * calculated by evenly weighting their three stages (shuffle, sort, map)
  2192. * which rarely account for 1/3 each. This should be fixed in the future
  2193. * by calculating progressRate more intelligently or splitting these
  2194. * multi-phase tasks into individual tasks.
  2195. *
  2196. * The ordering this comparator defines is: task1 < task2 if task1 is
  2197. * estimated to finish farther in the future => compare(t1,t2) returns -1
  2198. */
  2199. public int compare(TaskInProgress tip1, TaskInProgress tip2) {
  2200. //we have to use the Math.max in the denominator to avoid divide by zero
  2201. //error because prog and progRate can both be zero (if one is zero,
  2202. //the other one will be 0 too).
  2203. //We use inverse of time_reminaing=[(1- prog) / progRate]
  2204. //so that (1-prog) is in denom. because tasks can have arbitrarily
  2205. //low progRates in practice (e.g. a task that is half done after 1000
  2206. //seconds will have progRate of 0.0000005) so we would rather
  2207. //use Math.maxnon (1-prog) by putting it in the denominator
  2208. //which will cause tasks with prog=1 look 99.99% done instead of 100%
  2209. //which is okay
  2210. double t1 = tip1.getCurrentProgressRate(time) / Math.max(0.0001,
  2211. 1.0 - tip1.getProgress());
  2212. double t2 = tip2.getCurrentProgressRate(time) / Math.max(0.0001,
  2213. 1.0 - tip2.getProgress());
  2214. if (t1 < t2) return -1;
  2215. else if (t2 < t1) return 1;
  2216. else return 0;
  2217. }
  2218. }
  2219. /**
  2220. * Compares the ave progressRate of tasks that have finished on this
  2221. * taskTracker to the ave of all succesfull tasks thus far to see if this
  2222. * TT one is too slow for speculating.
  2223. * slowNodeThreshold is used to determine the number of standard deviations
  2224. * @param taskTracker the name of the TaskTracker we are checking
  2225. * @return is this TaskTracker slow
  2226. */
  2227. protected boolean isSlowTracker(String taskTracker) {
  2228. if (trackerMapStats.get(taskTracker) != null &&
  2229. trackerMapStats.get(taskTracker).mean() -
  2230. mapTaskStats.mean() > mapTaskStats.std()*slowNodeThreshold) {
  2231. if (LOG.isDebugEnabled()) {
  2232. LOG.debug("Tracker " + taskTracker +
  2233. " declared slow. trackerMapStats.get(taskTracker).mean() :" + trackerMapStats.get(taskTracker).mean() +
  2234. " mapTaskStats :" + mapTaskStats);
  2235. }
  2236. return true;
  2237. }
  2238. if (trackerReduceStats.get(taskTracker) != null &&
  2239. trackerReduceStats.get(taskTracker).mean() -
  2240. reduceTaskStats.mean() > reduceTaskStats.std()*slowNodeThreshold) {
  2241. if (LOG.isDebugEnabled()) {
  2242. LOG.debug("Tracker " + taskTracker +
  2243. " declared slow. trackerReduceStats.get(taskTracker).mean() :" + trackerReduceStats.get(taskTracker).mean() +
  2244. " reduceTaskStats :" + reduceTaskStats);
  2245. }
  2246. return true;
  2247. }
  2248. return false;
  2249. }
  2250. static class DataStatistics{
  2251. private int count = 0;
  2252. private double sum = 0;
  2253. private double sumSquares = 0;
  2254. public DataStatistics() {
  2255. }
  2256. public DataStatistics(double initNum) {
  2257. this.count = 1;
  2258. this.sum = initNum;
  2259. this.sumSquares = initNum * initNum;
  2260. }
  2261. public void add(double newNum) {
  2262. this.count++;
  2263. this.sum += newNum;
  2264. this.sumSquares += newNum * newNum;
  2265. }
  2266. public void updateStatistics(double old, double update) {
  2267. sub(old);
  2268. add(update);
  2269. }
  2270. private void sub(double oldNum) {
  2271. this.count--;
  2272. this.sum = Math.max(this.sum -= oldNum, 0.0d);
  2273. this.sumSquares = Math.max(this.sumSquares -= oldNum * oldNum, 0.0d);
  2274. }
  2275. public double mean() {
  2276. return sum/count;
  2277. }
  2278. public double var() {
  2279. // E(X^2) - E(X)^2
  2280. return Math.max((sumSquares/count) - mean() * mean(), 0.0d);
  2281. }
  2282. public double std() {
  2283. return Math.sqrt(this.var());
  2284. }
  2285. public String toString() {
  2286. return "DataStatistics: count is " + count + ", sum is " + sum +
  2287. ", sumSquares is " + sumSquares + " mean is " + mean() + " std() is " + std();
  2288. }
  2289. }
  2290. private boolean shouldRunOnTaskTracker(String taskTracker) {
  2291. //
  2292. // Check if too many tasks of this job have failed on this
  2293. // tasktracker prior to assigning it a new one.
  2294. //
  2295. int taskTrackerFailedTasks = getTrackerTaskFailures(taskTracker);
  2296. if ((flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) &&
  2297. taskTrackerFailedTasks >= maxTaskFailuresPerTracker) {
  2298. if (LOG.isDebugEnabled()) {
  2299. String flakyTracker = convertTrackerNameToHostName(taskTracker);
  2300. if (LOG.isDebugEnabled()) {
  2301. LOG.debug("Ignoring the black-listed tasktracker: '" + flakyTracker
  2302. + "' for assigning a new task");
  2303. }
  2304. }
  2305. return false;
  2306. }
  2307. return true;
  2308. }
  2309. /**
  2310. * Metering: Occupied Slots * (Finish - Start)
  2311. * @param tip {@link TaskInProgress} to be metered which just completed,
  2312. * cannot be <code>null</code>
  2313. * @param status {@link TaskStatus} of the completed task, cannot be
  2314. * <code>null</code>
  2315. */
  2316. private void meterTaskAttempt(TaskInProgress tip, TaskStatus status) {
  2317. JobCounter slotCounter =
  2318. (tip.isMapTask()) ? JobCounter.SLOTS_MILLIS_MAPS :
  2319. JobCounter.SLOTS_MILLIS_REDUCES;
  2320. jobCounters.incrCounter(slotCounter,
  2321. tip.getNumSlotsRequired() *
  2322. (status.getFinishTime() - status.getStartTime()));
  2323. }
  2324. /**
  2325. * A taskid assigned to this JobInProgress has reported in successfully.
  2326. */
  2327. public synchronized boolean completedTask(TaskInProgress tip,
  2328. TaskStatus status)
  2329. {
  2330. TaskAttemptID taskid = status.getTaskID();
  2331. final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
  2332. // Metering
  2333. meterTaskAttempt(tip, status);
  2334. // Sanity check: is the TIP already complete?
  2335. // This would not happen,
  2336. // because no two tasks are SUCCEEDED at the same time.
  2337. if (tip.isComplete()) {
  2338. // Mark this task as KILLED
  2339. tip.alreadyCompletedTask(taskid);
  2340. // Let the JobTracker cleanup this taskid if the job isn't running
  2341. if (this.status.getRunState() != JobStatus.RUNNING) {
  2342. jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
  2343. }
  2344. return false;
  2345. }
  2346. boolean wasSpeculating = tip.isSpeculating(); //store this fact
  2347. LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() +
  2348. " successfully.");
  2349. // Mark the TIP as complete
  2350. tip.completed(taskid);
  2351. resourceEstimator.updateWithCompletedTask(status, tip);
  2352. // Update jobhistory
  2353. TaskTrackerStatus ttStatus =
  2354. this.jobtracker.getTaskTrackerStatus(status.getTaskTracker());
  2355. String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
  2356. TaskType taskType = getTaskType(tip);
  2357. TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent(
  2358. status.getTaskID(), taskType, status.getStartTime(),
  2359. status.getTaskTracker(), ttStatus.getHttpPort(), -1);
  2360. jobHistory.logEvent(tse, status.getTaskID().getJobID());
  2361. TaskAttemptID statusAttemptID = status.getTaskID();
  2362. if (status.getIsMap()){
  2363. MapAttemptFinishedEvent mfe = new MapAttemptFinishedEvent(
  2364. statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(),
  2365. status.getMapFinishTime(),
  2366. status.getFinishTime(), trackerHostname, null,
  2367. status.getStateString(),
  2368. new org.apache.hadoop.mapreduce.Counters(status.getCounters()),
  2369. tip.getSplits(statusAttemptID).burst()
  2370. );
  2371. jobHistory.logEvent(mfe, status.getTaskID().getJobID());
  2372. }else{
  2373. ReduceAttemptFinishedEvent rfe = new ReduceAttemptFinishedEvent(
  2374. statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(),
  2375. status.getShuffleFinishTime(),
  2376. status.getSortFinishTime(), status.getFinishTime(),
  2377. trackerHostname, null, status.getStateString(),
  2378. new org.apache.hadoop.mapreduce.Counters(status.getCounters()),
  2379. tip.getSplits(statusAttemptID).burst()
  2380. );
  2381. jobHistory.logEvent(rfe, status.getTaskID().getJobID());
  2382. }
  2383. TaskFinishedEvent tfe = new TaskFinishedEvent(tip.getTIPId(),
  2384. tip.getExecFinishTime(), taskType,
  2385. TaskStatus.State.SUCCEEDED.toString(),
  2386. new org.apache.hadoop.mapreduce.Counters(status.getCounters()));
  2387. jobHistory.logEvent(tfe, tip.getJob().getJobID());
  2388. if (tip.isJobSetupTask()) {
  2389. // setup task has finished. kill the extra setup tip
  2390. killSetupTip(!tip.isMapTask());
  2391. setupComplete();
  2392. } else if (tip.isJobCleanupTask()) {
  2393. // cleanup task has finished. Kill the extra cleanup tip
  2394. if (tip.isMapTask()) {
  2395. // kill the reduce tip
  2396. cleanup[1].kill();
  2397. } else {
  2398. cleanup[0].kill();
  2399. }
  2400. //
  2401. // The Job is done
  2402. // if the job is failed, then mark the job failed.
  2403. if (jobFailed) {
  2404. terminateJob(JobStatus.FAILED);
  2405. }
  2406. // if the job is killed, then mark the job killed.
  2407. if (jobKilled) {
  2408. terminateJob(JobStatus.KILLED);
  2409. }
  2410. else {
  2411. jobComplete();
  2412. }
  2413. // The job has been killed/failed/successful
  2414. // JobTracker should cleanup this task
  2415. jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
  2416. } else if (tip.isMapTask()) {
  2417. runningMapTasks -= 1;
  2418. finishedMapTasks += 1;
  2419. metrics.completeMap(taskid);
  2420. if (!tip.isJobSetupTask() && hasSpeculativeMaps) {
  2421. updateTaskTrackerStats(tip,ttStatus,trackerMapStats,mapTaskStats);
  2422. }
  2423. // remove the completed map from the resp running caches
  2424. retireMap(tip);
  2425. if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
  2426. this.status.setMapProgress(1.0f);
  2427. if (canLaunchJobCleanupTask()) {
  2428. checkCountersLimitsOrFail();
  2429. }
  2430. }
  2431. } else {
  2432. runningReduceTasks -= 1;
  2433. finishedReduceTasks += 1;
  2434. metrics.completeReduce(taskid);
  2435. if (!tip.isJobSetupTask() && hasSpeculativeReduces) {
  2436. updateTaskTrackerStats(tip,ttStatus,trackerReduceStats,reduceTaskStats);
  2437. }
  2438. // remove the completed reduces from the running reducers set
  2439. retireReduce(tip);
  2440. if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
  2441. this.status.setReduceProgress(1.0f);
  2442. if (canLaunchJobCleanupTask()) {
  2443. checkCountersLimitsOrFail();
  2444. }
  2445. }
  2446. }
  2447. decrementSpeculativeCount(wasSpeculating, tip);
  2448. // is job complete?
  2449. if (!jobSetupCleanupNeeded && canLaunchJobCleanupTask()) {
  2450. jobComplete();
  2451. }
  2452. return true;
  2453. }
  2454. /*
  2455. * add up the counters and fail the job if it exceeds the limits.
  2456. * Make sure we do not recalculate the counters after we fail the job.
  2457. * Currently this is taken care by terminateJob() since it does not
  2458. * calculate the counters.
  2459. */
  2460. private void checkCountersLimitsOrFail() {
  2461. Counters counters = getCounters();
  2462. if (counters.limits().violation() != null) {
  2463. jobtracker.failJob(this);
  2464. }
  2465. }
  2466. private void updateTaskTrackerStats(TaskInProgress tip, TaskTrackerStatus ttStatus,
  2467. Map<String,DataStatistics> trackerStats, DataStatistics overallStats) {
  2468. float tipDuration = tip.getExecFinishTime()-tip.getDispatchTime(tip.getSuccessfulTaskid());
  2469. DataStatistics ttStats =
  2470. trackerStats.get(ttStatus.getTrackerName());
  2471. double oldMean = 0.0d;
  2472. //We maintain the mean of TaskTrackers' means. That way, we get a single
  2473. //data-point for every tracker (used in the evaluation in isSlowTracker)
  2474. if (ttStats != null) {
  2475. oldMean = ttStats.mean();
  2476. ttStats.add(tipDuration);
  2477. overallStats.updateStatistics(oldMean, ttStats.mean());
  2478. } else {
  2479. trackerStats.put(ttStatus.getTrackerName(),
  2480. (ttStats = new DataStatistics(tipDuration)));
  2481. overallStats.add(tipDuration);
  2482. }
  2483. if (LOG.isDebugEnabled()) {
  2484. LOG.debug("Added mean of " +ttStats.mean() + " to trackerStats of type "+
  2485. (tip.isMapTask() ? "Map" : "Reduce") +
  2486. " on "+ttStatus.getTrackerName()+". DataStatistics is now: " +
  2487. trackerStats.get(ttStatus.getTrackerName()));
  2488. }
  2489. }
  2490. public void updateStatistics(double oldProg, double newProg, boolean isMap) {
  2491. if (isMap) {
  2492. runningMapTaskStats.updateStatistics(oldProg, newProg);
  2493. } else {
  2494. runningReduceTaskStats.updateStatistics(oldProg, newProg);
  2495. }
  2496. }
  2497. public DataStatistics getRunningTaskStatistics(boolean isMap) {
  2498. if (isMap) {
  2499. return runningMapTaskStats;
  2500. } else {
  2501. return runningReduceTaskStats;
  2502. }
  2503. }
  2504. public float getSlowTaskThreshold() {
  2505. return slowTaskThreshold;
  2506. }
  2507. /**
  2508. * Job state change must happen thru this call
  2509. */
  2510. private void changeStateTo(int newState) {
  2511. int oldState = this.status.getRunState();
  2512. if (oldState == newState) {
  2513. return; //old and new states are same
  2514. }
  2515. this.status.setRunState(newState);
  2516. //update the metrics
  2517. if (oldState == JobStatus.PREP) {
  2518. this.jobtracker.getInstrumentation().decPrepJob(conf, jobId);
  2519. } else if (oldState == JobStatus.RUNNING) {
  2520. this.jobtracker.getInstrumentation().decRunningJob(conf, jobId);
  2521. }
  2522. if (newState == JobStatus.PREP) {
  2523. this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
  2524. } else if (newState == JobStatus.RUNNING) {
  2525. this.jobtracker.getInstrumentation().addRunningJob(conf, jobId);
  2526. }
  2527. }
  2528. /**
  2529. * The job is done since all it's component tasks are either
  2530. * successful or have failed.
  2531. */
  2532. private void jobComplete() {
  2533. final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
  2534. //
  2535. // All tasks are complete, then the job is done!
  2536. //
  2537. if (this.status.getRunState() == JobStatus.RUNNING ||
  2538. this.status.getRunState() == JobStatus.PREP) {
  2539. changeStateTo(JobStatus.SUCCEEDED);
  2540. this.status.setCleanupProgress(1.0f);
  2541. if (maps.length == 0) {
  2542. this.status.setMapProgress(1.0f);
  2543. }
  2544. if (reduces.length == 0) {
  2545. this.status.setReduceProgress(1.0f);
  2546. }
  2547. this.finishTime = JobTracker.getClock().getTime();
  2548. this.status.setFinishTime(this.finishTime);
  2549. LOG.info("Job " + this.status.getJobID() +
  2550. " has completed successfully.");
  2551. // Log the job summary (this should be done prior to logging to
  2552. // job-history to ensure job-counters are in-sync
  2553. JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
  2554. // Log job-history
  2555. JobFinishedEvent jfe =
  2556. new JobFinishedEvent(this.status.getJobID(),
  2557. this.finishTime,
  2558. this.finishedMapTasks,this.finishedReduceTasks, failedMapTasks,
  2559. failedReduceTasks,
  2560. new org.apache.hadoop.mapreduce.Counters(getMapCounters()),
  2561. new org.apache.hadoop.mapreduce.Counters(getReduceCounters()),
  2562. new org.apache.hadoop.mapreduce.Counters(getCounters()));
  2563. jobHistory.logEvent(jfe, this.status.getJobID());
  2564. jobHistory.closeWriter(this.status.getJobID());
  2565. // Note that finalize will close the job history handles which garbage collect
  2566. // might try to finalize
  2567. garbageCollect();
  2568. metrics.completeJob(this.conf, this.status.getJobID());
  2569. }
  2570. }
  2571. private synchronized void terminateJob(int jobTerminationState) {
  2572. if ((status.getRunState() == JobStatus.RUNNING) ||
  2573. (status.getRunState() == JobStatus.PREP)) {
  2574. this.finishTime = JobTracker.getClock().getTime();
  2575. this.status.setMapProgress(1.0f);
  2576. this.status.setReduceProgress(1.0f);
  2577. this.status.setCleanupProgress(1.0f);
  2578. this.status.setFinishTime(this.finishTime);
  2579. if (jobTerminationState == JobStatus.FAILED) {
  2580. changeStateTo(JobStatus.FAILED);
  2581. } else {
  2582. changeStateTo(JobStatus.KILLED);
  2583. }
  2584. // Log the job summary
  2585. JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
  2586. JobUnsuccessfulCompletionEvent failedEvent =
  2587. new JobUnsuccessfulCompletionEvent(this.status.getJobID(),
  2588. finishTime,
  2589. this.finishedMapTasks,
  2590. this.finishedReduceTasks,
  2591. JobStatus.getJobRunState(jobTerminationState));
  2592. jobHistory.logEvent(failedEvent, this.status.getJobID());
  2593. jobHistory.closeWriter(this.status.getJobID());
  2594. garbageCollect();
  2595. jobtracker.getInstrumentation().terminateJob(
  2596. this.conf, this.status.getJobID());
  2597. if (jobTerminationState == JobStatus.FAILED) {
  2598. jobtracker.getInstrumentation().failedJob(
  2599. this.conf, this.status.getJobID());
  2600. } else {
  2601. jobtracker.getInstrumentation().killedJob(
  2602. this.conf, this.status.getJobID());
  2603. }
  2604. }
  2605. }
  2606. /**
  2607. * Terminate the job and all its component tasks.
  2608. * Calling this will lead to marking the job as failed/killed. Cleanup
  2609. * tip will be launched. If the job has not inited, it will directly call
  2610. * terminateJob as there is no need to launch cleanup tip.
  2611. * This method is reentrant.
  2612. * @param jobTerminationState job termination state
  2613. */
  2614. private synchronized void terminate(int jobTerminationState) {
  2615. if(!tasksInited.get()) {
  2616. //init could not be done, we just terminate directly.
  2617. terminateJob(jobTerminationState);
  2618. return;
  2619. }
  2620. if ((status.getRunState() == JobStatus.RUNNING) ||
  2621. (status.getRunState() == JobStatus.PREP)) {
  2622. LOG.info("Killing job '" + this.status.getJobID() + "'");
  2623. if (jobTerminationState == JobStatus.FAILED) {
  2624. if(jobFailed) {//reentrant
  2625. return;
  2626. }
  2627. jobFailed = true;
  2628. } else if (jobTerminationState == JobStatus.KILLED) {
  2629. if(jobKilled) {//reentrant
  2630. return;
  2631. }
  2632. jobKilled = true;
  2633. }
  2634. // clear all unclean tasks
  2635. clearUncleanTasks();
  2636. //
  2637. // kill all TIPs.
  2638. //
  2639. for (int i = 0; i < setup.length; i++) {
  2640. setup[i].kill();
  2641. }
  2642. for (int i = 0; i < maps.length; i++) {
  2643. maps[i].kill();
  2644. }
  2645. for (int i = 0; i < reduces.length; i++) {
  2646. reduces[i].kill();
  2647. }
  2648. if (!jobSetupCleanupNeeded) {
  2649. terminateJob(jobTerminationState);
  2650. }
  2651. }
  2652. }
  2653. /**
  2654. * Cancel all reservations since the job is done
  2655. */
  2656. private void cancelReservedSlots() {
  2657. // Make a copy of the set of TaskTrackers to prevent a
  2658. // ConcurrentModificationException ...
  2659. Set<TaskTracker> tm =
  2660. new HashSet<TaskTracker>(trackersReservedForMaps.keySet());
  2661. for (TaskTracker tt : tm) {
  2662. tt.unreserveSlots(TaskType.MAP, this);
  2663. }
  2664. Set<TaskTracker> tr =
  2665. new HashSet<TaskTracker>(trackersReservedForReduces.keySet());
  2666. for (TaskTracker tt : tr) {
  2667. tt.unreserveSlots(TaskType.REDUCE, this);
  2668. }
  2669. }
  2670. private void clearUncleanTasks() {
  2671. TaskAttemptID taskid = null;
  2672. TaskInProgress tip = null;
  2673. while (!mapCleanupTasks.isEmpty()) {
  2674. taskid = mapCleanupTasks.remove(0);
  2675. tip = maps[taskid.getTaskID().getId()];
  2676. updateTaskStatus(tip, tip.getTaskStatus(taskid));
  2677. }
  2678. while (!reduceCleanupTasks.isEmpty()) {
  2679. taskid = reduceCleanupTasks.remove(0);
  2680. tip = reduces[taskid.getTaskID().getId()];
  2681. updateTaskStatus(tip, tip.getTaskStatus(taskid));
  2682. }
  2683. }
  2684. /**
  2685. * Kill the job and all its component tasks. This method should be called from
  2686. * jobtracker and should return fast as it locks the jobtracker.
  2687. */
  2688. public void kill() {
  2689. boolean killNow = false;
  2690. synchronized(jobInitKillStatus) {
  2691. jobInitKillStatus.killed = true;
  2692. //if not in middle of init, terminate it now
  2693. if(!jobInitKillStatus.initStarted || jobInitKillStatus.initDone) {
  2694. //avoiding nested locking by setting flag
  2695. killNow = true;
  2696. }
  2697. }
  2698. if(killNow) {
  2699. terminate(JobStatus.KILLED);
  2700. }
  2701. }
  2702. /**
  2703. * Fails the job and all its component tasks. This should be called only from
  2704. * {@link JobInProgress} or {@link JobTracker}. Look at
  2705. * {@link JobTracker#failJob(JobInProgress)} for more details.
  2706. * Note that the job doesnt expect itself to be failed before its inited.
  2707. * Only when the init is done (successfully or otherwise), the job can be
  2708. * failed.
  2709. */
  2710. synchronized void fail() {
  2711. terminate(JobStatus.FAILED);
  2712. }
  2713. private void decrementSpeculativeCount(boolean wasSpeculating,
  2714. TaskInProgress tip) {
  2715. if (wasSpeculating) {
  2716. if (tip.isMapTask()) {
  2717. speculativeMapTasks--;
  2718. if (LOG.isDebugEnabled()) {
  2719. LOG.debug("Decremented count for " +
  2720. tip.getTIPId()+"/"+tip.getJob().getJobID() +
  2721. ". Current speculativeMap task count: "
  2722. + speculativeMapTasks);
  2723. }
  2724. } else {
  2725. speculativeReduceTasks--;
  2726. if (LOG.isDebugEnabled()) {
  2727. LOG.debug("Decremented count for " +
  2728. tip.getTIPId()+"/"+tip.getJob().getJobID() +
  2729. ". Current speculativeReduce task count: "
  2730. + speculativeReduceTasks);
  2731. }
  2732. }
  2733. }
  2734. }
  2735. /**
  2736. * A task assigned to this JobInProgress has reported in as failed.
  2737. * Most of the time, we'll just reschedule execution. However, after
  2738. * many repeated failures we may instead decide to allow the entire
  2739. * job to fail or succeed if the user doesn't care about a few tasks failing.
  2740. *
  2741. * Even if a task has reported as completed in the past, it might later
  2742. * be reported as failed. That's because the TaskTracker that hosts a map
  2743. * task might die before the entire job can complete. If that happens,
  2744. * we need to schedule reexecution so that downstream reduce tasks can
  2745. * obtain the map task's output.
  2746. */
  2747. private void failedTask(TaskInProgress tip, TaskAttemptID taskid,
  2748. TaskStatus status,
  2749. TaskTracker taskTracker, boolean wasRunning,
  2750. boolean wasComplete, boolean wasAttemptRunning) {
  2751. // check if the TIP is already failed
  2752. boolean wasFailed = tip.isFailed();
  2753. boolean wasSpeculating = tip.isSpeculating();
  2754. // Mark the taskid as FAILED or KILLED
  2755. tip.incompleteSubTask(taskid, this.status);
  2756. decrementSpeculativeCount(wasSpeculating, tip);
  2757. boolean isRunning = tip.isRunning();
  2758. boolean isComplete = tip.isComplete();
  2759. if(wasAttemptRunning) {
  2760. // We are decrementing counters without looking for isRunning ,
  2761. // because we increment the counters when we obtain
  2762. // new map task attempt or reduce task attempt.We do not really check
  2763. // for tip being running.
  2764. // Whenever we obtain new task attempt runningMapTasks incremented.
  2765. // hence we are decrementing the same.
  2766. if(!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
  2767. if(tip.isMapTask()) {
  2768. runningMapTasks -= 1;
  2769. } else {
  2770. runningReduceTasks -= 1;
  2771. }
  2772. }
  2773. // Metering
  2774. meterTaskAttempt(tip, status);
  2775. }
  2776. //update running count on task failure.
  2777. if (wasRunning && !isRunning) {
  2778. if (tip.isJobCleanupTask()) {
  2779. launchedCleanup = false;
  2780. } else if (tip.isJobSetupTask()) {
  2781. launchedSetup = false;
  2782. } else if (tip.isMapTask()) {
  2783. // remove from the running queue and put it in the non-running cache
  2784. // if the tip is not complete i.e if the tip still needs to be run
  2785. if (!isComplete) {
  2786. retireMap(tip);
  2787. failMap(tip);
  2788. }
  2789. } else {
  2790. // remove from the running queue and put in the failed queue if the tip
  2791. // is not complete
  2792. if (!isComplete) {
  2793. retireReduce(tip);
  2794. failReduce(tip);
  2795. }
  2796. }
  2797. }
  2798. // The case when the map was complete but the task tracker went down.
  2799. // However, we don't need to do any metering here...
  2800. if (wasComplete && !isComplete) {
  2801. if (tip.isMapTask()) {
  2802. // Put the task back in the cache. This will help locality for cases
  2803. // where we have a different TaskTracker from the same rack/switch
  2804. // asking for a task.
  2805. // We bother about only those TIPs that were successful
  2806. // earlier (wasComplete and !isComplete)
  2807. // (since they might have been removed from the cache of other
  2808. // racks/switches, if the input split blocks were present there too)
  2809. failMap(tip);
  2810. finishedMapTasks -= 1;
  2811. }
  2812. }
  2813. // update job history
  2814. // get taskStatus from tip
  2815. TaskStatus taskStatus = tip.getTaskStatus(taskid);
  2816. String taskTrackerName = taskStatus.getTaskTracker();
  2817. String taskTrackerHostName = convertTrackerNameToHostName(taskTrackerName);
  2818. int taskTrackerPort = -1;
  2819. TaskTrackerStatus taskTrackerStatus =
  2820. (taskTracker == null) ? null : taskTracker.getStatus();
  2821. if (taskTrackerStatus != null) {
  2822. taskTrackerPort = taskTrackerStatus.getHttpPort();
  2823. }
  2824. long startTime = taskStatus.getStartTime();
  2825. long finishTime = taskStatus.getFinishTime();
  2826. List<String> taskDiagnosticInfo = tip.getDiagnosticInfo(taskid);
  2827. String diagInfo = taskDiagnosticInfo == null ? "" :
  2828. StringUtils.arrayToString(taskDiagnosticInfo.toArray(new String[0]));
  2829. TaskType taskType = getTaskType(tip);
  2830. TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent(
  2831. taskid, taskType, startTime, taskTrackerName, taskTrackerPort, -1);
  2832. jobHistory.logEvent(tse, taskid.getJobID());
  2833. ProgressSplitsBlock splits = tip.getSplits(taskStatus.getTaskID());
  2834. TaskAttemptUnsuccessfulCompletionEvent tue =
  2835. new TaskAttemptUnsuccessfulCompletionEvent
  2836. (taskid,
  2837. taskType, taskStatus.getRunState().toString(),
  2838. finishTime,
  2839. taskTrackerHostName, diagInfo,
  2840. splits.burst());
  2841. jobHistory.logEvent(tue, taskid.getJobID());
  2842. // After this, try to assign tasks with the one after this, so that
  2843. // the failed task goes to the end of the list.
  2844. if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
  2845. if (tip.isMapTask()) {
  2846. failedMapTasks++;
  2847. } else {
  2848. failedReduceTasks++;
  2849. }
  2850. }
  2851. //
  2852. // Note down that a task has failed on this tasktracker
  2853. //
  2854. if (status.getRunState() == TaskStatus.State.FAILED) {
  2855. addTrackerTaskFailure(taskTrackerName, taskTracker);
  2856. }
  2857. //
  2858. // Let the JobTracker know that this task has failed
  2859. //
  2860. jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
  2861. //
  2862. // Check if we need to kill the job because of too many failures or
  2863. // if the job is complete since all component tasks have completed
  2864. // We do it once per TIP and that too for the task that fails the TIP
  2865. if (!wasFailed && tip.isFailed()) {
  2866. //
  2867. // Allow upto 'mapFailuresPercent' of map tasks to fail or
  2868. // 'reduceFailuresPercent' of reduce tasks to fail
  2869. //
  2870. boolean killJob = tip.isJobCleanupTask() || tip.isJobSetupTask() ? true :
  2871. tip.isMapTask() ?
  2872. ((++failedMapTIPs*100) > (mapFailuresPercent*numMapTasks)) :
  2873. ((++failedReduceTIPs*100) > (reduceFailuresPercent*numReduceTasks));
  2874. if (killJob) {
  2875. LOG.info("Aborting job " + profile.getJobID());
  2876. TaskFailedEvent tfe =
  2877. new TaskFailedEvent(tip.getTIPId(), finishTime, taskType, diagInfo,
  2878. TaskStatus.State.FAILED.toString(),
  2879. null);
  2880. jobHistory.logEvent(tfe, tip.getJob().getJobID());
  2881. if (tip.isJobCleanupTask()) {
  2882. // kill the other tip
  2883. if (tip.isMapTask()) {
  2884. cleanup[1].kill();
  2885. } else {
  2886. cleanup[0].kill();
  2887. }
  2888. terminateJob(JobStatus.FAILED);
  2889. } else {
  2890. if (tip.isJobSetupTask()) {
  2891. // kill the other tip
  2892. killSetupTip(!tip.isMapTask());
  2893. }
  2894. fail();
  2895. }
  2896. }
  2897. //
  2898. // Update the counters
  2899. //
  2900. if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
  2901. if (tip.isMapTask()) {
  2902. jobCounters.incrCounter(JobCounter.NUM_FAILED_MAPS, 1);
  2903. } else {
  2904. jobCounters.incrCounter(JobCounter.NUM_FAILED_REDUCES, 1);
  2905. }
  2906. }
  2907. }
  2908. }
  2909. void killSetupTip(boolean isMap) {
  2910. if (isMap) {
  2911. setup[0].kill();
  2912. } else {
  2913. setup[1].kill();
  2914. }
  2915. }
  2916. boolean isSetupFinished() {
  2917. // if there is no setup to be launched, consider setup is finished.
  2918. if ((tasksInited.get() && setup.length == 0) ||
  2919. setup[0].isComplete() || setup[0].isFailed() || setup[1].isComplete()
  2920. || setup[1].isFailed()) {
  2921. return true;
  2922. }
  2923. return false;
  2924. }
  2925. /**
  2926. * Fail a task with a given reason, but without a status object.
  2927. *
  2928. * Assuming {@link JobTracker} is locked on entry.
  2929. *
  2930. * @param tip The task's tip
  2931. * @param taskid The task id
  2932. * @param reason The reason that the task failed
  2933. * @param trackerName The task tracker the task failed on
  2934. */
  2935. public synchronized void failedTask(TaskInProgress tip, TaskAttemptID taskid,
  2936. String reason, TaskStatus.Phase phase, TaskStatus.State state,
  2937. String trackerName) {
  2938. TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(),
  2939. taskid,
  2940. 0.0f,
  2941. tip.isMapTask() ?
  2942. numSlotsPerMap :
  2943. numSlotsPerReduce,
  2944. state,
  2945. reason,
  2946. reason,
  2947. trackerName, phase,
  2948. new Counters());
  2949. // update the actual start-time of the attempt
  2950. TaskStatus oldStatus = tip.getTaskStatus(taskid);
  2951. long startTime = oldStatus == null
  2952. ? JobTracker.getClock().getTime()
  2953. : oldStatus.getStartTime();
  2954. status.setStartTime(startTime);
  2955. status.setFinishTime(JobTracker.getClock().getTime());
  2956. boolean wasComplete = tip.isComplete();
  2957. updateTaskStatus(tip, status);
  2958. boolean isComplete = tip.isComplete();
  2959. if (wasComplete && !isComplete) { // mark a successful tip as failed
  2960. TaskType taskType = getTaskType(tip);
  2961. TaskFailedEvent tfe =
  2962. new TaskFailedEvent(tip.getTIPId(), tip.getExecFinishTime(), taskType,
  2963. reason, TaskStatus.State.FAILED.toString(),
  2964. taskid);
  2965. jobHistory.logEvent(tfe, tip.getJob().getJobID());
  2966. }
  2967. }
  2968. /**
  2969. * The job is dead. We're now GC'ing it, getting rid of the job
  2970. * from all tables. Be sure to remove all of this job's tasks
  2971. * from the various tables.
  2972. */
  2973. void garbageCollect() {
  2974. synchronized(this) {
  2975. // Cancel task tracker reservation
  2976. cancelReservedSlots();
  2977. // Let the JobTracker know that a job is complete
  2978. jobtracker.getInstrumentation().decWaitingMaps(getJobID(), pendingMaps());
  2979. jobtracker.getInstrumentation().decWaitingReduces(getJobID(), pendingReduces());
  2980. jobtracker.storeCompletedJob(this);
  2981. jobtracker.finalizeJob(this);
  2982. try {
  2983. // Definitely remove the local-disk copy of the job file
  2984. if (localJobFile != null) {
  2985. localFs.delete(localJobFile, true);
  2986. localJobFile = null;
  2987. }
  2988. Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
  2989. new CleanupQueue().addToQueue(new PathDeletionContext(
  2990. jobtracker.getFileSystem(), tempDir.toUri().getPath()));
  2991. } catch (IOException e) {
  2992. LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
  2993. }
  2994. // free up the memory used by the data structures
  2995. this.nonRunningMapCache = null;
  2996. this.runningMapCache = null;
  2997. this.nonRunningReduces = null;
  2998. this.runningReduces = null;
  2999. }
  3000. // remove jobs delegation tokens
  3001. if(conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true)) {
  3002. DelegationTokenRenewal.removeDelegationTokenRenewalForJob(jobId);
  3003. } // else don't remove it.May be used by spawned tasks
  3004. }
  3005. /**
  3006. * Return the TaskInProgress that matches the tipid.
  3007. */
  3008. public synchronized TaskInProgress getTaskInProgress(TaskID tipid) {
  3009. if (tipid.getTaskType() == TaskType.MAP) {
  3010. // cleanup map tip
  3011. if (cleanup.length > 0 && tipid.equals(cleanup[0].getTIPId())) {
  3012. return cleanup[0];
  3013. }
  3014. // setup map tip
  3015. if (setup.length > 0 && tipid.equals(setup[0].getTIPId())) {
  3016. return setup[0];
  3017. }
  3018. for (int i = 0; i < maps.length; i++) {
  3019. if (tipid.equals(maps[i].getTIPId())){
  3020. return maps[i];
  3021. }
  3022. }
  3023. } else {
  3024. // cleanup reduce tip
  3025. if (cleanup.length > 0 && tipid.equals(cleanup[1].getTIPId())) {
  3026. return cleanup[1];
  3027. }
  3028. // setup reduce tip
  3029. if (setup.length > 0 && tipid.equals(setup[1].getTIPId())) {
  3030. return setup[1];
  3031. }
  3032. for (int i = 0; i < reduces.length; i++) {
  3033. if (tipid.equals(reduces[i].getTIPId())){
  3034. return reduces[i];
  3035. }
  3036. }
  3037. }
  3038. return null;
  3039. }
  3040. /**
  3041. * Find the details of someplace where a map has finished
  3042. * @param mapId the id of the map
  3043. * @return the task status of the completed task
  3044. */
  3045. public synchronized TaskStatus findFinishedMap(int mapId) {
  3046. TaskInProgress tip = maps[mapId];
  3047. if (tip.isComplete()) {
  3048. TaskStatus[] statuses = tip.getTaskStatuses();
  3049. for(int i=0; i < statuses.length; i++) {
  3050. if (statuses[i].getRunState() == TaskStatus.State.SUCCEEDED) {
  3051. return statuses[i];
  3052. }
  3053. }
  3054. }
  3055. return null;
  3056. }
  3057. synchronized int getNumTaskCompletionEvents() {
  3058. return taskCompletionEvents.size();
  3059. }
  3060. synchronized public TaskCompletionEvent[] getTaskCompletionEvents(
  3061. int fromEventId, int maxEvents) {
  3062. TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
  3063. if (taskCompletionEvents.size() > fromEventId) {
  3064. int actualMax = Math.min(maxEvents,
  3065. (taskCompletionEvents.size() - fromEventId));
  3066. events = taskCompletionEvents.subList(fromEventId, actualMax + fromEventId).toArray(events);
  3067. }
  3068. return events;
  3069. }
  3070. synchronized void fetchFailureNotification(TaskInProgress tip,
  3071. TaskAttemptID mapTaskId,
  3072. String mapTrackerName,
  3073. TaskAttemptID reduceTaskId,
  3074. String reduceTrackerName) {
  3075. Integer fetchFailures = mapTaskIdToFetchFailuresMap.get(mapTaskId);
  3076. fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
  3077. mapTaskIdToFetchFailuresMap.put(mapTaskId, fetchFailures);
  3078. LOG.info("Failed fetch notification #" + fetchFailures + " for map task: "
  3079. + mapTaskId + " running on tracker: " + mapTrackerName
  3080. + " and reduce task: " + reduceTaskId + " running on tracker: "
  3081. + reduceTrackerName);
  3082. float failureRate = (float)fetchFailures / runningReduceTasks;
  3083. // declare faulty if fetch-failures >= max-allowed-failures
  3084. boolean isMapFaulty = (failureRate >= MAX_ALLOWED_FETCH_FAILURES_PERCENT)
  3085. ? true
  3086. : false;
  3087. if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS
  3088. && isMapFaulty) {
  3089. LOG.info("Too many fetch-failures for output of task: " + mapTaskId
  3090. + " ... killing it");
  3091. failedTask(tip, mapTaskId, "Too many fetch-failures",
  3092. (tip.isMapTask() ? TaskStatus.Phase.MAP :
  3093. TaskStatus.Phase.REDUCE),
  3094. TaskStatus.State.FAILED, mapTrackerName);
  3095. mapTaskIdToFetchFailuresMap.remove(mapTaskId);
  3096. }
  3097. }
  3098. /**
  3099. * @return The JobID of this JobInProgress.
  3100. */
  3101. public JobID getJobID() {
  3102. return jobId;
  3103. }
  3104. public synchronized Object getSchedulingInfo() {
  3105. return this.schedulingInfo;
  3106. }
  3107. public synchronized void setSchedulingInfo(Object schedulingInfo) {
  3108. this.schedulingInfo = schedulingInfo;
  3109. this.status.setSchedulingInfo(schedulingInfo.toString());
  3110. }
  3111. /**
  3112. * To keep track of kill and initTasks status of this job. initTasks() take
  3113. * a lock on JobInProgress object. kill should avoid waiting on
  3114. * JobInProgress lock since it may take a while to do initTasks().
  3115. */
  3116. private static class JobInitKillStatus {
  3117. //flag to be set if kill is called
  3118. boolean killed;
  3119. boolean initStarted;
  3120. boolean initDone;
  3121. }
  3122. boolean isComplete() {
  3123. return status.isJobComplete();
  3124. }
  3125. /**
  3126. * Get the task type for logging it to {@link JobHistory}.
  3127. */
  3128. private TaskType getTaskType(TaskInProgress tip) {
  3129. if (tip.isJobCleanupTask()) {
  3130. return TaskType.JOB_CLEANUP;
  3131. } else if (tip.isJobSetupTask()) {
  3132. return TaskType.JOB_SETUP;
  3133. } else if (tip.isMapTask()) {
  3134. return TaskType.MAP;
  3135. } else {
  3136. return TaskType.REDUCE;
  3137. }
  3138. }
  3139. /**
  3140. * Get the level of locality that a given task would have if launched on
  3141. * a particular TaskTracker. Returns 0 if the task has data on that machine,
  3142. * 1 if it has data on the same rack, etc (depending on number of levels in
  3143. * the network hierarchy).
  3144. */
  3145. int getLocalityLevel(TaskInProgress tip, TaskTrackerStatus tts) {
  3146. Node tracker = jobtracker.getNode(tts.getHost());
  3147. int level = this.maxLevel;
  3148. // find the right level across split locations
  3149. for (String local : maps[tip.getIdWithinJob()].getSplitLocations()) {
  3150. Node datanode = jobtracker.getNode(local);
  3151. int newLevel = this.maxLevel;
  3152. if (tracker != null && datanode != null) {
  3153. newLevel = getMatchingLevelForNodes(tracker, datanode);
  3154. }
  3155. if (newLevel < level) {
  3156. level = newLevel;
  3157. // an optimization
  3158. if (level == 0) {
  3159. break;
  3160. }
  3161. }
  3162. }
  3163. return level;
  3164. }
  3165. /**
  3166. * Test method to set the cluster sizes
  3167. */
  3168. void setClusterSize(int clusterSize) {
  3169. this.clusterSize = clusterSize;
  3170. }
  3171. static class JobSummary {
  3172. static final Log LOG = LogFactory.getLog(JobSummary.class);
  3173. // Escape sequences
  3174. static final char EQUALS = '=';
  3175. static final char[] charsToEscape =
  3176. {StringUtils.COMMA, EQUALS, StringUtils.ESCAPE_CHAR};
  3177. static class SummaryBuilder {
  3178. final StringBuilder buffer = new StringBuilder();
  3179. // A little optimization for a very common case
  3180. SummaryBuilder add(String key, long value) {
  3181. return _add(key, Long.toString(value));
  3182. }
  3183. <T> SummaryBuilder add(String key, T value) {
  3184. return _add(key, StringUtils.escapeString(String.valueOf(value),
  3185. StringUtils.ESCAPE_CHAR, charsToEscape));
  3186. }
  3187. SummaryBuilder add(SummaryBuilder summary) {
  3188. if (buffer.length() > 0) buffer.append(StringUtils.COMMA);
  3189. buffer.append(summary.buffer);
  3190. return this;
  3191. }
  3192. SummaryBuilder _add(String key, String value) {
  3193. if (buffer.length() > 0) buffer.append(StringUtils.COMMA);
  3194. buffer.append(key).append(EQUALS).append(value);
  3195. return this;
  3196. }
  3197. @Override public String toString() {
  3198. return buffer.toString();
  3199. }
  3200. }
  3201. static SummaryBuilder getTaskLaunchTimesSummary(JobInProgress job) {
  3202. SummaryBuilder summary = new SummaryBuilder();
  3203. Map<TaskType, Long> timeMap = job.getFirstTaskLaunchTimes();
  3204. synchronized(timeMap) {
  3205. for (Map.Entry<TaskType, Long> e : timeMap.entrySet()) {
  3206. summary.add("first"+ StringUtils.camelize(e.getKey().name()) +
  3207. "TaskLaunchTime", e.getValue().longValue());
  3208. }
  3209. }
  3210. return summary;
  3211. }
  3212. /**
  3213. * Log a summary of the job's runtime.
  3214. *
  3215. * @param job {@link JobInProgress} whose summary is to be logged, cannot
  3216. * be <code>null</code>.
  3217. * @param cluster {@link ClusterStatus} of the cluster on which the job was
  3218. * run, cannot be <code>null</code>
  3219. */
  3220. public static void logJobSummary(JobInProgress job, ClusterStatus cluster) {
  3221. JobStatus status = job.getStatus();
  3222. JobProfile profile = job.getProfile();
  3223. Counters jobCounters = job.getJobCounters();
  3224. long mapSlotSeconds =
  3225. (jobCounters.getCounter(JobCounter.SLOTS_MILLIS_MAPS) +
  3226. jobCounters.getCounter(JobCounter.FALLOW_SLOTS_MILLIS_MAPS)) / 1000;
  3227. long reduceSlotSeconds =
  3228. (jobCounters.getCounter(JobCounter.SLOTS_MILLIS_REDUCES) +
  3229. jobCounters.getCounter(JobCounter.FALLOW_SLOTS_MILLIS_REDUCES)) / 1000;
  3230. SummaryBuilder summary = new SummaryBuilder()
  3231. .add("jobId", job.getJobID())
  3232. .add("submitTime", job.getStartTime())
  3233. .add("launchTime", job.getLaunchTime())
  3234. .add(getTaskLaunchTimesSummary(job))
  3235. .add("finishTime", job.getFinishTime())
  3236. .add("numMaps", job.getTasks(TaskType.MAP).length)
  3237. .add("numSlotsPerMap", job.getNumSlotsPerMap())
  3238. .add("numReduces", job.getTasks(TaskType.REDUCE).length)
  3239. .add("numSlotsPerReduce", job.getNumSlotsPerReduce())
  3240. .add("user", profile.getUser())
  3241. .add("queue", profile.getQueueName())
  3242. .add("status", JobStatus.getJobRunState(status.getRunState()))
  3243. .add("mapSlotSeconds", mapSlotSeconds)
  3244. .add("reduceSlotsSeconds", reduceSlotSeconds)
  3245. .add("clusterMapCapacity", cluster.getMaxMapTasks())
  3246. .add("clusterReduceCapacity", cluster.getMaxReduceTasks());
  3247. LOG.info(summary);
  3248. }
  3249. }
  3250. /**
  3251. * Creates the localized copy of job conf
  3252. * @param jobConf
  3253. * @param id
  3254. */
  3255. void setUpLocalizedJobConf(JobConf jobConf,
  3256. org.apache.hadoop.mapreduce.JobID id) {
  3257. String localJobFilePath = jobtracker.getLocalJobFilePath(id);
  3258. File localJobFile = new File(localJobFilePath);
  3259. FileOutputStream jobOut = null;
  3260. try {
  3261. jobOut = new FileOutputStream(localJobFile);
  3262. jobConf.writeXml(jobOut);
  3263. if (LOG.isDebugEnabled()) {
  3264. LOG.debug("Job conf for " + id + " stored at "
  3265. + localJobFile.getAbsolutePath());
  3266. }
  3267. } catch (IOException ioe) {
  3268. LOG.error("Failed to store job conf on the local filesystem ", ioe);
  3269. } finally {
  3270. if (jobOut != null) {
  3271. try {
  3272. jobOut.close();
  3273. } catch (IOException ie) {
  3274. LOG.info("Failed to close the job configuration file "
  3275. + StringUtils.stringifyException(ie));
  3276. }
  3277. }
  3278. }
  3279. }
  3280. /**
  3281. * Deletes localized copy of job conf
  3282. */
  3283. void cleanupLocalizedJobConf(org.apache.hadoop.mapreduce.JobID id) {
  3284. String localJobFilePath = jobtracker.getLocalJobFilePath(id);
  3285. File f = new File (localJobFilePath);
  3286. LOG.info("Deleting localized job conf at " + f);
  3287. if (!f.delete()) {
  3288. if (LOG.isDebugEnabled()) {
  3289. LOG.debug("Failed to delete file " + f);
  3290. }
  3291. }
  3292. }
  3293. /**
  3294. * generate job token and save it into the file
  3295. * @throws IOException
  3296. */
  3297. private void generateAndStoreTokens() throws IOException{
  3298. Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
  3299. Path keysFile = new Path(jobDir, TokenCache.JOB_TOKEN_HDFS_FILE);
  3300. if (tokenStorage == null) {
  3301. tokenStorage = new Credentials();
  3302. }
  3303. //create JobToken file and write token to it
  3304. JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(jobId
  3305. .toString()));
  3306. Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(identifier,
  3307. jobtracker.getJobTokenSecretManager());
  3308. token.setService(identifier.getJobId());
  3309. TokenCache.setJobToken(token, tokenStorage);
  3310. // write TokenStorage out
  3311. tokenStorage.writeTokenStorageFile(keysFile, jobtracker.getConf());
  3312. LOG.info("jobToken generated and stored with users keys in "
  3313. + keysFile.toUri().getPath());
  3314. }
  3315. public String getJobSubmitHostAddress() {
  3316. return submitHostAddress;
  3317. }
  3318. public String getJobSubmitHostName() {
  3319. return submitHostName;
  3320. }
  3321. }