|
@@ -17,6 +17,7 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
+import org.apache.commons.cli2.validation.InvalidArgumentException;
|
|
|
import org.apache.commons.logging.*;
|
|
|
|
|
|
import org.apache.hadoop.fs.*;
|
|
@@ -224,7 +225,8 @@ public class JobClient extends ToolBase implements MRConstants {
|
|
|
/**
|
|
|
* Submit a job to the MR system
|
|
|
*/
|
|
|
- public RunningJob submitJob(String jobFile) throws IOException {
|
|
|
+ public RunningJob submitJob(String jobFile) throws FileNotFoundException,
|
|
|
+ InvalidJobConfException,IOException {
|
|
|
// Load in the submitted job details
|
|
|
JobConf job = new JobConf(jobFile);
|
|
|
return submitJob(job);
|
|
@@ -234,7 +236,8 @@ public class JobClient extends ToolBase implements MRConstants {
|
|
|
/**
|
|
|
* Submit a job to the MR system
|
|
|
*/
|
|
|
- public RunningJob submitJob(JobConf job) throws IOException {
|
|
|
+ public RunningJob submitJob(JobConf job) throws FileNotFoundException,
|
|
|
+ InvalidJobConfException, IOException {
|
|
|
//
|
|
|
// First figure out what fs the JobTracker is using. Copy the
|
|
|
// job to it, under a temporary name. This allows DFS to work,
|
|
@@ -301,17 +304,33 @@ public class JobClient extends ToolBase implements MRConstants {
|
|
|
|
|
|
FileSystem userFileSys = FileSystem.get(job);
|
|
|
Path[] inputDirs = job.getInputPaths();
|
|
|
+ // input paths should exist.
|
|
|
+
|
|
|
boolean[] validDirs =
|
|
|
job.getInputFormat().areValidInputDirectories(userFileSys, inputDirs);
|
|
|
for(int i=0; i < validDirs.length; ++i) {
|
|
|
if (!validDirs[i]) {
|
|
|
- String msg = "Input directory " + inputDirs[i] +
|
|
|
- " in " + userFileSys.getName() + " is invalid.";
|
|
|
- LOG.error(msg);
|
|
|
- throw new IOException(msg);
|
|
|
+ String msg = null ;
|
|
|
+ if( !userFileSys.exists(inputDirs[i]) ){
|
|
|
+ msg = "Input directory " + inputDirs[i] +
|
|
|
+ " doesn't exist in " + userFileSys.getName();
|
|
|
+ LOG.error(msg);
|
|
|
+ throw new FileNotFoundException(msg);
|
|
|
+ }else if( !userFileSys.isDirectory(inputDirs[i])){
|
|
|
+ msg = "Invalid input path, expecting directory : " + inputDirs[i] ;
|
|
|
+ LOG.error(msg);
|
|
|
+ throw new InvalidFileTypeException(msg);
|
|
|
+ }else{
|
|
|
+ // some other error
|
|
|
+ msg = "Input directory " + inputDirs[i] +
|
|
|
+ " in " + userFileSys.getName() + " is invalid.";
|
|
|
+ LOG.error(msg);
|
|
|
+ throw new IOException(msg);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+
|
|
|
// Check the output specification
|
|
|
job.getOutputFormat().checkOutputSpecs(fs, job);
|
|
|
|
|
@@ -428,7 +447,6 @@ public class JobClient extends ToolBase implements MRConstants {
|
|
|
|
|
|
|
|
|
public int run(String[] argv) throws Exception {
|
|
|
- // TODO Auto-generated method stub
|
|
|
if (argv.length < 2) {
|
|
|
System.out.println("JobClient -submit <job> | -status <id> | -kill <id> [-jt <jobtracker:port>|<config>]");
|
|
|
System.exit(-1);
|