1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754 |
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.mapred;
- import java.io.File;
- import java.io.FileOutputStream;
- import java.io.IOException;
- import java.net.UnknownHostException;
- import java.security.PrivilegedExceptionAction;
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.Comparator;
- import java.util.EnumMap;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.IdentityHashMap;
- import java.util.Iterator;
- import java.util.LinkedHashSet;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
- import java.util.TreeMap;
- import java.util.Vector;
- import java.util.concurrent.atomic.AtomicBoolean;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.classification.InterfaceAudience;
- import org.apache.hadoop.classification.InterfaceStability;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.LocalFileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
- import org.apache.hadoop.mapreduce.JobContext;
- import org.apache.hadoop.mapreduce.JobCounter;
- import org.apache.hadoop.mapreduce.JobSubmissionFiles;
- import org.apache.hadoop.mapreduce.MRJobConfig;
- import org.apache.hadoop.mapreduce.TaskType;
- import org.apache.hadoop.mapreduce.counters.LimitExceededException;
- import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
- import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
- import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
- import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
- import org.apache.hadoop.mapreduce.jobhistory.JobPriorityChangeEvent;
- import org.apache.hadoop.mapreduce.jobhistory.JobStatusChangedEvent;
- import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
- import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
- import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
- import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
- import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
- import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
- import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
- import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
- import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
- import org.apache.hadoop.mapreduce.security.TokenCache;
- import org.apache.hadoop.security.Credentials;
- import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
- import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
- import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
- import org.apache.hadoop.mapreduce.split.JobSplit;
- import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
- import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
- import org.apache.hadoop.mapreduce.task.JobContextImpl;
- import org.apache.hadoop.net.NetUtils;
- import org.apache.hadoop.net.NetworkTopology;
- import org.apache.hadoop.net.Node;
- import org.apache.hadoop.security.UserGroupInformation;
- import org.apache.hadoop.security.token.Token;
- import org.apache.hadoop.security.token.TokenIdentifier;
- import org.apache.hadoop.util.StringUtils;
- /**
- * JobInProgress maintains all the info for keeping a Job on the straight and
- * narrow. It keeps its JobProfile and its latest JobStatus, plus a set of
- * tables for doing bookkeeping of its Tasks.
- */
- @InterfaceAudience.LimitedPrivate({"MapReduce"})
- @InterfaceStability.Unstable
- public class JobInProgress {
- /**
- * Used when the a kill is issued to a job which is initializing.
- */
- static class KillInterruptedException extends InterruptedException {
- private static final long serialVersionUID = 1L;
- public KillInterruptedException(String msg) {
- super(msg);
- }
- }
-
- static final Log LOG = LogFactory.getLog(JobInProgress.class);
-
- JobProfile profile;
- JobStatus status;
- Path jobFile = null;
- Path localJobFile = null;
- TaskInProgress maps[] = new TaskInProgress[0];
- TaskInProgress reduces[] = new TaskInProgress[0];
- TaskInProgress cleanup[] = new TaskInProgress[0];
- TaskInProgress setup[] = new TaskInProgress[0];
- int numMapTasks = 0;
- int numReduceTasks = 0;
- final long memoryPerMap;
- final long memoryPerReduce;
- volatile int numSlotsPerMap = 1;
- volatile int numSlotsPerReduce = 1;
- final int maxTaskFailuresPerTracker;
-
- // Counters to track currently running/finished/failed Map/Reduce task-attempts
- int runningMapTasks = 0;
- int runningReduceTasks = 0;
- int finishedMapTasks = 0;
- int finishedReduceTasks = 0;
- int failedMapTasks = 0;
- int failedReduceTasks = 0;
-
- static final float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
- int completedMapsForReduceSlowstart = 0;
-
- // runningMapTasks include speculative tasks, so we need to capture
- // speculative tasks separately
- int speculativeMapTasks = 0;
- int speculativeReduceTasks = 0;
-
- int mapFailuresPercent = 0;
- int reduceFailuresPercent = 0;
- int failedMapTIPs = 0;
- int failedReduceTIPs = 0;
- private volatile boolean launchedCleanup = false;
- private volatile boolean launchedSetup = false;
- private volatile boolean jobKilled = false;
- private volatile boolean jobFailed = false;
- private final boolean jobSetupCleanupNeeded;
- private final boolean taskCleanupNeeded;
- JobPriority priority = JobPriority.NORMAL;
- protected JobTracker jobtracker;
-
- protected Credentials tokenStorage;
-
- JobHistory jobHistory;
- // NetworkTopology Node to the set of TIPs
- Map<Node, List<TaskInProgress>> nonRunningMapCache;
-
- // Map of NetworkTopology Node to set of running TIPs
- Map<Node, Set<TaskInProgress>> runningMapCache;
- // A list of non-local non-running maps
- List<TaskInProgress> nonLocalMaps;
- // A set of non-local running maps
- Set<TaskInProgress> nonLocalRunningMaps;
- // A list of non-running reduce TIPs
- List<TaskInProgress> nonRunningReduces;
- // A set of running reduce TIPs
- Set<TaskInProgress> runningReduces;
-
- // A list of cleanup tasks for the map task attempts, to be launched
- List<TaskAttemptID> mapCleanupTasks = new LinkedList<TaskAttemptID>();
-
- // A list of cleanup tasks for the reduce task attempts, to be launched
- List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();
- int maxLevel;
- /**
- * A special value indicating that
- * {@link #findNewMapTask(TaskTrackerStatus, int, int, int, double)} should
- * schedule any available map tasks for this job, including speculative tasks.
- */
- int anyCacheLevel;
-
- /**
- * A special value indicating that
- * {@link #findNewMapTask(TaskTrackerStatus, int, int, int, double)} should
- * schedule any only off-switch and speculative map tasks for this job.
- */
- private static final int NON_LOCAL_CACHE_LEVEL = -1;
- private int taskCompletionEventTracker = 0;
- List<TaskCompletionEvent> taskCompletionEvents;
-
- // The maximum percentage of trackers in cluster added to the 'blacklist'.
- private static final double CLUSTER_BLACKLIST_PERCENT = 0.25;
-
- // The maximum percentage of fetch failures allowed for a map
- private static final double MAX_ALLOWED_FETCH_FAILURES_PERCENT = 0.5;
-
- // No. of tasktrackers in the cluster
- private volatile int clusterSize = 0;
-
- // The no. of tasktrackers where >= conf.getMaxTaskFailuresPerTracker()
- // tasks have failed
- private volatile int flakyTaskTrackers = 0;
- // Map of trackerHostName -> no. of task failures
- private Map<String, Integer> trackerToFailuresMap =
- new TreeMap<String, Integer>();
-
- //Confine estimation algorithms to an "oracle" class that JIP queries.
- ResourceEstimator resourceEstimator;
-
- long startTime;
- long launchTime;
- long finishTime;
- // First *task launch times
- final Map<TaskType, Long> firstTaskLaunchTimes =
- new EnumMap<TaskType, Long>(TaskType.class);
-
- // Indicates how many times the job got restarted
- private final int restartCount;
- JobConf conf;
- protected AtomicBoolean tasksInited = new AtomicBoolean(false);
- private JobInitKillStatus jobInitKillStatus = new JobInitKillStatus();
- LocalFileSystem localFs;
- FileSystem fs;
- String user;
- JobID jobId;
- volatile private boolean hasSpeculativeMaps;
- volatile private boolean hasSpeculativeReduces;
- long inputLength = 0;
-
- Counters jobCounters = new Counters();
-
- // Maximum no. of fetch-failure notifications after which map task is killed
- private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
- // Don't lower speculativeCap below one TT's worth (for small clusters)
- private static final int MIN_SPEC_CAP = 10;
-
- private static final float MIN_SLOTS_CAP = 0.01f;
-
- // Map of mapTaskId -> no. of fetch failures
- private Map<TaskAttemptID, Integer> mapTaskIdToFetchFailuresMap =
- new TreeMap<TaskAttemptID, Integer>();
- private Object schedulingInfo;
- private String submitHostName;
- private String submitHostAddress;
- //thresholds for speculative execution
- float slowTaskThreshold;
- float speculativeCap;
- float slowNodeThreshold; //standard deviations
- //Statistics are maintained for a couple of things
- //mapTaskStats is used for maintaining statistics about
- //the completion time of map tasks on the trackers. On a per
- //tracker basis, the mean time for task completion is maintained
- private DataStatistics mapTaskStats = new DataStatistics();
- //reduceTaskStats is used for maintaining statistics about
- //the completion time of reduce tasks on the trackers. On a per
- //tracker basis, the mean time for task completion is maintained
- private DataStatistics reduceTaskStats = new DataStatistics();
- //trackerMapStats used to maintain a mapping from the tracker to the
- //the statistics about completion time of map tasks
- private Map<String,DataStatistics> trackerMapStats =
- new HashMap<String,DataStatistics>();
- //trackerReduceStats used to maintain a mapping from the tracker to the
- //the statistics about completion time of reduce tasks
- private Map<String,DataStatistics> trackerReduceStats =
- new HashMap<String,DataStatistics>();
- //runningMapStats used to maintain the RUNNING map tasks' statistics
- private DataStatistics runningMapTaskStats = new DataStatistics();
- //runningReduceStats used to maintain the RUNNING reduce tasks' statistics
- private DataStatistics runningReduceTaskStats = new DataStatistics();
-
- private static class FallowSlotInfo {
- long timestamp;
- int numSlots;
-
- public FallowSlotInfo(long timestamp, int numSlots) {
- this.timestamp = timestamp;
- this.numSlots = numSlots;
- }
- public long getTimestamp() {
- return timestamp;
- }
- public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
- }
- public int getNumSlots() {
- return numSlots;
- }
- public void setNumSlots(int numSlots) {
- this.numSlots = numSlots;
- }
- }
-
- private Map<TaskTracker, FallowSlotInfo> trackersReservedForMaps =
- new HashMap<TaskTracker, FallowSlotInfo>();
- private Map<TaskTracker, FallowSlotInfo> trackersReservedForReduces =
- new HashMap<TaskTracker, FallowSlotInfo>();
- private Path jobSubmitDir = null;
-
- /**
- * Create an almost empty JobInProgress, which can be used only for tests
- */
- protected JobInProgress(JobID jobid, JobConf conf, JobTracker tracker) {
- this.conf = conf;
- this.jobId = jobid;
- this.numMapTasks = conf.getNumMapTasks();
- this.numReduceTasks = conf.getNumReduceTasks();
- this.maxLevel = NetworkTopology.DEFAULT_HOST_LEVEL;
- this.anyCacheLevel = this.maxLevel+1;
- this.jobtracker = tracker;
- this.restartCount = 0;
- this.profile = new JobProfile(conf.getUser(), jobid, "", "",
- conf.getJobName(),conf.getQueueName());
- this.memoryPerMap = conf.getMemoryForMapTask();
- this.memoryPerReduce = conf.getMemoryForReduceTask();
- this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
-
- hasSpeculativeMaps = conf.getMapSpeculativeExecution();
- hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
- this.nonLocalMaps = new LinkedList<TaskInProgress>();
- this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
- this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
- this.nonRunningReduces = new LinkedList<TaskInProgress>();
- this.runningReduces = new LinkedHashSet<TaskInProgress>();
- this.resourceEstimator = new ResourceEstimator(this);
- this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP,
- this.profile.getUser(), this.profile.getJobName(),
- this.profile.getJobFile(), "");
- this.jobtracker.getInstrumentation().addPrepJob(conf, jobid);
- this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
- (numMapTasks + numReduceTasks + 10);
-
- this.slowTaskThreshold = Math.max(0.0f,
- conf.getFloat(MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD,1.0f));
- this.speculativeCap = conf.getFloat(
- MRJobConfig.SPECULATIVECAP,0.1f);
- this.slowNodeThreshold = conf.getFloat(
- MRJobConfig.SPECULATIVE_SLOWNODE_THRESHOLD,1.0f);
- this.jobSetupCleanupNeeded = conf.getBoolean(
- MRJobConfig.SETUP_CLEANUP_NEEDED, true);
- this.taskCleanupNeeded = conf.getBoolean(
- MRJobConfig.TASK_CLEANUP_NEEDED, true);
- if (tracker != null) { // Some mock tests have null tracker
- this.jobHistory = tracker.getJobHistory();
- }
- this.tokenStorage = null;
- }
-
- JobInProgress(JobConf conf) {
- restartCount = 0;
- jobSetupCleanupNeeded = false;
- taskCleanupNeeded = true;
- this.memoryPerMap = conf.getMemoryForMapTask();
- this.memoryPerReduce = conf.getMemoryForReduceTask();
- this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
- }
-
- /**
- * Create a JobInProgress with the given job file, plus a handle
- * to the tracker.
- */
- public JobInProgress(JobTracker jobtracker,
- final JobConf default_conf, int rCount,
- JobInfo jobInfo,
- Credentials ts
- ) throws IOException, InterruptedException {
- try {
- this.restartCount = rCount;
- this.jobId = JobID.downgrade(jobInfo.getJobID());
- String url = "http://" + jobtracker.getJobTrackerMachine() + ":"
- + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + this.jobId;
- this.jobtracker = jobtracker;
- this.jobHistory = jobtracker.getJobHistory();
- this.startTime = System.currentTimeMillis();
- this.localFs = jobtracker.getLocalFileSystem();
- this.tokenStorage = ts;
- // use the user supplied token to add user credentials to the conf
- jobSubmitDir = jobInfo.getJobSubmitDir();
- user = jobInfo.getUser().toString();
- UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
- if (ts != null) {
- for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
- ugi.addToken(token);
- }
- }
- fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
- public FileSystem run() throws IOException {
- return jobSubmitDir.getFileSystem(default_conf);
- }
- });
- this.localJobFile = default_conf.getLocalPath(JobTracker.SUBDIR + "/"
- + this.jobId + ".xml");
- jobFile = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
- fs.copyToLocalFile(jobFile, localJobFile);
- conf = new JobConf(localJobFile);
- if (conf.getUser() == null) {
- this.conf.setUser(user);
- }
- if (!conf.getUser().equals(user)) {
- String desc = "The username " + conf.getUser() + " obtained from the "
- + "conf doesn't match the username " + user + " the user "
- + "authenticated as";
- AuditLogger.logFailure(user, Operation.SUBMIT_JOB.name(),
- conf.getUser(), jobId.toString(), desc);
- throw new IOException(desc);
- }
- String userGroups[] = ugi.getGroupNames();
- String primaryGroup = (userGroups.length > 0) ? userGroups[0] : null;
- if (primaryGroup != null) {
- conf.set("group.name", primaryGroup);
- }
- this.priority = conf.getJobPriority();
- this.profile = new JobProfile(conf.getUser(), this.jobId, jobFile
- .toString(), url, conf.getJobName(), conf.getQueueName());
- this.status = new JobStatus(this.jobId, 0.0f, 0.0f, JobStatus.PREP,
- profile.getUser(), profile.getJobName(), profile.getJobFile(),
- profile.getURL().toString());
- this.jobtracker.getInstrumentation().addPrepJob(conf, this.jobId);
- status.setStartTime(startTime);
- this.status.setJobPriority(this.priority);
- this.numMapTasks = conf.getNumMapTasks();
- this.numReduceTasks = conf.getNumReduceTasks();
- this.memoryPerMap = conf.getMemoryForMapTask();
- this.memoryPerReduce = conf.getMemoryForReduceTask();
- this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(
- numMapTasks + numReduceTasks + 10);
- JobContext jobContext = new JobContextImpl(conf, jobId);
- this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded();
- this.taskCleanupNeeded = jobContext.getTaskCleanupNeeded();
- // Construct the jobACLs
- status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf));
- this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
- this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
- this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
- hasSpeculativeMaps = conf.getMapSpeculativeExecution();
- hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
- this.maxLevel = jobtracker.getNumTaskCacheLevels();
- this.anyCacheLevel = this.maxLevel + 1;
- this.nonLocalMaps = new LinkedList<TaskInProgress>();
- this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
- this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
- this.nonRunningReduces = new LinkedList<TaskInProgress>();
- this.runningReduces = new LinkedHashSet<TaskInProgress>();
- this.resourceEstimator = new ResourceEstimator(this);
- this.submitHostName = conf.getJobSubmitHostName();
- this.submitHostAddress = conf.getJobSubmitHostAddress();
- this.slowTaskThreshold = Math.max(0.0f, conf.getFloat(
- MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD, 1.0f));
- this.speculativeCap = conf.getFloat(MRJobConfig.SPECULATIVECAP, 0.1f);
- this.slowNodeThreshold = conf.getFloat(
- MRJobConfig.SPECULATIVE_SLOWNODE_THRESHOLD, 1.0f);
- // register job's tokens for renewal
- DelegationTokenRenewal.registerDelegationTokensForRenewal(jobInfo
- .getJobID(), ts, jobtracker.getConf());
- } finally {
- // close all FileSystems that was created above for the current user
- // At this point, this constructor is called in the context of an RPC, and
- // hence the "current user" is actually referring to the kerberos
- // authenticated user (if security is ON).
- FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
- }
- }
-
- private void printCache (Map<Node, List<TaskInProgress>> cache) {
- LOG.info("The taskcache info:");
- for (Map.Entry<Node, List<TaskInProgress>> n : cache.entrySet()) {
- List <TaskInProgress> tips = n.getValue();
- LOG.info("Cached TIPs on node: " + n.getKey());
- for (TaskInProgress tip : tips) {
- LOG.info("tip : " + tip.getTIPId());
- }
- }
- }
-
- Map<Node, List<TaskInProgress>> createCache(
- TaskSplitMetaInfo[] splits, int maxLevel) {
- Map<Node, List<TaskInProgress>> cache =
- new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
-
- for (int i = 0; i < splits.length; i++) {
- String[] splitLocations = splits[i].getLocations();
- if (splitLocations.length == 0) {
- nonLocalMaps.add(maps[i]);
- continue;
- }
- for(String host: splitLocations) {
- Node node = jobtracker.resolveAndAddToTopology(host);
- LOG.info("tip:" + maps[i].getTIPId() + " has split on node:" + node);
- for (int j = 0; j < maxLevel; j++) {
- List<TaskInProgress> hostMaps = cache.get(node);
- if (hostMaps == null) {
- hostMaps = new ArrayList<TaskInProgress>();
- cache.put(node, hostMaps);
- hostMaps.add(maps[i]);
- }
- //check whether the hostMaps already contains an entry for a TIP
- //This will be true for nodes that are racks and multiple nodes in
- //the rack contain the input for a tip. Note that if it already
- //exists in the hostMaps, it must be the last element there since
- //we process one TIP at a time sequentially in the split-size order
- if (hostMaps.get(hostMaps.size() - 1) != maps[i]) {
- hostMaps.add(maps[i]);
- }
- node = node.getParent();
- }
- }
- }
- return cache;
- }
-
- /**
- * Check if the job has been initialized.
- * @return <code>true</code> if the job has been initialized,
- * <code>false</code> otherwise
- */
- public boolean inited() {
- return tasksInited.get();
- }
-
- /**
- * Get the user for the job
- */
- public String getUser() {
- return user;
- }
- boolean getMapSpeculativeExecution() {
- return hasSpeculativeMaps;
- }
-
- boolean getReduceSpeculativeExecution() {
- return hasSpeculativeReduces;
- }
-
- long getMemoryForMapTask() {
- return memoryPerMap;
- }
-
- long getMemoryForReduceTask() {
- return memoryPerReduce;
- }
-
- /**
- * Get the number of slots required to run a single map task-attempt.
- * @return the number of slots required to run a single map task-attempt
- */
- int getNumSlotsPerMap() {
- return numSlotsPerMap;
- }
- /**
- * Set the number of slots required to run a single map task-attempt.
- * This is typically set by schedulers which support high-ram jobs.
- * @param slots the number of slots required to run a single map task-attempt
- */
- void setNumSlotsPerMap(int numSlotsPerMap) {
- this.numSlotsPerMap = numSlotsPerMap;
- }
- /**
- * Get the number of slots required to run a single reduce task-attempt.
- * @return the number of slots required to run a single reduce task-attempt
- */
- int getNumSlotsPerReduce() {
- return numSlotsPerReduce;
- }
- /**
- * Set the number of slots required to run a single reduce task-attempt.
- * This is typically set by schedulers which support high-ram jobs.
- * @param slots the number of slots required to run a single reduce
- * task-attempt
- */
- void setNumSlotsPerReduce(int numSlotsPerReduce) {
- this.numSlotsPerReduce = numSlotsPerReduce;
- }
- /**
- * Construct the splits, etc. This is invoked from an async
- * thread so that split-computation doesn't block anyone. Only the
- * {@link JobTracker} should invoke this api. Look
- * at {@link JobTracker#initJob(JobInProgress)} for more details.
- */
- public synchronized void initTasks()
- throws IOException, KillInterruptedException, UnknownHostException {
- if (tasksInited.get() || isComplete()) {
- return;
- }
- synchronized(jobInitKillStatus){
- if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) {
- return;
- }
- jobInitKillStatus.initStarted = true;
- }
- LOG.info("Initializing " + jobId);
- logSubmissionToJobHistory();
-
- // log the job priority
- setPriority(this.priority);
-
- //
- // generate security keys needed by Tasks
- //
- generateAndStoreTokens();
-
- //
- // read input splits and create a map per a split
- //
- TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(jobId);
- numMapTasks = taskSplitMetaInfo.length;
- checkTaskLimits();
- // Sanity check the locations so we don't create/initialize unnecessary tasks
- for (TaskSplitMetaInfo split : taskSplitMetaInfo) {
- NetUtils.verifyHostnames(split.getLocations());
- }
- jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
- jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
- createMapTasks(jobFile.toString(), taskSplitMetaInfo);
-
- if (numMapTasks > 0) {
- nonRunningMapCache = createCache(taskSplitMetaInfo,
- maxLevel);
- }
-
- // set the launch time
- this.launchTime = JobTracker.getClock().getTime();
- createReduceTasks(jobFile.toString());
-
- // Calculate the minimum number of maps to be complete before
- // we should start scheduling reduces
- completedMapsForReduceSlowstart =
- (int)Math.ceil(
- (conf.getFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
- DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) *
- numMapTasks));
-
- initSetupCleanupTasks(jobFile.toString());
-
- synchronized(jobInitKillStatus){
- jobInitKillStatus.initDone = true;
- if(jobInitKillStatus.killed) {
- //setup not launched so directly terminate
- throw new KillInterruptedException("Job " + jobId + " killed in init");
- }
- }
-
- tasksInited.set(true);
- JobInitedEvent jie = new JobInitedEvent(
- profile.getJobID(), this.launchTime,
- numMapTasks, numReduceTasks,
- JobStatus.getJobRunState(JobStatus.PREP),
- false);
-
- jobHistory.logEvent(jie, jobId);
-
- // Log the number of map and reduce tasks
- LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks
- + " map tasks and " + numReduceTasks + " reduce tasks.");
- }
- // Returns true if the job is empty (0 maps, 0 reduces and no setup-cleanup)
- // else return false.
- synchronized boolean isJobEmpty() {
- return maps.length == 0 && reduces.length == 0 && !jobSetupCleanupNeeded;
- }
-
- synchronized boolean isSetupCleanupRequired() {
- return jobSetupCleanupNeeded;
- }
- // Should be called once the init is done. This will complete the job
- // because the job is empty (0 maps, 0 reduces and no setup-cleanup).
- synchronized void completeEmptyJob() {
- jobComplete();
- }
- synchronized void completeSetup() {
- setupComplete();
- }
- void logSubmissionToJobHistory() throws IOException {
- // log job info
- String username = conf.getUser();
- if (username == null) { username = ""; }
- String jobname = conf.getJobName();
- String jobQueueName = conf.getQueueName();
- setUpLocalizedJobConf(conf, jobId);
- jobHistory.setupEventWriter(jobId, conf);
- JobSubmittedEvent jse =
- new JobSubmittedEvent(jobId, jobname, username, this.startTime,
- jobFile.toString(), status.getJobACLs(), jobQueueName);
- jobHistory.logEvent(jse, jobId);
-
- }
- TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId)
- throws IOException {
- TaskSplitMetaInfo[] allTaskSplitMetaInfo =
- SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, conf, jobSubmitDir);
- return allTaskSplitMetaInfo;
- }
- /**
- * If the number of taks is greater than the configured value
- * throw an exception that will fail job initialization
- */
- void checkTaskLimits() throws IOException {
- int maxTasks = jobtracker.getMaxTasksPerJob();
- if (maxTasks > 0 && numMapTasks + numReduceTasks > maxTasks) {
- throw new IOException(
- "The number of tasks for this job " +
- (numMapTasks + numReduceTasks) +
- " exceeds the configured limit " + maxTasks);
- }
- }
- synchronized void createMapTasks(String jobFile,
- TaskSplitMetaInfo[] splits) {
- maps = new TaskInProgress[numMapTasks];
- for(int i=0; i < numMapTasks; ++i) {
- inputLength += splits[i].getInputDataLength();
- maps[i] = new TaskInProgress(jobId, jobFile,
- splits[i],
- jobtracker, conf, this,
- i, numSlotsPerMap);
- }
- LOG.info("Input size for job " + jobId + " = " + inputLength
- + ". Number of splits = " + splits.length);
- }
- synchronized void createReduceTasks(String jobFile) {
- this.reduces = new TaskInProgress[numReduceTasks];
- for (int i = 0; i < numReduceTasks; i++) {
- reduces[i] = new TaskInProgress(jobId, jobFile,
- numMapTasks, i,
- jobtracker, conf,
- this, numSlotsPerReduce);
- nonRunningReduces.add(reduces[i]);
- }
- }
-
- synchronized void initSetupCleanupTasks(String jobFile) {
- if (!jobSetupCleanupNeeded) {
- LOG.info("Setup/Cleanup not needed for job " + jobId);
- // nothing to initialize
- return;
- }
- // create cleanup two cleanup tips, one map and one reduce.
- cleanup = new TaskInProgress[2];
- // cleanup map tip. This map doesn't use any splits. Just assign an empty
- // split.
- TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
- cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
- jobtracker, conf, this, numMapTasks, 1);
- cleanup[0].setJobCleanupTask();
- // cleanup reduce tip.
- cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
- numReduceTasks, jobtracker, conf, this, 1);
- cleanup[1].setJobCleanupTask();
- // create two setup tips, one map and one reduce.
- setup = new TaskInProgress[2];
- // setup map tip. This map doesn't use any split. Just assign an empty
- // split.
- setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
- jobtracker, conf, this, numMapTasks + 1, 1);
- setup[0].setJobSetupTask();
- // setup reduce tip.
- setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
- numReduceTasks + 1, jobtracker, conf, this, 1);
- setup[1].setJobSetupTask();
- }
-
- void setupComplete() {
- status.setSetupProgress(1.0f);
- if (this.status.getRunState() == JobStatus.PREP) {
- changeStateTo(JobStatus.RUNNING);
- JobStatusChangedEvent jse =
- new JobStatusChangedEvent(profile.getJobID(),
- JobStatus.getJobRunState(JobStatus.RUNNING));
- jobHistory.logEvent(jse, profile.getJobID());
- }
- }
- /////////////////////////////////////////////////////
- // Accessors for the JobInProgress
- /////////////////////////////////////////////////////
- public JobProfile getProfile() {
- return profile;
- }
- public JobStatus getStatus() {
- return status;
- }
- public synchronized long getLaunchTime() {
- return launchTime;
- }
- Map<TaskType, Long> getFirstTaskLaunchTimes() {
- return firstTaskLaunchTimes;
- }
- public long getStartTime() {
- return startTime;
- }
- public long getFinishTime() {
- return finishTime;
- }
- public int desiredMaps() {
- return numMapTasks;
- }
- public synchronized int finishedMaps() {
- return finishedMapTasks;
- }
- public int desiredReduces() {
- return numReduceTasks;
- }
- public synchronized int runningMaps() {
- return runningMapTasks;
- }
- public synchronized int runningReduces() {
- return runningReduceTasks;
- }
- public synchronized int finishedReduces() {
- return finishedReduceTasks;
- }
- public synchronized int pendingMaps() {
- return numMapTasks - runningMapTasks - failedMapTIPs -
- finishedMapTasks + speculativeMapTasks;
- }
- public synchronized int pendingReduces() {
- return numReduceTasks - runningReduceTasks - failedReduceTIPs -
- finishedReduceTasks + speculativeReduceTasks;
- }
-
- public int getNumSlotsPerTask(TaskType taskType) {
- if (taskType == TaskType.MAP) {
- return numSlotsPerMap;
- } else if (taskType == TaskType.REDUCE) {
- return numSlotsPerReduce;
- } else {
- return 1;
- }
- }
- public JobPriority getPriority() {
- return this.priority;
- }
- public void setPriority(JobPriority priority) {
- if(priority == null) {
- priority = JobPriority.NORMAL;
- }
- synchronized (this) {
- this.priority = priority;
- status.setJobPriority(priority);
- // log and change to the job's priority
- JobPriorityChangeEvent prEvent =
- new JobPriorityChangeEvent(jobId, priority);
-
- jobHistory.logEvent(prEvent, jobId);
-
- }
- }
- // Update the job start/launch time (upon restart) and log to history
- synchronized void updateJobInfo(long startTime, long launchTime) {
- // log and change to the job's start/launch time
- this.startTime = startTime;
- this.launchTime = launchTime;
- JobInfoChangeEvent event =
- new JobInfoChangeEvent(jobId, startTime, launchTime);
-
- jobHistory.logEvent(event, jobId);
-
- }
- /**
- * Get the number of times the job has restarted
- */
- int getNumRestarts() {
- return restartCount;
- }
-
- long getInputLength() {
- return inputLength;
- }
-
- boolean isCleanupLaunched() {
- return launchedCleanup;
- }
- boolean isSetupLaunched() {
- return launchedSetup;
- }
- /**
- * Get all the tasks of the desired type in this job.
- * @param type {@link TaskType} of the tasks required
- * @return An array of {@link TaskInProgress} matching the given type.
- * Returns an empty array if no tasks are found for the given type.
- */
- TaskInProgress[] getTasks(TaskType type) {
- TaskInProgress[] tasks = null;
- switch (type) {
- case MAP:
- {
- tasks = maps;
- }
- break;
- case REDUCE:
- {
- tasks = reduces;
- }
- break;
- case JOB_SETUP:
- {
- tasks = setup;
- }
- break;
- case JOB_CLEANUP:
- {
- tasks = cleanup;
- }
- break;
- default:
- {
- tasks = new TaskInProgress[0];
- }
- break;
- }
- return tasks;
- }
- /**
- * Return the nonLocalRunningMaps
- * @return
- */
- Set<TaskInProgress> getNonLocalRunningMaps()
- {
- return nonLocalRunningMaps;
- }
-
- /**
- * Return the runningMapCache
- * @return
- */
- Map<Node, Set<TaskInProgress>> getRunningMapCache()
- {
- return runningMapCache;
- }
-
- /**
- * Return runningReduces
- * @return
- */
- Set<TaskInProgress> getRunningReduces()
- {
- return runningReduces;
- }
-
- /**
- * Get the job configuration
- * @return the job's configuration
- */
- JobConf getJobConf() {
- return conf;
- }
-
- /**
- * Return a vector of completed TaskInProgress objects
- */
- public synchronized Vector<TaskInProgress> reportTasksInProgress(boolean shouldBeMap,
- boolean shouldBeComplete) {
-
- Vector<TaskInProgress> results = new Vector<TaskInProgress>();
- TaskInProgress tips[] = null;
- if (shouldBeMap) {
- tips = maps;
- } else {
- tips = reduces;
- }
- for (int i = 0; i < tips.length; i++) {
- if (tips[i].isComplete() == shouldBeComplete) {
- results.add(tips[i]);
- }
- }
- return results;
- }
-
- /**
- * Return a vector of cleanup TaskInProgress objects
- */
- public synchronized Vector<TaskInProgress> reportCleanupTIPs(
- boolean shouldBeComplete) {
-
- Vector<TaskInProgress> results = new Vector<TaskInProgress>();
- for (int i = 0; i < cleanup.length; i++) {
- if (cleanup[i].isComplete() == shouldBeComplete) {
- results.add(cleanup[i]);
- }
- }
- return results;
- }
- /**
- * Return a vector of setup TaskInProgress objects
- */
- public synchronized Vector<TaskInProgress> reportSetupTIPs(
- boolean shouldBeComplete) {
-
- Vector<TaskInProgress> results = new Vector<TaskInProgress>();
- for (int i = 0; i < setup.length; i++) {
- if (setup[i].isComplete() == shouldBeComplete) {
- results.add(setup[i]);
- }
- }
- return results;
- }
- ////////////////////////////////////////////////////
- // Status update methods
- ////////////////////////////////////////////////////
- /**
- * Assuming {@link JobTracker} is locked on entry.
- */
- public synchronized void updateTaskStatus(TaskInProgress tip,
- TaskStatus status) {
- double oldProgress = tip.getProgress(); // save old progress
- boolean wasRunning = tip.isRunning();
- boolean wasComplete = tip.isComplete();
- boolean wasPending = tip.isOnlyCommitPending();
- TaskAttemptID taskid = status.getTaskID();
- boolean wasAttemptRunning = tip.isAttemptRunning(taskid);
-
- // If the TIP is already completed and the task reports as SUCCEEDED then
- // mark the task as KILLED.
- // In case of task with no promotion the task tracker will mark the task
- // as SUCCEEDED.
- // User has requested to kill the task, but TT reported SUCCEEDED,
- // mark the task KILLED.
- if ((wasComplete || tip.wasKilled(taskid)) &&
- (status.getRunState() == TaskStatus.State.SUCCEEDED)) {
- status.setRunState(TaskStatus.State.KILLED);
- }
-
- // If the job is complete or task-cleanup is switched off
- // and a task has just reported its state as FAILED_UNCLEAN/KILLED_UNCLEAN,
- // make the task's state FAILED/KILLED without launching cleanup attempt.
- // Note that if task is already a cleanup attempt,
- // we don't change the state to make sure the task gets a killTaskAction
- if ((this.isComplete() || jobFailed || jobKilled || !taskCleanupNeeded) &&
- !tip.isCleanupAttempt(taskid)) {
- if (status.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
- status.setRunState(TaskStatus.State.FAILED);
- } else if (status.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
- status.setRunState(TaskStatus.State.KILLED);
- }
- }
-
- boolean change = tip.updateStatus(status);
- if (change) {
- TaskStatus.State state = status.getRunState();
- // get the TaskTrackerStatus where the task ran
- TaskTracker taskTracker =
- this.jobtracker.getTaskTracker(tip.machineWhereTaskRan(taskid));
- TaskTrackerStatus ttStatus =
- (taskTracker == null) ? null : taskTracker.getStatus();
- String taskTrackerHttpLocation = null;
- if (null != ttStatus){
- String host;
- if (NetUtils.getStaticResolution(ttStatus.getHost()) != null) {
- host = NetUtils.getStaticResolution(ttStatus.getHost());
- } else {
- host = ttStatus.getHost();
- }
- taskTrackerHttpLocation = "http://" + host + ":"
- + ttStatus.getHttpPort();
- }
- TaskCompletionEvent taskEvent = null;
- if (state == TaskStatus.State.SUCCEEDED) {
- taskEvent = new TaskCompletionEvent(
- taskCompletionEventTracker,
- taskid,
- tip.idWithinJob(),
- status.getIsMap() &&
- !tip.isJobCleanupTask() &&
- !tip.isJobSetupTask(),
- TaskCompletionEvent.Status.SUCCEEDED,
- taskTrackerHttpLocation
- );
- taskEvent.setTaskRunTime((int)(status.getFinishTime()
- - status.getStartTime()));
- tip.setSuccessEventNumber(taskCompletionEventTracker);
- } else if (state == TaskStatus.State.COMMIT_PENDING) {
- // If it is the first attempt reporting COMMIT_PENDING
- // ask the task to commit.
- if (!wasComplete && !wasPending) {
- tip.doCommit(taskid);
- }
- return;
- } else if (state == TaskStatus.State.FAILED_UNCLEAN ||
- state == TaskStatus.State.KILLED_UNCLEAN) {
- tip.incompleteSubTask(taskid, this.status);
- // add this task, to be rescheduled as cleanup attempt
- if (tip.isMapTask()) {
- mapCleanupTasks.add(taskid);
- } else {
- reduceCleanupTasks.add(taskid);
- }
- // Remove the task entry from jobtracker
- jobtracker.removeTaskEntry(taskid);
- }
- //For a failed task update the JT datastructures.
- else if (state == TaskStatus.State.FAILED ||
- state == TaskStatus.State.KILLED) {
- // Get the event number for the (possibly) previously successful
- // task. If there exists one, then set that status to OBSOLETE
- int eventNumber;
- if ((eventNumber = tip.getSuccessEventNumber()) != -1) {
- TaskCompletionEvent t =
- this.taskCompletionEvents.get(eventNumber);
- if (t.getTaskAttemptId().equals(taskid))
- t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
- }
-
- // Tell the job to fail the relevant task
- failedTask(tip, taskid, status, taskTracker,
- wasRunning, wasComplete, wasAttemptRunning);
- // Did the task failure lead to tip failure?
- TaskCompletionEvent.Status taskCompletionStatus =
- (state == TaskStatus.State.FAILED ) ?
- TaskCompletionEvent.Status.FAILED :
- TaskCompletionEvent.Status.KILLED;
- if (tip.isFailed()) {
- taskCompletionStatus = TaskCompletionEvent.Status.TIPFAILED;
- }
- taskEvent = new TaskCompletionEvent(taskCompletionEventTracker,
- taskid,
- tip.idWithinJob(),
- status.getIsMap() &&
- !tip.isJobCleanupTask() &&
- !tip.isJobSetupTask(),
- taskCompletionStatus,
- taskTrackerHttpLocation
- );
- }
- // Add the 'complete' task i.e. successful/failed
- // It _is_ safe to add the TaskCompletionEvent.Status.SUCCEEDED
- // *before* calling TIP.completedTask since:
- // a. One and only one task of a TIP is declared as a SUCCESS, the
- // other (speculative tasks) are marked KILLED
- // b. TIP.completedTask *does not* throw _any_ exception at all.
- if (taskEvent != null) {
- this.taskCompletionEvents.add(taskEvent);
- taskCompletionEventTracker++;
- JobTrackerStatistics.TaskTrackerStat ttStat = jobtracker.
- getStatistics().getTaskTrackerStat(tip.machineWhereTaskRan(taskid));
- if(ttStat != null) { // ttStat can be null in case of lost tracker
- ttStat.incrTotalTasks();
- }
- if (state == TaskStatus.State.SUCCEEDED) {
- completedTask(tip, status);
- if(ttStat != null) {
- ttStat.incrSucceededTasks();
- }
- }
- }
- }
-
- //
- // Update JobInProgress status
- //
- if (LOG.isDebugEnabled()) {
- LOG.debug("Taking progress for " + tip.getTIPId() + " from " +
- oldProgress + " to " + tip.getProgress());
- }
-
- if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
- double progressDelta = tip.getProgress() - oldProgress;
- if (tip.isMapTask()) {
- this.status.setMapProgress((float) (this.status.mapProgress() +
- progressDelta / maps.length));
- } else {
- this.status.setReduceProgress((float) (this.status.reduceProgress() +
- (progressDelta / reduces.length)));
- }
- }
- }
- /**
- * Returns the job-level counters.
- *
- * @return the job-level counters.
- */
- public synchronized Counters getJobCounters() {
- return jobCounters;
- }
-
- /**
- * Returns map phase counters by summing over all map tasks in progress.
- */
- public synchronized Counters getMapCounters() {
- return incrementTaskCounters(new Counters(), maps);
- }
-
- /**
- * Returns map phase counters by summing over all map tasks in progress.
- */
- public synchronized Counters getReduceCounters() {
- return incrementTaskCounters(new Counters(), reduces);
- }
-
- /**
- * Returns the total job counters, by adding together the job,
- * the map and the reduce counters.
- */
- public Counters getCounters() {
- Counters result = new Counters();
- synchronized (this) {
- result.incrAllCounters(getJobCounters());
- }
- // the counters of TIPs are not updated in place.
- // hence read-only access is ok without any locks
- incrementTaskCounters(result, maps);
- return incrementTaskCounters(result, reduces);
- }
-
- /**
- * Increments the counters with the counters from each task.
- * @param counters the counters to increment
- * @param tips the tasks to add in to counters
- * @return counters the same object passed in as counters
- */
- private Counters incrementTaskCounters(Counters counters,
- TaskInProgress[] tips) {
- try {
- for (TaskInProgress tip : tips) {
- counters.incrAllCounters(tip.getCounters());
- }
- } catch (LimitExceededException e) {
- // too many user counters/groups, leaving existing counters intact.
- }
- return counters;
- }
- /////////////////////////////////////////////////////
- // Create/manage tasks
- /////////////////////////////////////////////////////
- /**
- * Return a MapTask, if appropriate, to run on the given tasktracker
- */
- public synchronized Task obtainNewMapTask(TaskTrackerStatus tts,
- int clusterSize,
- int numUniqueHosts,
- int maxCacheLevel
- ) throws IOException {
- if (status.getRunState() != JobStatus.RUNNING) {
- LOG.info("Cannot create task split for " + profile.getJobID());
- return null;
- }
-
- int target = findNewMapTask(tts, clusterSize, numUniqueHosts,
- maxCacheLevel);
- if (target == -1) {
- return null;
- }
-
- Task result = maps[target].getTaskToRun(tts.getTrackerName());
- if (result != null) {
- addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
- }
- return result;
- }
-
- /**
- * Return a MapTask, if appropriate, to run on the given tasktracker
- */
- public synchronized Task obtainNewMapTask(TaskTrackerStatus tts,
- int clusterSize,
- int numUniqueHosts
- ) throws IOException {
- return obtainNewMapTask(tts, clusterSize, numUniqueHosts, anyCacheLevel);
- }
- /*
- * Return task cleanup attempt if any, to run on a given tracker
- */
- public Task obtainTaskCleanupTask(TaskTrackerStatus tts,
- boolean isMapSlot)
- throws IOException {
- if (!tasksInited.get()) {
- return null;
- }
- synchronized (this) {
- if (this.status.getRunState() != JobStatus.RUNNING ||
- jobFailed || jobKilled) {
- return null;
- }
- String taskTracker = tts.getTrackerName();
- if (!shouldRunOnTaskTracker(taskTracker)) {
- return null;
- }
- TaskAttemptID taskid = null;
- TaskInProgress tip = null;
- if (isMapSlot) {
- if (!mapCleanupTasks.isEmpty()) {
- taskid = mapCleanupTasks.remove(0);
- tip = maps[taskid.getTaskID().getId()];
- }
- } else {
- if (!reduceCleanupTasks.isEmpty()) {
- taskid = reduceCleanupTasks.remove(0);
- tip = reduces[taskid.getTaskID().getId()];
- }
- }
- if (tip != null) {
- return tip.addRunningTask(taskid, taskTracker, true);
- }
- return null;
- }
- }
-
- public synchronized Task obtainNewLocalMapTask(TaskTrackerStatus tts,
- int clusterSize,
- int numUniqueHosts)
- throws IOException {
- if (!tasksInited.get()) {
- LOG.info("Cannot create task split for " + profile.getJobID());
- return null;
- }
-
- return obtainNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel);
- }
-
- public synchronized Task obtainNewNonLocalMapTask(TaskTrackerStatus tts,
- int clusterSize,
- int numUniqueHosts)
- throws IOException {
- if (!tasksInited.get()) {
- LOG.info("Cannot create task split for " + profile.getJobID());
- return null;
- }
-
- return obtainNewMapTask(tts, clusterSize, numUniqueHosts,
- NON_LOCAL_CACHE_LEVEL);
- }
-
- /**
- * Return a CleanupTask, if appropriate, to run on the given tasktracker
- *
- */
- public Task obtainJobCleanupTask(TaskTrackerStatus tts,
- int clusterSize,
- int numUniqueHosts,
- boolean isMapSlot
- ) throws IOException {
- if(!tasksInited.get() || !jobSetupCleanupNeeded) {
- return null;
- }
-
- synchronized(this) {
- if (!canLaunchJobCleanupTask()) {
- return null;
- }
-
- String taskTracker = tts.getTrackerName();
- // Update the last-known clusterSize
- this.clusterSize = clusterSize;
- if (!shouldRunOnTaskTracker(taskTracker)) {
- return null;
- }
-
- List<TaskInProgress> cleanupTaskList = new ArrayList<TaskInProgress>();
- if (isMapSlot) {
- cleanupTaskList.add(cleanup[0]);
- } else {
- cleanupTaskList.add(cleanup[1]);
- }
- TaskInProgress tip = findTaskFromList(cleanupTaskList,
- tts, numUniqueHosts, false);
- if (tip == null) {
- return null;
- }
-
- // Now launch the cleanupTask
- Task result = tip.getTaskToRun(tts.getTrackerName());
- if (result != null) {
- addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
- if (jobFailed) {
- result.setJobCleanupTaskState(org.apache.hadoop.mapreduce.JobStatus
- .State.FAILED);
- } else if (jobKilled) {
- result.setJobCleanupTaskState(org.apache.hadoop.mapreduce.JobStatus
- .State.KILLED);
- } else {
- result.setJobCleanupTaskState(org.apache.hadoop.mapreduce
- .JobStatus.State.SUCCEEDED);
- }
- }
- return result;
- }
-
- }
-
- /**
- * Check whether cleanup task can be launched for the job.
- *
- * Cleanup task can be launched if it is not already launched
- * or job is Killed
- * or all maps and reduces are complete
- * @return true/false
- */
- private synchronized boolean canLaunchJobCleanupTask() {
- // check if the job is running
- if (status.getRunState() != JobStatus.RUNNING &&
- status.getRunState() != JobStatus.PREP) {
- return false;
- }
- // check if cleanup task has been launched already or if setup isn't
- // launched already. The later check is useful when number of maps is
- // zero.
- if (launchedCleanup || !isSetupFinished()) {
- return false;
- }
- // check if job has failed or killed
- if (jobKilled || jobFailed) {
- return true;
- }
- // Check if all maps and reducers have finished.
- boolean launchCleanupTask =
- ((finishedMapTasks + failedMapTIPs) == (numMapTasks));
- if (launchCleanupTask) {
- launchCleanupTask =
- ((finishedReduceTasks + failedReduceTIPs) == numReduceTasks);
- }
- return launchCleanupTask;
- }
- /**
- * Return a SetupTask, if appropriate, to run on the given tasktracker
- *
- */
- public Task obtainJobSetupTask(TaskTrackerStatus tts,
- int clusterSize,
- int numUniqueHosts,
- boolean isMapSlot
- ) throws IOException {
- if(!tasksInited.get() || !jobSetupCleanupNeeded) {
- return null;
- }
-
- synchronized(this) {
- if (!canLaunchSetupTask()) {
- return null;
- }
- String taskTracker = tts.getTrackerName();
- // Update the last-known clusterSize
- this.clusterSize = clusterSize;
- if (!shouldRunOnTaskTracker(taskTracker)) {
- return null;
- }
-
- List<TaskInProgress> setupTaskList = new ArrayList<TaskInProgress>();
- if (isMapSlot) {
- setupTaskList.add(setup[0]);
- } else {
- setupTaskList.add(setup[1]);
- }
- TaskInProgress tip = findTaskFromList(setupTaskList,
- tts, numUniqueHosts, false);
- if (tip == null) {
- return null;
- }
-
- // Now launch the setupTask
- Task result = tip.getTaskToRun(tts.getTrackerName());
- if (result != null) {
- addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
- }
- return result;
- }
- }
-
- public synchronized boolean scheduleReduces() {
- return finishedMapTasks >= completedMapsForReduceSlowstart;
- }
-
- /**
- * Check whether setup task can be launched for the job.
- *
- * Setup task can be launched after the tasks are inited
- * and Job is in PREP state
- * and if it is not already launched
- * or job is not Killed/Failed
- * @return true/false
- */
- private synchronized boolean canLaunchSetupTask() {
- return (tasksInited.get() && status.getRunState() == JobStatus.PREP &&
- !launchedSetup && !jobKilled && !jobFailed);
- }
-
- /**
- * Return a ReduceTask, if appropriate, to run on the given tasktracker.
- * We don't have cache-sensitivity for reduce tasks, as they
- * work on temporary MapRed files.
- */
- public synchronized Task obtainNewReduceTask(TaskTrackerStatus tts,
- int clusterSize,
- int numUniqueHosts
- ) throws IOException {
- if (status.getRunState() != JobStatus.RUNNING) {
- LOG.info("Cannot create task split for " + profile.getJobID());
- return null;
- }
-
- // Ensure we have sufficient map outputs ready to shuffle before
- // scheduling reduces
- if (!scheduleReduces()) {
- return null;
- }
- int target = findNewReduceTask(tts, clusterSize, numUniqueHosts);
- if (target == -1) {
- return null;
- }
-
- Task result = reduces[target].getTaskToRun(tts.getTrackerName());
- if (result != null) {
- addRunningTaskToTIP(reduces[target], result.getTaskID(), tts, true);
- }
- return result;
- }
-
- // returns the (cache)level at which the nodes matches
- private int getMatchingLevelForNodes(Node n1, Node n2) {
- int count = 0;
- do {
- if (n1.equals(n2)) {
- return count;
- }
- ++count;
- n1 = n1.getParent();
- n2 = n2.getParent();
- } while (n1 != null);
- return this.maxLevel;
- }
- /**
- * Populate the data structures as a task is scheduled.
- *
- * Assuming {@link JobTracker} is locked on entry.
- *
- * @param tip The tip for which the task is added
- * @param id The attempt-id for the task
- * @param tts task-tracker status
- * @param isScheduled Whether this task is scheduled from the JT or has
- * joined back upon restart
- */
- synchronized void addRunningTaskToTIP(TaskInProgress tip, TaskAttemptID id,
- TaskTrackerStatus tts,
- boolean isScheduled) {
- // Make an entry in the tip if the attempt is not scheduled i.e externally
- // added
- if (!isScheduled) {
- tip.addRunningTask(id, tts.getTrackerName());
- }
- final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
- // keeping the earlier ordering intact
- TaskType name;
- String splits = "";
- Enum counter = null;
- if (tip.isJobSetupTask()) {
- launchedSetup = true;
- name = TaskType.JOB_SETUP;
- } else if (tip.isJobCleanupTask()) {
- launchedCleanup = true;
- name = TaskType.JOB_CLEANUP;
- } else if (tip.isMapTask()) {
- ++runningMapTasks;
- name = TaskType.MAP;
- counter = JobCounter.TOTAL_LAUNCHED_MAPS;
- splits = tip.getSplitNodes();
- if (tip.isSpeculating()) {
- speculativeMapTasks++;
- metrics.speculateMap(id);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Chosen speculative task, current speculativeMap task count: "
- + speculativeMapTasks);
- }
- }
- metrics.launchMap(id);
- } else {
- ++runningReduceTasks;
- name = TaskType.REDUCE;
- counter = JobCounter.TOTAL_LAUNCHED_REDUCES;
- if (tip.isSpeculating()) {
- speculativeReduceTasks++;
- metrics.speculateReduce(id);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Chosen speculative task, current speculativeReduce task count: "
- + speculativeReduceTasks);
- }
- }
- metrics.launchReduce(id);
- }
- // Note that the logs are for the scheduled tasks only. Tasks that join on
- // restart has already their logs in place.
- if (tip.isFirstAttempt(id)) {
- TaskStartedEvent tse = new TaskStartedEvent(tip.getTIPId(),
- tip.getExecStartTime(),
- name, splits);
-
- jobHistory.logEvent(tse, tip.getJob().jobId);
- setFirstTaskLaunchTime(tip);
- }
- if (!tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
- jobCounters.incrCounter(counter, 1);
- }
-
- //TODO The only problem with these counters would be on restart.
- // The jobtracker updates the counter only when the task that is scheduled
- // if from a non-running tip and is local (data, rack ...). But upon restart
- // as the reports come from the task tracker, there is no good way to infer
- // when exactly to increment the locality counters. The only solution is to
- // increment the counters for all the tasks irrespective of
- // - whether the tip is running or not
- // - whether its a speculative task or not
- //
- // So to simplify, increment the data locality counter whenever there is
- // data locality.
- if (tip.isMapTask() && !tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
- // increment the data locality counter for maps
- int level = getLocalityLevel(tip, tts);
- switch (level) {
- case 0 :
- LOG.info("Choosing data-local task " + tip.getTIPId());
- jobCounters.incrCounter(JobCounter.DATA_LOCAL_MAPS, 1);
- metrics.launchDataLocalMap(id);
- break;
- case 1:
- LOG.info("Choosing rack-local task " + tip.getTIPId());
- jobCounters.incrCounter(JobCounter.RACK_LOCAL_MAPS, 1);
- metrics.launchRackLocalMap(id);
- break;
- default :
- // check if there is any locality
- if (level != this.maxLevel) {
- LOG.info("Choosing cached task at level " + level + tip.getTIPId());
- jobCounters.incrCounter(JobCounter.OTHER_LOCAL_MAPS, 1);
- }
- break;
- }
- }
- }
- void setFirstTaskLaunchTime(TaskInProgress tip) {
- TaskType key = getTaskType(tip);
- synchronized(firstTaskLaunchTimes) {
- // Could be optimized to do only one lookup with a little more code
- if (!firstTaskLaunchTimes.containsKey(key)) {
- firstTaskLaunchTimes.put(key, tip.getExecStartTime());
- }
- }
- }
-
- public static String convertTrackerNameToHostName(String trackerName) {
- // Ugly!
- // Convert the trackerName to it's host name
- int indexOfColon = trackerName.indexOf(":");
- String trackerHostName = (indexOfColon == -1) ?
- trackerName :
- trackerName.substring(0, indexOfColon);
- return trackerHostName.substring("tracker_".length());
- }
-
- /**
- * Note that a task has failed on a given tracker and add the tracker
- * to the blacklist iff too many trackers in the cluster i.e.
- * (clusterSize * CLUSTER_BLACKLIST_PERCENT) haven't turned 'flaky' already.
- *
- * @param taskTracker task-tracker on which a task failed
- */
- synchronized void addTrackerTaskFailure(String trackerName,
- TaskTracker taskTracker) {
- if (flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) {
- String trackerHostName = convertTrackerNameToHostName(trackerName);
- Integer trackerFailures = trackerToFailuresMap.get(trackerHostName);
- if (trackerFailures == null) {
- trackerFailures = 0;
- }
- trackerToFailuresMap.put(trackerHostName, ++trackerFailures);
- // Check if this tasktracker has turned 'flaky'
- if (trackerFailures.intValue() == maxTaskFailuresPerTracker) {
- ++flakyTaskTrackers;
-
- // Cancel reservations if appropriate
- if (taskTracker != null) {
- if (trackersReservedForMaps.containsKey(taskTracker)) {
- taskTracker.unreserveSlots(TaskType.MAP, this);
- }
- if (trackersReservedForReduces.containsKey(taskTracker)) {
- taskTracker.unreserveSlots(TaskType.REDUCE, this);
- }
- }
- LOG.info("TaskTracker at '" + trackerHostName + "' turned 'flaky'");
- }
- }
- }
-
- public synchronized void reserveTaskTracker(TaskTracker taskTracker,
- TaskType type, int numSlots) {
- Map<TaskTracker, FallowSlotInfo> map =
- (type == TaskType.MAP) ? trackersReservedForMaps : trackersReservedForReduces;
-
- long now = System.currentTimeMillis();
-
- FallowSlotInfo info = map.get(taskTracker);
- int reservedSlots = 0;
- if (info == null) {
- info = new FallowSlotInfo(now, numSlots);
- reservedSlots = numSlots;
- } else {
- // Increment metering info if the reservation is changing
- if (info.getNumSlots() != numSlots) {
- Enum<JobCounter> counter =
- (type == TaskType.MAP) ?
- JobCounter.FALLOW_SLOTS_MILLIS_MAPS :
- JobCounter.FALLOW_SLOTS_MILLIS_REDUCES;
- long fallowSlotMillis = (now - info.getTimestamp()) * info.getNumSlots();
- jobCounters.incrCounter(counter, fallowSlotMillis);
-
- // Update
- reservedSlots = numSlots - info.getNumSlots();
- info.setTimestamp(now);
- info.setNumSlots(numSlots);
- }
- }
- map.put(taskTracker, info);
- if (type == TaskType.MAP) {
- jobtracker.getInstrumentation().addReservedMapSlots(reservedSlots);
- }
- else {
- jobtracker.getInstrumentation().addReservedReduceSlots(reservedSlots);
- }
- jobtracker.incrementReservations(type, reservedSlots);
- }
-
- public synchronized void unreserveTaskTracker(TaskTracker taskTracker,
- TaskType type) {
- Map<TaskTracker, FallowSlotInfo> map =
- (type == TaskType.MAP) ? trackersReservedForMaps :
- trackersReservedForReduces;
- FallowSlotInfo info = map.get(taskTracker);
- if (info == null) {
- LOG.warn("Cannot find information about fallow slots for " +
- taskTracker.getTrackerName());
- return;
- }
-
- long now = System.currentTimeMillis();
- Enum<JobCounter> counter =
- (type == TaskType.MAP) ?
- JobCounter.FALLOW_SLOTS_MILLIS_MAPS :
- JobCounter.FALLOW_SLOTS_MILLIS_REDUCES;
- long fallowSlotMillis = (now - info.getTimestamp()) * info.getNumSlots();
- jobCounters.incrCounter(counter, fallowSlotMillis);
- map.remove(taskTracker);
- if (type == TaskType.MAP) {
- jobtracker.getInstrumentation().decReservedMapSlots(info.getNumSlots());
- }
- else {
- jobtracker.getInstrumentation().decReservedReduceSlots(
- info.getNumSlots());
- }
- jobtracker.decrementReservations(type, info.getNumSlots());
- }
-
- public int getNumReservedTaskTrackersForMaps() {
- return trackersReservedForMaps.size();
- }
-
- public int getNumReservedTaskTrackersForReduces() {
- return trackersReservedForReduces.size();
- }
-
- private int getTrackerTaskFailures(String trackerName) {
- String trackerHostName = convertTrackerNameToHostName(trackerName);
- Integer failedTasks = trackerToFailuresMap.get(trackerHostName);
- return (failedTasks != null) ? failedTasks.intValue() : 0;
- }
-
- /**
- * Get the black listed trackers for the job
- *
- * @return List of blacklisted tracker names
- */
- List<String> getBlackListedTrackers() {
- List<String> blackListedTrackers = new ArrayList<String>();
- for (Map.Entry<String,Integer> e : trackerToFailuresMap.entrySet()) {
- if (e.getValue().intValue() >= maxTaskFailuresPerTracker) {
- blackListedTrackers.add(e.getKey());
- }
- }
- return blackListedTrackers;
- }
-
- /**
- * Get the no. of 'flaky' tasktrackers for a given job.
- *
- * @return the no. of 'flaky' tasktrackers for a given job.
- */
- int getNoOfBlackListedTrackers() {
- return flakyTaskTrackers;
- }
-
- /**
- * Get the information on tasktrackers and no. of errors which occurred
- * on them for a given job.
- *
- * @return the map of tasktrackers and no. of errors which occurred
- * on them for a given job.
- */
- synchronized Map<String, Integer> getTaskTrackerErrors() {
- // Clone the 'trackerToFailuresMap' and return the copy
- Map<String, Integer> trackerErrors =
- new TreeMap<String, Integer>(trackerToFailuresMap);
- return trackerErrors;
- }
- /**
- * Remove a map TIP from the lists for running maps.
- * Called when a map fails/completes (note if a map is killed,
- * it won't be present in the list since it was completed earlier)
- * @param tip the tip that needs to be retired
- */
- private synchronized void retireMap(TaskInProgress tip) {
- if (runningMapCache == null) {
- LOG.warn("Running cache for maps missing!! "
- + "Job details are missing.");
- return;
- }
-
- String[] splitLocations = tip.getSplitLocations();
- // Remove the TIP from the list for running non-local maps
- if (splitLocations == null || splitLocations.length == 0) {
- nonLocalRunningMaps.remove(tip);
- return;
- }
- // Remove from the running map caches
- for(String host: splitLocations) {
- Node node = jobtracker.getNode(host);
- for (int j = 0; j < maxLevel; ++j) {
- Set<TaskInProgress> hostMaps = runningMapCache.get(node);
- if (hostMaps != null) {
- hostMaps.remove(tip);
- if (hostMaps.size() == 0) {
- runningMapCache.remove(node);
- }
- }
- node = node.getParent();
- }
- }
- }
- /**
- * Remove a reduce TIP from the list for running-reduces
- * Called when a reduce fails/completes
- * @param tip the tip that needs to be retired
- */
- private synchronized void retireReduce(TaskInProgress tip) {
- if (runningReduces == null) {
- LOG.warn("Running list for reducers missing!! "
- + "Job details are missing.");
- return;
- }
- runningReduces.remove(tip);
- }
- /**
- * Adds a map tip to the list of running maps.
- * @param tip the tip that needs to be scheduled as running
- */
- protected synchronized void scheduleMap(TaskInProgress tip) {
-
- runningMapTaskStats.add(0.0f);
- if (runningMapCache == null) {
- LOG.warn("Running cache for maps is missing!! "
- + "Job details are missing.");
- return;
- }
- String[] splitLocations = tip.getSplitLocations();
- // Add the TIP to the list of non-local running TIPs
- if (splitLocations == null || splitLocations.length == 0) {
- nonLocalRunningMaps.add(tip);
- return;
- }
- for(String host: splitLocations) {
- Node node = jobtracker.getNode(host);
- for (int j = 0; j < maxLevel; ++j) {
- Set<TaskInProgress> hostMaps = runningMapCache.get(node);
- if (hostMaps == null) {
- // create a cache if needed
- hostMaps = new LinkedHashSet<TaskInProgress>();
- runningMapCache.put(node, hostMaps);
- }
- hostMaps.add(tip);
- node = node.getParent();
- }
- }
- }
-
- /**
- * Adds a reduce tip to the list of running reduces
- * @param tip the tip that needs to be scheduled as running
- */
- protected synchronized void scheduleReduce(TaskInProgress tip) {
- runningReduceTaskStats.add(0.0f);
- if (runningReduces == null) {
- LOG.warn("Running cache for reducers missing!! "
- + "Job details are missing.");
- return;
- }
- runningReduces.add(tip);
- }
-
- /**
- * Adds the failed TIP in the front of the list for non-running maps
- * @param tip the tip that needs to be failed
- */
- private synchronized void failMap(TaskInProgress tip) {
- if (nonRunningMapCache == null) {
- LOG.warn("Non-running cache for maps missing!! "
- + "Job details are missing.");
- return;
- }
- // 1. Its added everywhere since other nodes (having this split local)
- // might have removed this tip from their local cache
- // 2. Give high priority to failed tip - fail early
- String[] splitLocations = tip.getSplitLocations();
- // Add the TIP in the front of the list for non-local non-running maps
- if (splitLocations.length == 0) {
- nonLocalMaps.add(0, tip);
- return;
- }
- for(String host: splitLocations) {
- Node node = jobtracker.getNode(host);
-
- for (int j = 0; j < maxLevel; ++j) {
- List<TaskInProgress> hostMaps = nonRunningMapCache.get(node);
- if (hostMaps == null) {
- hostMaps = new LinkedList<TaskInProgress>();
- nonRunningMapCache.put(node, hostMaps);
- }
- hostMaps.add(0, tip);
- node = node.getParent();
- }
- }
- }
-
- /**
- * Adds a failed TIP in the front of the list for non-running reduces
- * @param tip the tip that needs to be failed
- */
- private synchronized void failReduce(TaskInProgress tip) {
- if (nonRunningReduces == null) {
- LOG.warn("Failed cache for reducers missing!! "
- + "Job details are missing.");
- return;
- }
- nonRunningReduces.add(0, tip);
- }
-
- /**
- * Find a non-running task in the passed list of TIPs
- * @param tips a collection of TIPs
- * @param ttStatus the status of tracker that has requested a task to run
- * @param numUniqueHosts number of unique hosts that run trask trackers
- * @param removeFailedTip whether to remove the failed tips
- */
- private synchronized TaskInProgress findTaskFromList(
- Collection<TaskInProgress> tips, TaskTrackerStatus ttStatus,
- int numUniqueHosts,
- boolean removeFailedTip) {
- Iterator<TaskInProgress> iter = tips.iterator();
- while (iter.hasNext()) {
- TaskInProgress tip = iter.next();
- // Select a tip if
- // 1. runnable : still needs to be run and is not completed
- // 2. ~running : no other node is running it
- // 3. earlier attempt failed : has not failed on this host
- // and has failed on all the other hosts
- // A TIP is removed from the list if
- // (1) this tip is scheduled
- // (2) if the passed list is a level 0 (host) cache
- // (3) when the TIP is non-schedulable (running, killed, complete)
- if (tip.isRunnable() && !tip.isRunning()) {
- // check if the tip has failed on this host
- if (!tip.hasFailedOnMachine(ttStatus.getHost()) ||
- tip.getNumberOfFailedMachines() >= numUniqueHosts) {
- // check if the tip has failed on all the nodes
- iter.remove();
- return tip;
- } else if (removeFailedTip) {
- // the case where we want to remove a failed tip from the host cache
- // point#3 in the TIP removal logic above
- iter.remove();
- }
- } else {
- // see point#3 in the comment above for TIP removal logic
- iter.remove();
- }
- }
- return null;
- }
-
- public boolean hasSpeculativeMaps() {
- return hasSpeculativeMaps;
- }
- public boolean hasSpeculativeReduces() {
- return hasSpeculativeReduces;
- }
- /**
- * Retrieve a task for speculation.
- * If a task slot becomes available and there are less than SpeculativeCap
- * speculative tasks running:
- * 1)Ignore the request if the TT's progressRate is < SlowNodeThreshold
- * 2)Choose candidate tasks - those tasks whose progress rate is below
- * slowTaskThreshold * mean(progress-rates)
- * 3)Speculate task that's expected to complete last
- * @param list pool of tasks to choose from
- * @param taskTrackerName the name of the TaskTracker asking for a task
- * @param taskTrackerHost the hostname of the TaskTracker asking for a task
- * @param taskType the type of task (MAP/REDUCE) that we are considering
- * @return the TIP to speculatively re-execute
- */
- protected synchronized TaskInProgress findSpeculativeTask(
- Collection<TaskInProgress> list, String taskTrackerName,
- String taskTrackerHost, TaskType taskType) {
- if (list.isEmpty()) {
- return null;
- }
- long now = JobTracker.getClock().getTime();
- // Don't return anything if either the TaskTracker is slow or we have
- // already launched enough speculative tasks in the cluster.
- if (isSlowTracker(taskTrackerName) || atSpeculativeCap(list, taskType)) {
- return null;
- }
- TaskInProgress slowestTIP = null;
- Comparator<TaskInProgress> LateComparator =
- new EstimatedTimeLeftComparator(now);
- Iterator<TaskInProgress> iter = list.iterator();
- while (iter.hasNext()) {
- TaskInProgress tip = iter.next();
- // If this tip has already run on this machine once or it doesn't need any
- // more speculative attempts, skip it.
- if (tip.hasRunOnMachine(taskTrackerHost, taskTrackerName) ||
- !tip.canBeSpeculated(now)) {
- continue;
- }
- if (slowestTIP == null) {
- slowestTIP = tip;
- } else {
- slowestTIP =
- LateComparator.compare(tip, slowestTIP) < 0 ? tip : slowestTIP;
- }
- }
- if (slowestTIP != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Chose task " + slowestTIP.getTIPId() + ". Statistics: Task's : " +
- slowestTIP.getCurrentProgressRate(now) + " Job's : " +
- (slowestTIP.isMapTask() ? runningMapTaskStats : runningReduceTaskStats));
- }
- }
- return slowestTIP;
- }
- /**
- * Find new map task
- * @param tts The task tracker that is asking for a task
- * @param clusterSize The number of task trackers in the cluster
- * @param numUniqueHosts The number of hosts that run task trackers
- * @param maxCacheLevel The maximum topology level until which to schedule
- * maps.
- * A value of {@link #anyCacheLevel} implies any
- * available task (node-local, rack-local, off-switch and
- * speculative tasks).
- * A value of {@link #NON_LOCAL_CACHE_LEVEL} implies only
- * off-switch/speculative tasks should be scheduled.
- * @return the index in tasks of the selected task (or -1 for no task)
- */
- private synchronized int findNewMapTask(final TaskTrackerStatus tts,
- final int clusterSize,
- final int numUniqueHosts,
- final int maxCacheLevel) {
- String taskTrackerName = tts.getTrackerName();
- String taskTrackerHost = tts.getHost();
- if (numMapTasks == 0) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("No maps to schedule for " + profile.getJobID());
- }
- return -1;
- }
- TaskInProgress tip = null;
-
- //
- // Update the last-known clusterSize
- //
- this.clusterSize = clusterSize;
- if (!shouldRunOnTaskTracker(taskTrackerName)) {
- return -1;
- }
- // Check to ensure this TaskTracker has enough resources to
- // run tasks from this job
- long outSize = resourceEstimator.getEstimatedMapOutputSize();
- long availSpace = tts.getResourceStatus().getAvailableSpace();
- if(availSpace < outSize) {
- LOG.warn("No room for map task. Node " + tts.getHost() +
- " has " + availSpace +
- " bytes free; but we expect map to take " + outSize);
- return -1; //see if a different TIP might work better.
- }
-
-
- // For scheduling a map task, we have two caches and a list (optional)
- // I) one for non-running task
- // II) one for running task (this is for handling speculation)
- // III) a list of TIPs that have empty locations (e.g., dummy splits),
- // the list is empty if all TIPs have associated locations
- // First a look up is done on the non-running cache and on a miss, a look
- // up is done on the running cache. The order for lookup within the cache:
- // 1. from local node to root [bottom up]
- // 2. breadth wise for all the parent nodes at max level
- // We fall to linear scan of the list (III above) if we have misses in the
- // above caches
- Node node = jobtracker.getNode(tts.getHost());
-
- //
- // I) Non-running TIP :
- //
- // 1. check from local node to the root [bottom up cache lookup]
- // i.e if the cache is available and the host has been resolved
- // (node!=null)
- if (node != null) {
- Node key = node;
- int level = 0;
- // maxCacheLevel might be greater than this.maxLevel if findNewMapTask is
- // called to schedule any task (local, rack-local, off-switch or speculative)
- // tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if findNewMapTask is
- // (i.e. -1) if findNewMapTask is to only schedule off-switch/speculative
- // tasks
- int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel);
- for (level = 0;level < maxLevelToSchedule; ++level) {
- List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
- if (cacheForLevel != null) {
- tip = findTaskFromList(cacheForLevel, tts,
- numUniqueHosts,level == 0);
- if (tip != null) {
- // Add to running cache
- scheduleMap(tip);
- // remove the cache if its empty
- if (cacheForLevel.size() == 0) {
- nonRunningMapCache.remove(key);
- }
- return tip.getIdWithinJob();
- }
- }
- key = key.getParent();
- }
-
- // Check if we need to only schedule a local task (node-local/rack-local)
- if (level == maxCacheLevel) {
- return -1;
- }
- }
- //2. Search breadth-wise across parents at max level for non-running
- // TIP if
- // - cache exists and there is a cache miss
- // - node information for the tracker is missing (tracker's topology
- // info not obtained yet)
- // collection of node at max level in the cache structure
- Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel();
- // get the node parent at max level
- Node nodeParentAtMaxLevel =
- (node == null) ? null : JobTracker.getParentNode(node, maxLevel - 1);
-
- for (Node parent : nodesAtMaxLevel) {
- // skip the parent that has already been scanned
- if (parent == nodeParentAtMaxLevel) {
- continue;
- }
- List<TaskInProgress> cache = nonRunningMapCache.get(parent);
- if (cache != null) {
- tip = findTaskFromList(cache, tts, numUniqueHosts, false);
- if (tip != null) {
- // Add to the running cache
- scheduleMap(tip);
- // remove the cache if empty
- if (cache.size() == 0) {
- nonRunningMapCache.remove(parent);
- }
- LOG.info("Choosing a non-local task " + tip.getTIPId());
- return tip.getIdWithinJob();
- }
- }
- }
- // 3. Search non-local tips for a new task
- tip = findTaskFromList(nonLocalMaps, tts, numUniqueHosts, false);
- if (tip != null) {
- // Add to the running list
- scheduleMap(tip);
- LOG.info("Choosing a non-local task " + tip.getTIPId());
- return tip.getIdWithinJob();
- }
- //
- // II) Running TIP :
- //
-
- if (hasSpeculativeMaps) {
- tip = getSpeculativeMap(taskTrackerName, taskTrackerHost);
- if (tip != null) {
- return tip.getIdWithinJob();
- }
- }
- return -1;
- }
- private synchronized TaskInProgress getSpeculativeMap(String taskTrackerName,
- String taskTrackerHost) {
- //////// Populate allTips with all TaskInProgress
- Set<TaskInProgress> allTips = new HashSet<TaskInProgress>();
-
- // collection of node at max level in the cache structure
- Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel();
- // Add all tasks from max-level nodes breadth-wise
- for (Node parent : nodesAtMaxLevel) {
- Set<TaskInProgress> cache = runningMapCache.get(parent);
- if (cache != null) {
- allTips.addAll(cache);
- }
- }
- // Add all non-local TIPs
- allTips.addAll(nonLocalRunningMaps);
-
- ///////// Select a TIP to run on
- TaskInProgress tip = findSpeculativeTask(allTips, taskTrackerName,
- taskTrackerHost, TaskType.MAP);
-
- if (tip != null) {
- LOG.info("Choosing map task " + tip.getTIPId() +
- " for speculative execution");
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("No speculative map task found for tracker " + taskTrackerName);
- }
- }
- return tip;
- }
-
- /**
- * Find new reduce task
- * @param tts The task tracker that is asking for a task
- * @param clusterSize The number of task trackers in the cluster
- * @param numUniqueHosts The number of hosts that run task trackers
- * @return the index in tasks of the selected task (or -1 for no task)
- */
- private synchronized int findNewReduceTask(TaskTrackerStatus tts,
- int clusterSize,
- int numUniqueHosts) {
- String taskTrackerName = tts.getTrackerName();
- String taskTrackerHost = tts.getHost();
- if (numReduceTasks == 0) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("No reduces to schedule for " + profile.getJobID());
- }
- return -1;
- }
- TaskInProgress tip = null;
-
- // Update the last-known clusterSize
- this.clusterSize = clusterSize;
- if (!shouldRunOnTaskTracker(taskTrackerName)) {
- return -1;
- }
- long outSize = resourceEstimator.getEstimatedReduceInputSize();
- long availSpace = tts.getResourceStatus().getAvailableSpace();
- if(availSpace < outSize) {
- LOG.warn("No room for reduce task. Node " + taskTrackerName + " has " +
- availSpace +
- " bytes free; but we expect reduce input to take " + outSize);
- return -1; //see if a different TIP might work better.
- }
-
- // 1. check for a never-executed reduce tip
- // reducers don't have a cache and so pass -1 to explicitly call that out
- tip = findTaskFromList(nonRunningReduces, tts, numUniqueHosts, false);
- if (tip != null) {
- scheduleReduce(tip);
- return tip.getIdWithinJob();
- }
- // 2. check for a reduce tip to be speculated
- if (hasSpeculativeReduces) {
- tip = getSpeculativeReduce(taskTrackerName, taskTrackerHost);
- if (tip != null) {
- return tip.getIdWithinJob();
- }
- }
- return -1;
- }
- private synchronized TaskInProgress getSpeculativeReduce(
- String taskTrackerName, String taskTrackerHost) {
- TaskInProgress tip = findSpeculativeTask(
- runningReduces, taskTrackerName, taskTrackerHost, TaskType.REDUCE);
- if (tip != null) {
- LOG.info("Choosing reduce task " + tip.getTIPId() +
- " for speculative execution");
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("No speculative map task found for tracker "
- + taskTrackerHost);
- }
- }
- return tip;
- }
- /**
- * Check to see if the maximum number of speculative tasks are
- * already being executed currently.
- * @param tasks the set of tasks to test
- * @param type the type of task (MAP/REDUCE) that we are considering
- * @return has the cap been reached?
- */
- private boolean atSpeculativeCap(Collection<TaskInProgress> tasks,
- TaskType type) {
- float numTasks = tasks.size();
- if (numTasks == 0){
- return true; // avoid divide by zero
- }
- int speculativeTaskCount = type == TaskType.MAP ? speculativeMapTasks
- : speculativeReduceTasks;
- //return true if totalSpecTask < max(10, 0.01 * total-slots,
- // 0.1 * total-running-tasks)
- if (speculativeTaskCount < MIN_SPEC_CAP) {
- return false; // at least one slow tracker's worth of slots(default=10)
- }
- ClusterStatus c = jobtracker.getClusterStatus(false);
- int numSlots = (type == TaskType.MAP ? c.getMaxMapTasks() : c.getMaxReduceTasks());
- if ((float)speculativeTaskCount < numSlots * MIN_SLOTS_CAP) {
- return false;
- }
- boolean atCap = (((float)(speculativeTaskCount)/numTasks) >= speculativeCap);
- if (LOG.isDebugEnabled()) {
- LOG.debug("SpeculativeCap is "+speculativeCap+", specTasks/numTasks is " +
- ((float)(speculativeTaskCount)/numTasks)+
- ", so atSpecCap() is returning "+atCap);
- }
- return atCap;
- }
-
- /**
- * A class for comparing the estimated time to completion of two tasks
- */
- private static class EstimatedTimeLeftComparator
- implements Comparator<TaskInProgress> {
- private long time;
- public EstimatedTimeLeftComparator(long now) {
- this.time = now;
- }
- /**
- * Estimated time to completion is measured as:
- * % of task left to complete (1 - progress) / progress rate of the task.
- *
- * This assumes that tasks are linear in their progress, which is
- * often wrong, especially since progress for reducers is currently
- * calculated by evenly weighting their three stages (shuffle, sort, map)
- * which rarely account for 1/3 each. This should be fixed in the future
- * by calculating progressRate more intelligently or splitting these
- * multi-phase tasks into individual tasks.
- *
- * The ordering this comparator defines is: task1 < task2 if task1 is
- * estimated to finish farther in the future => compare(t1,t2) returns -1
- */
- public int compare(TaskInProgress tip1, TaskInProgress tip2) {
- //we have to use the Math.max in the denominator to avoid divide by zero
- //error because prog and progRate can both be zero (if one is zero,
- //the other one will be 0 too).
- //We use inverse of time_reminaing=[(1- prog) / progRate]
- //so that (1-prog) is in denom. because tasks can have arbitrarily
- //low progRates in practice (e.g. a task that is half done after 1000
- //seconds will have progRate of 0.0000005) so we would rather
- //use Math.maxnon (1-prog) by putting it in the denominator
- //which will cause tasks with prog=1 look 99.99% done instead of 100%
- //which is okay
- double t1 = tip1.getCurrentProgressRate(time) / Math.max(0.0001,
- 1.0 - tip1.getProgress());
- double t2 = tip2.getCurrentProgressRate(time) / Math.max(0.0001,
- 1.0 - tip2.getProgress());
- if (t1 < t2) return -1;
- else if (t2 < t1) return 1;
- else return 0;
- }
- }
-
- /**
- * Compares the ave progressRate of tasks that have finished on this
- * taskTracker to the ave of all succesfull tasks thus far to see if this
- * TT one is too slow for speculating.
- * slowNodeThreshold is used to determine the number of standard deviations
- * @param taskTracker the name of the TaskTracker we are checking
- * @return is this TaskTracker slow
- */
- protected boolean isSlowTracker(String taskTracker) {
- if (trackerMapStats.get(taskTracker) != null &&
- trackerMapStats.get(taskTracker).mean() -
- mapTaskStats.mean() > mapTaskStats.std()*slowNodeThreshold) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Tracker " + taskTracker +
- " declared slow. trackerMapStats.get(taskTracker).mean() :" + trackerMapStats.get(taskTracker).mean() +
- " mapTaskStats :" + mapTaskStats);
- }
- return true;
- }
- if (trackerReduceStats.get(taskTracker) != null &&
- trackerReduceStats.get(taskTracker).mean() -
- reduceTaskStats.mean() > reduceTaskStats.std()*slowNodeThreshold) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Tracker " + taskTracker +
- " declared slow. trackerReduceStats.get(taskTracker).mean() :" + trackerReduceStats.get(taskTracker).mean() +
- " reduceTaskStats :" + reduceTaskStats);
- }
- return true;
- }
- return false;
- }
-
- static class DataStatistics{
- private int count = 0;
- private double sum = 0;
- private double sumSquares = 0;
-
- public DataStatistics() {
- }
-
- public DataStatistics(double initNum) {
- this.count = 1;
- this.sum = initNum;
- this.sumSquares = initNum * initNum;
- }
-
- public void add(double newNum) {
- this.count++;
- this.sum += newNum;
- this.sumSquares += newNum * newNum;
- }
- public void updateStatistics(double old, double update) {
- sub(old);
- add(update);
- }
- private void sub(double oldNum) {
- this.count--;
- this.sum = Math.max(this.sum -= oldNum, 0.0d);
- this.sumSquares = Math.max(this.sumSquares -= oldNum * oldNum, 0.0d);
- }
-
- public double mean() {
- return sum/count;
- }
-
- public double var() {
- // E(X^2) - E(X)^2
- return Math.max((sumSquares/count) - mean() * mean(), 0.0d);
- }
-
- public double std() {
- return Math.sqrt(this.var());
- }
-
- public String toString() {
- return "DataStatistics: count is " + count + ", sum is " + sum +
- ", sumSquares is " + sumSquares + " mean is " + mean() + " std() is " + std();
- }
-
- }
-
- private boolean shouldRunOnTaskTracker(String taskTracker) {
- //
- // Check if too many tasks of this job have failed on this
- // tasktracker prior to assigning it a new one.
- //
- int taskTrackerFailedTasks = getTrackerTaskFailures(taskTracker);
- if ((flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) &&
- taskTrackerFailedTasks >= maxTaskFailuresPerTracker) {
- if (LOG.isDebugEnabled()) {
- String flakyTracker = convertTrackerNameToHostName(taskTracker);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ignoring the black-listed tasktracker: '" + flakyTracker
- + "' for assigning a new task");
- }
- }
- return false;
- }
- return true;
- }
-
- /**
- * Metering: Occupied Slots * (Finish - Start)
- * @param tip {@link TaskInProgress} to be metered which just completed,
- * cannot be <code>null</code>
- * @param status {@link TaskStatus} of the completed task, cannot be
- * <code>null</code>
- */
- private void meterTaskAttempt(TaskInProgress tip, TaskStatus status) {
- JobCounter slotCounter =
- (tip.isMapTask()) ? JobCounter.SLOTS_MILLIS_MAPS :
- JobCounter.SLOTS_MILLIS_REDUCES;
- jobCounters.incrCounter(slotCounter,
- tip.getNumSlotsRequired() *
- (status.getFinishTime() - status.getStartTime()));
- }
-
- /**
- * A taskid assigned to this JobInProgress has reported in successfully.
- */
- public synchronized boolean completedTask(TaskInProgress tip,
- TaskStatus status)
- {
- TaskAttemptID taskid = status.getTaskID();
- final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
-
- // Metering
- meterTaskAttempt(tip, status);
-
- // Sanity check: is the TIP already complete?
- // This would not happen,
- // because no two tasks are SUCCEEDED at the same time.
- if (tip.isComplete()) {
- // Mark this task as KILLED
- tip.alreadyCompletedTask(taskid);
- // Let the JobTracker cleanup this taskid if the job isn't running
- if (this.status.getRunState() != JobStatus.RUNNING) {
- jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
- }
- return false;
- }
- boolean wasSpeculating = tip.isSpeculating(); //store this fact
- LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() +
- " successfully.");
- // Mark the TIP as complete
- tip.completed(taskid);
- resourceEstimator.updateWithCompletedTask(status, tip);
- // Update jobhistory
- TaskTrackerStatus ttStatus =
- this.jobtracker.getTaskTrackerStatus(status.getTaskTracker());
- String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
- TaskType taskType = getTaskType(tip);
- TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent(
- status.getTaskID(), taskType, status.getStartTime(),
- status.getTaskTracker(), ttStatus.getHttpPort(), -1);
-
- jobHistory.logEvent(tse, status.getTaskID().getJobID());
- TaskAttemptID statusAttemptID = status.getTaskID();
- if (status.getIsMap()){
- MapAttemptFinishedEvent mfe = new MapAttemptFinishedEvent(
- statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(),
- status.getMapFinishTime(),
- status.getFinishTime(), trackerHostname, null,
- status.getStateString(),
- new org.apache.hadoop.mapreduce.Counters(status.getCounters()),
- tip.getSplits(statusAttemptID).burst()
- );
-
- jobHistory.logEvent(mfe, status.getTaskID().getJobID());
-
- }else{
- ReduceAttemptFinishedEvent rfe = new ReduceAttemptFinishedEvent(
- statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(),
- status.getShuffleFinishTime(),
- status.getSortFinishTime(), status.getFinishTime(),
- trackerHostname, null, status.getStateString(),
- new org.apache.hadoop.mapreduce.Counters(status.getCounters()),
- tip.getSplits(statusAttemptID).burst()
- );
-
- jobHistory.logEvent(rfe, status.getTaskID().getJobID());
-
- }
- TaskFinishedEvent tfe = new TaskFinishedEvent(tip.getTIPId(),
- tip.getExecFinishTime(), taskType,
- TaskStatus.State.SUCCEEDED.toString(),
- new org.apache.hadoop.mapreduce.Counters(status.getCounters()));
-
- jobHistory.logEvent(tfe, tip.getJob().getJobID());
-
-
- if (tip.isJobSetupTask()) {
- // setup task has finished. kill the extra setup tip
- killSetupTip(!tip.isMapTask());
- setupComplete();
- } else if (tip.isJobCleanupTask()) {
- // cleanup task has finished. Kill the extra cleanup tip
- if (tip.isMapTask()) {
- // kill the reduce tip
- cleanup[1].kill();
- } else {
- cleanup[0].kill();
- }
- //
- // The Job is done
- // if the job is failed, then mark the job failed.
- if (jobFailed) {
- terminateJob(JobStatus.FAILED);
- }
- // if the job is killed, then mark the job killed.
- if (jobKilled) {
- terminateJob(JobStatus.KILLED);
- }
- else {
- jobComplete();
- }
- // The job has been killed/failed/successful
- // JobTracker should cleanup this task
- jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
- } else if (tip.isMapTask()) {
- runningMapTasks -= 1;
- finishedMapTasks += 1;
- metrics.completeMap(taskid);
- if (!tip.isJobSetupTask() && hasSpeculativeMaps) {
- updateTaskTrackerStats(tip,ttStatus,trackerMapStats,mapTaskStats);
- }
- // remove the completed map from the resp running caches
- retireMap(tip);
- if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
- this.status.setMapProgress(1.0f);
- if (canLaunchJobCleanupTask()) {
- checkCountersLimitsOrFail();
- }
- }
- } else {
- runningReduceTasks -= 1;
- finishedReduceTasks += 1;
- metrics.completeReduce(taskid);
- if (!tip.isJobSetupTask() && hasSpeculativeReduces) {
- updateTaskTrackerStats(tip,ttStatus,trackerReduceStats,reduceTaskStats);
- }
- // remove the completed reduces from the running reducers set
- retireReduce(tip);
- if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
- this.status.setReduceProgress(1.0f);
- if (canLaunchJobCleanupTask()) {
- checkCountersLimitsOrFail();
- }
- }
- }
- decrementSpeculativeCount(wasSpeculating, tip);
- // is job complete?
- if (!jobSetupCleanupNeeded && canLaunchJobCleanupTask()) {
- jobComplete();
- }
- return true;
- }
- /*
- * add up the counters and fail the job if it exceeds the limits.
- * Make sure we do not recalculate the counters after we fail the job.
- * Currently this is taken care by terminateJob() since it does not
- * calculate the counters.
- */
- private void checkCountersLimitsOrFail() {
- Counters counters = getCounters();
- if (counters.limits().violation() != null) {
- jobtracker.failJob(this);
- }
- }
-
- private void updateTaskTrackerStats(TaskInProgress tip, TaskTrackerStatus ttStatus,
- Map<String,DataStatistics> trackerStats, DataStatistics overallStats) {
- float tipDuration = tip.getExecFinishTime()-tip.getDispatchTime(tip.getSuccessfulTaskid());
- DataStatistics ttStats =
- trackerStats.get(ttStatus.getTrackerName());
- double oldMean = 0.0d;
- //We maintain the mean of TaskTrackers' means. That way, we get a single
- //data-point for every tracker (used in the evaluation in isSlowTracker)
- if (ttStats != null) {
- oldMean = ttStats.mean();
- ttStats.add(tipDuration);
- overallStats.updateStatistics(oldMean, ttStats.mean());
- } else {
- trackerStats.put(ttStatus.getTrackerName(),
- (ttStats = new DataStatistics(tipDuration)));
- overallStats.add(tipDuration);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Added mean of " +ttStats.mean() + " to trackerStats of type "+
- (tip.isMapTask() ? "Map" : "Reduce") +
- " on "+ttStatus.getTrackerName()+". DataStatistics is now: " +
- trackerStats.get(ttStatus.getTrackerName()));
- }
- }
-
- public void updateStatistics(double oldProg, double newProg, boolean isMap) {
- if (isMap) {
- runningMapTaskStats.updateStatistics(oldProg, newProg);
- } else {
- runningReduceTaskStats.updateStatistics(oldProg, newProg);
- }
- }
-
- public DataStatistics getRunningTaskStatistics(boolean isMap) {
- if (isMap) {
- return runningMapTaskStats;
- } else {
- return runningReduceTaskStats;
- }
- }
-
- public float getSlowTaskThreshold() {
- return slowTaskThreshold;
- }
- /**
- * Job state change must happen thru this call
- */
- private void changeStateTo(int newState) {
- int oldState = this.status.getRunState();
- if (oldState == newState) {
- return; //old and new states are same
- }
- this.status.setRunState(newState);
-
- //update the metrics
- if (oldState == JobStatus.PREP) {
- this.jobtracker.getInstrumentation().decPrepJob(conf, jobId);
- } else if (oldState == JobStatus.RUNNING) {
- this.jobtracker.getInstrumentation().decRunningJob(conf, jobId);
- }
-
- if (newState == JobStatus.PREP) {
- this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
- } else if (newState == JobStatus.RUNNING) {
- this.jobtracker.getInstrumentation().addRunningJob(conf, jobId);
- }
-
- }
- /**
- * The job is done since all it's component tasks are either
- * successful or have failed.
- */
- private void jobComplete() {
- final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
- //
- // All tasks are complete, then the job is done!
- //
- if (this.status.getRunState() == JobStatus.RUNNING ||
- this.status.getRunState() == JobStatus.PREP) {
- changeStateTo(JobStatus.SUCCEEDED);
- this.status.setCleanupProgress(1.0f);
- if (maps.length == 0) {
- this.status.setMapProgress(1.0f);
- }
- if (reduces.length == 0) {
- this.status.setReduceProgress(1.0f);
- }
- this.finishTime = JobTracker.getClock().getTime();
- this.status.setFinishTime(this.finishTime);
- LOG.info("Job " + this.status.getJobID() +
- " has completed successfully.");
-
- // Log the job summary (this should be done prior to logging to
- // job-history to ensure job-counters are in-sync
- JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
- // Log job-history
- JobFinishedEvent jfe =
- new JobFinishedEvent(this.status.getJobID(),
- this.finishTime,
- this.finishedMapTasks,this.finishedReduceTasks, failedMapTasks,
- failedReduceTasks,
- new org.apache.hadoop.mapreduce.Counters(getMapCounters()),
- new org.apache.hadoop.mapreduce.Counters(getReduceCounters()),
- new org.apache.hadoop.mapreduce.Counters(getCounters()));
-
- jobHistory.logEvent(jfe, this.status.getJobID());
- jobHistory.closeWriter(this.status.getJobID());
- // Note that finalize will close the job history handles which garbage collect
- // might try to finalize
- garbageCollect();
-
- metrics.completeJob(this.conf, this.status.getJobID());
- }
- }
-
- private synchronized void terminateJob(int jobTerminationState) {
- if ((status.getRunState() == JobStatus.RUNNING) ||
- (status.getRunState() == JobStatus.PREP)) {
- this.finishTime = JobTracker.getClock().getTime();
- this.status.setMapProgress(1.0f);
- this.status.setReduceProgress(1.0f);
- this.status.setCleanupProgress(1.0f);
- this.status.setFinishTime(this.finishTime);
- if (jobTerminationState == JobStatus.FAILED) {
- changeStateTo(JobStatus.FAILED);
- } else {
- changeStateTo(JobStatus.KILLED);
- }
- // Log the job summary
- JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
- JobUnsuccessfulCompletionEvent failedEvent =
- new JobUnsuccessfulCompletionEvent(this.status.getJobID(),
- finishTime,
- this.finishedMapTasks,
- this.finishedReduceTasks,
- JobStatus.getJobRunState(jobTerminationState));
-
- jobHistory.logEvent(failedEvent, this.status.getJobID());
- jobHistory.closeWriter(this.status.getJobID());
- garbageCollect();
- jobtracker.getInstrumentation().terminateJob(
- this.conf, this.status.getJobID());
- if (jobTerminationState == JobStatus.FAILED) {
- jobtracker.getInstrumentation().failedJob(
- this.conf, this.status.getJobID());
- } else {
- jobtracker.getInstrumentation().killedJob(
- this.conf, this.status.getJobID());
- }
- }
- }
- /**
- * Terminate the job and all its component tasks.
- * Calling this will lead to marking the job as failed/killed. Cleanup
- * tip will be launched. If the job has not inited, it will directly call
- * terminateJob as there is no need to launch cleanup tip.
- * This method is reentrant.
- * @param jobTerminationState job termination state
- */
- private synchronized void terminate(int jobTerminationState) {
- if(!tasksInited.get()) {
- //init could not be done, we just terminate directly.
- terminateJob(jobTerminationState);
- return;
- }
- if ((status.getRunState() == JobStatus.RUNNING) ||
- (status.getRunState() == JobStatus.PREP)) {
- LOG.info("Killing job '" + this.status.getJobID() + "'");
- if (jobTerminationState == JobStatus.FAILED) {
- if(jobFailed) {//reentrant
- return;
- }
- jobFailed = true;
- } else if (jobTerminationState == JobStatus.KILLED) {
- if(jobKilled) {//reentrant
- return;
- }
- jobKilled = true;
- }
- // clear all unclean tasks
- clearUncleanTasks();
- //
- // kill all TIPs.
- //
- for (int i = 0; i < setup.length; i++) {
- setup[i].kill();
- }
- for (int i = 0; i < maps.length; i++) {
- maps[i].kill();
- }
- for (int i = 0; i < reduces.length; i++) {
- reduces[i].kill();
- }
-
- if (!jobSetupCleanupNeeded) {
- terminateJob(jobTerminationState);
- }
- }
- }
- /**
- * Cancel all reservations since the job is done
- */
- private void cancelReservedSlots() {
- // Make a copy of the set of TaskTrackers to prevent a
- // ConcurrentModificationException ...
- Set<TaskTracker> tm =
- new HashSet<TaskTracker>(trackersReservedForMaps.keySet());
- for (TaskTracker tt : tm) {
- tt.unreserveSlots(TaskType.MAP, this);
- }
- Set<TaskTracker> tr =
- new HashSet<TaskTracker>(trackersReservedForReduces.keySet());
- for (TaskTracker tt : tr) {
- tt.unreserveSlots(TaskType.REDUCE, this);
- }
- }
-
- private void clearUncleanTasks() {
- TaskAttemptID taskid = null;
- TaskInProgress tip = null;
- while (!mapCleanupTasks.isEmpty()) {
- taskid = mapCleanupTasks.remove(0);
- tip = maps[taskid.getTaskID().getId()];
- updateTaskStatus(tip, tip.getTaskStatus(taskid));
- }
- while (!reduceCleanupTasks.isEmpty()) {
- taskid = reduceCleanupTasks.remove(0);
- tip = reduces[taskid.getTaskID().getId()];
- updateTaskStatus(tip, tip.getTaskStatus(taskid));
- }
- }
- /**
- * Kill the job and all its component tasks. This method should be called from
- * jobtracker and should return fast as it locks the jobtracker.
- */
- public void kill() {
- boolean killNow = false;
- synchronized(jobInitKillStatus) {
- jobInitKillStatus.killed = true;
- //if not in middle of init, terminate it now
- if(!jobInitKillStatus.initStarted || jobInitKillStatus.initDone) {
- //avoiding nested locking by setting flag
- killNow = true;
- }
- }
- if(killNow) {
- terminate(JobStatus.KILLED);
- }
- }
-
- /**
- * Fails the job and all its component tasks. This should be called only from
- * {@link JobInProgress} or {@link JobTracker}. Look at
- * {@link JobTracker#failJob(JobInProgress)} for more details.
- * Note that the job doesnt expect itself to be failed before its inited.
- * Only when the init is done (successfully or otherwise), the job can be
- * failed.
- */
- synchronized void fail() {
- terminate(JobStatus.FAILED);
- }
-
- private void decrementSpeculativeCount(boolean wasSpeculating,
- TaskInProgress tip) {
- if (wasSpeculating) {
- if (tip.isMapTask()) {
- speculativeMapTasks--;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Decremented count for " +
- tip.getTIPId()+"/"+tip.getJob().getJobID() +
- ". Current speculativeMap task count: "
- + speculativeMapTasks);
- }
- } else {
- speculativeReduceTasks--;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Decremented count for " +
- tip.getTIPId()+"/"+tip.getJob().getJobID() +
- ". Current speculativeReduce task count: "
- + speculativeReduceTasks);
- }
- }
- }
- }
-
- /**
- * A task assigned to this JobInProgress has reported in as failed.
- * Most of the time, we'll just reschedule execution. However, after
- * many repeated failures we may instead decide to allow the entire
- * job to fail or succeed if the user doesn't care about a few tasks failing.
- *
- * Even if a task has reported as completed in the past, it might later
- * be reported as failed. That's because the TaskTracker that hosts a map
- * task might die before the entire job can complete. If that happens,
- * we need to schedule reexecution so that downstream reduce tasks can
- * obtain the map task's output.
- */
- private void failedTask(TaskInProgress tip, TaskAttemptID taskid,
- TaskStatus status,
- TaskTracker taskTracker, boolean wasRunning,
- boolean wasComplete, boolean wasAttemptRunning) {
- // check if the TIP is already failed
- boolean wasFailed = tip.isFailed();
- boolean wasSpeculating = tip.isSpeculating();
- // Mark the taskid as FAILED or KILLED
- tip.incompleteSubTask(taskid, this.status);
- decrementSpeculativeCount(wasSpeculating, tip);
-
- boolean isRunning = tip.isRunning();
- boolean isComplete = tip.isComplete();
- if(wasAttemptRunning) {
- // We are decrementing counters without looking for isRunning ,
- // because we increment the counters when we obtain
- // new map task attempt or reduce task attempt.We do not really check
- // for tip being running.
- // Whenever we obtain new task attempt runningMapTasks incremented.
- // hence we are decrementing the same.
- if(!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
- if(tip.isMapTask()) {
- runningMapTasks -= 1;
- } else {
- runningReduceTasks -= 1;
- }
- }
-
- // Metering
- meterTaskAttempt(tip, status);
- }
-
- //update running count on task failure.
- if (wasRunning && !isRunning) {
- if (tip.isJobCleanupTask()) {
- launchedCleanup = false;
- } else if (tip.isJobSetupTask()) {
- launchedSetup = false;
- } else if (tip.isMapTask()) {
- // remove from the running queue and put it in the non-running cache
- // if the tip is not complete i.e if the tip still needs to be run
- if (!isComplete) {
- retireMap(tip);
- failMap(tip);
- }
- } else {
- // remove from the running queue and put in the failed queue if the tip
- // is not complete
- if (!isComplete) {
- retireReduce(tip);
- failReduce(tip);
- }
- }
- }
-
- // The case when the map was complete but the task tracker went down.
- // However, we don't need to do any metering here...
- if (wasComplete && !isComplete) {
- if (tip.isMapTask()) {
- // Put the task back in the cache. This will help locality for cases
- // where we have a different TaskTracker from the same rack/switch
- // asking for a task.
- // We bother about only those TIPs that were successful
- // earlier (wasComplete and !isComplete)
- // (since they might have been removed from the cache of other
- // racks/switches, if the input split blocks were present there too)
- failMap(tip);
- finishedMapTasks -= 1;
- }
- }
-
- // update job history
- // get taskStatus from tip
- TaskStatus taskStatus = tip.getTaskStatus(taskid);
- String taskTrackerName = taskStatus.getTaskTracker();
- String taskTrackerHostName = convertTrackerNameToHostName(taskTrackerName);
- int taskTrackerPort = -1;
- TaskTrackerStatus taskTrackerStatus =
- (taskTracker == null) ? null : taskTracker.getStatus();
- if (taskTrackerStatus != null) {
- taskTrackerPort = taskTrackerStatus.getHttpPort();
- }
- long startTime = taskStatus.getStartTime();
- long finishTime = taskStatus.getFinishTime();
- List<String> taskDiagnosticInfo = tip.getDiagnosticInfo(taskid);
- String diagInfo = taskDiagnosticInfo == null ? "" :
- StringUtils.arrayToString(taskDiagnosticInfo.toArray(new String[0]));
- TaskType taskType = getTaskType(tip);
- TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent(
- taskid, taskType, startTime, taskTrackerName, taskTrackerPort, -1);
-
- jobHistory.logEvent(tse, taskid.getJobID());
- ProgressSplitsBlock splits = tip.getSplits(taskStatus.getTaskID());
-
- TaskAttemptUnsuccessfulCompletionEvent tue =
- new TaskAttemptUnsuccessfulCompletionEvent
- (taskid,
- taskType, taskStatus.getRunState().toString(),
- finishTime,
- taskTrackerHostName, diagInfo,
- splits.burst());
- jobHistory.logEvent(tue, taskid.getJobID());
-
- // After this, try to assign tasks with the one after this, so that
- // the failed task goes to the end of the list.
- if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
- if (tip.isMapTask()) {
- failedMapTasks++;
- } else {
- failedReduceTasks++;
- }
- }
-
- //
- // Note down that a task has failed on this tasktracker
- //
- if (status.getRunState() == TaskStatus.State.FAILED) {
- addTrackerTaskFailure(taskTrackerName, taskTracker);
- }
-
- //
- // Let the JobTracker know that this task has failed
- //
- jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
- //
- // Check if we need to kill the job because of too many failures or
- // if the job is complete since all component tasks have completed
- // We do it once per TIP and that too for the task that fails the TIP
- if (!wasFailed && tip.isFailed()) {
- //
- // Allow upto 'mapFailuresPercent' of map tasks to fail or
- // 'reduceFailuresPercent' of reduce tasks to fail
- //
- boolean killJob = tip.isJobCleanupTask() || tip.isJobSetupTask() ? true :
- tip.isMapTask() ?
- ((++failedMapTIPs*100) > (mapFailuresPercent*numMapTasks)) :
- ((++failedReduceTIPs*100) > (reduceFailuresPercent*numReduceTasks));
-
- if (killJob) {
- LOG.info("Aborting job " + profile.getJobID());
- TaskFailedEvent tfe =
- new TaskFailedEvent(tip.getTIPId(), finishTime, taskType, diagInfo,
- TaskStatus.State.FAILED.toString(),
- null);
-
- jobHistory.logEvent(tfe, tip.getJob().getJobID());
-
- if (tip.isJobCleanupTask()) {
- // kill the other tip
- if (tip.isMapTask()) {
- cleanup[1].kill();
- } else {
- cleanup[0].kill();
- }
- terminateJob(JobStatus.FAILED);
- } else {
- if (tip.isJobSetupTask()) {
- // kill the other tip
- killSetupTip(!tip.isMapTask());
- }
- fail();
- }
- }
-
- //
- // Update the counters
- //
- if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
- if (tip.isMapTask()) {
- jobCounters.incrCounter(JobCounter.NUM_FAILED_MAPS, 1);
- } else {
- jobCounters.incrCounter(JobCounter.NUM_FAILED_REDUCES, 1);
- }
- }
- }
- }
- void killSetupTip(boolean isMap) {
- if (isMap) {
- setup[0].kill();
- } else {
- setup[1].kill();
- }
- }
- boolean isSetupFinished() {
- // if there is no setup to be launched, consider setup is finished.
- if ((tasksInited.get() && setup.length == 0) ||
- setup[0].isComplete() || setup[0].isFailed() || setup[1].isComplete()
- || setup[1].isFailed()) {
- return true;
- }
- return false;
- }
- /**
- * Fail a task with a given reason, but without a status object.
- *
- * Assuming {@link JobTracker} is locked on entry.
- *
- * @param tip The task's tip
- * @param taskid The task id
- * @param reason The reason that the task failed
- * @param trackerName The task tracker the task failed on
- */
- public synchronized void failedTask(TaskInProgress tip, TaskAttemptID taskid,
- String reason, TaskStatus.Phase phase, TaskStatus.State state,
- String trackerName) {
- TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(),
- taskid,
- 0.0f,
- tip.isMapTask() ?
- numSlotsPerMap :
- numSlotsPerReduce,
- state,
- reason,
- reason,
- trackerName, phase,
- new Counters());
- // update the actual start-time of the attempt
- TaskStatus oldStatus = tip.getTaskStatus(taskid);
- long startTime = oldStatus == null
- ? JobTracker.getClock().getTime()
- : oldStatus.getStartTime();
- status.setStartTime(startTime);
- status.setFinishTime(JobTracker.getClock().getTime());
- boolean wasComplete = tip.isComplete();
- updateTaskStatus(tip, status);
- boolean isComplete = tip.isComplete();
- if (wasComplete && !isComplete) { // mark a successful tip as failed
- TaskType taskType = getTaskType(tip);
- TaskFailedEvent tfe =
- new TaskFailedEvent(tip.getTIPId(), tip.getExecFinishTime(), taskType,
- reason, TaskStatus.State.FAILED.toString(),
- taskid);
-
- jobHistory.logEvent(tfe, tip.getJob().getJobID());
-
- }
- }
-
-
- /**
- * The job is dead. We're now GC'ing it, getting rid of the job
- * from all tables. Be sure to remove all of this job's tasks
- * from the various tables.
- */
- void garbageCollect() {
- synchronized(this) {
- // Cancel task tracker reservation
- cancelReservedSlots();
- // Let the JobTracker know that a job is complete
- jobtracker.getInstrumentation().decWaitingMaps(getJobID(), pendingMaps());
- jobtracker.getInstrumentation().decWaitingReduces(getJobID(), pendingReduces());
- jobtracker.storeCompletedJob(this);
- jobtracker.finalizeJob(this);
- try {
- // Definitely remove the local-disk copy of the job file
- if (localJobFile != null) {
- localFs.delete(localJobFile, true);
- localJobFile = null;
- }
- Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
- new CleanupQueue().addToQueue(new PathDeletionContext(
- jobtracker.getFileSystem(), tempDir.toUri().getPath()));
- } catch (IOException e) {
- LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
- }
- // free up the memory used by the data structures
- this.nonRunningMapCache = null;
- this.runningMapCache = null;
- this.nonRunningReduces = null;
- this.runningReduces = null;
- }
- // remove jobs delegation tokens
- if(conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true)) {
- DelegationTokenRenewal.removeDelegationTokenRenewalForJob(jobId);
- } // else don't remove it.May be used by spawned tasks
- }
- /**
- * Return the TaskInProgress that matches the tipid.
- */
- public synchronized TaskInProgress getTaskInProgress(TaskID tipid) {
- if (tipid.getTaskType() == TaskType.MAP) {
- // cleanup map tip
- if (cleanup.length > 0 && tipid.equals(cleanup[0].getTIPId())) {
- return cleanup[0];
- }
- // setup map tip
- if (setup.length > 0 && tipid.equals(setup[0].getTIPId())) {
- return setup[0];
- }
- for (int i = 0; i < maps.length; i++) {
- if (tipid.equals(maps[i].getTIPId())){
- return maps[i];
- }
- }
- } else {
- // cleanup reduce tip
- if (cleanup.length > 0 && tipid.equals(cleanup[1].getTIPId())) {
- return cleanup[1];
- }
- // setup reduce tip
- if (setup.length > 0 && tipid.equals(setup[1].getTIPId())) {
- return setup[1];
- }
- for (int i = 0; i < reduces.length; i++) {
- if (tipid.equals(reduces[i].getTIPId())){
- return reduces[i];
- }
- }
- }
- return null;
- }
-
- /**
- * Find the details of someplace where a map has finished
- * @param mapId the id of the map
- * @return the task status of the completed task
- */
- public synchronized TaskStatus findFinishedMap(int mapId) {
- TaskInProgress tip = maps[mapId];
- if (tip.isComplete()) {
- TaskStatus[] statuses = tip.getTaskStatuses();
- for(int i=0; i < statuses.length; i++) {
- if (statuses[i].getRunState() == TaskStatus.State.SUCCEEDED) {
- return statuses[i];
- }
- }
- }
- return null;
- }
-
- synchronized int getNumTaskCompletionEvents() {
- return taskCompletionEvents.size();
- }
-
- synchronized public TaskCompletionEvent[] getTaskCompletionEvents(
- int fromEventId, int maxEvents) {
- TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
- if (taskCompletionEvents.size() > fromEventId) {
- int actualMax = Math.min(maxEvents,
- (taskCompletionEvents.size() - fromEventId));
- events = taskCompletionEvents.subList(fromEventId, actualMax + fromEventId).toArray(events);
- }
- return events;
- }
-
- synchronized void fetchFailureNotification(TaskInProgress tip,
- TaskAttemptID mapTaskId,
- String mapTrackerName,
- TaskAttemptID reduceTaskId,
- String reduceTrackerName) {
- Integer fetchFailures = mapTaskIdToFetchFailuresMap.get(mapTaskId);
- fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
- mapTaskIdToFetchFailuresMap.put(mapTaskId, fetchFailures);
- LOG.info("Failed fetch notification #" + fetchFailures + " for map task: "
- + mapTaskId + " running on tracker: " + mapTrackerName
- + " and reduce task: " + reduceTaskId + " running on tracker: "
- + reduceTrackerName);
- float failureRate = (float)fetchFailures / runningReduceTasks;
- // declare faulty if fetch-failures >= max-allowed-failures
- boolean isMapFaulty = (failureRate >= MAX_ALLOWED_FETCH_FAILURES_PERCENT)
- ? true
- : false;
- if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS
- && isMapFaulty) {
- LOG.info("Too many fetch-failures for output of task: " + mapTaskId
- + " ... killing it");
-
- failedTask(tip, mapTaskId, "Too many fetch-failures",
- (tip.isMapTask() ? TaskStatus.Phase.MAP :
- TaskStatus.Phase.REDUCE),
- TaskStatus.State.FAILED, mapTrackerName);
-
- mapTaskIdToFetchFailuresMap.remove(mapTaskId);
- }
- }
-
- /**
- * @return The JobID of this JobInProgress.
- */
- public JobID getJobID() {
- return jobId;
- }
-
- public synchronized Object getSchedulingInfo() {
- return this.schedulingInfo;
- }
-
- public synchronized void setSchedulingInfo(Object schedulingInfo) {
- this.schedulingInfo = schedulingInfo;
- this.status.setSchedulingInfo(schedulingInfo.toString());
- }
-
- /**
- * To keep track of kill and initTasks status of this job. initTasks() take
- * a lock on JobInProgress object. kill should avoid waiting on
- * JobInProgress lock since it may take a while to do initTasks().
- */
- private static class JobInitKillStatus {
- //flag to be set if kill is called
- boolean killed;
-
- boolean initStarted;
- boolean initDone;
- }
- boolean isComplete() {
- return status.isJobComplete();
- }
-
- /**
- * Get the task type for logging it to {@link JobHistory}.
- */
- private TaskType getTaskType(TaskInProgress tip) {
- if (tip.isJobCleanupTask()) {
- return TaskType.JOB_CLEANUP;
- } else if (tip.isJobSetupTask()) {
- return TaskType.JOB_SETUP;
- } else if (tip.isMapTask()) {
- return TaskType.MAP;
- } else {
- return TaskType.REDUCE;
- }
- }
-
- /**
- * Get the level of locality that a given task would have if launched on
- * a particular TaskTracker. Returns 0 if the task has data on that machine,
- * 1 if it has data on the same rack, etc (depending on number of levels in
- * the network hierarchy).
- */
- int getLocalityLevel(TaskInProgress tip, TaskTrackerStatus tts) {
- Node tracker = jobtracker.getNode(tts.getHost());
- int level = this.maxLevel;
- // find the right level across split locations
- for (String local : maps[tip.getIdWithinJob()].getSplitLocations()) {
- Node datanode = jobtracker.getNode(local);
- int newLevel = this.maxLevel;
- if (tracker != null && datanode != null) {
- newLevel = getMatchingLevelForNodes(tracker, datanode);
- }
- if (newLevel < level) {
- level = newLevel;
- // an optimization
- if (level == 0) {
- break;
- }
- }
- }
- return level;
- }
-
- /**
- * Test method to set the cluster sizes
- */
- void setClusterSize(int clusterSize) {
- this.clusterSize = clusterSize;
- }
- static class JobSummary {
- static final Log LOG = LogFactory.getLog(JobSummary.class);
-
- // Escape sequences
- static final char EQUALS = '=';
- static final char[] charsToEscape =
- {StringUtils.COMMA, EQUALS, StringUtils.ESCAPE_CHAR};
- static class SummaryBuilder {
- final StringBuilder buffer = new StringBuilder();
- // A little optimization for a very common case
- SummaryBuilder add(String key, long value) {
- return _add(key, Long.toString(value));
- }
- <T> SummaryBuilder add(String key, T value) {
- return _add(key, StringUtils.escapeString(String.valueOf(value),
- StringUtils.ESCAPE_CHAR, charsToEscape));
- }
- SummaryBuilder add(SummaryBuilder summary) {
- if (buffer.length() > 0) buffer.append(StringUtils.COMMA);
- buffer.append(summary.buffer);
- return this;
- }
- SummaryBuilder _add(String key, String value) {
- if (buffer.length() > 0) buffer.append(StringUtils.COMMA);
- buffer.append(key).append(EQUALS).append(value);
- return this;
- }
- @Override public String toString() {
- return buffer.toString();
- }
- }
- static SummaryBuilder getTaskLaunchTimesSummary(JobInProgress job) {
- SummaryBuilder summary = new SummaryBuilder();
- Map<TaskType, Long> timeMap = job.getFirstTaskLaunchTimes();
- synchronized(timeMap) {
- for (Map.Entry<TaskType, Long> e : timeMap.entrySet()) {
- summary.add("first"+ StringUtils.camelize(e.getKey().name()) +
- "TaskLaunchTime", e.getValue().longValue());
- }
- }
- return summary;
- }
- /**
- * Log a summary of the job's runtime.
- *
- * @param job {@link JobInProgress} whose summary is to be logged, cannot
- * be <code>null</code>.
- * @param cluster {@link ClusterStatus} of the cluster on which the job was
- * run, cannot be <code>null</code>
- */
- public static void logJobSummary(JobInProgress job, ClusterStatus cluster) {
- JobStatus status = job.getStatus();
- JobProfile profile = job.getProfile();
- Counters jobCounters = job.getJobCounters();
- long mapSlotSeconds =
- (jobCounters.getCounter(JobCounter.SLOTS_MILLIS_MAPS) +
- jobCounters.getCounter(JobCounter.FALLOW_SLOTS_MILLIS_MAPS)) / 1000;
- long reduceSlotSeconds =
- (jobCounters.getCounter(JobCounter.SLOTS_MILLIS_REDUCES) +
- jobCounters.getCounter(JobCounter.FALLOW_SLOTS_MILLIS_REDUCES)) / 1000;
- SummaryBuilder summary = new SummaryBuilder()
- .add("jobId", job.getJobID())
- .add("submitTime", job.getStartTime())
- .add("launchTime", job.getLaunchTime())
- .add(getTaskLaunchTimesSummary(job))
- .add("finishTime", job.getFinishTime())
- .add("numMaps", job.getTasks(TaskType.MAP).length)
- .add("numSlotsPerMap", job.getNumSlotsPerMap())
- .add("numReduces", job.getTasks(TaskType.REDUCE).length)
- .add("numSlotsPerReduce", job.getNumSlotsPerReduce())
- .add("user", profile.getUser())
- .add("queue", profile.getQueueName())
- .add("status", JobStatus.getJobRunState(status.getRunState()))
- .add("mapSlotSeconds", mapSlotSeconds)
- .add("reduceSlotsSeconds", reduceSlotSeconds)
- .add("clusterMapCapacity", cluster.getMaxMapTasks())
- .add("clusterReduceCapacity", cluster.getMaxReduceTasks());
- LOG.info(summary);
- }
- }
-
- /**
- * Creates the localized copy of job conf
- * @param jobConf
- * @param id
- */
- void setUpLocalizedJobConf(JobConf jobConf,
- org.apache.hadoop.mapreduce.JobID id) {
- String localJobFilePath = jobtracker.getLocalJobFilePath(id);
- File localJobFile = new File(localJobFilePath);
- FileOutputStream jobOut = null;
- try {
- jobOut = new FileOutputStream(localJobFile);
- jobConf.writeXml(jobOut);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Job conf for " + id + " stored at "
- + localJobFile.getAbsolutePath());
- }
- } catch (IOException ioe) {
- LOG.error("Failed to store job conf on the local filesystem ", ioe);
- } finally {
- if (jobOut != null) {
- try {
- jobOut.close();
- } catch (IOException ie) {
- LOG.info("Failed to close the job configuration file "
- + StringUtils.stringifyException(ie));
- }
- }
- }
- }
- /**
- * Deletes localized copy of job conf
- */
- void cleanupLocalizedJobConf(org.apache.hadoop.mapreduce.JobID id) {
- String localJobFilePath = jobtracker.getLocalJobFilePath(id);
- File f = new File (localJobFilePath);
- LOG.info("Deleting localized job conf at " + f);
- if (!f.delete()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Failed to delete file " + f);
- }
- }
- }
-
- /**
- * generate job token and save it into the file
- * @throws IOException
- */
- private void generateAndStoreTokens() throws IOException{
- Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
- Path keysFile = new Path(jobDir, TokenCache.JOB_TOKEN_HDFS_FILE);
- if (tokenStorage == null) {
- tokenStorage = new Credentials();
- }
-
- //create JobToken file and write token to it
- JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(jobId
- .toString()));
- Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(identifier,
- jobtracker.getJobTokenSecretManager());
- token.setService(identifier.getJobId());
-
- TokenCache.setJobToken(token, tokenStorage);
-
- // write TokenStorage out
- tokenStorage.writeTokenStorageFile(keysFile, jobtracker.getConf());
- LOG.info("jobToken generated and stored with users keys in "
- + keysFile.toUri().getPath());
- }
- public String getJobSubmitHostAddress() {
- return submitHostAddress;
- }
- public String getJobSubmitHostName() {
- return submitHostName;
- }
- }
|