|
@@ -37,8 +37,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
-import org.apache.hadoop.fs.FileContext;
|
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
import org.apache.hadoop.mapred.MapReduceChildJVM;
|
|
import org.apache.hadoop.mapred.MapReduceChildJVM;
|
|
@@ -475,7 +475,7 @@ public abstract class TaskAttemptImpl implements
|
|
* Create a {@link LocalResource} record with all the given parameters.
|
|
* Create a {@link LocalResource} record with all the given parameters.
|
|
* TODO: This should pave way for Builder pattern.
|
|
* TODO: This should pave way for Builder pattern.
|
|
*/
|
|
*/
|
|
- private static LocalResource createLocalResource(FileContext fc,
|
|
|
|
|
|
+ private static LocalResource createLocalResource(FileSystem fc,
|
|
RecordFactory recordFactory, Path file, LocalResourceType type,
|
|
RecordFactory recordFactory, Path file, LocalResourceType type,
|
|
LocalResourceVisibility visibility) throws IOException {
|
|
LocalResourceVisibility visibility) throws IOException {
|
|
FileStatus fstat = fc.getFileStatus(file);
|
|
FileStatus fstat = fc.getFileStatus(file);
|
|
@@ -516,13 +516,13 @@ public abstract class TaskAttemptImpl implements
|
|
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
|
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
|
|
|
|
|
try {
|
|
try {
|
|
- FileContext remoteFS = FileContext.getFileContext(conf);
|
|
|
|
|
|
+ FileSystem remoteFS = FileSystem.get(conf);
|
|
|
|
|
|
// //////////// Set up JobJar to be localized properly on the remote NM.
|
|
// //////////// Set up JobJar to be localized properly on the remote NM.
|
|
if (conf.get(MRJobConfig.JAR) != null) {
|
|
if (conf.get(MRJobConfig.JAR) != null) {
|
|
- Path remoteJobJar = remoteFS.getDefaultFileSystem().resolvePath(
|
|
|
|
- remoteFS.makeQualified(new Path(remoteTask.getConf().get(
|
|
|
|
- MRJobConfig.JAR))));
|
|
|
|
|
|
+ Path remoteJobJar = (new Path(remoteTask.getConf().get(
|
|
|
|
+ MRJobConfig.JAR))).makeQualified(remoteFS.getUri(),
|
|
|
|
+ remoteFS.getWorkingDirectory());
|
|
container.setLocalResource(
|
|
container.setLocalResource(
|
|
MRConstants.JOB_JAR,
|
|
MRConstants.JOB_JAR,
|
|
createLocalResource(remoteFS, recordFactory, remoteJobJar,
|
|
createLocalResource(remoteFS, recordFactory, remoteJobJar,
|
|
@@ -543,8 +543,8 @@ public abstract class TaskAttemptImpl implements
|
|
.getCurrentUser().getShortUserName());
|
|
.getCurrentUser().getShortUserName());
|
|
Path remoteJobSubmitDir =
|
|
Path remoteJobSubmitDir =
|
|
new Path(path, oldJobId.toString());
|
|
new Path(path, oldJobId.toString());
|
|
- Path remoteJobConfPath = remoteFS.getDefaultFileSystem().resolvePath(
|
|
|
|
- new Path(remoteJobSubmitDir, MRConstants.JOB_CONF_FILE));
|
|
|
|
|
|
+ Path remoteJobConfPath =
|
|
|
|
+ new Path(remoteJobSubmitDir, MRConstants.JOB_CONF_FILE);
|
|
container.setLocalResource(
|
|
container.setLocalResource(
|
|
MRConstants.JOB_CONF_FILE,
|
|
MRConstants.JOB_CONF_FILE,
|
|
createLocalResource(remoteFS, recordFactory, remoteJobConfPath,
|
|
createLocalResource(remoteFS, recordFactory, remoteJobConfPath,
|
|
@@ -628,7 +628,7 @@ public abstract class TaskAttemptImpl implements
|
|
return container;
|
|
return container;
|
|
}
|
|
}
|
|
|
|
|
|
- private void setupDistributedCache(FileContext remoteFS, Configuration conf,
|
|
|
|
|
|
+ private void setupDistributedCache(FileSystem remoteFS, Configuration conf,
|
|
ContainerLaunchContext container) throws IOException {
|
|
ContainerLaunchContext container) throws IOException {
|
|
|
|
|
|
// Cache archives
|
|
// Cache archives
|
|
@@ -652,7 +652,7 @@ public abstract class TaskAttemptImpl implements
|
|
// Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[],
|
|
// Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[],
|
|
// long[], boolean[], Path[], FileType)
|
|
// long[], boolean[], Path[], FileType)
|
|
private void parseDistributedCacheArtifacts(
|
|
private void parseDistributedCacheArtifacts(
|
|
- FileContext remoteFS, ContainerLaunchContext container, LocalResourceType type,
|
|
|
|
|
|
+ FileSystem remoteFS, ContainerLaunchContext container, LocalResourceType type,
|
|
URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
|
|
URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
|
|
Path[] pathsToPutOnClasspath) throws IOException {
|
|
Path[] pathsToPutOnClasspath) throws IOException {
|
|
|
|
|
|
@@ -671,17 +671,14 @@ public abstract class TaskAttemptImpl implements
|
|
Map<String, Path> classPaths = new HashMap<String, Path>();
|
|
Map<String, Path> classPaths = new HashMap<String, Path>();
|
|
if (pathsToPutOnClasspath != null) {
|
|
if (pathsToPutOnClasspath != null) {
|
|
for (Path p : pathsToPutOnClasspath) {
|
|
for (Path p : pathsToPutOnClasspath) {
|
|
- p = p.makeQualified(remoteFS.getDefaultFileSystem()
|
|
|
|
- .getUri(), remoteFS.getWorkingDirectory());
|
|
|
|
|
|
+ p = p.makeQualified(remoteFS.getUri(),remoteFS.getWorkingDirectory());
|
|
classPaths.put(p.toUri().getPath().toString(), p);
|
|
classPaths.put(p.toUri().getPath().toString(), p);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
for (int i = 0; i < uris.length; ++i) {
|
|
for (int i = 0; i < uris.length; ++i) {
|
|
URI u = uris[i];
|
|
URI u = uris[i];
|
|
Path p = new Path(u);
|
|
Path p = new Path(u);
|
|
- p = remoteFS.getDefaultFileSystem().resolvePath(
|
|
|
|
- p.makeQualified(remoteFS.getDefaultFileSystem().getUri(),
|
|
|
|
- remoteFS.getWorkingDirectory()));
|
|
|
|
|
|
+ p = p.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory());
|
|
// Add URI fragment or just the filename
|
|
// Add URI fragment or just the filename
|
|
Path name = new Path((null == u.getFragment())
|
|
Path name = new Path((null == u.getFragment())
|
|
? p.getName()
|
|
? p.getName()
|