Explorar o código

Better fix for HADOOP-189. Tested in both distributed and standalone mode. Note that WritableFactories is not quite functioning correctly and there are a few hacks around this that should be removed when that's better understood.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@399500 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting %!s(int64=19) %!d(string=hai) anos
pai
achega
e568e49ac7

+ 5 - 0
CHANGES.txt

@@ -167,6 +167,11 @@ Trunk (unreleased)
     'bin/hadoop jar build/hadoop-streaming.jar' to get details.
     (Michel Tourn via cutting)
 
+44. HADOOP-189.  Fix MapReduce in standalone configuration to
+    correctly handle job jar files that contain a lib directory with
+    nested jar files.  (cutting)
+
+
 Release 0.1.1 - 2006-04-08
 
  1. Added CHANGES.txt, logging all significant changes to Hadoop.  (cutting)

+ 1 - 8
bin/hadoop

@@ -135,14 +135,7 @@ elif [ "$COMMAND" = "tasktracker" ] ; then
 elif [ "$COMMAND" = "job" ] ; then
   CLASS=org.apache.hadoop.mapred.JobClient
 elif [ "$COMMAND" = "jar" ] ; then
-  JAR="$1"
-  shift
-  CLASS=`"$0" org.apache.hadoop.util.PrintJarMainClass "$JAR"`
-  if [ $? != 0 ]; then
-    echo "Error: Could not find main class in jar file $JAR"
-    exit 1
-  fi
-  CLASSPATH=${CLASSPATH}:${JAR}
+  CLASS=org.apache.hadoop.util.RunJar
 else
   CLASS=$COMMAND
 fi

+ 1 - 1
src/java/org/apache/hadoop/conf/Configuration.java

@@ -241,7 +241,7 @@ public class Configuration {
     if (valueString == null)
       return defaultValue;
     try {
-      return Class.forName(valueString);
+      return classLoader.loadClass(valueString);
     } catch (ClassNotFoundException e) {
       throw new RuntimeException(e);
     }

+ 3 - 0
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -49,6 +49,9 @@ class DFSClient implements FSConstants {
     Daemon leaseChecker;
     private Configuration conf;
     
+    // required for unknown reason to make WritableFactories work distributed
+    static { new DFSFileInfo(); }
+
     /**
      * A map from name -> DFSOutputStream of files that are currently being
      * written by this client.

+ 3 - 0
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -65,6 +65,9 @@ public class DataNode implements FSConstants, Runnable {
     //private static long numGigs = Configuration.get().getLong("dfs.datanode.maxgigs", 100);
     //
 
+    // required for unknown reason to make WritableFactories work distributed
+    static { new BlockCommand(); }
+
     /**
      * Util method to build socket addr from string
      */

+ 6 - 3
src/java/org/apache/hadoop/io/ObjectWritable.java

@@ -88,7 +88,8 @@ public class ObjectWritable implements Writable, Configurable {
       declaredClass = (Class)PRIMITIVE_NAMES.get(className);
       if (declaredClass == null) {
         try {
-          declaredClass = Class.forName(className);
+          declaredClass =
+            Thread.currentThread().getContextClassLoader().loadClass(className);
         } catch (ClassNotFoundException e) {
           throw new RuntimeException(e.toString());
         }
@@ -170,7 +171,8 @@ public class ObjectWritable implements Writable, Configurable {
     Class declaredClass = (Class)PRIMITIVE_NAMES.get(className);
     if (declaredClass == null) {
       try {
-        declaredClass = Class.forName(className);
+        declaredClass =
+          Thread.currentThread().getContextClassLoader().loadClass(className);
       } catch (ClassNotFoundException e) {
         throw new RuntimeException(e.toString());
       }
@@ -215,7 +217,8 @@ public class ObjectWritable implements Writable, Configurable {
     } else {                                      // Writable
       Class instanceClass = null;
       try {
-        instanceClass = Class.forName(UTF8.readString(in));
+        instanceClass = Thread.currentThread().getContextClassLoader()
+          .loadClass(UTF8.readString(in));
       } catch (ClassNotFoundException e) {
         throw new RuntimeException(e.toString());
       }

+ 1 - 1
src/java/org/apache/hadoop/io/WritableName.java

@@ -62,7 +62,7 @@ public class WritableName {
     if (writableClass != null)
       return writableClass;
     try {
-      return Class.forName(name);
+      return Thread.currentThread().getContextClassLoader().loadClass(name);
     } catch (ClassNotFoundException e) {
       throw new IOException(e.toString());
     }

+ 3 - 0
src/java/org/apache/hadoop/mapred/JobClient.java

@@ -38,6 +38,9 @@ public class JobClient implements MRConstants {
 
     static long MAX_JOBPROFILE_AGE = 1000 * 2;
 
+    // required for unknown reason to make WritableFactories work distributed
+    static { new JobStatus(); new JobProfile(); }
+
     /**
      * A NetworkedJob is an implementation of RunningJob.  It holds
      * a JobProfile object to provide some info, and interacts with the

+ 4 - 0
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -49,6 +49,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
       idFormat.setMinimumIntegerDigits(4);
       idFormat.setGroupingUsed(false);
     }
+
+    // required for unknown reason to make WritableFactories work distributed
+    static { new TaskTrackerStatus(); }
+
     private int nextJobId = 1;
 
     public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.mapred.JobTracker");

+ 2 - 32
src/java/org/apache/hadoop/mapred/TaskRunner.java

@@ -18,6 +18,7 @@ package org.apache.hadoop.mapred;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.LogFormatter;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.util.*;
 
 import java.io.*;
 import java.util.jar.*;
@@ -78,7 +79,7 @@ abstract class TaskRunner extends Thread {
 
       String jar = conf.getJar();
       if (jar != null) {                      // if jar exists, it into workDir
-        unJar(new File(jar), workDir);
+        RunJar.unJar(new File(jar), workDir);
         File[] libs = new File(workDir, "lib").listFiles();
         if (libs != null) {
           for (int i = 0; i < libs.length; i++) {
@@ -222,37 +223,6 @@ abstract class TaskRunner extends Thread {
     return text;
   }
 
-  private void unJar(File jarFile, File toDir) throws IOException {
-    JarFile jar = new JarFile(jarFile);
-    try {
-      Enumeration entries = jar.entries();
-      while (entries.hasMoreElements()) {
-        JarEntry entry = (JarEntry)entries.nextElement();
-        if (!entry.isDirectory()) {
-          InputStream in = jar.getInputStream(entry);
-          try {
-            File file = new File(toDir, entry.getName());
-            file.getParentFile().mkdirs();
-            OutputStream out = new FileOutputStream(file);
-            try {
-              byte[] buffer = new byte[8192];
-              int i;
-              while ((i = in.read(buffer)) != -1) {
-                out.write(buffer, 0, i);
-              }
-            } finally {
-              out.close();
-            }
-          } finally {
-            in.close();
-          }
-        }
-      }
-    } finally {
-      jar.close();
-    }
-  }
-
   /**
    * Run the child process
    */

+ 3 - 0
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -38,6 +38,9 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
 
     static final int STALE_STATE = 1;
 
+    // required for unknown reason to make WritableFactories work distributed
+    static { new MapTask(); new ReduceTask(); new MapOutputLocation(); }
+
     public static final Logger LOG =
     LogFormatter.getLogger("org.apache.hadoop.mapred.TaskTracker");
 

+ 136 - 0
src/java/org/apache/hadoop/util/RunJar.java

@@ -0,0 +1,136 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.util.jar.*;
+import java.lang.reflect.*;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.io.*;
+import java.util.*;
+
+import org.apache.hadoop.fs.FileUtil;
+
+/** Run a Hadoop job jar. */
+public class RunJar {
+
+  /** Unpack a jar file into a directory. */
+  public static void unJar(File jarFile, File toDir) throws IOException {
+    JarFile jar = new JarFile(jarFile);
+    try {
+      Enumeration entries = jar.entries();
+      while (entries.hasMoreElements()) {
+        JarEntry entry = (JarEntry)entries.nextElement();
+        if (!entry.isDirectory()) {
+          InputStream in = jar.getInputStream(entry);
+          try {
+            File file = new File(toDir, entry.getName());
+            file.getParentFile().mkdirs();
+            OutputStream out = new FileOutputStream(file);
+            try {
+              byte[] buffer = new byte[8192];
+              int i;
+              while ((i = in.read(buffer)) != -1) {
+                out.write(buffer, 0, i);
+              }
+            } finally {
+              out.close();
+            }
+          } finally {
+            in.close();
+          }
+        }
+      }
+    } finally {
+      jar.close();
+    }
+  }
+
+  /** Run a Hadoop job jar.  If the main class is not in the jar's manifest,
+   * then it must be provided on the command line. */
+  public static void main(String[] args) throws Throwable {
+    String usage = "RunJar jarFile [mainClass] args...";
+
+    if (args.length < 1) {
+      System.err.println(usage);
+      System.exit(-1);
+    }
+
+    int firstArg = 0;
+    String fileName = args[firstArg++];
+    File file = new File(fileName);
+    String mainClassName = null;
+
+    JarFile jarFile = new JarFile(fileName);
+    Manifest manifest = jarFile.getManifest();
+    if (manifest != null) {
+      mainClassName = manifest.getMainAttributes().getValue("Main-Class");
+    }
+    jarFile.close();
+
+    if (mainClassName == null) {
+      if (args.length < 2) {
+        System.err.println(usage);
+        System.exit(-1);
+      }
+      mainClassName = args[firstArg++];
+    }
+    mainClassName = mainClassName.replaceAll("/", ".");
+
+    final File workDir = File.createTempFile("hadoop-unjar","");
+    workDir.delete();
+    workDir.mkdirs();
+
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+        public void run() {
+          try {
+            FileUtil.fullyDelete(workDir);
+          } catch (IOException e) {
+          }
+        }
+      });
+
+    unJar(file, workDir);
+    
+    ArrayList classPath = new ArrayList();
+    classPath.add(new File(workDir+"/").toURL());
+    classPath.add(file.toURL());
+    classPath.add(new File(workDir, "classes/").toURL());
+    File[] libs = new File(workDir, "lib").listFiles();
+    if (libs != null) {
+      for (int i = 0; i < libs.length; i++) {
+        classPath.add(libs[i].toURL());
+      }
+    }
+    ClassLoader loader =
+      new URLClassLoader((URL[])classPath.toArray(new URL[0]));
+
+    Thread.currentThread().setContextClassLoader(loader);
+    Class mainClass = loader.loadClass(mainClassName);
+    Method main = mainClass.getMethod("main", new Class[] {
+      Array.newInstance(String.class, 0).getClass()
+    });
+    String[] newArgs = (String[])Arrays.asList(args)
+      .subList(firstArg, args.length).toArray(new String[0]);
+    try {
+      main.invoke(null, new Object[] { newArgs });
+    } catch (InvocationTargetException e) {
+      throw e.getTargetException();
+    }
+  }
+  
+}