|
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
import org.apache.hadoop.fs.FileContext;
|
|
import org.apache.hadoop.fs.FileContext;
|
|
|
|
+import org.apache.hadoop.fs.LocalFileSystem;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.mapred.LocalContainerLauncher;
|
|
import org.apache.hadoop.mapred.LocalContainerLauncher;
|
|
import org.apache.hadoop.mapred.ShuffleHandler;
|
|
import org.apache.hadoop.mapred.ShuffleHandler;
|
|
@@ -87,6 +88,21 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
|
|
try {
|
|
try {
|
|
Path stagingPath = FileContext.getFileContext(conf).makeQualified(
|
|
Path stagingPath = FileContext.getFileContext(conf).makeQualified(
|
|
new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
|
|
new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
|
|
|
|
+ /*
|
|
|
|
+ * Re-configure the staging path on Windows if the file system is localFs.
|
|
|
|
+ * We need to use a absolute path that contains the drive letter. The unit
|
|
|
|
+ * test could run on a different drive than the AM. We can run into the
|
|
|
|
+ * issue that job files are localized to the drive where the test runs on,
|
|
|
|
+ * while the AM starts on a different drive and fails to find the job
|
|
|
|
+ * metafiles. Using absolute path can avoid this ambiguity.
|
|
|
|
+ */
|
|
|
|
+ if (Path.WINDOWS) {
|
|
|
|
+ if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
|
|
|
|
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR,
|
|
|
|
+ new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
|
|
|
|
+ .getAbsolutePath());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
|
|
FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
|
|
if (fc.util().exists(stagingPath)) {
|
|
if (fc.util().exists(stagingPath)) {
|
|
LOG.info(stagingPath + " exists! deleting...");
|
|
LOG.info(stagingPath + " exists! deleting...");
|