|
@@ -22,13 +22,11 @@ import java.io.File;
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
import java.net.URI;
|
|
|
-import java.net.URISyntaxException;
|
|
|
import java.net.URLEncoder;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
-import java.util.regex.Pattern;
|
|
|
import java.util.TreeMap;
|
|
|
import java.util.TreeSet;
|
|
|
|
|
@@ -41,12 +39,12 @@ import org.apache.commons.cli.Options;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.permission.FsAction;
|
|
|
import org.apache.hadoop.mapreduce.MRConfig;
|
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
|
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
|
|
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
-import org.apache.hadoop.fs.FileUtil;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
|
|
import org.apache.hadoop.mapred.FileInputFormat;
|
|
@@ -56,7 +54,6 @@ import org.apache.hadoop.mapred.JobClient;
|
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
|
import org.apache.hadoop.mapred.JobID;
|
|
|
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
|
|
|
-import org.apache.hadoop.mapred.OutputFormat;
|
|
|
import org.apache.hadoop.mapred.RunningJob;
|
|
|
import org.apache.hadoop.mapred.SequenceFileAsTextInputFormat;
|
|
|
import org.apache.hadoop.mapred.SequenceFileInputFormat;
|
|
@@ -65,6 +62,7 @@ import org.apache.hadoop.mapred.TextOutputFormat;
|
|
|
import org.apache.hadoop.mapred.lib.LazyOutputFormat;
|
|
|
import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorCombiner;
|
|
|
import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer;
|
|
|
+import org.apache.hadoop.security.AccessControlException;
|
|
|
import org.apache.hadoop.streaming.io.IdentifierResolver;
|
|
|
import org.apache.hadoop.streaming.io.InputWriter;
|
|
|
import org.apache.hadoop.streaming.io.OutputReader;
|
|
@@ -297,7 +295,10 @@ public class StreamJob implements Tool {
|
|
|
try {
|
|
|
Path path = new Path(file);
|
|
|
FileSystem localFs = FileSystem.getLocal(config_);
|
|
|
- String finalPath = path.makeQualified(localFs).toString();
|
|
|
+ Path qualifiedPath = path.makeQualified(
|
|
|
+ localFs.getUri(), localFs.getWorkingDirectory());
|
|
|
+ validate(qualifiedPath);
|
|
|
+ String finalPath = qualifiedPath.toString();
|
|
|
if(fileList.length() > 0) {
|
|
|
fileList.append(',');
|
|
|
}
|
|
@@ -313,7 +314,6 @@ public class StreamJob implements Tool {
|
|
|
tmpFiles = tmpFiles + "," + fileList;
|
|
|
}
|
|
|
config_.set("tmpfiles", tmpFiles);
|
|
|
- validate(packageFiles_);
|
|
|
}
|
|
|
|
|
|
String fsName = cmdLine.getOptionValue("dfs");
|
|
@@ -391,14 +391,13 @@ public class StreamJob implements Tool {
|
|
|
return OptionBuilder.withDescription(desc).create(name);
|
|
|
}
|
|
|
|
|
|
- private void validate(final List<String> values)
|
|
|
- throws IllegalArgumentException {
|
|
|
- for (String file : values) {
|
|
|
- File f = new File(file);
|
|
|
- if (!FileUtil.canRead(f)) {
|
|
|
- fail("File: " + f.getAbsolutePath()
|
|
|
- + " does not exist, or is not readable.");
|
|
|
- }
|
|
|
+ private void validate(final Path path) throws IOException {
|
|
|
+ try {
|
|
|
+ path.getFileSystem(config_).access(path, FsAction.READ);
|
|
|
+ } catch (FileNotFoundException e) {
|
|
|
+ fail("File: " + path + " does not exist.");
|
|
|
+ } catch (AccessControlException e) {
|
|
|
+ fail("File: " + path + " is not readable.");
|
|
|
}
|
|
|
}
|
|
|
|