ソースを参照

HADOOP-1032. Permit one to specify jars that will be cached across multiple jobs. Contributed by Gautam Kowshik.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@513924 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年 前
コミット
0df4e542ac

+ 3 - 0
CHANGES.txt

@@ -182,6 +182,9 @@ Trunk (unreleased changes)
 55. HADOOP-1041.  Optimize mapred counter implementation.  Also group
     counters by their declaring Enum.  (David Bowen via cutting)
 
+56. HADOOP-1032.  Permit one to specify jars that will be cached
+    across multiple jobs.  (Gautam Kowshik via cutting)
+
 
 Release 0.11.2 - 2007-02-16
 

+ 74 - 1
src/java/org/apache/hadoop/filecache/DistributedCache.java

@@ -521,7 +521,80 @@ public class DistributedCache {
     conf.set("mapred.cache.files", files == null ? uri.toString() : files + ","
         + uri.toString());
   }
-  
+
+	/**
+	 * Add an file path to the current set of classpath entries It adds the file
+	 * to cache as well.
+	 * 
+	 * @param file Path of the file to be added
+	 * @param conf Configuration that contains the classpath setting
+	 */
+	public static void addFileToClassPath(Path file, Configuration conf)
+			throws IOException {
+		String classpath = conf.get("mapred.job.classpath.files");
+		conf.set("mapred.job.classpath.files", classpath == null ? file.toString()
+				: classpath + System.getProperty("path.separator") + file.toString());
+		FileSystem fs = FileSystem.get(conf);
+		URI uri = fs.makeQualified(file).toUri();
+
+		addCacheFile(uri, conf);
+	}
+
+	/**
+	 * Get the file entries in classpath as an array of Path
+	 * 
+	 * @param conf Configuration that contains the classpath setting
+	 */
+	public static Path[] getFileClassPaths(Configuration conf) {
+		String classpath = conf.get("mapred.job.classpath.files");
+		if (classpath == null)
+			return null;
+		ArrayList list = Collections.list(new StringTokenizer(classpath, System
+				.getProperty("path.separator")));
+		Path[] paths = new Path[list.size()];
+		for (int i = 0; i < list.size(); i++) {
+			paths[i] = new Path((String) list.get(i));
+		}
+		return paths;
+	}
+
+	/**
+	 * Add an archive path to the current set of classpath entries. It adds the
+	 * archive to cache as well.
+	 * 
+	 * @param archive Path of the archive to be added
+	 * @param conf Configuration that contains the classpath setting
+	 */
+	public static void addArchiveToClassPath(Path archive, Configuration conf)
+			throws IOException {
+		String classpath = conf.get("mapred.job.classpath.archives");
+		conf.set("mapred.job.classpath.archives", classpath == null ? archive
+				.toString() : classpath + System.getProperty("path.separator")
+				+ archive.toString());
+		FileSystem fs = FileSystem.get(conf);
+		URI uri = fs.makeQualified(archive).toUri();
+
+		addCacheArchive(uri, conf);
+	}
+
+	/**
+	 * Get the archive entries in classpath as an array of Path
+	 * 
+	 * @param conf Configuration that contains the classpath setting
+	 */
+	public static Path[] getArchiveClassPaths(Configuration conf) {
+		String classpath = conf.get("mapred.job.classpath.archives");
+		if (classpath == null)
+			return null;
+		ArrayList list = Collections.list(new StringTokenizer(classpath, System
+				.getProperty("path.separator")));
+		Path[] paths = new Path[list.size()];
+		for (int i = 0; i < list.size(); i++) {
+			paths[i] = new Path((String) list.get(i));
+		}
+		return paths;
+	}
+
   /**
    * This method allows you to create symlinks in the current working directory
    * of the task to all the cache files/archives

+ 39 - 0
src/java/org/apache/hadoop/mapred/TaskRunner.java

@@ -171,6 +171,45 @@ abstract class TaskRunner extends Thread {
         classPath.append(jobCacheDir);
        
       }
+
+  		// include the user specified classpath
+  		
+  		//archive paths
+  		Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
+  		if (archiveClasspaths != null && archives != null) {
+  			Path[] localArchives = DistributedCache
+  					.getLocalCacheArchives(conf);
+  			if (localArchives != null){
+  				for (int i=0;i<archives.length;i++){
+  					for(int j=0;j<archiveClasspaths.length;j++){
+  						if(archives[i].getPath().equals(
+  								archiveClasspaths[j].toString())){
+  							classPath.append(sep);
+  							classPath.append(localArchives[i]
+  									.toString());
+  						}
+  					}
+  				}
+  			}
+  		}
+  		//file paths
+  		Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf);
+  		if(fileClasspaths!=null && files != null) {
+  			Path[] localFiles = DistributedCache
+  					.getLocalCacheFiles(conf);
+  			if (localFiles != null) {
+  				for (int i = 0; i < files.length; i++) {
+  					for (int j = 0; j < fileClasspaths.length; j++) {
+  						if (files[i].getPath().equals(
+  								fileClasspaths[j].toString())) {
+  							classPath.append(sep);
+  							classPath.append(localFiles[i].toString());
+  						}
+  					}
+  				}
+  			}
+  		}
+
       classPath.append(sep);
       classPath.append(workDir);
       //  Build exec child jmv args.