|
@@ -23,15 +23,11 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
|
|
import java.io.IOException;
|
|
|
import java.io.OutputStream;
|
|
|
import java.net.InetSocketAddress;
|
|
|
-import java.net.URI;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.List;
|
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FSError;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.LocalDirAllocator;
|
|
@@ -43,7 +39,6 @@ import org.apache.hadoop.mapreduce.MRConfig;
|
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
|
import org.apache.hadoop.mapreduce.TaskType;
|
|
|
import org.apache.hadoop.mapreduce.counters.Limits;
|
|
|
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
|
|
import org.apache.hadoop.mapreduce.security.TokenCache;
|
|
|
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
|
|
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
|
@@ -307,7 +302,7 @@ class YarnChild {
|
|
|
task.localizeConfiguration(job);
|
|
|
|
|
|
// Set up the DistributedCache related configs
|
|
|
- setupDistributedCacheConfig(job);
|
|
|
+ MRApps.setupDistributedCacheLocal(job);
|
|
|
|
|
|
// Overwrite the localized task jobconf which is linked to in the current
|
|
|
// work-dir.
|
|
@@ -317,62 +312,6 @@ class YarnChild {
|
|
|
task.setConf(job);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Set up the DistributedCache related configs to make
|
|
|
- * {@link DistributedCache#getLocalCacheFiles(Configuration)}
|
|
|
- * and
|
|
|
- * {@link DistributedCache#getLocalCacheArchives(Configuration)}
|
|
|
- * working.
|
|
|
- * @param job
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private static void setupDistributedCacheConfig(final JobConf job)
|
|
|
- throws IOException {
|
|
|
-
|
|
|
- String localWorkDir = System.getenv("PWD");
|
|
|
- // ^ ^ all symlinks are created in the current work-dir
|
|
|
-
|
|
|
- // Update the configuration object with localized archives.
|
|
|
- URI[] cacheArchives = DistributedCache.getCacheArchives(job);
|
|
|
- if (cacheArchives != null) {
|
|
|
- List<String> localArchives = new ArrayList<String>();
|
|
|
- for (int i = 0; i < cacheArchives.length; ++i) {
|
|
|
- URI u = cacheArchives[i];
|
|
|
- Path p = new Path(u);
|
|
|
- Path name =
|
|
|
- new Path((null == u.getFragment()) ? p.getName()
|
|
|
- : u.getFragment());
|
|
|
- String linkName = name.toUri().getPath();
|
|
|
- localArchives.add(new Path(localWorkDir, linkName).toUri().getPath());
|
|
|
- }
|
|
|
- if (!localArchives.isEmpty()) {
|
|
|
- job.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
|
|
|
- .arrayToString(localArchives.toArray(new String[localArchives
|
|
|
- .size()])));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Update the configuration object with localized files.
|
|
|
- URI[] cacheFiles = DistributedCache.getCacheFiles(job);
|
|
|
- if (cacheFiles != null) {
|
|
|
- List<String> localFiles = new ArrayList<String>();
|
|
|
- for (int i = 0; i < cacheFiles.length; ++i) {
|
|
|
- URI u = cacheFiles[i];
|
|
|
- Path p = new Path(u);
|
|
|
- Path name =
|
|
|
- new Path((null == u.getFragment()) ? p.getName()
|
|
|
- : u.getFragment());
|
|
|
- String linkName = name.toUri().getPath();
|
|
|
- localFiles.add(new Path(localWorkDir, linkName).toUri().getPath());
|
|
|
- }
|
|
|
- if (!localFiles.isEmpty()) {
|
|
|
- job.set(MRJobConfig.CACHE_LOCALFILES,
|
|
|
- StringUtils.arrayToString(localFiles
|
|
|
- .toArray(new String[localFiles.size()])));
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
private static final FsPermission urw_gr =
|
|
|
FsPermission.createImmutable((short) 0640);
|
|
|
|