Pārlūkot izejas kodu

Merging changes from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1003106 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 15 gadi atpakaļ
vecāks
revīzija
5fb25d4586

+ 19 - 1
CHANGES.txt

@@ -48,6 +48,8 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
 
+    HDFS-1304. Add a new unit test for HftpFileSystem.open(..).  (szetszwo)
+
     HDFS-1096. fix for prev. commit. (boryas)
 
     HDFS-1096. allow dfsadmin/mradmin refresh of superuser proxy group
@@ -147,6 +149,11 @@ Trunk (unreleased changes)
     HDFS-1407. Change DataTransferProtocol methods to use Block instead 
     of individual elements of Block. (suresh)
 
+    HDFS-1417. Add @Override to SimulatedFSDataset methods that implement
+    FSDatasetInterface methods. (suresh)
+
+    HDFS-1426. Remove unused method BlockInfo#listCount. (hairong)
+
   OPTIMIZATIONS
 
     HDFS-1140. Speedup INode.getPathComponents. (Dmytro Molkov via shv)
@@ -160,6 +167,8 @@ Trunk (unreleased changes)
     HDFS-1320. Add LOG.isDebugEnabled() guard for each LOG.debug(..).
     (Erik Steffl via szetszwo)
 
+    HDFS-1368. Add a block counter to DatanodeDescriptor. (hairong)
+
   BUG FIXES
 
     HDFS-1039. Adding test for  JspHelper.getUGI(jnp via boryas)
@@ -281,7 +290,14 @@ Trunk (unreleased changes)
     HDFS-1419. Federation: Three test cases need minor modification after 
     the new block id change (Tanping Wang via suresh)
 
-Release 0.21.0 - Unreleased
+    HDFS-96. HDFS supports blocks larger than 2 GB.
+    (Patrick Kling via dhruba)
+
+    HDFS-1364. Makes long running HFTP-based applications do relogins
+    if necessary. (Jitendra Pandey via ddas)
+
+    HDFS-1399.  Distinct minicluster services (e.g. NN and JT) overwrite each
+    other's service policies.  (Aaron T. Myers via tomwhite)
 
   INCOMPATIBLE CHANGES
 
@@ -1210,6 +1226,8 @@ Release 0.21.0 - Unreleased
 
     HDFS-1363. Eliminate second synchronized sections in appendFile(). (shv)
 
+    HDFS-1413. Fix broken links to HDFS Wiki. (shv)
+
 Release 0.20.3 - Unreleased
 
   IMPROVEMENTS

+ 1 - 1
src/docs/src/documentation/content/xdocs/tabs.xml

@@ -31,7 +31,7 @@
   -->
 
   <tab label="Project" href="http://hadoop.apache.org/hdfs/" />
-  <tab label="Wiki" href="http://wiki.apache.org/hadoop/hdfs" />
+  <tab label="Wiki" href="http://wiki.apache.org/hadoop/HDFS" />
   <tab label="HDFS 0.22 Documentation" dir="" />  
   
 </tabs>

+ 1 - 1
src/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -524,7 +524,7 @@ public class DFSInputStream extends FSInputStream {
           if (pos > blockEnd) {
             currentNode = blockSeekTo(pos);
           }
-          int realLen = Math.min(len, (int) (blockEnd - pos + 1));
+          int realLen = (int) Math.min((long) len, (blockEnd - pos + 1L));
           int result = readBuffer(buf, off, realLen);
           
           if (result >= 0) {

+ 5 - 0
src/java/org/apache/hadoop/hdfs/HftpFileSystem.java

@@ -178,6 +178,10 @@ public class HftpFileSystem extends FileSystem {
           break;
         }
       }
+      
+      //Renew TGT if needed
+      ugi.reloginFromKeytab();
+      
       //since we don't already have a token, go get one over https
       if (delegationToken == null) {
         delegationToken = 
@@ -659,6 +663,7 @@ public class HftpFileSystem extends FileSystem {
       final HftpFileSystem fs = weakFs.get();
       if (fs != null) {
         synchronized (fs) {
+          fs.ugi.reloginFromKeytab();
           fs.ugi.doAs(new PrivilegedExceptionAction<Void>() {
 
             @Override

+ 2 - 2
src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

@@ -300,8 +300,8 @@ class BlockSender implements java.io.Closeable, FSConstants {
                          throws IOException {
     // Sends multiple chunks in one packet with a single write().
 
-    int len = Math.min((int) (endOffset - offset),
-                       bytesPerChecksum*maxChunks);
+    int len = (int) Math.min(endOffset - offset,
+                             (((long) bytesPerChecksum) * ((long) maxChunks)));
     int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
     int packetLen = len + numChunks*checksumSize + 4;
     boolean lastDataPacket = offset + len == endOffset && len > 0;

+ 8 - 8
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -54,6 +54,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
@@ -112,7 +113,6 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.Daemon;
@@ -457,12 +457,6 @@ public class DataNode extends Configured
     // adjust info port
     this.dnRegistration.setInfoPort(this.infoServer.getPort());
     myMetrics = new DataNodeMetrics(conf, dnRegistration.getName());
-    
-    // set service-level authorization security policy
-    if (conf.getBoolean(
-          ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
-      ServiceAuthorizationManager.refresh(conf, new HDFSPolicyProvider());
-       }
 
     // BlockTokenSecretManager is created here, but it shouldn't be
     // used until it is initialized in register().
@@ -474,7 +468,13 @@ public class DataNode extends Configured
     ipcServer = RPC.getServer(DataNode.class, this, ipcAddr.getHostName(),
         ipcAddr.getPort(), conf.getInt("dfs.datanode.handler.count", 3), false,
         conf, blockTokenSecretManager);
-    
+
+    // set service-level authorization security policy
+    if (conf.getBoolean(
+        CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
+      ipcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
+    }
+
     dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
 
     LOG.info("dnRegistration = " + dnRegistration);

+ 0 - 8
src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java

@@ -249,14 +249,6 @@ class BlockInfo extends Block implements LightWeightGSet.LinkedElement {
     return head;
   }
 
-  int listCount(DatanodeDescriptor dn) {
-    int count = 0;
-    for(BlockInfo cur = this; cur != null;
-          cur = cur.getNext(cur.findDatanode(dn)))
-      count++;
-    return count;
-  }
-
   boolean listIsConsistent(DatanodeDescriptor dn) {
     // going forward
     int count = 0;

+ 9 - 2
src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java

@@ -101,6 +101,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
   }
 
   private volatile BlockInfo blockList = null;
+  private int numBlocks = 0;
   // isAlive == heartbeats.contains(this)
   // This is an optimization, because contains takes O(n) time on Arraylist
   protected boolean isAlive = false;
@@ -202,6 +203,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
       return false;
     // add to the head of the data-node list
     blockList = b.listInsert(blockList, this);
+    numBlocks++;
     return true;
   }
   
@@ -211,7 +213,12 @@ public class DatanodeDescriptor extends DatanodeInfo {
    */
   boolean removeBlock(BlockInfo b) {
     blockList = b.listRemove(blockList, this);
-    return b.removeNode(this);
+    if ( b.removeNode(this) ) {
+      numBlocks--;
+      return true;
+    } else {
+      return false;
+    }
   }
 
   /**
@@ -247,7 +254,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
   }
 
   public int numBlocks() {
-    return blockList == null ? 0 : blockList.listCount(this);
+    return numBlocks;
   }
 
   /**

+ 18 - 12
src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Options;
@@ -93,7 +94,6 @@ import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
-import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ServicePlugin;
@@ -165,8 +165,8 @@ public class NameNode implements NamenodeProtocols, FSConstants {
 
   protected FSNamesystem namesystem; 
   protected NamenodeRole role;
-  /** RPC server. */
-  protected Server server;
+  /** RPC server. Package-protected for use in tests. */
+  Server server;
   /** RPC server for HDFS Services communication.
       BackupNode, Datanodes and all other services
       should be connecting to this server if it is
@@ -347,13 +347,6 @@ public class NameNode implements NamenodeProtocols, FSConstants {
     SecurityUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
         DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName());
     int handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
-    
-    // set service-level authorization security policy
-    if (serviceAuthEnabled = 
-          conf.getBoolean(
-            ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
-      ServiceAuthorizationManager.refresh(conf, new HDFSPolicyProvider());
-    }
 
     NameNode.initMetrics(conf, this.getRole());
     loadNamesystem(conf);
@@ -373,6 +366,17 @@ public class NameNode implements NamenodeProtocols, FSConstants {
                                 socAddr.getHostName(), socAddr.getPort(),
                                 handlerCount, false, conf, 
                                 namesystem.getDelegationTokenSecretManager());
+
+    // set service-level authorization security policy
+    if (serviceAuthEnabled =
+          conf.getBoolean(
+            CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
+      this.server.refreshServiceAcl(conf, new HDFSPolicyProvider());
+      if (this.serviceRpcServer != null) {
+        this.serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
+      }
+    }
+
     // The rpc-server port can be ephemeral... ensure we have the correct info
     this.rpcAddress = this.server.getListenerAddress(); 
     setRpcServerAddress(conf);
@@ -1434,8 +1438,10 @@ public class NameNode implements NamenodeProtocols, FSConstants {
       throw new AuthorizationException("Service Level Authorization not enabled!");
     }
 
-    ServiceAuthorizationManager.refresh(
-        new Configuration(), new HDFSPolicyProvider());
+    this.server.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
+    if (this.serviceRpcServer != null) {
+      this.serviceRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
+    }
   }
 
   @Override

+ 187 - 0
src/test/aop/org/apache/hadoop/hdfs/TestFiHftp.java

@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFiHftp {
+  final Log LOG = FileSystem.LOG;
+  {
+    ((Log4JLogger)LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  static final short DATANODE_NUM = 1;
+  static final Random ran = new Random();
+  static final byte[] buffer = new byte[1 << 16];
+  static final MessageDigest md5;
+  static {
+    try {
+      md5 = MessageDigest.getInstance("MD5");
+    } catch (NoSuchAlgorithmException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static byte[] createFile(FileSystem fs, Path name, long length, 
+      short replication, long blocksize) throws IOException {
+    final FSDataOutputStream out = fs.create(name, false, 4096,
+        replication, blocksize);
+    try {
+      for(long n = length; n > 0; ) {
+        ran.nextBytes(buffer);
+        final int w = n < buffer.length? (int)n: buffer.length;
+        out.write(buffer, 0, w);
+        md5.update(buffer, 0, w);
+        n -= w;
+      }
+    } finally {
+      IOUtils.closeStream(out);
+    }
+    return md5.digest();
+  }
+
+  @Test
+  public void testHftpOpen() throws IOException {
+    final Configuration conf = new Configuration();
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, null);
+      cluster.waitActive();
+
+      //test with a file
+      //which is larger than the servlet response buffer size
+      {
+        final long blocksize = 1L << 20; //  
+        final long filesize = 2*blocksize + 100;
+        runTestHftpOpen(cluster, "/foo", blocksize, filesize);
+      }
+
+      //test with a small file
+      //which is smaller than the servlet response buffer size
+      { 
+        final long blocksize = 1L << 10; //  
+        final long filesize = 2*blocksize + 100;
+        runTestHftpOpen(cluster, "/small", blocksize, filesize);
+      }
+    } finally {
+      if (cluster != null) {cluster.shutdown();}
+    }
+  }
+
+  /**
+   * A test with a 3GB file.
+   * It may take ~6 minutes.
+   */
+  void largeFileTest(final MiniDFSCluster cluster) throws IOException {
+    final long blocksize = 128L << 20;  
+    final long filesize = 3L << 30;
+    runTestHftpOpen(cluster, "/large", blocksize, filesize);
+  }
+
+  /**
+   * @param blocksize
+   * @param filesize must be > block size 
+   */
+  private void runTestHftpOpen(final MiniDFSCluster cluster, final String file,
+      final long blocksize, final long filesize) throws IOException {
+    //create a file
+    final DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
+    final Path filepath = new Path(file);
+    final byte[] filemd5 = createFile(dfs, filepath, filesize, DATANODE_NUM,
+        blocksize);
+    DFSTestUtil.waitReplication(dfs, filepath, DATANODE_NUM);
+
+    //test hftp open and read
+    final HftpFileSystem hftpfs = cluster.getHftpFileSystem();
+    {
+      final FSDataInputStream in = hftpfs.open(filepath);
+      long bytesRead = 0;
+      try {
+        for(int r; (r = in.read(buffer)) != -1; ) {
+          bytesRead += r;
+          md5.update(buffer, 0, r);
+        }
+      } finally {
+        LOG.info("bytesRead=" + bytesRead);
+        in.close();
+      }
+      Assert.assertEquals(filesize, bytesRead);
+      Assert.assertArrayEquals(filemd5, md5.digest());
+    }
+
+    //delete the second block
+    final DFSClient client = dfs.getClient();
+    final LocatedBlocks locatedblocks = client.getNamenode().getBlockLocations(
+        file, 0, filesize);
+    Assert.assertEquals((filesize - 1)/blocksize + 1,
+        locatedblocks.locatedBlockCount());
+    final LocatedBlock lb = locatedblocks.get(1);
+    final Block blk = lb.getBlock();
+    Assert.assertEquals(blocksize, lb.getBlockSize());
+    final DatanodeInfo[] datanodeinfos = lb.getLocations();
+    Assert.assertEquals(DATANODE_NUM, datanodeinfos.length);
+    final DataNode dn = cluster.getDataNode(datanodeinfos[0].getIpcPort());
+    LOG.info("dn=" + dn + ", blk=" + blk + " (length=" + blk.getNumBytes() + ")");
+    final FSDataset data = (FSDataset)dn.getFSDataset();
+    final File blkfile = data.getBlockFile(blk);
+    Assert.assertTrue(blkfile.delete());
+
+    //read again by hftp, should get an exception 
+    LOG.info("hftpfs.getUri() = " + hftpfs.getUri());
+    final ContentSummary cs = hftpfs.getContentSummary(filepath);
+    LOG.info("hftpfs.getContentSummary = " + cs);
+    Assert.assertEquals(filesize, cs.getLength());
+
+    final FSDataInputStream in = hftpfs.open(hftpfs.makeQualified(filepath));
+    long bytesRead = 0;
+    try {
+      for(int r; (r = in.read(buffer)) != -1; ) {
+        bytesRead += r;
+      }
+      Assert.fail();
+    } catch(IOException ioe) {
+      LOG.info("GOOD: get an exception", ioe);
+    } finally {
+      LOG.info("bytesRead=" + bytesRead);
+      in.close();
+    }
+  }
+}

+ 47 - 0
src/test/aop/org/apache/hadoop/hdfs/server/namenode/FileDataServletAspects.aj

@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public aspect FileDataServletAspects {
+  static final Log LOG = FileDataServlet.LOG;
+
+  pointcut callCreateUri() : call (URI FileDataServlet.createUri(
+      String, HdfsFileStatus, UserGroupInformation, ClientProtocol,
+      HttpServletRequest, String));
+
+  /** Replace host name with "localhost" for unit test environment. */
+  URI around () throws URISyntaxException : callCreateUri() {
+    final URI original = proceed(); 
+    LOG.info("FI: original uri = " + original);
+    final URI replaced = new URI(original.getScheme(), original.getUserInfo(),
+        "localhost", original.getPort(), original.getPath(),
+        original.getQuery(), original.getFragment()) ; 
+    LOG.info("FI: replaced uri = " + replaced);
+    return replaced;
+  }
+}

+ 222 - 0
src/test/hdfs/org/apache/hadoop/hdfs/TestLargeBlock.java

@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Random;
+import java.util.Arrays;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Level;
+
+import org.junit.Test;
+
+/**
+ * This class tests that blocks can be larger than 2GB
+ */
+public class TestLargeBlock extends junit.framework.TestCase {
+  static final String DIR = "/" + TestLargeBlock.class.getSimpleName() + "/";
+
+  {
+    // ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    // ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+    // ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    // ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+
+  static final boolean verifyData = true; // should we verify the data read back from the file? (slow)
+  static final byte[] pattern = { 'D', 'E', 'A', 'D', 'B', 'E', 'E', 'F'};
+  static final boolean simulatedStorage = false;
+
+  // creates a file 
+  static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl, final long blockSize)
+    throws IOException {
+    FSDataOutputStream stm = fileSys.create(name, true,
+                                            fileSys.getConf().getInt("io.file.buffer.size", 4096),
+                                            (short)repl, blockSize);
+    System.out.println("createFile: Created " + name + " with " + repl + " replica.");
+    return stm;
+  }
+
+
+  /**
+   * Writes pattern to file
+   */
+  static void writeFile(FSDataOutputStream stm, final long fileSize) throws IOException {
+    final int writeSize = pattern.length * 8 * 1024 * 1024; // write in chunks of 64 MB
+    final int writeCount = (int) ((fileSize / ((long) writeSize)) + ((fileSize % ((long) writeSize) == 0L) ? 0L : 1L));
+
+    if (writeSize > Integer.MAX_VALUE) {
+      throw new IOException("A single write is too large " + writeSize);
+    } 
+
+    long bytesToWrite = fileSize;
+    byte[] b = new byte[writeSize];
+
+    // initialize buffer
+    for (int j = 0; j < writeSize; j++) {
+      b[j] = pattern[j % pattern.length];
+    }
+
+    int i = 0;
+
+    while (bytesToWrite > 0) {
+      int thiswrite = (int) Math.min(writeSize, bytesToWrite); // how many bytes we are writing in this iteration
+
+      stm.write(b, 0, thiswrite);
+      // System.out.println("Wrote[" + i + "/" + writeCount + "] " + thiswrite + " bytes.");
+      bytesToWrite -= thiswrite;
+      i++;
+    }
+  }
+
+  /**
+   * Reads from file and makes sure that it matches the pattern
+   */
+  static void checkFullFile(FileSystem fs, Path name, final long fileSize) throws IOException {
+    final int readSize = pattern.length * 16 * 1024 * 1024; // read in chunks of 128 MB
+    final int readCount = (int) ((fileSize / ((long) readSize)) + ((fileSize % ((long) readSize) == 0L) ? 0L : 1L));
+
+    if (readSize > Integer.MAX_VALUE) {
+      throw new IOException("A single read is too large " + readSize);
+    }
+
+    byte[] b = new byte[readSize];
+    long bytesToRead = fileSize;
+
+    byte[] compb = new byte[readSize]; // buffer with correct data for comparison
+
+    if (verifyData) {
+      // initialize compare buffer
+      for (int j = 0; j < readSize; j++) {
+        compb[j] = pattern[j % pattern.length];
+      }
+    }
+
+
+    FSDataInputStream stm = fs.open(name);
+
+    int i = 0;
+
+    while (bytesToRead > 0) {
+      int thisread = (int) Math.min(readSize, bytesToRead); // how many bytes we are reading in this iteration
+
+      stm.readFully(b, 0, thisread); 
+      
+      if (verifyData) {
+        // verify data read
+        
+        if (thisread == readSize) {
+          assertTrue("file corrupted at or after byte " + (fileSize - bytesToRead), Arrays.equals(b, compb));
+        } else {
+          // b was only partially filled by last read
+          for (int k = 0; k < thisread; k++) {
+            assertTrue("file corrupted at or after byte " + (fileSize - bytesToRead), b[k] == compb[k]);
+          }
+        }
+      }
+
+      // System.out.println("Read[" + i + "/" + readCount + "] " + thisread + " bytes.");
+
+      bytesToRead -= thisread;
+      i++;
+    }
+    stm.close();
+  }
+ 
+  /**
+   * Test for block size of 2GB + 512B
+   */
+  @Test
+  public void testLargeBlockSize() throws IOException {
+    final long blockSize = 2L * 1024L * 1024L * 1024L + 512L; // 2GB + 512B
+    runTest(blockSize);
+  }
+  
+  /**
+   * Test that we can write to and read from large blocks
+   */
+  public void runTest(final long blockSize) throws IOException {
+
+    // write a file that is slightly larger than 1 block
+    final long fileSize = blockSize + 1L;
+
+    Configuration conf = new Configuration();
+    if (simulatedStorage) {
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    }
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fs = cluster.getFileSystem();
+    try {
+
+      // create a new file in test data directory
+      Path file1 = new Path(System.getProperty("test.build.data") + "/" + Long.toString(blockSize) + ".dat");
+      FSDataOutputStream stm = createFile(fs, file1, 1, blockSize);
+      System.out.println("File " + file1 + " created with file size " +
+                         fileSize +
+                         " blocksize " + blockSize);
+
+      // verify that file exists in FS namespace
+      assertTrue(file1 + " should be a file", 
+                  fs.getFileStatus(file1).isFile());
+
+      // write to file
+      writeFile(stm, fileSize);
+      System.out.println("File " + file1 + " written to.");
+
+      // close file
+      stm.close();
+      System.out.println("File " + file1 + " closed.");
+
+      // Make sure a client can read it
+      checkFullFile(fs, file1, fileSize);
+
+      // verify that file size has changed
+      long len = fs.getFileStatus(file1).getLen();
+      assertTrue(file1 + " should be of size " +  fileSize +
+                 " but found to be of size " + len, 
+                  len == fileSize);
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
+}

+ 15 - 6
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -66,7 +66,7 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
                                     "dfs.datanode.simulateddatastorage";
   public static final String CONFIG_PROPERTY_CAPACITY =
                             "dfs.datanode.simulateddatastorage.capacity";
-  
+//  
   public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte
   public static final byte DEFAULT_DATABYTE = 9; // 1 terabyte
   byte simulatedDataByte = DEFAULT_DATABYTE;
@@ -303,9 +303,6 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
     setConf(conf);
   }
   
-  private SimulatedFSDataset() { // real construction when setConf called.. Uggg
-  }
-  
   public Configuration getConf() {
     return conf;
   }
@@ -347,6 +344,7 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
     }
   }
 
+  @Override
   public synchronized void finalizeBlock(Block b) throws IOException {
     BInfo binfo = blockMap.get(b);
     if (binfo == null) {
@@ -356,12 +354,14 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
 
   }
 
+  @Override
   public synchronized void unfinalizeBlock(Block b) throws IOException {
     if (isBeingWritten(b)) {
       blockMap.remove(b);
     }
   }
 
+  @Override
   public synchronized BlockListAsLongs getBlockReport() {
     Block[] blockTable = new Block[blockMap.size()];
     int count = 0;
@@ -389,6 +389,7 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
     return storage.getFree();
   }
 
+  @Override
   public synchronized long getLength(Block b) throws IOException {
     BInfo binfo = blockMap.get(b);
     if (binfo == null) {
@@ -403,7 +404,7 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
     return blockMap.get(new Block(blockId));
   }
 
-  /** {@inheritDoc} */
+  @Override
   public Block getStoredBlock(long blkid) throws IOException {
     Block b = new Block(blkid);
     BInfo binfo = blockMap.get(b);
@@ -415,6 +416,7 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
     return b;
   }
 
+  @Override
   public synchronized void invalidate(Block[] invalidBlks) throws IOException {
     boolean error = false;
     if (invalidBlks == null) {
@@ -438,6 +440,7 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
       }
   }
 
+  @Override
   public synchronized boolean isValidBlock(Block b) {
     // return (blockMap.containsKey(b));
     BInfo binfo = blockMap.get(b);
@@ -545,6 +548,7 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
     return binfo;
   }
 
+  @Override
   public synchronized InputStream getBlockInputStream(Block b)
                                             throws IOException {
     BInfo binfo = blockMap.get(b);
@@ -556,6 +560,7 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
     return binfo.getIStream();
   }
   
+  @Override
   public synchronized InputStream getBlockInputStream(Block b, long seekOffset)
                               throws IOException {
     InputStream result = getBlockInputStream(b);
@@ -564,6 +569,7 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
   }
 
   /** Not supported */
+  @Override
   public BlockInputStreams getTmpInputStreams(Block b, long blkoff, long ckoff
       ) throws IOException {
     throw new IOException("Not supported");
@@ -588,7 +594,8 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
     }
     return binfo.getMetaIStream();
   }
-
+ 
+  @Override
   public synchronized long getMetaDataLength(Block b) throws IOException {
     BInfo binfo = blockMap.get(b);
     if (binfo == null) {
@@ -601,6 +608,7 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
     return binfo.getMetaIStream().getLength();
   }
   
+  @Override
   public MetaDataInputStream getMetaDataInputStream(Block b)
   throws IOException {
 
@@ -608,6 +616,7 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
                                                 getMetaDataLength(b));
   }
 
+  @Override
   public synchronized boolean metaFileExists(Block b) throws IOException {
     if (!isValidBlock(b)) {
           throw new IOException("Block " + b +

+ 10 - 1
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
 
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 
 /**
@@ -48,4 +49,12 @@ public class NameNodeAdapter {
   public static void refreshBlockCounts(NameNode namenode) {
     namenode.getNamesystem().blockManager.updateState();
   }
-}
+
+  /**
+   * Get the internal RPC server instance.
+   * @return rpc server
+   */
+  public static Server getRpcServer(NameNode namenode) {
+    return namenode.server;
+  }
+}

+ 25 - 0
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDatanodeDescriptor.java

@@ -48,4 +48,29 @@ public class TestDatanodeDescriptor extends TestCase {
     bc = dd.getInvalidateBlocks(MAX_LIMIT);
     assertEquals(bc.getBlocks().length, REMAINING_BLOCKS);
   }
+  
+  public void testBlocksCounter() throws Exception {
+    DatanodeDescriptor dd = new DatanodeDescriptor();
+    assertEquals(0, dd.numBlocks());
+    BlockInfo blk = new BlockInfo(new Block(1L), 1);
+    BlockInfo blk1 = new BlockInfo(new Block(2L), 2);
+    // add first block
+    assertTrue(dd.addBlock(blk));
+    assertEquals(1, dd.numBlocks());
+    // remove a non-existent block
+    assertFalse(dd.removeBlock(blk1));
+    assertEquals(1, dd.numBlocks());
+    // add an existent block
+    assertFalse(dd.addBlock(blk));
+    assertEquals(1, dd.numBlocks());
+    // add second block
+    assertTrue(dd.addBlock(blk1));
+    assertEquals(2, dd.numBlocks());
+    // remove first block
+    assertTrue(dd.removeBlock(blk));
+    assertEquals(1, dd.numBlocks());
+    // remove second block
+    assertTrue(dd.removeBlock(blk1));
+    assertEquals(0, dd.numBlocks());    
+  }
 }