Ver Fonte

HADOOP-4176. Implement getFileChecksum(Path) in HftpFileSystem. (szetszwo)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@696950 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze há 16 anos atrás
pai
commit
e2de2b9540

+ 2 - 0
CHANGES.txt

@@ -178,6 +178,8 @@ Trunk (unreleased changes)
     HADOOP-3930. Add common interfaces for the pluggable schedulers and the
     cli & gui clients. (Sreekanth Ramakrishnan via omalley)
 
+    HADOOP-4176. Implement getFileChecksum(Path) in HftpFileSystem. (szetszwo)
+
   IMPROVEMENTS
 
     HADOOP-4106. libhdfs: add time, permission and user attribute support (part 2).

+ 34 - 0
src/core/org/apache/hadoop/fs/MD5MD5CRC32FileChecksum.java

@@ -23,6 +23,9 @@ import java.io.IOException;
 
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.WritableUtils;
+import org.xml.sax.Attributes;
+import org.xml.sax.SAXException;
+import org.znerd.xmlenc.XMLOutputter;
 
 /** MD5 of MD5 of CRC32. */
 public class MD5MD5CRC32FileChecksum extends FileChecksum {
@@ -72,6 +75,37 @@ public class MD5MD5CRC32FileChecksum extends FileChecksum {
     md5.write(out);    
   }
 
+  /** Write that object to xml output. */
+  public static void write(XMLOutputter xml, MD5MD5CRC32FileChecksum that
+      ) throws IOException {
+    xml.startTag(MD5MD5CRC32FileChecksum.class.getName());
+    if (that != null) {
+      xml.attribute("bytesPerCRC", "" + that.bytesPerCRC);
+      xml.attribute("crcPerBlock", "" + that.crcPerBlock);
+      xml.attribute("md5", "" + that.md5);
+    }
+    xml.endTag();
+  }
+
+  /** Return the object represented in the attributes. */
+  public static MD5MD5CRC32FileChecksum valueOf(Attributes attrs
+      ) throws SAXException {
+    final String bytesPerCRC = attrs.getValue("bytesPerCRC");
+    final String crcPerBlock = attrs.getValue("crcPerBlock");
+    final String md5 = attrs.getValue("md5");
+    if (bytesPerCRC == null || crcPerBlock == null || md5 == null) {
+      return null;
+    }
+
+    try {
+      return new MD5MD5CRC32FileChecksum(Integer.valueOf(bytesPerCRC),
+          Integer.valueOf(crcPerBlock), new MD5Hash(md5));
+    } catch(Exception e) {
+      throw new SAXException("Invalid attributes: bytesPerCRC=" + bytesPerCRC
+          + ", crcPerBlock=" + crcPerBlock + ", md5=" + md5, e);
+    }
+  }
+
   /** {@inheritDoc} */ 
   public String toString() {
     return getAlgorithmName() + ":" + md5;

+ 19 - 8
src/hdfs/org/apache/hadoop/hdfs/DFSClient.java

@@ -272,8 +272,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     return hints;
   }
 
-  private LocatedBlocks callGetBlockLocations(String src, long start, 
-      long length) throws IOException {
+  private static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
+      String src, long start, long length) throws IOException {
     try {
       return namenode.getBlockLocations(src, start, length);
     } catch(RemoteException re) {
@@ -296,7 +296,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
    */
   public BlockLocation[] getBlockLocations(String src, long start, 
     long length) throws IOException {
-    LocatedBlocks blocks = callGetBlockLocations(src, start, length);
+    LocatedBlocks blocks = callGetBlockLocations(namenode, src, start, length);
     if (blocks == null) {
       return new BlockLocation[0];
     }
@@ -583,13 +583,24 @@ public class DFSClient implements FSConstants, java.io.Closeable {
    * Get the checksum of a file.
    * @param src The file path
    * @return The checksum 
+   * @see DistributedFileSystem#getFileChecksum(Path)
    */
   MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
     checkOpen();
-    
+    return getFileChecksum(src, namenode, socketFactory, socketTimeout);    
+  }
+
+  /**
+   * Get the checksum of a file.
+   * @param src The file path
+   * @return The checksum 
+   */
+  public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
+      ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout
+      ) throws IOException {
     //get all block locations
     final List<LocatedBlock> locatedblocks
-        = callGetBlockLocations(src, 0, Long.MAX_VALUE).getLocatedBlocks();
+        = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE).getLocatedBlocks();
     final DataOutputBuffer md5out = new DataOutputBuffer();
     int bytesPerCRC = 0;
     long crcPerBlock = 0;
@@ -1369,7 +1380,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
      * Grab the open-file info from namenode
      */
     synchronized void openInfo() throws IOException {
-      LocatedBlocks newInfo = callGetBlockLocations(src, 0, prefetchSize);
+      LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
       if (newInfo == null) {
         throw new IOException("Cannot open filename " + src);
       }
@@ -1428,7 +1439,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
         targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
         // fetch more blocks
         LocatedBlocks newBlocks;
-        newBlocks = callGetBlockLocations(src, offset, prefetchSize);
+        newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
         assert (newBlocks != null) : "Could not find target position " + offset;
         locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
       }
@@ -1467,7 +1478,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
           blk = locatedBlocks.get(blockIdx);
         if (blk == null || curOff < blk.getStartOffset()) {
           LocatedBlocks newBlocks;
-          newBlocks = callGetBlockLocations(src, curOff, remaining);
+          newBlocks = callGetBlockLocations(namenode, src, curOff, remaining);
           locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());
           continue;
         }

+ 47 - 0
src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java

@@ -43,11 +43,13 @@ import org.xml.sax.helpers.XMLReaderFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.server.namenode.ListPathsServlet;
@@ -237,6 +239,51 @@ public class HftpFileSystem extends FileSystem {
     return lsparser.getFileStatus(f);
   }
 
+  private class ChecksumParser extends DefaultHandler {
+    private FileChecksum filechecksum;
+
+    /** {@inheritDoc} */
+    public void startElement(String ns, String localname, String qname,
+                Attributes attrs) throws SAXException {
+      if (!MD5MD5CRC32FileChecksum.class.getName().equals(qname)) {
+        if (RemoteException.class.getSimpleName().equals(qname)) {
+          throw new SAXException(RemoteException.valueOf(attrs));
+        }
+        throw new SAXException("Unrecognized entry: " + qname);
+      }
+
+      filechecksum = MD5MD5CRC32FileChecksum.valueOf(attrs);
+    }
+
+    private FileChecksum getFileChecksum(Path f) throws IOException {
+      final HttpURLConnection connection = openConnection(
+          "/fileChecksum" + f, "ugi=" + ugi);
+      try {
+        final XMLReader xr = XMLReaderFactory.createXMLReader();
+        xr.setContentHandler(this);
+
+        connection.setRequestMethod("GET");
+        connection.connect();
+
+        xr.parse(new InputSource(connection.getInputStream()));
+      } catch(SAXException e) {
+        final Exception embedded = e.getException();
+        if (embedded != null && embedded instanceof IOException) {
+          throw (IOException)embedded;
+        }
+        throw new IOException("invalid xml directory content", e);
+      } finally {
+        connection.disconnect();
+      }
+      return filechecksum;
+    }
+  }
+
+  /** {@inheritDoc} */
+  public FileChecksum getFileChecksum(Path f) throws IOException {
+    return new ChecksumParser().getFileChecksum(f);
+  }
+
   @Override
   public Path getWorkingDirectory() {
     return new Path("/").makeQualified(this);

+ 3 - 0
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
@@ -365,6 +366,8 @@ public class DataNode extends Configured
           sslConf.get("https.keystore.keypassword", ""));
     }
     this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
+    this.infoServer.addInternalServlet(null, "/getFileChecksum/*",
+        FileChecksumServlets.GetServlet.class);
     this.infoServer.setAttribute("datanode.blockScanner", blockScanner);
     this.infoServer.addServlet(null, "/blockScannerReport", 
                                DataBlockScanner.Servlet.class);

+ 29 - 0
src/hdfs/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java

@@ -18,16 +18,21 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -65,4 +70,28 @@ abstract class DfsServlet extends HttpServlet {
         UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
     return DFSClient.createNamenode(nn.getNameNodeAddress(), conf);
   }
+
+  /** Create a URI for redirecting request */
+  protected URI createRedirectUri(String servletpath, UserGroupInformation ugi,
+      DatanodeID host, HttpServletRequest request) throws URISyntaxException {
+    final String hostname = host instanceof DatanodeInfo?
+        ((DatanodeInfo)host).getHostName(): host.getHost();
+    final String scheme = request.getScheme();
+    final int port = "https".equals(scheme)?
+        (Integer)getServletContext().getAttribute("datanode.https.port")
+        : host.getInfoPort();
+    final String filename = request.getPathInfo();
+    return new URI(scheme, null, hostname, port, servletpath,
+        "filename=" + filename + "&ugi=" + ugi, null);
+  }
+
+  /** Get filename from the request */
+  protected String getFilename(HttpServletRequest request,
+      HttpServletResponse response) throws IOException {
+    final String filename = request.getParameter("filename");
+    if (filename == null || filename.length() == 0) {
+      throw new IOException("Invalid filename");
+    }
+    return filename;
+  }
 }

+ 2 - 0
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -372,6 +372,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
     this.infoServer.addInternalServlet("getimage", "/getimage", GetImageServlet.class);
     this.infoServer.addInternalServlet("listPaths", "/listPaths/*", ListPathsServlet.class);
     this.infoServer.addInternalServlet("data", "/data/*", FileDataServlet.class);
+    this.infoServer.addInternalServlet("checksum", "/fileChecksum/*",
+        FileChecksumServlets.RedirectServlet.class);
     this.infoServer.start();
 
     // The web-server port can be ephemeral... ensure we have the correct info

+ 102 - 0
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java

@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import javax.net.SocketFactory;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.znerd.xmlenc.XMLOutputter;
+
+/** Servlets for file checksum */
+public class FileChecksumServlets {
+  /** Redirect file checksum queries to an appropriate datanode. */
+  public static class RedirectServlet extends DfsServlet {
+    /** For java.io.Serializable */
+    private static final long serialVersionUID = 1L;
+  
+    /** {@inheritDoc} */
+    public void doGet(HttpServletRequest request, HttpServletResponse response
+        ) throws ServletException, IOException {
+      final UserGroupInformation ugi = getUGI(request);
+      final ServletContext context = getServletContext();
+      final NameNode namenode = (NameNode)context.getAttribute("name.node");
+      final DatanodeID datanode = namenode.namesystem.getRandomDatanode();
+      try {
+        final URI uri = createRedirectUri("/getFileChecksum", ugi, datanode, request); 
+        response.sendRedirect(uri.toURL().toString());
+      } catch(URISyntaxException e) {
+        throw new ServletException(e); 
+        //response.getWriter().println(e.toString());
+      } catch (IOException e) {
+        response.sendError(400, e.getMessage());
+      }
+    }
+  }
+  
+  /** Get FileChecksum */
+  public static class GetServlet extends DfsServlet {
+    /** For java.io.Serializable */
+    private static final long serialVersionUID = 1L;
+    
+    /** {@inheritDoc} */
+    public void doGet(HttpServletRequest request, HttpServletResponse response
+        ) throws ServletException, IOException {
+      final UnixUserGroupInformation ugi = getUGI(request);
+      final PrintWriter out = response.getWriter();
+      final String filename = getFilename(request, response);
+      final XMLOutputter xml = new XMLOutputter(out, "UTF-8");
+      xml.declaration();
+
+      final Configuration conf = new Configuration(DataNode.getDataNode().getConf());
+      final int socketTimeout = conf.getInt("dfs.socket.timeout", FSConstants.READ_TIMEOUT);
+      final SocketFactory socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
+      UnixUserGroupInformation.saveToConf(conf,
+          UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
+      final ClientProtocol nnproxy = DFSClient.createNamenode(conf);
+
+      try {
+        final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum(
+            filename, nnproxy, socketFactory, socketTimeout);
+        MD5MD5CRC32FileChecksum.write(xml, checksum);
+      } catch(IOException ioe) {
+        new RemoteException(ioe.getClass().getName(), ioe.getMessage()
+            ).writeXml(filename, xml);
+      }
+      xml.endDocument();
+    }
+  }
+}

+ 25 - 9
src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java

@@ -22,12 +22,14 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.Random;
 
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Level;
 
 public class TestDistributedFileSystem extends junit.framework.TestCase {
   private static final Random RAN = new Random();
@@ -117,15 +119,22 @@ public class TestDistributedFileSystem extends junit.framework.TestCase {
   }
   
   public void testFileChecksum() throws IOException {
+    ((Log4JLogger)HftpFileSystem.LOG).getLogger().setLevel(Level.ALL);
+
     final long seed = RAN.nextLong();
     System.out.println("seed=" + seed);
     RAN.setSeed(seed);
 
     final Configuration conf = new Configuration();
+    conf.set("slave.host.name", "localhost");
+
     final MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
-    final FileSystem fs = cluster.getFileSystem();
+    final FileSystem hdfs = cluster.getFileSystem();
+    final String hftpuri = "hftp://" + conf.get("dfs.http.address");
+    System.out.println("hftpuri=" + hftpuri);
+    final FileSystem hftp = new Path(hftpuri).getFileSystem(conf);
 
-    final String dir = "/fileChecksum";
+    final String dir = "/filechecksum";
     final int block_size = 1024;
     final int buffer_size = conf.getInt("io.file.buffer.size", 4096);
     conf.setInt("io.bytes.per.checksum", 512);
@@ -140,29 +149,36 @@ public class TestDistributedFileSystem extends junit.framework.TestCase {
       //write data to a file
       final Path foo = new Path(dir, "foo" + n);
       {
-        final FSDataOutputStream out = fs.create(foo, false, buffer_size,
+        final FSDataOutputStream out = hdfs.create(foo, false, buffer_size,
             (short)2, block_size);
         out.write(data);
         out.close();
       }
       
       //compute checksum
-      final FileChecksum foocs = fs.getFileChecksum(foo);
-      System.out.println("foocs=" + foocs);
+      final FileChecksum hdfsfoocs = hdfs.getFileChecksum(foo);
+      System.out.println("hdfsfoocs=" + hdfsfoocs);
       
+      final FileChecksum hftpfoocs = hftp.getFileChecksum(foo);
+      System.out.println("hftpfoocs=" + hftpfoocs);
+
       //write another file
       final Path bar = new Path(dir, "bar" + n);
       {
-        final FSDataOutputStream out = fs.create(bar, false, buffer_size,
+        final FSDataOutputStream out = hdfs.create(bar, false, buffer_size,
             (short)2, block_size);
         out.write(data);
         out.close();
       }
   
       { //verify checksum
-        final FileChecksum barcs = fs.getFileChecksum(bar);
-        assertEquals(foocs.hashCode(), barcs.hashCode());
-        assertEquals(foocs, barcs);
+        final FileChecksum barcs = hdfs.getFileChecksum(bar);
+        final int barhashcode = barcs.hashCode();
+        assertEquals(hdfsfoocs.hashCode(), barhashcode);
+        assertEquals(hdfsfoocs, barcs);
+
+        assertEquals(hftpfoocs.hashCode(), barhashcode);
+        assertEquals(hftpfoocs, barcs);
       }
     }
   }