|
@@ -19,19 +19,13 @@
|
|
|
package org.apache.hadoop.mapreduce.v2.util;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
-import java.net.URI;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.fs.FileSystem;
|
|
|
-import org.apache.hadoop.fs.FilterFileSystem;
|
|
|
-import org.apache.hadoop.fs.Path;
|
|
|
-import org.apache.hadoop.mapred.InvalidJobConfException;
|
|
|
import org.apache.hadoop.mapreduce.Job;
|
|
|
import org.apache.hadoop.mapreduce.JobID;
|
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
|
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
|
@@ -39,12 +33,9 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
|
|
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
-import org.apache.hadoop.yarn.api.records.LocalResource;
|
|
|
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
|
|
|
|
|
import org.junit.Test;
|
|
|
import static org.junit.Assert.*;
|
|
|
-import static org.mockito.Mockito.*;
|
|
|
|
|
|
public class TestMRApps {
|
|
|
|
|
@@ -177,122 +168,5 @@ public class TestMRApps {
|
|
|
assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!",
|
|
|
env_str.indexOf("$PWD:job.jar"), 0);
|
|
|
}
|
|
|
-
|
|
|
- @Test
|
|
|
- public void testSetupDistributedCacheEmpty() throws IOException {
|
|
|
- Configuration conf = new Configuration();
|
|
|
- Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
|
|
|
- MRApps.setupDistributedCache(conf, localResources);
|
|
|
- assertTrue("Empty Config did not produce an empty list of resources",
|
|
|
- localResources.isEmpty());
|
|
|
- }
|
|
|
-
|
|
|
- @SuppressWarnings("deprecation")
|
|
|
- @Test(expected = InvalidJobConfException.class)
|
|
|
- public void testSetupDistributedCacheConflicts() throws Exception {
|
|
|
- Configuration conf = new Configuration();
|
|
|
- conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
|
|
-
|
|
|
- URI mockUri = URI.create("mockfs://mock/");
|
|
|
- FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
|
|
|
- .getRawFileSystem();
|
|
|
-
|
|
|
- URI archive = new URI("mockfs://mock/tmp/something.zip#something");
|
|
|
- Path archivePath = new Path(archive);
|
|
|
- URI file = new URI("mockfs://mock/tmp/something.txt#something");
|
|
|
- Path filePath = new Path(file);
|
|
|
-
|
|
|
- when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
|
|
|
- when(mockFs.resolvePath(filePath)).thenReturn(filePath);
|
|
|
-
|
|
|
- DistributedCache.addCacheArchive(archive, conf);
|
|
|
- conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
|
|
|
- conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
|
|
|
- conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
|
|
|
- DistributedCache.addCacheFile(file, conf);
|
|
|
- conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
|
|
|
- conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
|
|
|
- conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
|
|
|
- Map<String, LocalResource> localResources =
|
|
|
- new HashMap<String, LocalResource>();
|
|
|
- MRApps.setupDistributedCache(conf, localResources);
|
|
|
- }
|
|
|
-
|
|
|
- @SuppressWarnings("deprecation")
|
|
|
- @Test(expected = InvalidJobConfException.class)
|
|
|
- public void testSetupDistributedCacheConflictsFiles() throws Exception {
|
|
|
- Configuration conf = new Configuration();
|
|
|
- conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
|
|
-
|
|
|
- URI mockUri = URI.create("mockfs://mock/");
|
|
|
- FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
|
|
|
- .getRawFileSystem();
|
|
|
-
|
|
|
- URI file = new URI("mockfs://mock/tmp/something.zip#something");
|
|
|
- Path filePath = new Path(file);
|
|
|
- URI file2 = new URI("mockfs://mock/tmp/something.txt#something");
|
|
|
- Path file2Path = new Path(file);
|
|
|
-
|
|
|
- when(mockFs.resolvePath(filePath)).thenReturn(filePath);
|
|
|
- when(mockFs.resolvePath(file2Path)).thenReturn(file2Path);
|
|
|
-
|
|
|
- DistributedCache.addCacheFile(file, conf);
|
|
|
- DistributedCache.addCacheFile(file2, conf);
|
|
|
- conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11");
|
|
|
- conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11");
|
|
|
- conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true");
|
|
|
- Map<String, LocalResource> localResources =
|
|
|
- new HashMap<String, LocalResource>();
|
|
|
- MRApps.setupDistributedCache(conf, localResources);
|
|
|
- }
|
|
|
-
|
|
|
- @SuppressWarnings("deprecation")
|
|
|
- @Test
|
|
|
- public void testSetupDistributedCache() throws Exception {
|
|
|
- Configuration conf = new Configuration();
|
|
|
- conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
|
|
-
|
|
|
- URI mockUri = URI.create("mockfs://mock/");
|
|
|
- FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
|
|
|
- .getRawFileSystem();
|
|
|
-
|
|
|
- URI archive = new URI("mockfs://mock/tmp/something.zip");
|
|
|
- Path archivePath = new Path(archive);
|
|
|
- URI file = new URI("mockfs://mock/tmp/something.txt#something");
|
|
|
- Path filePath = new Path(file);
|
|
|
-
|
|
|
- when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
|
|
|
- when(mockFs.resolvePath(filePath)).thenReturn(filePath);
|
|
|
-
|
|
|
- DistributedCache.addCacheArchive(archive, conf);
|
|
|
- conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
|
|
|
- conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
|
|
|
- conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
|
|
|
- DistributedCache.addCacheFile(file, conf);
|
|
|
- conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
|
|
|
- conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
|
|
|
- conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
|
|
|
- Map<String, LocalResource> localResources =
|
|
|
- new HashMap<String, LocalResource>();
|
|
|
- MRApps.setupDistributedCache(conf, localResources);
|
|
|
- assertEquals(2, localResources.size());
|
|
|
- LocalResource lr = localResources.get("something.zip");
|
|
|
- assertNotNull(lr);
|
|
|
- assertEquals(10l, lr.getSize());
|
|
|
- assertEquals(10l, lr.getTimestamp());
|
|
|
- assertEquals(LocalResourceType.ARCHIVE, lr.getType());
|
|
|
- lr = localResources.get("something");
|
|
|
- assertNotNull(lr);
|
|
|
- assertEquals(11l, lr.getSize());
|
|
|
- assertEquals(11l, lr.getTimestamp());
|
|
|
- assertEquals(LocalResourceType.FILE, lr.getType());
|
|
|
- }
|
|
|
-
|
|
|
- static class MockFileSystem extends FilterFileSystem {
|
|
|
- MockFileSystem() {
|
|
|
- super(mock(FileSystem.class));
|
|
|
- }
|
|
|
- public void initialize(URI name, Configuration conf) throws IOException {}
|
|
|
- }
|
|
|
-
|
|
|
+
|
|
|
}
|