|
@@ -65,6 +65,12 @@ import org.apache.hadoop.io.serializer.Serializer;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.mapred.Counters.Counter;
|
|
|
import org.apache.hadoop.mapred.Counters.Group;
|
|
|
+import org.apache.hadoop.mapreduce.InputFormat;
|
|
|
+import org.apache.hadoop.mapreduce.InputSplit;
|
|
|
+import org.apache.hadoop.mapreduce.Job;
|
|
|
+import org.apache.hadoop.mapreduce.JobContext;
|
|
|
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
|
|
|
+import org.apache.hadoop.mapreduce.split.JobSplitWriter;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.security.UnixUserGroupInformation;
|
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
@@ -389,6 +395,7 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
|
|
|
private JobSubmissionProtocol jobSubmitClient;
|
|
|
private Path sysDir = null;
|
|
|
+ private Path stagingAreaDir = null;
|
|
|
|
|
|
private FileSystem fs = null;
|
|
|
|
|
@@ -418,6 +425,7 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
public void init(JobConf conf) throws IOException {
|
|
|
String tracker = conf.get("mapred.job.tracker", "local");
|
|
|
if ("local".equals(tracker)) {
|
|
|
+ conf.setNumMapTasks(1);
|
|
|
this.jobSubmitClient = new LocalJobRunner(conf);
|
|
|
} else {
|
|
|
this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
|
|
@@ -529,11 +537,23 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
/**
|
|
|
* configure the jobconf of the user with the command line options of
|
|
|
* -libjars, -files, -archives
|
|
|
- * @param conf
|
|
|
+ * @param job the JobConf
|
|
|
+ * @param submitJobDir
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- private void configureCommandLineOptions(JobConf job, Path submitJobDir, Path submitJarFile)
|
|
|
- throws IOException {
|
|
|
+ private void copyAndConfigureFiles(JobConf job, Path jobSubmitDir)
|
|
|
+ throws IOException {
|
|
|
+ short replication = (short)job.getInt("mapred.submit.replication", 10);
|
|
|
+ copyAndConfigureFiles(job, jobSubmitDir, replication);
|
|
|
+
|
|
|
+ // Set the working directory
|
|
|
+ if (job.getWorkingDirectory() == null) {
|
|
|
+ job.setWorkingDirectory(fs.getWorkingDirectory());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void copyAndConfigureFiles(JobConf job, Path submitJobDir,
|
|
|
+ short replication) throws IOException {
|
|
|
|
|
|
if (!(job.getBoolean("mapred.used.genericoptionsparser", false))) {
|
|
|
LOG.warn("Use GenericOptionsParser for parsing the arguments. " +
|
|
@@ -566,15 +586,18 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
// Create a number of filenames in the JobTracker's fs namespace
|
|
|
FileSystem fs = getFs();
|
|
|
LOG.debug("default FileSystem: " + fs.getUri());
|
|
|
- fs.delete(submitJobDir, true);
|
|
|
+ if (fs.exists(submitJobDir)) {
|
|
|
+ throw new IOException("Not submitting job. Job directory " + submitJobDir
|
|
|
+ +" already exists!! This is unexpected.Please check what's there in" +
|
|
|
+ " that directory");
|
|
|
+ }
|
|
|
submitJobDir = fs.makeQualified(submitJobDir);
|
|
|
submitJobDir = new Path(submitJobDir.toUri().getPath());
|
|
|
- FsPermission mapredSysPerms = new FsPermission(JOB_DIR_PERMISSION);
|
|
|
+ FsPermission mapredSysPerms = new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
|
|
|
FileSystem.mkdirs(fs, submitJobDir, mapredSysPerms);
|
|
|
- Path filesDir = new Path(submitJobDir, "files");
|
|
|
- Path archivesDir = new Path(submitJobDir, "archives");
|
|
|
- Path libjarsDir = new Path(submitJobDir, "libjars");
|
|
|
- short replication = (short)job.getInt("mapred.submit.replication", 10);
|
|
|
+ Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
|
|
|
+ Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
|
|
|
+ Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
|
|
|
// add all the command line files/ jars and archive
|
|
|
// first copy them to jobtrackers filesystem
|
|
|
|
|
@@ -601,7 +624,8 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
for (String tmpjars: libjarsArr) {
|
|
|
Path tmp = new Path(tmpjars);
|
|
|
Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication);
|
|
|
- DistributedCache.addArchiveToClassPath(newPath, job);
|
|
|
+ DistributedCache.addArchiveToClassPath(
|
|
|
+ new Path(newPath.toUri().getPath()), job);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -653,10 +677,12 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
if ("".equals(job.getJobName())){
|
|
|
job.setJobName(new Path(originalJarPath).getName());
|
|
|
}
|
|
|
+ Path submitJarFile = JobSubmissionFiles.getJobJar(submitJobDir);
|
|
|
job.setJar(submitJarFile.toString());
|
|
|
fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);
|
|
|
fs.setReplication(submitJarFile, replication);
|
|
|
- fs.setPermission(submitJarFile, new FsPermission(JOB_FILE_PERMISSION));
|
|
|
+ fs.setPermission(submitJarFile,
|
|
|
+ new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
|
|
|
} else {
|
|
|
LOG.warn("No job jar file set. User classes may not be found. "+
|
|
|
"See JobConf(Class) or JobConf#setJar(String).");
|
|
@@ -667,10 +693,6 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
if (ugi.getGroupNames().length > 0) {
|
|
|
job.set("group.name", ugi.getGroupNames()[0]);
|
|
|
}
|
|
|
- if (job.getWorkingDirectory() == null) {
|
|
|
- job.setWorkingDirectory(fs.getWorkingDirectory());
|
|
|
- }
|
|
|
-
|
|
|
}
|
|
|
|
|
|
private UnixUserGroupInformation getUGI(Configuration job) throws IOException {
|
|
@@ -704,15 +726,7 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
JobConf job = new JobConf(jobFile);
|
|
|
return submitJob(job);
|
|
|
}
|
|
|
-
|
|
|
- // job files are world-wide readable and owner writable
|
|
|
- final private static FsPermission JOB_FILE_PERMISSION =
|
|
|
- FsPermission.createImmutable((short) 0644); // rw-r--r--
|
|
|
-
|
|
|
- // job submission directory is world readable/writable/executable
|
|
|
- final static FsPermission JOB_DIR_PERMISSION =
|
|
|
- FsPermission.createImmutable((short) 0777); // rwx-rwx-rwx
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Submit a job to the MR system.
|
|
|
* This returns a handle to the {@link RunningJob} which can be used to track
|
|
@@ -753,66 +767,100 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
/*
|
|
|
* configure the command line options correctly on the submitting dfs
|
|
|
*/
|
|
|
-
|
|
|
+ Path jobStagingArea = JobSubmissionFiles.getStagingDir(this, job);
|
|
|
JobID jobId = jobSubmitClient.getNewJobId();
|
|
|
- Path submitJobDir = new Path(getSystemDir(), jobId.toString());
|
|
|
- Path submitJarFile = new Path(submitJobDir, "job.jar");
|
|
|
- Path submitSplitFile = new Path(submitJobDir, "job.split");
|
|
|
- configureCommandLineOptions(job, submitJobDir, submitJarFile);
|
|
|
- Path submitJobFile = new Path(submitJobDir, "job.xml");
|
|
|
- int reduces = job.getNumReduceTasks();
|
|
|
- JobContext context = new JobContext(job, jobId);
|
|
|
-
|
|
|
- // Check the output specification
|
|
|
- if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
|
|
|
- org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
|
|
|
- ReflectionUtils.newInstance(context.getOutputFormatClass(), job);
|
|
|
- output.checkOutputSpecs(context);
|
|
|
- } else {
|
|
|
- job.getOutputFormat().checkOutputSpecs(fs, job);
|
|
|
- }
|
|
|
+ Path submitJobDir = new Path(jobStagingArea, jobId.toString());
|
|
|
+ job.set("mapreduce.job.dir", submitJobDir.toString());
|
|
|
+ JobStatus status = null;
|
|
|
+ try {
|
|
|
+ copyAndConfigureFiles(job, submitJobDir);
|
|
|
+ Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
|
|
|
+ int reduces = job.getNumReduceTasks();
|
|
|
+ JobContext context = new JobContext(job, jobId);
|
|
|
+
|
|
|
+ // Check the output specification
|
|
|
+ if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
|
|
|
+ org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
|
|
|
+ ReflectionUtils.newInstance(context.getOutputFormatClass(), job);
|
|
|
+ output.checkOutputSpecs(context);
|
|
|
+ } else {
|
|
|
+ job.getOutputFormat().checkOutputSpecs(fs, job);
|
|
|
+ }
|
|
|
|
|
|
- // Create the splits for the job
|
|
|
- LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
|
|
|
- int maps;
|
|
|
- if (job.getUseNewMapper()) {
|
|
|
- maps = writeNewSplits(context, submitSplitFile);
|
|
|
- } else {
|
|
|
- maps = writeOldSplits(job, submitSplitFile);
|
|
|
- }
|
|
|
- job.set("mapred.job.split.file", submitSplitFile.toString());
|
|
|
- job.setNumMapTasks(maps);
|
|
|
-
|
|
|
- // Write job file to JobTracker's fs
|
|
|
- FSDataOutputStream out =
|
|
|
- FileSystem.create(fs, submitJobFile,
|
|
|
- new FsPermission(JOB_FILE_PERMISSION));
|
|
|
+ // Create the splits for the job
|
|
|
+ LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
|
|
|
+ int maps = writeSplits(context, submitJobDir);
|
|
|
+ job.setNumMapTasks(maps);
|
|
|
|
|
|
- try {
|
|
|
- job.writeXml(out);
|
|
|
+ // Write job file to JobTracker's fs
|
|
|
+ FSDataOutputStream out =
|
|
|
+ FileSystem.create(fs, submitJobFile,
|
|
|
+ new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
|
|
|
+
|
|
|
+ try {
|
|
|
+ job.writeXml(out);
|
|
|
+ } finally {
|
|
|
+ out.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ //
|
|
|
+ // Now, actually submit the job (using the submit name)
|
|
|
+ //
|
|
|
+ status = jobSubmitClient.submitJob(jobId, submitJobDir.toString());
|
|
|
+ if (status != null) {
|
|
|
+ return new NetworkedJob(status);
|
|
|
+ } else {
|
|
|
+ throw new IOException("Could not launch job");
|
|
|
+ }
|
|
|
} finally {
|
|
|
- out.close();
|
|
|
+ if (status == null) {
|
|
|
+ LOG.info("Cleaning up the staging area " + submitJobDir);
|
|
|
+ fs.delete(submitJobDir, true);
|
|
|
+ }
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- //
|
|
|
- // Now, actually submit the job (using the submit name)
|
|
|
- //
|
|
|
- JobStatus status = jobSubmitClient.submitJob(jobId);
|
|
|
- if (status != null) {
|
|
|
- return new NetworkedJob(status);
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ private <T extends InputSplit>
|
|
|
+ int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
|
|
|
+ InterruptedException, ClassNotFoundException {
|
|
|
+ Configuration conf = job.getConfiguration();
|
|
|
+ InputFormat<?, ?> input =
|
|
|
+ ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
|
|
|
+
|
|
|
+ List<InputSplit> splits = input.getSplits(job);
|
|
|
+ T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
|
|
|
+
|
|
|
+ // sort the splits into order based on size, so that the biggest
|
|
|
+ // go first
|
|
|
+ Arrays.sort(array, new SplitComparator());
|
|
|
+ JobSplitWriter.createSplitFiles(jobSubmitDir, conf, array);
|
|
|
+ return array.length;
|
|
|
+ }
|
|
|
+
|
|
|
+ private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
|
|
|
+ Path jobSubmitDir) throws IOException,
|
|
|
+ InterruptedException, ClassNotFoundException {
|
|
|
+ JobConf jConf = (JobConf)job.getConfiguration();
|
|
|
+ int maps;
|
|
|
+ if (jConf.getUseNewMapper()) {
|
|
|
+ maps = writeNewSplits(job, jobSubmitDir);
|
|
|
} else {
|
|
|
- throw new IOException("Could not launch job");
|
|
|
+ maps = writeOldSplits(jConf, jobSubmitDir);
|
|
|
}
|
|
|
+ return maps;
|
|
|
}
|
|
|
-
|
|
|
- private int writeOldSplits(JobConf job,
|
|
|
- Path submitSplitFile) throws IOException {
|
|
|
- InputSplit[] splits =
|
|
|
- job.getInputFormat().getSplits(job, job.getNumMapTasks());
|
|
|
+
|
|
|
+ //method to write splits for old api mapper.
|
|
|
+ private int writeOldSplits(JobConf job, Path jobSubmitDir)
|
|
|
+ throws IOException {
|
|
|
+ org.apache.hadoop.mapred.InputSplit[] splits =
|
|
|
+ job.getInputFormat().getSplits(job, job.getNumMapTasks());
|
|
|
// sort the splits into order based on size, so that the biggest
|
|
|
// go first
|
|
|
- Arrays.sort(splits, new Comparator<InputSplit>() {
|
|
|
- public int compare(InputSplit a, InputSplit b) {
|
|
|
+ Arrays.sort(splits, new Comparator<org.apache.hadoop.mapred.InputSplit>() {
|
|
|
+ public int compare(org.apache.hadoop.mapred.InputSplit a,
|
|
|
+ org.apache.hadoop.mapred.InputSplit b) {
|
|
|
try {
|
|
|
long left = a.getLength();
|
|
|
long right = b.getLength();
|
|
@@ -824,37 +872,17 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
return -1;
|
|
|
}
|
|
|
} catch (IOException ie) {
|
|
|
- throw new RuntimeException("Problem getting input split size",
|
|
|
- ie);
|
|
|
+ throw new RuntimeException("Problem getting input split size", ie);
|
|
|
}
|
|
|
}
|
|
|
});
|
|
|
- DataOutputStream out = writeSplitsFileHeader(job, submitSplitFile, splits.length);
|
|
|
-
|
|
|
- try {
|
|
|
- DataOutputBuffer buffer = new DataOutputBuffer();
|
|
|
- RawSplit rawSplit = new RawSplit();
|
|
|
- for(InputSplit split: splits) {
|
|
|
- rawSplit.setClassName(split.getClass().getName());
|
|
|
- buffer.reset();
|
|
|
- split.write(buffer);
|
|
|
- rawSplit.setDataLength(split.getLength());
|
|
|
- rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
|
|
|
- rawSplit.setLocations(split.getLocations());
|
|
|
- rawSplit.write(out);
|
|
|
- }
|
|
|
- } finally {
|
|
|
- out.close();
|
|
|
- }
|
|
|
+ JobSplitWriter.createSplitFiles(jobSubmitDir, job, splits);
|
|
|
return splits.length;
|
|
|
}
|
|
|
-
|
|
|
- private static class NewSplitComparator
|
|
|
- implements Comparator<org.apache.hadoop.mapreduce.InputSplit>{
|
|
|
-
|
|
|
+
|
|
|
+ private static class SplitComparator implements Comparator<InputSplit> {
|
|
|
@Override
|
|
|
- public int compare(org.apache.hadoop.mapreduce.InputSplit o1,
|
|
|
- org.apache.hadoop.mapreduce.InputSplit o2) {
|
|
|
+ public int compare(InputSplit o1, InputSplit o2) {
|
|
|
try {
|
|
|
long len1 = o1.getLength();
|
|
|
long len2 = o2.getLength();
|
|
@@ -868,54 +896,11 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
} catch (IOException ie) {
|
|
|
throw new RuntimeException("exception in compare", ie);
|
|
|
} catch (InterruptedException ie) {
|
|
|
- throw new RuntimeException("exception in compare", ie);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- private <T extends org.apache.hadoop.mapreduce.InputSplit>
|
|
|
- int writeNewSplits(JobContext job, Path submitSplitFile
|
|
|
- ) throws IOException, InterruptedException,
|
|
|
- ClassNotFoundException {
|
|
|
- JobConf conf = job.getJobConf();
|
|
|
- org.apache.hadoop.mapreduce.InputFormat<?,?> input =
|
|
|
- ReflectionUtils.newInstance(job.getInputFormatClass(), job.getJobConf());
|
|
|
-
|
|
|
- List<org.apache.hadoop.mapreduce.InputSplit> splits = input.getSplits(job);
|
|
|
- T[] array = (T[])
|
|
|
- splits.toArray(new org.apache.hadoop.mapreduce.InputSplit[splits.size()]);
|
|
|
-
|
|
|
- // sort the splits into order based on size, so that the biggest
|
|
|
- // go first
|
|
|
- Arrays.sort(array, new NewSplitComparator());
|
|
|
- DataOutputStream out = writeSplitsFileHeader(conf, submitSplitFile,
|
|
|
- array.length);
|
|
|
- try {
|
|
|
- if (array.length != 0) {
|
|
|
- DataOutputBuffer buffer = new DataOutputBuffer();
|
|
|
- RawSplit rawSplit = new RawSplit();
|
|
|
- SerializationFactory factory = new SerializationFactory(conf);
|
|
|
- Serializer<T> serializer =
|
|
|
- factory.getSerializer((Class<T>) array[0].getClass());
|
|
|
- serializer.open(buffer);
|
|
|
- for(T split: array) {
|
|
|
- rawSplit.setClassName(split.getClass().getName());
|
|
|
- buffer.reset();
|
|
|
- serializer.serialize(split);
|
|
|
- rawSplit.setDataLength(split.getLength());
|
|
|
- rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
|
|
|
- rawSplit.setLocations(split.getLocations());
|
|
|
- rawSplit.write(out);
|
|
|
- }
|
|
|
- serializer.close();
|
|
|
+ throw new RuntimeException("exception in compare", ie);
|
|
|
}
|
|
|
- } finally {
|
|
|
- out.close();
|
|
|
}
|
|
|
- return array.length;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Checks if the job directory is clean and has all the required components
|
|
|
* for (re) starting the job
|
|
@@ -939,125 +924,6 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
|
-
|
|
|
- static class RawSplit implements Writable {
|
|
|
- private String splitClass;
|
|
|
- private BytesWritable bytes = new BytesWritable();
|
|
|
- private String[] locations;
|
|
|
- long dataLength;
|
|
|
-
|
|
|
- public void setBytes(byte[] data, int offset, int length) {
|
|
|
- bytes.set(data, offset, length);
|
|
|
- }
|
|
|
-
|
|
|
- public void setClassName(String className) {
|
|
|
- splitClass = className;
|
|
|
- }
|
|
|
-
|
|
|
- public String getClassName() {
|
|
|
- return splitClass;
|
|
|
- }
|
|
|
-
|
|
|
- public BytesWritable getBytes() {
|
|
|
- return bytes;
|
|
|
- }
|
|
|
-
|
|
|
- public void clearBytes() {
|
|
|
- bytes = null;
|
|
|
- }
|
|
|
-
|
|
|
- public void setLocations(String[] locations) {
|
|
|
- this.locations = locations;
|
|
|
- }
|
|
|
-
|
|
|
- public String[] getLocations() {
|
|
|
- return locations;
|
|
|
- }
|
|
|
-
|
|
|
- public void readFields(DataInput in) throws IOException {
|
|
|
- splitClass = Text.readString(in);
|
|
|
- dataLength = in.readLong();
|
|
|
- bytes.readFields(in);
|
|
|
- int len = WritableUtils.readVInt(in);
|
|
|
- locations = new String[len];
|
|
|
- for(int i=0; i < len; ++i) {
|
|
|
- locations[i] = Text.readString(in);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public void write(DataOutput out) throws IOException {
|
|
|
- Text.writeString(out, splitClass);
|
|
|
- out.writeLong(dataLength);
|
|
|
- bytes.write(out);
|
|
|
- WritableUtils.writeVInt(out, locations.length);
|
|
|
- for(int i = 0; i < locations.length; i++) {
|
|
|
- Text.writeString(out, locations[i]);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public long getDataLength() {
|
|
|
- return dataLength;
|
|
|
- }
|
|
|
- public void setDataLength(long l) {
|
|
|
- dataLength = l;
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- private static final int CURRENT_SPLIT_FILE_VERSION = 0;
|
|
|
- private static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();
|
|
|
-
|
|
|
- private DataOutputStream writeSplitsFileHeader(Configuration conf,
|
|
|
- Path filename,
|
|
|
- int length
|
|
|
- ) throws IOException {
|
|
|
- // write the splits to a file for the job tracker
|
|
|
- FileSystem fs = filename.getFileSystem(conf);
|
|
|
- FSDataOutputStream out =
|
|
|
- FileSystem.create(fs, filename, new FsPermission(JOB_FILE_PERMISSION));
|
|
|
- out.write(SPLIT_FILE_HEADER);
|
|
|
- WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION);
|
|
|
- WritableUtils.writeVInt(out, length);
|
|
|
- return out;
|
|
|
- }
|
|
|
-
|
|
|
- /** Create the list of input splits and write them out in a file for
|
|
|
- *the JobTracker. The format is:
|
|
|
- * <format version>
|
|
|
- * <numSplits>
|
|
|
- * for each split:
|
|
|
- * <RawSplit>
|
|
|
- * @param splits the input splits to write out
|
|
|
- * @param out the stream to write to
|
|
|
- */
|
|
|
- private void writeOldSplitsFile(InputSplit[] splits,
|
|
|
- FSDataOutputStream out) throws IOException {
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Read a splits file into a list of raw splits
|
|
|
- * @param in the stream to read from
|
|
|
- * @return the complete list of splits
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- static RawSplit[] readSplitFile(DataInput in) throws IOException {
|
|
|
- byte[] header = new byte[SPLIT_FILE_HEADER.length];
|
|
|
- in.readFully(header);
|
|
|
- if (!Arrays.equals(SPLIT_FILE_HEADER, header)) {
|
|
|
- throw new IOException("Invalid header on split file");
|
|
|
- }
|
|
|
- int vers = WritableUtils.readVInt(in);
|
|
|
- if (vers != CURRENT_SPLIT_FILE_VERSION) {
|
|
|
- throw new IOException("Unsupported split version " + vers);
|
|
|
- }
|
|
|
- int len = WritableUtils.readVInt(in);
|
|
|
- RawSplit[] result = new RawSplit[len];
|
|
|
- for(int i=0; i < len; ++i) {
|
|
|
- result[i] = new RawSplit();
|
|
|
- result[i].readFields(in);
|
|
|
- }
|
|
|
- return result;
|
|
|
- }
|
|
|
|
|
|
/**
|
|
|
* Get an {@link RunningJob} object to track an ongoing job. Returns
|
|
@@ -1205,7 +1071,19 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
|
|
|
return jobSubmitClient.getClusterStatus(detailed);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Grab the jobtracker's view of the staging directory path where
|
|
|
+ * job-specific files will be placed.
|
|
|
+ *
|
|
|
+ * @return the staging directory where job-specific files are to be placed.
|
|
|
+ */
|
|
|
+ public Path getStagingAreaDir() throws IOException {
|
|
|
+ if (stagingAreaDir == null) {
|
|
|
+ stagingAreaDir = new Path(jobSubmitClient.getStagingAreaDir());
|
|
|
+ }
|
|
|
+ return stagingAreaDir;
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Get the jobs that are not completed and not failed.
|