|
@@ -18,6 +18,7 @@
|
|
|
package org.apache.hadoop.mapreduce;
|
|
|
|
|
|
import java.io.File;
|
|
|
+import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
import java.net.InetAddress;
|
|
|
import java.net.URI;
|
|
@@ -262,15 +263,102 @@ class JobSubmitter {
|
|
|
LOG.warn("No job jar file set. User classes may not be found. "+
|
|
|
"See Job or Job#setJar(String).");
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ addLog4jToDistributedCache(job, submitJobDir);
|
|
|
+
|
|
|
// set the timestamps of the archives and files
|
|
|
// set the public/private visibility of the archives and files
|
|
|
ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf);
|
|
|
- // get DelegationToken for each cached file
|
|
|
+ // get DelegationToken for cached file
|
|
|
ClientDistributedCacheManager.getDelegationTokens(conf, job
|
|
|
.getCredentials());
|
|
|
}
|
|
|
|
|
|
+ // copy user specified log4j.property file in local
|
|
|
+ // to HDFS with putting on distributed cache and adding its parent directory
|
|
|
+ // to classpath.
|
|
|
+ @SuppressWarnings("deprecation")
|
|
|
+ private void copyLog4jPropertyFile(Job job, Path submitJobDir,
|
|
|
+ short replication) throws IOException {
|
|
|
+ Configuration conf = job.getConfiguration();
|
|
|
+
|
|
|
+ String file = validateFilePath(
|
|
|
+ conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE), conf);
|
|
|
+ LOG.debug("default FileSystem: " + jtFs.getUri());
|
|
|
+ FsPermission mapredSysPerms =
|
|
|
+ new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
|
|
|
+ if (!jtFs.exists(submitJobDir)) {
|
|
|
+ throw new IOException("Cannot find job submission directory! "
|
|
|
+ + "It should just be created, so something wrong here.");
|
|
|
+ }
|
|
|
+
|
|
|
+ Path fileDir = JobSubmissionFiles.getJobLog4jFile(submitJobDir);
|
|
|
+
|
|
|
+ // first copy local log4j.properties file to HDFS under submitJobDir
|
|
|
+ if (file != null) {
|
|
|
+ FileSystem.mkdirs(jtFs, fileDir, mapredSysPerms);
|
|
|
+ URI tmpURI = null;
|
|
|
+ try {
|
|
|
+ tmpURI = new URI(file);
|
|
|
+ } catch (URISyntaxException e) {
|
|
|
+ throw new IllegalArgumentException(e);
|
|
|
+ }
|
|
|
+ Path tmp = new Path(tmpURI);
|
|
|
+ Path newPath = copyRemoteFiles(fileDir, tmp, conf, replication);
|
|
|
+ DistributedCache.addFileToClassPath(new Path(newPath.toUri().getPath()), conf);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * takes input as a path string for file and verifies if it exist.
|
|
|
+ * It defaults for file:/// if the files specified do not have a scheme.
|
|
|
+ * it returns the paths uri converted defaulting to file:///.
|
|
|
+ * So an input of /home/user/file1 would return file:///home/user/file1
|
|
|
+ * @param file
|
|
|
+ * @param conf
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private String validateFilePath(String file, Configuration conf)
|
|
|
+ throws IOException {
|
|
|
+ if (file == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ if (file.isEmpty()) {
|
|
|
+ throw new IllegalArgumentException("File name can't be empty string");
|
|
|
+ }
|
|
|
+ String finalPath;
|
|
|
+ URI pathURI;
|
|
|
+ try {
|
|
|
+ pathURI = new URI(file);
|
|
|
+ } catch (URISyntaxException e) {
|
|
|
+ throw new IllegalArgumentException(e);
|
|
|
+ }
|
|
|
+ Path path = new Path(pathURI);
|
|
|
+ FileSystem localFs = FileSystem.getLocal(conf);
|
|
|
+ if (pathURI.getScheme() == null) {
|
|
|
+ //default to the local file system
|
|
|
+ //check if the file exists or not first
|
|
|
+ if (!localFs.exists(path)) {
|
|
|
+ throw new FileNotFoundException("File " + file + " does not exist.");
|
|
|
+ }
|
|
|
+ finalPath = path.makeQualified(localFs.getUri(),
|
|
|
+ localFs.getWorkingDirectory()).toString();
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ // check if the file exists in this file system
|
|
|
+ // we need to recreate this filesystem object to copy
|
|
|
+ // these files to the file system ResourceManager is running
|
|
|
+ // on.
|
|
|
+ FileSystem fs = path.getFileSystem(conf);
|
|
|
+ if (!fs.exists(path)) {
|
|
|
+ throw new FileNotFoundException("File " + file + " does not exist.");
|
|
|
+ }
|
|
|
+ finalPath = path.makeQualified(fs.getUri(),
|
|
|
+ fs.getWorkingDirectory()).toString();
|
|
|
+ }
|
|
|
+ return finalPath;
|
|
|
+ }
|
|
|
+
|
|
|
private URI getPathURI(Path destPath, String fragment)
|
|
|
throws URISyntaxException {
|
|
|
URI pathURI = destPath.toUri();
|
|
@@ -305,7 +393,7 @@ class JobSubmitter {
|
|
|
|
|
|
// Set the working directory
|
|
|
if (job.getWorkingDirectory() == null) {
|
|
|
- job.setWorkingDirectory(jtFs.getWorkingDirectory());
|
|
|
+ job.setWorkingDirectory(jtFs.getWorkingDirectory());
|
|
|
}
|
|
|
|
|
|
}
|
|
@@ -395,6 +483,10 @@ class JobSubmitter {
|
|
|
}
|
|
|
|
|
|
copyAndConfigureFiles(job, submitJobDir);
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
|
|
|
|
|
|
// Create the splits for the job
|
|
@@ -673,4 +765,20 @@ class JobSubmitter {
|
|
|
DistributedCache.addCacheArchive(uri, conf);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private void addLog4jToDistributedCache(Job job,
|
|
|
+ Path jobSubmitDir) throws IOException {
|
|
|
+ Configuration conf = job.getConfiguration();
|
|
|
+ String log4jPropertyFile =
|
|
|
+ conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, "");
|
|
|
+ if (!log4jPropertyFile.isEmpty()) {
|
|
|
+ short replication = (short)conf.getInt(Job.SUBMIT_REPLICATION, 10);
|
|
|
+ copyLog4jPropertyFile(job, jobSubmitDir, replication);
|
|
|
+
|
|
|
+ // Set the working directory
|
|
|
+ if (job.getWorkingDirectory() == null) {
|
|
|
+ job.setWorkingDirectory(jtFs.getWorkingDirectory());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|