|
@@ -28,11 +28,14 @@ import java.io.InputStream;
|
|
|
import java.io.InputStreamReader;
|
|
|
import java.io.OutputStream;
|
|
|
import java.io.OutputStreamWriter;
|
|
|
+import java.net.InetAddress;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.SocketTimeoutException;
|
|
|
import java.net.URI;
|
|
|
+import java.net.URISyntaxException;
|
|
|
import java.net.URL;
|
|
|
import java.net.URLConnection;
|
|
|
+import java.net.UnknownHostException;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Comparator;
|
|
|
import java.util.HashMap;
|
|
@@ -48,6 +51,7 @@ import org.apache.hadoop.conf.Configured;
|
|
|
import org.apache.hadoop.filecache.DistributedCache;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.FileUtil;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.io.BytesWritable;
|
|
@@ -155,7 +159,7 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobClient");
|
|
|
public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
|
|
|
private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
|
|
|
-
|
|
|
+ private static Configuration commandLineConfig;
|
|
|
static long MAX_JOBPROFILE_AGE = 1000 * 2;
|
|
|
|
|
|
/**
|
|
@@ -341,7 +345,24 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
setConf(conf);
|
|
|
init(conf);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ /**
|
|
|
+ * set the command line config in the jobclient. these are
|
|
|
+ * parameters paassed from the command line and stored in
|
|
|
+ * conf
|
|
|
+ * @param conf the configuration object to set.
|
|
|
+ */
|
|
|
+ static void setCommandLineConfig(Configuration conf) {
|
|
|
+ commandLineConfig = conf;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * return the command line configuration
|
|
|
+ */
|
|
|
+ public static Configuration getCommandLineConfig() {
|
|
|
+ return commandLineConfig;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Connect to the default {@link JobTracker}.
|
|
|
* @param conf the job configuration.
|
|
@@ -417,50 +438,87 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
}
|
|
|
return fs;
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * Submit a job to the MR system.
|
|
|
- *
|
|
|
- * This returns a handle to the {@link RunningJob} which can be used to track
|
|
|
- * the running-job.
|
|
|
- *
|
|
|
- * @param jobFile the job configuration.
|
|
|
- * @return a handle to the {@link RunningJob} which can be used to track the
|
|
|
- * running-job.
|
|
|
- * @throws FileNotFoundException
|
|
|
- * @throws InvalidJobConfException
|
|
|
- * @throws IOException
|
|
|
+
|
|
|
+ /* see if two file systems are the same or not
|
|
|
+ *
|
|
|
*/
|
|
|
- public RunningJob submitJob(String jobFile) throws FileNotFoundException,
|
|
|
- InvalidJobConfException,
|
|
|
- IOException {
|
|
|
- // Load in the submitted job details
|
|
|
- JobConf job = new JobConf(jobFile);
|
|
|
- return submitJob(job);
|
|
|
+ private boolean compareFs(FileSystem srcFs, FileSystem destFs) {
|
|
|
+ URI srcUri = srcFs.getUri();
|
|
|
+ URI dstUri = destFs.getUri();
|
|
|
+ if (srcUri.getScheme() == null) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ if (!srcUri.getScheme().equals(dstUri.getScheme())) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ String srcHost = srcUri.getHost();
|
|
|
+ String dstHost = dstUri.getHost();
|
|
|
+ if ((srcHost != null) && (dstHost != null)) {
|
|
|
+ try {
|
|
|
+ srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
|
|
|
+ dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
|
|
|
+ } catch(UnknownHostException ue) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ if (!srcHost.equals(dstHost)) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else if (srcHost == null && dstHost != null) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ else if (srcHost != null && dstHost == null) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ //check for ports
|
|
|
+ if (srcUri.getPort() != dstUri.getPort()) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
}
|
|
|
-
|
|
|
- // job files are world-wide readable and owner writable
|
|
|
- final private static FsPermission JOB_FILE_PERMISSION =
|
|
|
- FsPermission.createImmutable((short) 0644); // rw-r--r--
|
|
|
|
|
|
- // system directories are world-wide readable and owner readable
|
|
|
- final static FsPermission SYSTEM_DIR_PERMISSION =
|
|
|
- FsPermission.createImmutable((short) 0733); // rwx-wx-wx
|
|
|
-
|
|
|
+ // copies a file to the jobtracker filesystem and returns the path where it
|
|
|
+ // was copied to
|
|
|
+ private Path copyRemoteFiles(FileSystem jtFs, Path parentDir, Path originalPath,
|
|
|
+ JobConf job, short replication) throws IOException {
|
|
|
+ //check if we do not need to copy the files
|
|
|
+ // is jt using the same file system.
|
|
|
+ // just checking for uri strings... doing no dns lookups
|
|
|
+ // to see if the filesystems are the same. This is not optimal.
|
|
|
+ // but avoids name resolution.
|
|
|
+
|
|
|
+ FileSystem remoteFs = null;
|
|
|
+ remoteFs = originalPath.getFileSystem(job);
|
|
|
+ if (compareFs(remoteFs, jtFs)) {
|
|
|
+ return originalPath;
|
|
|
+ }
|
|
|
+ // this might have name collisions. copy will throw an exception
|
|
|
+ //parse the original path to create new path
|
|
|
+ Path newPath = new Path(parentDir, originalPath.getName());
|
|
|
+ FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, job);
|
|
|
+ jtFs.setReplication(newPath, replication);
|
|
|
+ return newPath;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
- * Submit a job to the MR system.
|
|
|
- * This returns a handle to the {@link RunningJob} which can be used to track
|
|
|
- * the running-job.
|
|
|
- *
|
|
|
- * @param job the job configuration.
|
|
|
- * @return a handle to the {@link RunningJob} which can be used to track the
|
|
|
- * running-job.
|
|
|
- * @throws FileNotFoundException
|
|
|
- * @throws InvalidJobConfException
|
|
|
+ * configure the jobconf of the user with the command line options of
|
|
|
+ * -libjars, -files, -archives
|
|
|
+ * @param conf
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public RunningJob submitJob(JobConf job) throws FileNotFoundException,
|
|
|
- InvalidJobConfException, IOException {
|
|
|
+ private void configureCommandLineOptions(JobConf job, Path submitJobDir, Path submitJarFile)
|
|
|
+ throws IOException {
|
|
|
+ // get all the command line arguments into the
|
|
|
+ // jobconf passed in by the user conf
|
|
|
+ Configuration commandConf = JobClient.getCommandLineConfig();
|
|
|
+ String files = null;
|
|
|
+ String libjars = null;
|
|
|
+ String archives = null;
|
|
|
+ if (commandConf != null) {
|
|
|
+ files = commandConf.get("tmpfiles");
|
|
|
+ libjars = commandConf.get("tmpjars");
|
|
|
+ archives = commandConf.get("tmparchives");
|
|
|
+ }
|
|
|
/*
|
|
|
* set this user's id in job configuration, so later job files can be
|
|
|
* accessed using this user's id
|
|
@@ -482,17 +540,66 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
//
|
|
|
|
|
|
// Create a number of filenames in the JobTracker's fs namespace
|
|
|
- String jobId = jobSubmitClient.getNewJobId();
|
|
|
- Path submitJobDir = new Path(job.getSystemDir(), jobId);
|
|
|
FileSystem fs = getFs();
|
|
|
LOG.debug("default FileSystem: " + fs.getUri());
|
|
|
- fs.delete(submitJobDir, true);
|
|
|
- FileSystem.mkdirs(fs, submitJobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
|
|
|
- Path submitJobFile = new Path(submitJobDir, "job.xml");
|
|
|
- Path submitJarFile = new Path(submitJobDir, "job.jar");
|
|
|
- Path submitSplitFile = new Path(submitJobDir, "job.split");
|
|
|
-
|
|
|
- // set the timestamps of the archives and files
|
|
|
+ fs.delete(submitJobDir, true);
|
|
|
+ submitJobDir = fs.makeQualified(submitJobDir);
|
|
|
+ submitJobDir = new Path(submitJobDir.toUri().getPath());
|
|
|
+ FsPermission mapredSysPerms = new FsPermission(SYSTEM_DIR_PERMISSION);
|
|
|
+ FileSystem.mkdirs(fs, submitJobDir, mapredSysPerms);
|
|
|
+ Path filesDir = new Path(submitJobDir, "files");
|
|
|
+ Path archivesDir = new Path(submitJobDir, "archives");
|
|
|
+ Path libjarsDir = new Path(submitJobDir, "libjars");
|
|
|
+ short replication = (short)job.getInt("mapred.submit.replication", 10);
|
|
|
+ // add all the command line files/ jars and archive
|
|
|
+ // first copy them to jobtrackers filesystem
|
|
|
+
|
|
|
+ if (files != null) {
|
|
|
+ FileSystem.mkdirs(fs, filesDir, mapredSysPerms);
|
|
|
+ String[] fileArr = files.split(",");
|
|
|
+ for (String tmpFile: fileArr) {
|
|
|
+ Path tmp = new Path(tmpFile);
|
|
|
+ Path newPath = copyRemoteFiles(fs,filesDir, tmp, job, replication);
|
|
|
+ try {
|
|
|
+ URI pathURI = new URI(newPath.toUri().toString() + "#" + newPath.getName());
|
|
|
+ DistributedCache.addCacheFile(pathURI, job);
|
|
|
+ } catch(URISyntaxException ue) {
|
|
|
+ //should not throw a uri exception
|
|
|
+ throw new IOException("Failed to create uri for " + tmpFile);
|
|
|
+ }
|
|
|
+ DistributedCache.createSymlink(job);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (libjars != null) {
|
|
|
+ FileSystem.mkdirs(fs, libjarsDir, mapredSysPerms);
|
|
|
+ String[] libjarsArr = libjars.split(",");
|
|
|
+ for (String tmpjars: libjarsArr) {
|
|
|
+ Path tmp = new Path(tmpjars);
|
|
|
+ Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication);
|
|
|
+ DistributedCache.addArchiveToClassPath(newPath, job);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ if (archives != null) {
|
|
|
+ FileSystem.mkdirs(fs, archivesDir, mapredSysPerms);
|
|
|
+ String[] archivesArr = archives.split(",");
|
|
|
+ for (String tmpArchives: archivesArr) {
|
|
|
+ Path tmp = new Path(tmpArchives);
|
|
|
+ Path newPath = copyRemoteFiles(fs, archivesDir, tmp, job, replication);
|
|
|
+ try {
|
|
|
+ URI pathURI = new URI(newPath.toUri().toString() + "#" + newPath.getName());
|
|
|
+ DistributedCache.addCacheArchive(pathURI, job);
|
|
|
+ } catch(URISyntaxException ue) {
|
|
|
+ //should not throw an uri excpetion
|
|
|
+ throw new IOException("Failed to create uri for " + tmpArchives);
|
|
|
+ }
|
|
|
+ DistributedCache.createSymlink(job);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // set the timestamps of the archives and files
|
|
|
URI[] tarchives = DistributedCache.getCacheArchives(job);
|
|
|
if (tarchives != null) {
|
|
|
StringBuffer archiveTimestamps =
|
|
@@ -516,7 +623,6 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
}
|
|
|
|
|
|
String originalJarPath = job.getJar();
|
|
|
- short replication = (short)job.getInt("mapred.submit.replication", 10);
|
|
|
|
|
|
if (originalJarPath != null) { // copy jar to JobTracker's fs
|
|
|
// use jar name if job is not named.
|
|
@@ -538,6 +644,63 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
job.setWorkingDirectory(fs.getWorkingDirectory());
|
|
|
}
|
|
|
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Submit a job to the MR system.
|
|
|
+ *
|
|
|
+ * This returns a handle to the {@link RunningJob} which can be used to track
|
|
|
+ * the running-job.
|
|
|
+ *
|
|
|
+ * @param jobFile the job configuration.
|
|
|
+ * @return a handle to the {@link RunningJob} which can be used to track the
|
|
|
+ * running-job.
|
|
|
+ * @throws FileNotFoundException
|
|
|
+ * @throws InvalidJobConfException
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public RunningJob submitJob(String jobFile) throws FileNotFoundException,
|
|
|
+ InvalidJobConfException,
|
|
|
+ IOException {
|
|
|
+ // Load in the submitted job details
|
|
|
+ JobConf job = new JobConf(jobFile);
|
|
|
+ return submitJob(job);
|
|
|
+ }
|
|
|
+
|
|
|
+ // job files are world-wide readable and owner writable
|
|
|
+ final private static FsPermission JOB_FILE_PERMISSION =
|
|
|
+ FsPermission.createImmutable((short) 0644); // rw-r--r--
|
|
|
+
|
|
|
+ // system directories are world-wide readable and owner readable
|
|
|
+ final static FsPermission SYSTEM_DIR_PERMISSION =
|
|
|
+ FsPermission.createImmutable((short) 0733); // rwx-wx-wx
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Submit a job to the MR system.
|
|
|
+ * This returns a handle to the {@link RunningJob} which can be used to track
|
|
|
+ * the running-job.
|
|
|
+ *
|
|
|
+ * @param job the job configuration.
|
|
|
+ * @return a handle to the {@link RunningJob} which can be used to track the
|
|
|
+ * running-job.
|
|
|
+ * @throws FileNotFoundException
|
|
|
+ * @throws InvalidJobConfException
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public RunningJob submitJob(JobConf job) throws FileNotFoundException,
|
|
|
+ InvalidJobConfException, IOException {
|
|
|
+ /*
|
|
|
+ * configure the command line options correctly on the submitting dfs
|
|
|
+ */
|
|
|
+
|
|
|
+ String jobId = jobSubmitClient.getNewJobId();
|
|
|
+ Path submitJobDir = new Path(job.getSystemDir(), jobId);
|
|
|
+ Path submitJarFile = new Path(submitJobDir, "job.jar");
|
|
|
+ Path submitSplitFile = new Path(submitJobDir, "job.split");
|
|
|
+ configureCommandLineOptions(job, submitJobDir, submitJarFile);
|
|
|
+ Path submitJobFile = new Path(submitJobDir, "job.xml");
|
|
|
+
|
|
|
+
|
|
|
// Check the input specification
|
|
|
job.getInputFormat().validateInput(job);
|
|
|
|