浏览代码

HADOOP-862. Add support for the S3 FileSystem to the CopyFiles tool. Contributed by Michael Stack.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@502722 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年之前
父节点
当前提交
c5b398342f
共有 3 个文件被更改,包括 148 次插入149 次删除
  1. 3 0
      CHANGES.txt
  2. 141 145
      src/java/org/apache/hadoop/util/CopyFiles.java
  3. 4 4
      src/test/org/apache/hadoop/fs/TestCopyFiles.java

+ 3 - 0
CHANGES.txt

@@ -137,6 +137,9 @@ Trunk (unreleased changes)
 
 42. HADOOP-969.  Fix a deadlock in JobTracker.  (omalley via cutting)
 
+43. HADOOP-862.  Add support for the S3 FileSystem to the CopyFiles
+    tool.  (Michael Stack via cutting)
+
 
 Release 0.10.1 - 2007-01-10
 

+ 141 - 145
src/java/org/apache/hadoop/util/CopyFiles.java

@@ -20,14 +20,14 @@ package org.apache.hadoop.util;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedReader;
+import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
-import java.io.File;
 import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
 import java.net.URI;
-import java.net.URL;
 import java.net.URISyntaxException;
+import java.net.URL;
 import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -48,9 +48,8 @@ import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
@@ -58,16 +57,16 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 
 /**
  * A Map-reduce program to recursively copy directories between
- * diffferent file-systems.
+ * different file-systems.
  *
  * @author Milind Bhandarkar
  */
 public class CopyFiles extends ToolBase {
+  private static final String HDFS = "hdfs";
+  private static final String S3 = "s3";
   
   private static final String usage = "distcp "+
-  "[-fs <namenode:port | local> ] [-jt <jobtracker:port | local>] " +
-  "[-conf <config-file.xml>] " + "[-D <property=value>] "+
-  "[-i] <srcurl> | -f <urilist_uri> <desturl>";
+    "[-i] <srcurl> | -f <urilist_uri> <desturl>";
   
   private static final long MIN_BYTES_PER_MAP = 1L << 28;
   private static final int MAX_NUM_MAPS = 10000;
@@ -114,17 +113,6 @@ public class CopyFiles extends ToolBase {
     public abstract void cleanup(Configuration conf, JobConf jobConf, 
         String srcPath, String destPath) throws IOException;
     
-    public static String getFileSysName(URI url) {
-      String fsname = url.getScheme();
-      if ("dfs".equals(fsname)) {
-        String host = url.getHost();
-        int port = url.getPort();
-        return (port==(-1)) ? host : (host+":"+port);
-      } else {
-        return "local";
-      }
-    }
-    
     /**
      * Make a path relative with respect to a root path.
      * absPath is always assumed to descend from root.
@@ -153,14 +141,48 @@ public class CopyFiles extends ToolBase {
       }
       return new Path(sb.toString());
     }
-    
+    /**
+     * Calculate how many maps to run.
+     * Ideal number of maps is one per file (if the map-launching overhead
+     * were 0). It is limited by jobtrackers handling capacity which, lets say,
+     * is MAX_NUM_MAPS. It is also limited by MAX_MAPS_PER_NODE. Also for 
+     * small files it is better to determine number of maps by 
+     * amount of data per map.
+     * @param initialEstimate Initial guess at number of maps (e.g. count of
+     * files).
+     * @param totalBytes Count of total bytes for job (If not known, pass -1).
+     * @param client
+     * @return Count of maps to run.
+     * @throws IOException
+     */
+    public int getMapCount(final int initialEstimate, final long totalBytes,
+        final JobClient client)
+    throws IOException {
+        int numMaps = initialEstimate;
+        if (numMaps > MAX_NUM_MAPS) {
+            numMaps = MAX_NUM_MAPS;
+        }
+        if (totalBytes != -1 &&
+            numMaps > (int)(totalBytes / MIN_BYTES_PER_MAP)) {
+          numMaps = (int) (totalBytes / MIN_BYTES_PER_MAP);
+        }
+        ClusterStatus cluster = client.getClusterStatus();
+        int tmpMaps = cluster.getTaskTrackers() * MAX_MAPS_PER_NODE;
+        if (numMaps > tmpMaps) {
+            numMaps = tmpMaps;
+        }
+        if (numMaps == 0) {
+            numMaps = 1;
+        }
+        return numMaps;
+    } 
   }
   
   /**
    * DFSCopyFilesMapper: The mapper for copying files from the DFS.
    * @author Milind Bhandarkar
    */
-  public static class DFSCopyFilesMapper extends CopyFilesMapper 
+  public static class FSCopyFilesMapper extends CopyFilesMapper 
   implements Mapper 
   {
     private int sizeBuf = 4096;
@@ -232,21 +254,12 @@ public class CopyFiles extends ToolBase {
         boolean ignoreReadFailures) 
     throws IOException
     {
-      URI srcURI = null;
-      URI destURI = null;
-      try {
-        srcURI = new URI(srcPaths[0]);
-        destURI = new URI(destPath);
-      } catch (URISyntaxException ex) {
-        throw new RuntimeException("URL syntax error.", ex);
-      }
-      
-      String srcFileSysName = getFileSysName(srcURI);
-      String destFileSysName = getFileSysName(destURI);
-      jobConf.set("copy.src.fs", srcFileSysName);
-      jobConf.set("copy.dest.fs", destFileSysName);
+      URI srcURI = toURI(srcPaths[0]);
+      URI destURI = toURI(destPath);
       
-      FileSystem srcfs = FileSystem.getNamed(srcFileSysName, conf);
+      FileSystem srcfs = FileSystem.get(srcURI, conf);
+      jobConf.set("copy.src.fs", srcURI.toString());
+      jobConf.set("copy.dest.fs", destURI.toString());
       
       String srcPath = srcURI.getPath();
       if ("".equals(srcPath)) { srcPath = "/"; }
@@ -278,8 +291,7 @@ public class CopyFiles extends ToolBase {
       jobConf.setOutputValueClass(Text.class);
       jobConf.setOutputFormat(SequenceFileOutputFormat.class);
       
-      jobConf.setMapperClass(DFSCopyFilesMapper.class);
-      jobConf.setReducerClass(CopyFilesReducer.class);
+      jobConf.setMapperClass(FSCopyFilesMapper.class);
       
       jobConf.setNumReduceTasks(1);
       jobConf.setBoolean(readFailuresAttribute, ignoreReadFailures);
@@ -324,24 +336,19 @@ public class CopyFiles extends ToolBase {
       // is MAX_NUM_MAPS. It is also limited by MAX_MAPS_PER_NODE. Also for 
       // small files it is better to determine number of maps by amount of 
       // data per map.
-      int nFiles = finalPathList.size();
-      int numMaps = nFiles;
-      if (numMaps > MAX_NUM_MAPS) { numMaps = MAX_NUM_MAPS; }
-      if (numMaps > (int) (totalBytes / MIN_BYTES_PER_MAP)) {
-        numMaps = (int) (totalBytes / MIN_BYTES_PER_MAP);
+      int numMaps = finalPathList.size();
+      if (numMaps > MAX_NUM_MAPS) {
+        numMaps = MAX_NUM_MAPS;
       }
+
       JobClient client = new JobClient(jobConf);
-      ClusterStatus cluster = client.getClusterStatus();
-      int tmpMaps = cluster.getTaskTrackers() * MAX_MAPS_PER_NODE;
-      if (numMaps > tmpMaps) { numMaps = tmpMaps; }
-      if (numMaps == 0) { numMaps = 1; }
-      jobConf.setNumMapTasks(numMaps);
+      jobConf.setNumMapTasks(getMapCount(numMaps, totalBytes, client));
       
       for(int idx=0; idx < numMaps; ++idx) {
         Path file = new Path(inDir, "part"+idx);
         SequenceFile.Writer writer = 
           SequenceFile.createWriter(fileSys,conf,file,Text.class,Text.class);
-        for (int ipath = idx; ipath < nFiles; ipath += numMaps) {
+        for (int ipath = idx; ipath < finalPathList.size(); ipath += numMaps) {
           String path = (String) finalPathList.get(ipath);
           writer.append(new Text(path), new Text(""));
         }
@@ -376,8 +383,10 @@ public class CopyFiles extends ToolBase {
       srcPath = new Path(job.get("copy.src.path", "/"));
       destPath = new Path(job.get("copy.dest.path", "/"));
       try {
-        srcFileSys = FileSystem.getNamed(srcfs, job);
-        destFileSys = FileSystem.getNamed(destfs, job);
+        srcFileSys = FileSystem.get(new URI(srcfs), job);
+        destFileSys = FileSystem.get(new URI(destfs), job);
+      } catch (URISyntaxException e) {
+          throw new RuntimeException("Failed parse of src or dest URI.", e);
       } catch (IOException ex) {
         throw new RuntimeException("Unable to get the named file system.", ex);
       }
@@ -443,14 +452,8 @@ public class CopyFiles extends ToolBase {
     throws IOException
     {
       //Destination
-      URI destURI = null;
-      try {
-        destURI = new URI(destPath);
-      } catch (URISyntaxException ue) {
-        throw new IOException("Illegal destination path!");
-      }
-      String destFileSysName = getFileSysName(destURI);
-      jobConf.set("copy.dest.fs", destFileSysName);
+      URI destURI = toURI(destPath);
+      jobConf.set("copy.dest.fs", destURI.toString());
       destPath = destURI.getPath();
       jobConf.set("copy.dest.path", destPath);
       
@@ -464,22 +467,9 @@ public class CopyFiles extends ToolBase {
       jobConf.setOutputFormat(SequenceFileOutputFormat.class);
       
       jobConf.setMapperClass(HTTPCopyFilesMapper.class);
-      jobConf.setReducerClass(CopyFilesReducer.class);
       
-      // ideal number of maps is one per file (if the map-launching overhead
-      // were 0. It is limited by jobtrackers handling capacity, which lets say
-      // is MAX_NUM_MAPS. It is also limited by MAX_MAPS_PER_NODE. Also for 
-      // small files it is better to determine number of maps by 
-      // amount of data per map.
-      int nFiles = srcPaths.length;
-      int numMaps = nFiles;
-      if (numMaps > MAX_NUM_MAPS) { numMaps = MAX_NUM_MAPS; }
       JobClient client = new JobClient(jobConf);
-      ClusterStatus cluster = client.getClusterStatus();
-      int tmpMaps = cluster.getTaskTrackers() * MAX_MAPS_PER_NODE;
-      if (numMaps > tmpMaps) { numMaps = tmpMaps; }
-      if (numMaps == 0) { numMaps = 1; }
-      jobConf.setNumMapTasks(numMaps);
+      jobConf.setNumMapTasks(getMapCount(srcPaths.length, -1, client));
       
       jobConf.setBoolean(readFailuresAttribute, ignoreReadFailures);
       
@@ -599,13 +589,20 @@ public class CopyFiles extends ToolBase {
    */
   private static class CopyMapperFactory
   {
-    public static CopyFilesMapper getMapper(String protocol)
+    public static CopyFilesMapper getMapper(Configuration conf, String protocol)
+    throws IOException
     {
       CopyFilesMapper mapper = null;
+      if (protocol == null) {
+          // Use 'default' filesystem.
+          protocol = FileSystem.get(conf).getUri().getScheme();
+      }
+      protocol = protocol.toLowerCase();
       
-      if("dfs".equals(protocol) || "file".equals(protocol)) {
-        mapper = new DFSCopyFilesMapper();
-      } else if("http".equals(protocol)) {
+      if(HDFS.equalsIgnoreCase(protocol) || "file".equalsIgnoreCase(protocol) ||
+          S3.equalsIgnoreCase(protocol)) {
+        mapper = new FSCopyFilesMapper();
+      } else if("http".equalsIgnoreCase(protocol)) {
         mapper = new HTTPCopyFilesMapper();
       }
       
@@ -613,31 +610,23 @@ public class CopyFiles extends ToolBase {
     }
   }
   
-  public static class CopyFilesReducer extends MapReduceBase implements Reducer {
-    public void reduce(WritableComparable key,
-        Iterator values,
-        OutputCollector output,
-        Reporter reporter) throws IOException {
-      // nothing
-    }
-  }
-  
   private static String[] fetchSrcURIs(Configuration conf, URI srcListURI) throws IOException
   {
-    ArrayList uris = new ArrayList();
+    ArrayList<String> uris = new ArrayList<String>();
     BufferedReader fis = null;
     
     String srcListURIScheme = srcListURI.getScheme();
     String srcListURIPath = srcListURI.getPath();
     
-    if("file".equals(srcListURIScheme)) {
+    if("file".equalsIgnoreCase(srcListURIScheme)) {
       fis = new BufferedReader(new FileReader(srcListURIPath));
-    } else if("dfs".equals(srcListURIScheme)) {
-      FileSystem fs = FileSystem.getNamed(CopyFilesMapper.getFileSysName(srcListURI), conf);
+    } else if (srcListURIScheme != null &&
+          HDFS.equalsIgnoreCase(srcListURIScheme)) {
+      FileSystem fs = FileSystem.get(srcListURI, conf);
       fis = new BufferedReader(
           new InputStreamReader(new FSDataInputStream(fs, new Path(srcListURIPath), conf))
           );
-    } else if("http".equals(srcListURIScheme)) {
+    } else if("http".equalsIgnoreCase(srcListURIScheme)) {
       //Copy the file 
       URL url = srcListURI.toURL();
       HttpURLConnection connection = (HttpURLConnection)url.openConnection();
@@ -648,54 +637,67 @@ public class CopyFiles extends ToolBase {
           new InputStreamReader(connection.getInputStream())
           );
     } else {
-      throw new IOException("Unsupported source list uri!");
+      throw new IOException("Unsupported source list uri: " + srcListURIScheme);
     }
 
-    String uri = null;
-    while((uri = fis.readLine()) != null) {
-      if(!uri.startsWith("#")) {
-        uris.add(uri);
+    try {
+      String uri = null;
+      while((uri = fis.readLine()) != null) {
+        if(!uri.startsWith("#")) {
+          // Check source is parseable as URI by passing via getPathURI.
+          toURI(uri);
+          uris.add(uri);
+        }
+      }
+    } catch (Exception e) {
+      if (fis != null) {
+        fis.close();
       }
     }
-    fis.close();
 
-    if(!uris.isEmpty()) {
-      return (String[])uris.toArray(new String[0]);
-    }
-    
-    return null;
+    return !uris.isEmpty()? uris.toArray(new String[0]): null;
   }
   
   /**
    * Helper function to parse input file and return source urls for 
    * a given protocol.
-   * @param protocol : The protocol for which to find source urls.
-   * @param inputFilePath : The file containing the urls.
+   * @param protocol The protocol for which to find source urls.
+   * @param inputFilePath The file containing the urls.
    * @return
    */
   private static String[] parseInputFile(String protocol, String[] uris)
   throws IOException
   {
-    ArrayList protocolURIs = new ArrayList();
+    ArrayList<String> protocolURIs = new ArrayList<String>(uris.length);
     
     for(int i=0; i < uris.length; ++i) {
-      if(uris[i].startsWith(protocol)) {
+      // uri must start w/ protocol or if protocol is dfs, allow hdfs as alias.
+      if(uris[i].startsWith(protocol) || 
+          (protocol.equalsIgnoreCase("dfs") && uris[i].startsWith("hdfs"))) {
         protocolURIs.add(uris[i]);
       }
     }
     
-    if(!protocolURIs.isEmpty()) {
-      return (String[])protocolURIs.toArray(new String[0]);
-    }
-    
-    return null;
+    return !protocolURIs.isEmpty()? protocolURIs.toArray(new String[0]): null;
+  }
+  
+  public static URI toURI(final String u) throws IOException {
+      URI result = null;
+      try {
+          result = new URI(u);
+      } catch (URISyntaxException ex) {
+        throw new IOException("Path does not parse as URI: " + u);
+      }
+      return result;
   }
   
   /**
    * Driver to copy srcPath to destPath depending on required protocol.
-   * @param conf : Configuration
-   * @param srcPath : Source path
-   * @param destPath : Destination path
+   * @param conf Configuration
+   * @param srcPath Source path URL
+   * @param destPath Destination path URL
+   * @param srcAsList List of source URLs to copy.
+   * @param ignoreReadFailures True if we are to ignore read failures.
    */
   public static void copy(Configuration conf, String srcPath, String destPath,
       boolean srcAsList, boolean ignoreReadFailures) 
@@ -706,32 +708,18 @@ public class CopyFiles extends ToolBase {
     jobConf.setJobName("distcp");
     
     //Sanity check for srcPath/destPath
-    URI srcURI = null;
-    try {
-        srcURI = new URI(srcPath);
-    } catch (URISyntaxException ex) {
-      throw new IOException("Illegal source path!");
-    }
-    
-    URI destURI = null;
-    try {
-      destURI = new URI(destPath);
-    } catch (URISyntaxException ex) {
-      throw new IOException("Illegal destination path!");
-    }
+    URI srcURI = toURI(srcPath);
+    toURI(destPath);
   
-    //Source paths
-    String[] srcPaths = null;
-    
-    if(srcAsList) {
-      srcPaths = fetchSrcURIs(conf, srcURI);
-    }
-    
     //Create the task-specific mapper 
     CopyFilesMapper mapper = null;
+    String[] srcPaths = null;
     if(srcAsList) {
       //Ugly?!
       
+      //Source paths
+      srcPaths = fetchSrcURIs(conf, srcURI);  
+      
       // Protocol - 'dfs://'
       String[] dfsUrls = parseInputFile("dfs", srcPaths);
       if(dfsUrls != null) {
@@ -752,17 +740,27 @@ public class CopyFiles extends ToolBase {
       String[] httpUrls = parseInputFile("http", srcPaths);
       if(httpUrls != null) {
         srcPaths = httpUrls;
-        mapper = CopyMapperFactory.getMapper("http");
-      } else {
-        //Done
+        mapper = CopyMapperFactory.getMapper(conf, "http");
+      } else {   
+        return;
+      }
+      
+      // Protocol - 's3://'
+      String[] s3Urls = parseInputFile(S3, srcPaths);
+      if(httpUrls != null) {
+          srcPaths = s3Urls;
+        mapper = CopyMapperFactory.getMapper(conf, S3);
+      } else {   
         return;
       }
       
+      // TODO: Add support for URIs w/o scheme (In this case, use the 'default'
+      // filesystem).
+      
     } else {
       //Single source - ugly!
-      String[] tmpSrcPath = {srcPath};
-      srcPaths = tmpSrcPath;
-      mapper = CopyMapperFactory.getMapper(srcURI.getScheme());
+      srcPaths = new String [] {srcPath};
+      mapper = CopyMapperFactory.getMapper(conf, srcURI.getScheme());
     }
     
     //Initialize the mapper
@@ -774,16 +772,15 @@ public class CopyFiles extends ToolBase {
     } finally {
       mapper.cleanup(conf, jobConf, srcPath, destPath);
     }
-    
   }
   
   /**
    * This is the main driver for recursively copying directories
    * across file systems. It takes at least two cmdline parameters. A source
    * URL and a destination URL. It then essentially does an "ls -lR" on the
-   * source URL, and writes the output in aa round-robin manner to all the map
-   * input files. The mapper actually copies the files allotted to it. And
-   * the reduce is empty.
+   * source URL, and writes the output in a round-robin manner to all the map
+   * input files. The mapper actually copies the files allotted to it. The
+   * reduce is empty.
    */
   public int run(String[] args) throws Exception {
     String srcPath = null;
@@ -828,5 +825,4 @@ public class CopyFiles extends ToolBase {
         args);
     System.exit(res);
   }
-  
-}
+}

+ 4 - 4
src/test/org/apache/hadoop/fs/TestCopyFiles.java

@@ -183,8 +183,8 @@ public class TestCopyFiles extends TestCase {
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles(namenode, "/srcdat");
-        new CopyFiles().doMain(conf, new String[] {"dfs://"+namenode+"/srcdat",
-        "dfs://"+namenode+"/destdat"});
+        new CopyFiles().doMain(conf, new String[] {"hdfs://"+namenode+"/srcdat",
+        "hdfs://"+namenode+"/destdat"});
         assertTrue("Source and destination directories do not match.",
             checkFiles(namenode, "/destdat", files));
         deldir(namenode, "/destdat");
@@ -206,7 +206,7 @@ public class TestCopyFiles extends TestCase {
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
         new CopyFiles().doMain(conf, new String[] {"file://"+TEST_ROOT_DIR+"/srcdat",
-        "dfs://"+namenode+"/destdat"});
+        "hdfs://"+namenode+"/destdat"});
         assertTrue("Source and destination directories do not match.",
             checkFiles(namenode, "/destdat", files));
         deldir(namenode, "/destdat");
@@ -227,7 +227,7 @@ public class TestCopyFiles extends TestCase {
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles(namenode, "/srcdat");
-        new CopyFiles().doMain(conf, new String[] {"dfs://"+namenode+"/srcdat",
+        new CopyFiles().doMain(conf, new String[] {"hdfs://"+namenode+"/srcdat",
         "file://"+TEST_ROOT_DIR+"/destdat"});
         assertTrue("Source and destination directories do not match.",
             checkFiles("local", TEST_ROOT_DIR+"/destdat", files));