|
@@ -19,24 +19,32 @@
|
|
|
package org.apache.hadoop.mapreduce.v2.util;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.net.URI;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.FilterFileSystem;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.mapred.InvalidJobConfException;
|
|
|
import org.apache.hadoop.mapreduce.Job;
|
|
|
import org.apache.hadoop.mapreduce.JobID;
|
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
|
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
|
|
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
|
|
-import org.apache.hadoop.yarn.YarnException;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
+import org.apache.hadoop.yarn.api.records.LocalResource;
|
|
|
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
|
|
|
|
|
import org.junit.Test;
|
|
|
import static org.junit.Assert.*;
|
|
|
+import static org.mockito.Mockito.*;
|
|
|
|
|
|
public class TestMRApps {
|
|
|
|
|
@@ -168,5 +176,122 @@ public class TestMRApps {
|
|
|
assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!",
|
|
|
env_str.indexOf("$PWD:job.jar"), 0);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testSetupDistributedCacheEmpty() throws IOException {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
|
|
|
+ MRApps.setupDistributedCache(conf, localResources);
|
|
|
+ assertTrue("Empty Config did not produce an empty list of resources",
|
|
|
+ localResources.isEmpty());
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("deprecation")
|
|
|
+ @Test(expected = InvalidJobConfException.class)
|
|
|
+ public void testSetupDistributedCacheConflicts() throws Exception {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
|
|
+
|
|
|
+ URI mockUri = URI.create("mockfs://mock/");
|
|
|
+ FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
|
|
|
+ .getRawFileSystem();
|
|
|
+
|
|
|
+ URI archive = new URI("mockfs://mock/tmp/something.zip#something");
|
|
|
+ Path archivePath = new Path(archive);
|
|
|
+ URI file = new URI("mockfs://mock/tmp/something.txt#something");
|
|
|
+ Path filePath = new Path(file);
|
|
|
+
|
|
|
+ when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
|
|
|
+ when(mockFs.resolvePath(filePath)).thenReturn(filePath);
|
|
|
+
|
|
|
+ DistributedCache.addCacheArchive(archive, conf);
|
|
|
+ conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
|
|
|
+ conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
|
|
|
+ conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
|
|
|
+ DistributedCache.addCacheFile(file, conf);
|
|
|
+ conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
|
|
|
+ conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
|
|
|
+ conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
|
|
|
+ Map<String, LocalResource> localResources =
|
|
|
+ new HashMap<String, LocalResource>();
|
|
|
+ MRApps.setupDistributedCache(conf, localResources);
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("deprecation")
|
|
|
+ @Test(expected = InvalidJobConfException.class)
|
|
|
+ public void testSetupDistributedCacheConflictsFiles() throws Exception {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
|
|
+
|
|
|
+ URI mockUri = URI.create("mockfs://mock/");
|
|
|
+ FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
|
|
|
+ .getRawFileSystem();
|
|
|
+
|
|
|
+ URI file = new URI("mockfs://mock/tmp/something.zip#something");
|
|
|
+ Path filePath = new Path(file);
|
|
|
+ URI file2 = new URI("mockfs://mock/tmp/something.txt#something");
|
|
|
+ Path file2Path = new Path(file);
|
|
|
+
|
|
|
+ when(mockFs.resolvePath(filePath)).thenReturn(filePath);
|
|
|
+ when(mockFs.resolvePath(file2Path)).thenReturn(file2Path);
|
|
|
+
|
|
|
+ DistributedCache.addCacheFile(file, conf);
|
|
|
+ DistributedCache.addCacheFile(file2, conf);
|
|
|
+ conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11");
|
|
|
+ conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11");
|
|
|
+ conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true");
|
|
|
+ Map<String, LocalResource> localResources =
|
|
|
+ new HashMap<String, LocalResource>();
|
|
|
+ MRApps.setupDistributedCache(conf, localResources);
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("deprecation")
|
|
|
+ @Test
|
|
|
+ public void testSetupDistributedCache() throws Exception {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
|
|
+
|
|
|
+ URI mockUri = URI.create("mockfs://mock/");
|
|
|
+ FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
|
|
|
+ .getRawFileSystem();
|
|
|
+
|
|
|
+ URI archive = new URI("mockfs://mock/tmp/something.zip");
|
|
|
+ Path archivePath = new Path(archive);
|
|
|
+ URI file = new URI("mockfs://mock/tmp/something.txt#something");
|
|
|
+ Path filePath = new Path(file);
|
|
|
+
|
|
|
+ when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
|
|
|
+ when(mockFs.resolvePath(filePath)).thenReturn(filePath);
|
|
|
+
|
|
|
+ DistributedCache.addCacheArchive(archive, conf);
|
|
|
+ conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
|
|
|
+ conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
|
|
|
+ conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
|
|
|
+ DistributedCache.addCacheFile(file, conf);
|
|
|
+ conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
|
|
|
+ conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
|
|
|
+ conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
|
|
|
+ Map<String, LocalResource> localResources =
|
|
|
+ new HashMap<String, LocalResource>();
|
|
|
+ MRApps.setupDistributedCache(conf, localResources);
|
|
|
+ assertEquals(2, localResources.size());
|
|
|
+ LocalResource lr = localResources.get("something.zip");
|
|
|
+ assertNotNull(lr);
|
|
|
+ assertEquals(10l, lr.getSize());
|
|
|
+ assertEquals(10l, lr.getTimestamp());
|
|
|
+ assertEquals(LocalResourceType.ARCHIVE, lr.getType());
|
|
|
+ lr = localResources.get("something");
|
|
|
+ assertNotNull(lr);
|
|
|
+ assertEquals(11l, lr.getSize());
|
|
|
+ assertEquals(11l, lr.getTimestamp());
|
|
|
+ assertEquals(LocalResourceType.FILE, lr.getType());
|
|
|
+ }
|
|
|
+
|
|
|
+ static class MockFileSystem extends FilterFileSystem {
|
|
|
+ MockFileSystem() {
|
|
|
+ super(mock(FileSystem.class));
|
|
|
+ }
|
|
|
+ public void initialize(URI name, Configuration conf) throws IOException {}
|
|
|
+ }
|
|
|
+
|
|
|
}
|