ソースを参照

Merge trunk into HA branch

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1296025 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon 13 年 前
コミット
87d1c67944
33 ファイル変更207 行追加233 行削除
  1. 3 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 18 26
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java
  3. 0 5
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java
  4. 1 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
  5. 11 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  6. 9 0
      hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
  7. 0 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
  8. 0 34
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
  9. 4 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
  10. 2 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java
  11. 5 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
  12. 5 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  13. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
  14. 43 44
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
  15. 8 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
  16. 5 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java
  17. 6 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
  18. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRemove.java
  19. 1 27
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
  20. 5 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java
  21. 11 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
  22. 4 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
  23. 7 7
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
  24. 5 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
  25. 2 13
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java
  26. 6 5
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
  27. 3 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMulitipleNNDataBlockScanner.java
  28. 6 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java
  29. 12 13
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
  30. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
  31. 6 0
      hadoop-mapreduce-project/CHANGES.txt
  32. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraOutputFormat.java
  33. 9 6
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -4,6 +4,9 @@ Trunk (unreleased changes)
 
   INCOMPATIBLE CHANGES
 
+    HADOOP-8124. Remove the deprecated FSDataOutputStream constructor,
+    FSDataOutputStream.sync() and Syncable.sync().  (szetszwo)
+
   NEW FEATURES
 
   IMPROVEMENTS

+ 18 - 26
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java

@@ -17,7 +17,11 @@
  */
 package org.apache.hadoop.fs;
 
-import java.io.*;
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -28,20 +32,19 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class FSDataOutputStream extends DataOutputStream implements Syncable {
-  private OutputStream wrappedStream;
+  private final OutputStream wrappedStream;
 
   private static class PositionCache extends FilterOutputStream {
-    private FileSystem.Statistics statistics;
-    long position;
+    private final FileSystem.Statistics statistics;
+    private long position;
 
-    public PositionCache(OutputStream out, 
-                         FileSystem.Statistics stats,
-                         long pos) throws IOException {
+    PositionCache(OutputStream out, FileSystem.Statistics stats, long pos) {
       super(out);
       statistics = stats;
       position = pos;
     }
 
+    @Override
     public void write(int b) throws IOException {
       out.write(b);
       position++;
@@ -50,6 +53,7 @@ public class FSDataOutputStream extends DataOutputStream implements Syncable {
       }
     }
     
+    @Override
     public void write(byte b[], int off, int len) throws IOException {
       out.write(b, off, len);
       position += len;                            // update position
@@ -58,27 +62,22 @@ public class FSDataOutputStream extends DataOutputStream implements Syncable {
       }
     }
       
-    public long getPos() throws IOException {
+    long getPos() {
       return position;                            // return cached position
     }
-    
+
+    @Override
     public void close() throws IOException {
       out.close();
     }
   }
 
-  @Deprecated
-  public FSDataOutputStream(OutputStream out) throws IOException {
-    this(out, null);
-  }
-
-  public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats)
-    throws IOException {
+  public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats) {
     this(out, stats, 0);
   }
 
   public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats,
-                            long startPosition) throws IOException {
+                            long startPosition) {
     super(new PositionCache(out, stats, startPosition));
     wrappedStream = out;
   }
@@ -88,13 +87,14 @@ public class FSDataOutputStream extends DataOutputStream implements Syncable {
    *
    * @return the current position in the output stream
    */
-  public long getPos() throws IOException {
+  public long getPos() {
     return ((PositionCache)out).getPos();
   }
 
   /**
    * Close the underlying output stream.
    */
+  @Override
   public void close() throws IOException {
     out.close(); // This invokes PositionCache.close()
   }
@@ -109,14 +109,6 @@ public class FSDataOutputStream extends DataOutputStream implements Syncable {
     return wrappedStream;
   }
 
-  @Override  // Syncable
-  @Deprecated
-  public void sync() throws IOException {
-    if (wrappedStream instanceof Syncable) {
-      ((Syncable)wrappedStream).sync();
-    }
-  }
-  
   @Override  // Syncable
   public void hflush() throws IOException {
     if (wrappedStream instanceof Syncable) {

+ 0 - 5
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java

@@ -27,11 +27,6 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public interface Syncable {
-  /**
-   * @deprecated As of HADOOP 0.21.0, replaced by hflush
-   * @see #hflush()
-   */
-  @Deprecated  public void sync() throws IOException;
   
   /** Flush out the data in client's user buffer. After the return of
    * this call, new readers will see the data.

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java

@@ -1196,7 +1196,7 @@ public class SequenceFile {
     /** flush all currently written data to the file system */
     public void syncFs() throws IOException {
       if (out != null) {
-        out.sync();                               // flush contents to file system
+        out.hflush();  // flush contents to file system
       }
     }
 

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -4,6 +4,8 @@ Trunk (unreleased changes)
 
   INCOMPATIBLE CHANGES
 
+    HDFS-3034. Remove the deprecated DFSOutputStream.sync() method.  (szetszwo)
+
   NEW FEATURES
 
     HDFS-2430. The number of failed or low-resource volumes the NN can tolerate
@@ -68,6 +70,8 @@ Trunk (unreleased changes)
     HDFS-3030. Remove getProtocolVersion and getProtocolSignature from translators.
     (jitendra)
 
+    HDFS-3036. Remove unused method DFSUtil#isDefaultNamenodeAddress. (atm)
+
   OPTIMIZATIONS
 
     HDFS-2477. Optimize computing the diff between a block report and the
@@ -117,6 +121,9 @@ Trunk (unreleased changes)
     HDFS-2908. Add apache license header for StorageReport.java. (Brandon Li
     via jitendra)
 
+    HDFS-3037. TestMulitipleNNDataBlockScanner#testBlockScannerAfterRestart is
+    racy. (atm)
+
 Release 0.23.3 - UNRELEASED 
 
   INCOMPATIBLE CHANGES
@@ -162,6 +169,8 @@ Release 0.23.3 - UNRELEASED
     HDFS-2899. Service protocol changes in DatanodeProtocol to add multiple 
     storages. (suresh)
 
+    HDFS-3021. Use generic type to declare FSDatasetInterface.  (szetszwo)
+
   IMPROVEMENTS
 
     HDFS-2018. Move all journal stream management code into one place.
@@ -252,6 +261,8 @@ Release 0.23.3 - UNRELEASED
 
     HDFS-3020. Fix editlog to automatically sync when buffer is full. (todd)
 
+    HDFS-3038. Add FSEditLog.metrics to findbugs exclude list. (todd via atm)
+
 Release 0.23.2 - UNRELEASED 
 
   INCOMPATIBLE CHANGES

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml

@@ -247,6 +247,15 @@
        <Method name="save" />
        <Bug pattern="OS_OPEN_STREAM" />
      </Match>
+     <!--
+      the 'metrics' member is sometimes used from synchronized blocks, sometimes not,
+      but it's only reset by test cases, so should be fine
+     -->
+     <Match>
+       <Class name="org.apache.hadoop.hdfs.server.namenode.FSEditLog" />
+       <Field name="metrics" />
+       <Bug pattern="IS2_INCONSISTENT_SYNC" />
+     </Match>
      <!--
       This method isn't performance-critical and is much clearer to write as it's written.
       -->

+ 0 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -1410,12 +1410,6 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
       }
     }
   }
-
-  @Override
-  @Deprecated
-  public synchronized void sync() throws IOException {
-    hflush();
-  }
   
   /**
    * Flushes out to all replicas of the block. The data is in the buffers

+ 0 - 34
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java

@@ -676,8 +676,6 @@ public class DFSUtil {
    * corresponding to the key with matching address, by doing a reverse 
    * lookup on the list of nameservices until it finds a match.
    * 
-   * If null is returned, client should try {@link #isDefaultNamenodeAddress}
-   * to check pre-Federation, non-HA configurations.
    * Since the process of resolving URIs to Addresses is slightly expensive,
    * this utility method should not be used in performance-critical routines.
    * 
@@ -768,38 +766,6 @@ public class DFSUtil {
     return conf.get(key, defaultVal);
   }
   
-  /**
-   * Given the InetSocketAddress for any configured communication with a 
-   * namenode, this method determines whether it is the configured
-   * communication channel for the "default" namenode.
-   * It does a reverse lookup on the list of default communication parameters
-   * to see if the given address matches any of them.
-   * Since the process of resolving URIs to Addresses is slightly expensive,
-   * this utility method should not be used in performance-critical routines.
-   * 
-   * @param conf - configuration
-   * @param address - InetSocketAddress for configured communication with NN.
-   *     Configured addresses are typically given as URIs, but we may have to
-   *     compare against a URI typed in by a human, or the server name may be
-   *     aliased, so we compare unambiguous InetSocketAddresses instead of just
-   *     comparing URI substrings.
-   * @param keys - list of configured communication parameters that should
-   *     be checked for matches.  For example, to compare against RPC addresses,
-   *     provide the list DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
-   *     DFS_NAMENODE_RPC_ADDRESS_KEY
-   * @return - boolean confirmation if matched generic parameter
-   */
-  public static boolean isDefaultNamenodeAddress(Configuration conf,
-      InetSocketAddress address, String... keys) {
-    for (String key : keys) {
-      String candidateAddress = conf.get(key);
-      if (candidateAddress != null
-          && address.equals(NetUtils.createSocketAddr(candidateAddress)))
-        return true;
-    }
-    return false;
-  }
-  
   /**
    * Sets the node specific setting into generic configuration key. Looks up
    * value of "key.nameserviceId.namenodeId" and if found sets that value into 

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java

@@ -74,7 +74,7 @@ class BlockPoolSliceScanner {
 
   private long scanPeriod = DEFAULT_SCAN_PERIOD_HOURS * 3600 * 1000;
   private DataNode datanode;
-  private final FSDatasetInterface dataset;
+  private final FSDatasetInterface<? extends FSVolumeInterface> dataset;
   
   // sorted set
   private TreeSet<BlockScanInfo> blockInfoSet;
@@ -133,7 +133,8 @@ class BlockPoolSliceScanner {
     }
   }
   
-  BlockPoolSliceScanner(DataNode datanode, FSDatasetInterface dataset,
+  BlockPoolSliceScanner(DataNode datanode,
+      FSDatasetInterface<? extends FSVolumeInterface> dataset,
       Configuration conf, String bpid) {
     this.datanode = datanode;
     this.dataset = dataset;
@@ -216,7 +217,7 @@ class BlockPoolSliceScanner {
      * otherwise, pick the first directory.
      */
     File dir = null;
-    List<FSVolumeInterface> volumes = dataset.getVolumes();
+    final List<? extends FSVolumeInterface> volumes = dataset.getVolumes();
     for (FSVolumeInterface vol : volumes) {
       File bpDir = vol.getDirectory(blockPoolId);
       if (LogFileHandler.isFilePresent(bpDir, verificationLogFile)) {

+ 2 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java

@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
 
 /**************************************************
@@ -34,7 +33,7 @@ import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterfa
  *
  ***************************************************/
 @InterfaceAudience.Private
-public interface BlockVolumeChoosingPolicy {
+public interface BlockVolumeChoosingPolicy<V extends FSVolumeInterface> {
 
   /**
    * Returns a specific FSVolume after applying a suitable choice algorithm
@@ -48,7 +47,5 @@ public interface BlockVolumeChoosingPolicy {
    * @return the chosen volume to store the block.
    * @throws IOException when disks are unavailable or are full.
    */
-  public FSVolumeInterface chooseVolume(List<FSVolumeInterface> volumes, long blockSize)
-    throws IOException;
-
+  public V chooseVolume(List<V> volumes, long blockSize) throws IOException;
 }

+ 5 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
 
 /**
  * DataBlockScanner manages block scanning for all the block pools. For each
@@ -44,7 +45,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 public class DataBlockScanner implements Runnable {
   public static final Log LOG = LogFactory.getLog(DataBlockScanner.class);
   private final DataNode datanode;
-  private final FSDatasetInterface dataset;
+  private final FSDatasetInterface<? extends FSVolumeInterface> dataset;
   private final Configuration conf;
   
   /**
@@ -55,7 +56,9 @@ public class DataBlockScanner implements Runnable {
     new TreeMap<String, BlockPoolSliceScanner>();
   Thread blockScannerThread = null;
   
-  DataBlockScanner(DataNode datanode, FSDatasetInterface dataset, Configuration conf) {
+  DataBlockScanner(DataNode datanode,
+      FSDatasetInterface<? extends FSVolumeInterface> dataset,
+      Configuration conf) {
     this.datanode = datanode;
     this.dataset = dataset;
     this.conf = conf;

+ 5 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -123,6 +123,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
 import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
@@ -139,7 +140,6 @@ import org.apache.hadoop.hdfs.web.resources.Param;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
@@ -234,7 +234,7 @@ public class DataNode extends Configured
   
   volatile boolean shouldRun = true;
   private BlockPoolManager blockPoolManager;
-  public volatile FSDatasetInterface data = null;
+  public volatile FSDatasetInterface<? extends FSVolumeInterface> data = null;
   private String clusterId = null;
 
   public final static String EMPTY_DEL_HINT = "";
@@ -812,7 +812,7 @@ public class DataNode extends Configured
    * handshake with the the first namenode is completed.
    */
   private void initStorage(final NamespaceInfo nsInfo) throws IOException {
-    final FSDatasetInterface.Factory factory
+    final FSDatasetInterface.Factory<? extends FSDatasetInterface<?>> factory
         = FSDatasetInterface.Factory.getFactory(conf);
     
     if (!factory.isSimulated()) {
@@ -1694,11 +1694,11 @@ public class DataNode extends Configured
   /**
    * This method is used for testing. 
    * Examples are adding and deleting blocks directly.
-   * The most common usage will be when the data node's storage is similated.
+   * The most common usage will be when the data node's storage is simulated.
    * 
    * @return the fsdataset that stores the blocks
    */
-  public FSDatasetInterface getFSDataset() {
+  FSDatasetInterface<?> getFSDataset() {
     return data;
   }
 

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java

@@ -55,7 +55,7 @@ public class DirectoryScanner implements Runnable {
   private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
 
   private final DataNode datanode;
-  private final FSDatasetInterface dataset;
+  private final FSDatasetInterface<?> dataset;
   private final ExecutorService reportCompileThreadPool;
   private final ScheduledExecutorService masterThread;
   private final long scanPeriodMsecs;
@@ -219,7 +219,7 @@ public class DirectoryScanner implements Runnable {
     }
   }
 
-  DirectoryScanner(DataNode dn, FSDatasetInterface dataset, Configuration conf) {
+  DirectoryScanner(DataNode dn, FSDatasetInterface<?> dataset, Configuration conf) {
     this.datanode = dn;
     this.dataset = dataset;
     int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
@@ -411,7 +411,7 @@ public class DirectoryScanner implements Runnable {
   }
 
   /** Is the given volume still valid in the dataset? */
-  private static boolean isValid(final FSDatasetInterface dataset,
+  private static boolean isValid(final FSDatasetInterface<?> dataset,
       final FSVolumeInterface volume) {
     for (FSVolumeInterface vol : dataset.getVolumes()) {
       if (vol == volume) {
@@ -424,7 +424,7 @@ public class DirectoryScanner implements Runnable {
   /** Get lists of blocks on the disk sorted by blockId, per blockpool */
   private Map<String, ScanInfo[]> getDiskReport() {
     // First get list of data directories
-    final List<FSVolumeInterface> volumes = dataset.getVolumes();
+    final List<? extends FSVolumeInterface> volumes = dataset.getVolumes();
     ArrayList<ScanInfoPerBlockPool> dirReports =
       new ArrayList<ScanInfoPerBlockPool>(volumes.size());
     

+ 43 - 44
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java

@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
@@ -74,13 +75,13 @@ import org.apache.hadoop.util.ReflectionUtils;
  *
  ***************************************************/
 @InterfaceAudience.Private
-class FSDataset implements FSDatasetInterface {
+class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
   /**
    * A factory for creating FSDataset objects.
    */
-  static class Factory extends FSDatasetInterface.Factory {
+  static class Factory extends FSDatasetInterface.Factory<FSDataset> {
     @Override
-    public FSDatasetInterface createFSDatasetInterface(DataNode datanode,
+    public FSDataset createFSDatasetInterface(DataNode datanode,
         DataStorage storage, Configuration conf) throws IOException {
       return new FSDataset(datanode, storage, conf);
     }
@@ -786,13 +787,13 @@ class FSDataset implements FSDatasetInterface {
      * Read access to this unmodifiable list is not synchronized.
      * This list is replaced on modification holding "this" lock.
      */
-    private volatile List<FSVolumeInterface> volumes = null;
+    private volatile List<FSVolume> volumes = null;
 
-    BlockVolumeChoosingPolicy blockChooser;
+    BlockVolumeChoosingPolicy<FSVolume> blockChooser;
     int numFailedVolumes;
 
-    FSVolumeSet(List<FSVolumeInterface> volumes, int failedVols,
-        BlockVolumeChoosingPolicy blockChooser) {
+    FSVolumeSet(List<FSVolume> volumes, int failedVols,
+        BlockVolumeChoosingPolicy<FSVolume> blockChooser) {
       this.volumes = Collections.unmodifiableList(volumes);
       this.blockChooser = blockChooser;
       this.numFailedVolumes = failedVols;
@@ -810,29 +811,29 @@ class FSDataset implements FSDatasetInterface {
      * @return next volume to store the block in.
      */
     synchronized FSVolume getNextVolume(long blockSize) throws IOException {
-      return (FSVolume)blockChooser.chooseVolume(volumes, blockSize);
+      return blockChooser.chooseVolume(volumes, blockSize);
     }
       
     private long getDfsUsed() throws IOException {
       long dfsUsed = 0L;
-      for (FSVolumeInterface v : volumes) {
-        dfsUsed += ((FSVolume)v).getDfsUsed();
+      for (FSVolume v : volumes) {
+        dfsUsed += v.getDfsUsed();
       }
       return dfsUsed;
     }
 
     private long getBlockPoolUsed(String bpid) throws IOException {
       long dfsUsed = 0L;
-      for (FSVolumeInterface v : volumes) {
-        dfsUsed += ((FSVolume)v).getBlockPoolUsed(bpid);
+      for (FSVolume v : volumes) {
+        dfsUsed += v.getBlockPoolUsed(bpid);
       }
       return dfsUsed;
     }
 
     private long getCapacity() {
       long capacity = 0L;
-      for (FSVolumeInterface v : volumes) {
-        capacity += ((FSVolume)v).getCapacity();
+      for (FSVolume v : volumes) {
+        capacity += v.getCapacity();
       }
       return capacity;
     }
@@ -845,17 +846,16 @@ class FSDataset implements FSDatasetInterface {
       return remaining;
     }
       
-    private void getVolumeMap(ReplicasMap volumeMap)
-        throws IOException {
-      for (FSVolumeInterface v : volumes) {
-        ((FSVolume)v).getVolumeMap(volumeMap);
+    private void getVolumeMap(ReplicasMap volumeMap) throws IOException {
+      for (FSVolume v : volumes) {
+        v.getVolumeMap(volumeMap);
       }
     }
     
     private void getVolumeMap(String bpid, ReplicasMap volumeMap)
         throws IOException {
-      for (FSVolumeInterface v : volumes) {
-        ((FSVolume)v).getVolumeMap(bpid, volumeMap);
+      for (FSVolume v : volumes) {
+        v.getVolumeMap(bpid, volumeMap);
       }
     }
       
@@ -871,10 +871,10 @@ class FSDataset implements FSDatasetInterface {
       ArrayList<FSVolume> removedVols = null;
       
       // Make a copy of volumes for performing modification 
-      final List<FSVolumeInterface> volumeList = new ArrayList<FSVolumeInterface>(volumes);
+      final List<FSVolume> volumeList = new ArrayList<FSVolume>(volumes);
       
       for (int idx = 0; idx < volumeList.size(); idx++) {
-        FSVolume fsv = (FSVolume)volumeList.get(idx);
+        FSVolume fsv = volumeList.get(idx);
         try {
           fsv.checkDirs();
         } catch (DiskErrorException e) {
@@ -891,8 +891,8 @@ class FSDataset implements FSDatasetInterface {
       
       // Remove null volumes from the volumes array
       if (removedVols != null && removedVols.size() > 0) {
-        List<FSVolumeInterface> newVols = new ArrayList<FSVolumeInterface>();
-        for (FSVolumeInterface vol : volumeList) {
+        final List<FSVolume> newVols = new ArrayList<FSVolume>();
+        for (FSVolume vol : volumeList) {
           if (vol != null) {
             newVols.add(vol);
           }
@@ -914,21 +914,21 @@ class FSDataset implements FSDatasetInterface {
 
     private void addBlockPool(String bpid, Configuration conf)
         throws IOException {
-      for (FSVolumeInterface v : volumes) {
-        ((FSVolume)v).addBlockPool(bpid, conf);
+      for (FSVolume v : volumes) {
+        v.addBlockPool(bpid, conf);
       }
     }
     
     private void removeBlockPool(String bpid) {
-      for (FSVolumeInterface v : volumes) {
-        ((FSVolume)v).shutdownBlockPool(bpid);
+      for (FSVolume v : volumes) {
+        v.shutdownBlockPool(bpid);
       }
     }
 
     private void shutdown() {
-      for (FSVolumeInterface volume : volumes) {
+      for (FSVolume volume : volumes) {
         if(volume != null) {
-          ((FSVolume)volume).shutdown();
+          volume.shutdown();
         }
       }
     }
@@ -991,7 +991,7 @@ class FSDataset implements FSDatasetInterface {
   }
 
   @Override // FSDatasetInterface
-  public List<FSVolumeInterface> getVolumes() {
+  public List<FSVolume> getVolumes() {
     return volumes.volumes;
   }
 
@@ -1099,7 +1099,7 @@ class FSDataset implements FSDatasetInterface {
           + ", volume failures tolerated: " + volFailuresTolerated);
     }
 
-    final List<FSVolumeInterface> volArray = new ArrayList<FSVolumeInterface>(
+    final List<FSVolume> volArray = new ArrayList<FSVolume>(
         storage.getNumStorageDirs());
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
       final File dir = storage.getStorageDir(idx).getCurrentDir();
@@ -1108,12 +1108,12 @@ class FSDataset implements FSDatasetInterface {
     }
     volumeMap = new ReplicasMap(this);
 
-    BlockVolumeChoosingPolicy blockChooserImpl =
-      (BlockVolumeChoosingPolicy) ReflectionUtils.newInstance(
-        conf.getClass(DFSConfigKeys.DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY,
+    @SuppressWarnings("unchecked")
+    final BlockVolumeChoosingPolicy<FSVolume> blockChooserImpl =
+        ReflectionUtils.newInstance(conf.getClass(
+            DFSConfigKeys.DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY,
             RoundRobinVolumesPolicy.class,
-            BlockVolumeChoosingPolicy.class),
-        conf);
+            BlockVolumeChoosingPolicy.class), conf);
     volumes = new FSVolumeSet(volArray, volsFailed, blockChooserImpl);
     volumes.getVolumeMap(volumeMap);
 
@@ -2001,7 +2001,7 @@ class FSDataset implements FSDatasetInterface {
     boolean error = false;
     for (int i = 0; i < invalidBlks.length; i++) {
       File f = null;
-      FSVolume v;
+      final FSVolume v;
       synchronized (this) {
         f = getFile(bpid, invalidBlks[i].getBlockId());
         ReplicaInfo dinfo = volumeMap.get(bpid, invalidBlks[i]);
@@ -2553,8 +2553,7 @@ class FSDataset implements FSDatasetInterface {
 
   private Collection<VolumeInfo> getVolumeInfo() {
     Collection<VolumeInfo> info = new ArrayList<VolumeInfo>();
-    for (FSVolumeInterface v : volumes.volumes) {
-      final FSVolume volume = (FSVolume)v;
+    for (FSVolume volume : volumes.volumes) {
       long used = 0;
       long free = 0;
       try {
@@ -2590,8 +2589,8 @@ class FSDataset implements FSDatasetInterface {
   public synchronized void deleteBlockPool(String bpid, boolean force)
       throws IOException {
     if (!force) {
-      for (FSVolumeInterface volume : volumes.volumes) {
-        if (!((FSVolume)volume).isBPDirEmpty(bpid)) {
+      for (FSVolume volume : volumes.volumes) {
+        if (!volume.isBPDirEmpty(bpid)) {
           DataNode.LOG.warn(bpid
               + " has some block files, cannot delete unless forced");
           throw new IOException("Cannot delete block pool, "
@@ -2599,8 +2598,8 @@ class FSDataset implements FSDatasetInterface {
         }
       }
     }
-    for (FSVolumeInterface volume : volumes.volumes) {
-      ((FSVolume)volume).deleteBPDirectories(bpid, force);
+    for (FSVolume volume : volumes.volumes) {
+      volume.deleteBPDirectories(bpid, force);
     }
   }
   

+ 8 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java

@@ -50,13 +50,15 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
  *
  */
 @InterfaceAudience.Private
-public interface FSDatasetInterface extends FSDatasetMBean {
+public interface FSDatasetInterface<V extends FSDatasetInterface.FSVolumeInterface>
+    extends FSDatasetMBean {
   /**
    * A factory for creating FSDatasetInterface objects.
    */
-  public abstract class Factory {
+  public abstract class Factory<D extends FSDatasetInterface<?>> {
     /** @return the configured factory. */
-    public static Factory getFactory(Configuration conf) {
+    public static Factory<?> getFactory(Configuration conf) {
+      @SuppressWarnings("rawtypes")
       final Class<? extends Factory> clazz = conf.getClass(
           DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
           FSDataset.Factory.class,
@@ -65,7 +67,7 @@ public interface FSDatasetInterface extends FSDatasetMBean {
     }
 
     /** Create a FSDatasetInterface object. */
-    public abstract FSDatasetInterface createFSDatasetInterface(
+    public abstract D createFSDatasetInterface(
         DataNode datanode, DataStorage storage, Configuration conf
         ) throws IOException;
 
@@ -94,7 +96,7 @@ public interface FSDatasetInterface extends FSDatasetMBean {
   }
 
   /** @return a list of volumes. */
-  public List<FSVolumeInterface> getVolumes();
+  public List<V> getVolumes();
 
   /** @return a volume information map (name => info). */
   public Map<String, Object> getVolumeInfoMap();
@@ -234,7 +236,7 @@ public interface FSDatasetInterface extends FSDatasetMBean {
         this.checksum = checksum;
       }
       
-      void close() throws IOException {
+      void close() {
         IOUtils.closeStream(dataOut);
         IOUtils.closeStream(checksumOut);
       }

+ 5 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java

@@ -23,13 +23,14 @@ import java.util.List;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 
-public class RoundRobinVolumesPolicy implements BlockVolumeChoosingPolicy {
+public class RoundRobinVolumesPolicy<V extends FSVolumeInterface>
+    implements BlockVolumeChoosingPolicy<V> {
 
   private int curVolume = 0;
 
   @Override
-  public synchronized FSVolumeInterface chooseVolume(
-      List<FSVolumeInterface> volumes, long blockSize) throws IOException {
+  public synchronized V chooseVolume(final List<V> volumes, final long blockSize
+      ) throws IOException {
     if(volumes.size() < 1) {
       throw new DiskOutOfSpaceException("No more available volumes");
     }
@@ -44,7 +45,7 @@ public class RoundRobinVolumesPolicy implements BlockVolumeChoosingPolicy {
     long maxAvailable = 0;
     
     while (true) {
-      FSVolumeInterface volume = volumes.get(curVolume);
+      final V volume = volumes.get(curVolume);
       curVolume = (curVolume + 1) % volumes.size();
       long availableVolumeSize = volume.getAvailable();
       if (availableVolumeSize > blockSize) { return volume; }

+ 6 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -1768,8 +1768,8 @@ public class MiniDFSCluster {
     if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
       throw new IndexOutOfBoundsException();
     }
-    return dataNodes.get(dataNodeIndex).datanode.getFSDataset().getBlockReport(
-        bpid);
+    final DataNode dn = dataNodes.get(dataNodeIndex).datanode;
+    return DataNodeTestUtils.getFSDataset(dn).getBlockReport(bpid);
   }
   
   
@@ -1801,7 +1801,8 @@ public class MiniDFSCluster {
     if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
       throw new IndexOutOfBoundsException();
     }
-    FSDatasetInterface dataSet = dataNodes.get(dataNodeIndex).datanode.getFSDataset();
+    final DataNode dn = dataNodes.get(dataNodeIndex).datanode;
+    final FSDatasetInterface<?> dataSet = DataNodeTestUtils.getFSDataset(dn);
     if (!(dataSet instanceof SimulatedFSDataset)) {
       throw new IOException("injectBlocks is valid only for SimilatedFSDataset");
     }
@@ -1819,7 +1820,8 @@ public class MiniDFSCluster {
     if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
       throw new IndexOutOfBoundsException();
     }
-    FSDatasetInterface dataSet = dataNodes.get(dataNodeIndex).datanode.getFSDataset();
+    final DataNode dn = dataNodes.get(dataNodeIndex).datanode;
+    final FSDatasetInterface<?> dataSet = DataNodeTestUtils.getFSDataset(dn);
     if (!(dataSet instanceof SimulatedFSDataset)) {
       throw new IOException("injectBlocks is valid only for SimilatedFSDataset");
     }

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRemove.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 
 public class TestDFSRemove extends junit.framework.TestCase {
   final Path dir = new Path("/test/remove/");
@@ -45,7 +46,7 @@ public class TestDFSRemove extends junit.framework.TestCase {
   static long getTotalDfsUsed(MiniDFSCluster cluster) throws IOException {
     long total = 0;
     for(DataNode node : cluster.getDataNodes()) {
-      total += node.getFSDataset().getDfsUsed();
+      total += DataNodeTestUtils.getFSDataset(node).getDfsUsed();
     }
     return total;
   }

+ 1 - 27
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java

@@ -232,32 +232,6 @@ public class TestDFSUtil {
     assertEquals(expectedNameServiceId, nameserviceId);
   }
 
-  /**
-   * Test for
-   * {@link DFSUtil#isDefaultNamenodeAddress(Configuration, InetSocketAddress, String...)}
-   */
-  @Test
-  public void testSingleNamenode() throws URISyntaxException {
-    HdfsConfiguration conf = new HdfsConfiguration();
-    final String DEFAULT_ADDRESS = "localhost:9000";
-    final String NN2_ADDRESS = "localhost:9001";
-    conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, DEFAULT_ADDRESS);
-    conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, DEFAULT_ADDRESS);
-
-    InetSocketAddress testAddress1 = NetUtils.createSocketAddr(DEFAULT_ADDRESS);
-    boolean isDefault = DFSUtil.isDefaultNamenodeAddress(conf, testAddress1,
-        DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
-    assertTrue(isDefault);
-    InetSocketAddress testAddress2 = NetUtils.createSocketAddr(NN2_ADDRESS);
-    isDefault = DFSUtil.isDefaultNamenodeAddress(conf, testAddress2,
-        DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
-    assertFalse(isDefault);
-    
-    Collection<URI> uris = DFSUtil.getNameServiceUris(conf, DFS_NAMENODE_RPC_ADDRESS_KEY);
-    assertEquals(1, uris.size());
-    assertTrue(uris.contains(new URI("hdfs://" + DEFAULT_ADDRESS)));
-  }
-
   /** Tests to ensure default namenode is used as fallback */
   @Test
   public void testDefaultNamenode() throws IOException {
@@ -554,4 +528,4 @@ public class TestDFSUtil {
     assertTrue(uris.contains(new URI("hdfs://" + NS2_NN_HOST)));
     assertTrue(uris.contains(new URI("hdfs://" + NN_HOST)));
   }
-}
+}

+ 5 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java

@@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
@@ -210,8 +211,10 @@ public class TestFileCreation extends junit.framework.TestCase {
       // can't check capacities for real storage since the OS file system may be changing under us.
       if (simulatedStorage) {
         DataNode dn = cluster.getDataNodes().get(0);
-        assertEquals(fileSize, dn.getFSDataset().getDfsUsed());
-        assertEquals(SimulatedFSDataset.DEFAULT_CAPACITY-fileSize, dn.getFSDataset().getRemaining());
+        FSDatasetInterface<?> dataset = DataNodeTestUtils.getFSDataset(dn);
+        assertEquals(fileSize, dataset.getDfsUsed());
+        assertEquals(SimulatedFSDataset.DEFAULT_CAPACITY-fileSize,
+            dataset.getRemaining());
       }
     } finally {
       cluster.shutdown();

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java

@@ -41,6 +41,17 @@ public class DataNodeTestUtils {
     return dn.getDNRegistrationForBP(bpid);
   }
 
+  /**
+   * This method is used for testing. 
+   * Examples are adding and deleting blocks directly.
+   * The most common usage will be when the data node's storage is simulated.
+   * 
+   * @return the fsdataset that stores the blocks
+   */
+  public static FSDatasetInterface<?> getFSDataset(DataNode dn) {
+    return dn.getFSDataset();
+  }
+
   public static File getFile(DataNode dn, String bpid, long bid) {
     return ((FSDataset)dn.getFSDataset()).getFile(bpid, bid);
   }

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -61,10 +61,11 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
  * 
  * Note the synchronization is coarse grained - it is at each method. 
  */
-public class SimulatedFSDataset implements FSDatasetInterface {
-  static class Factory extends FSDatasetInterface.Factory {
+public class SimulatedFSDataset
+    implements FSDatasetInterface<FSDatasetInterface.FSVolumeInterface> {
+  static class Factory extends FSDatasetInterface.Factory<SimulatedFSDataset> {
     @Override
-    public FSDatasetInterface createFSDatasetInterface(DataNode datanode,
+    public SimulatedFSDataset createFSDatasetInterface(DataNode datanode,
         DataStorage storage, Configuration conf) throws IOException {
       return new SimulatedFSDataset(datanode, storage, conf);
     }

+ 7 - 7
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java

@@ -210,13 +210,14 @@ public class TestBlockReport {
       LOG.debug("Number of blocks allocated " + lBlocks.size());
     }
 
+    final DataNode dn0 = cluster.getDataNodes().get(DN_N0);
     for (ExtendedBlock b : blocks2Remove) {
       if(LOG.isDebugEnabled()) {
         LOG.debug("Removing the block " + b.getBlockName());
       }
       for (File f : findAllFiles(dataDir,
         new MyFileFilter(b.getBlockName(), true))) {
-        cluster.getDataNodes().get(DN_N0).getFSDataset().unfinalizeBlock(b);
+        DataNodeTestUtils.getFSDataset(dn0).unfinalizeBlock(b);
         if (!f.delete())
           LOG.warn("Couldn't delete " + b.getBlockName());
       }
@@ -225,9 +226,8 @@ public class TestBlockReport {
     waitTil(DN_RESCAN_EXTRA_WAIT);
 
     // all blocks belong to the same file, hence same BP
-    DataNode dn = cluster.getDataNodes().get(DN_N0);
     String poolId = cluster.getNamesystem().getBlockPoolId();
-    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+    DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId);
     StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
         new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
     cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
@@ -602,15 +602,15 @@ public class TestBlockReport {
     cluster.waitActive();
     
     // Look about specified DN for the replica of the block from 1st DN
+    final DataNode dn1 = cluster.getDataNodes().get(DN_N1);
+    final FSDataset dataset1 = (FSDataset)DataNodeTestUtils.getFSDataset(dn1);
     String bpid = cluster.getNamesystem().getBlockPoolId();
-    Replica r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()).
-      fetchReplicaInfo(bpid, bl.getBlockId());
+    Replica r = dataset1.fetchReplicaInfo(bpid, bl.getBlockId());
     long start = System.currentTimeMillis();
     int count = 0;
     while (r == null) {
       waitTil(5);
-      r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()).
-        fetchReplicaInfo(bpid, bl.getBlockId());
+      r = dataset1.fetchReplicaInfo(bpid, bl.getBlockId());
       long waiting_period = System.currentTimeMillis() - start;
       if (count++ % 100 == 0)
         if(LOG.isDebugEnabled()) {

+ 5 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

@@ -145,8 +145,11 @@ public class TestDataNodeVolumeFailure {
     DataNode dn = cluster.getDataNodes().get(1); //corresponds to dir data3
     String bpid = cluster.getNamesystem().getBlockPoolId();
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid);
-    StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
-        dn.getFSDataset().getBlockReport(bpid).getBlockListAsLongs()) };
+    final StorageBlockReport[] report = {
+        new StorageBlockReport(dnR.getStorageID(),
+            DataNodeTestUtils.getFSDataset(dn).getBlockReport(bpid
+                ).getBlockListAsLongs())
+    };
     cluster.getNameNodeRpc().blockReport(dnR, bpid, report);
 
     // verify number of blocks and files...

+ 2 - 13
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java

@@ -24,11 +24,7 @@ import static org.junit.Assume.assumeTrue;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.concurrent.TimeoutException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -38,7 +34,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
-import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -47,12 +42,6 @@ import org.junit.Test;
  * Test the ability of a DN to tolerate volume failures.
  */
 public class TestDataNodeVolumeFailureToleration {
-
-  private static final Log LOG = LogFactory.getLog(TestDataNodeVolumeFailureToleration.class);
-  {
-    ((Log4JLogger)TestDataNodeVolumeFailureToleration.LOG).getLogger().setLevel(Level.ALL);
-  }
-
   private FileSystem fs;
   private MiniDFSCluster cluster;
   private Configuration conf;
@@ -130,7 +119,7 @@ public class TestDataNodeVolumeFailureToleration {
       assertTrue("The DN should have started up fine.",
           cluster.isDataNodeUp());
       DataNode dn = cluster.getDataNodes().get(0);
-      String si = dn.getFSDataset().getStorageInfo();
+      String si = DataNodeTestUtils.getFSDataset(dn).getStorageInfo();
       assertTrue("The DN should have started with this directory",
           si.contains(dataDir1Actual.getPath()));
       assertFalse("The DN shouldn't have a bad directory.",
@@ -227,7 +216,7 @@ public class TestDataNodeVolumeFailureToleration {
    */
   private void testVolumeConfig(int volumesTolerated, int volumesFailed,
       boolean expectedBPServiceState, boolean manageDfsDirs)
-      throws IOException, InterruptedException, TimeoutException {
+      throws IOException, InterruptedException {
     assumeTrue(!System.getProperty("os.name").startsWith("Windows"));
     final int dnIndex = 0;
     // Fail the current directory since invalid storage directory perms

+ 6 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java

@@ -38,7 +38,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
-import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
 
 /**
  * Tests {@link DirectoryScanner} handling of differences
@@ -142,7 +142,7 @@ public class TestDirectoryScanner extends TestCase {
 
   /** Create a block file in a random volume*/
   private long createBlockFile() throws IOException {
-    List<FSVolumeInterface> volumes = fds.getVolumes();
+    List<FSVolume> volumes = fds.getVolumes();
     int index = rand.nextInt(volumes.size() - 1);
     long id = getFreeBlockId();
     File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
@@ -155,7 +155,7 @@ public class TestDirectoryScanner extends TestCase {
 
   /** Create a metafile in a random volume*/
   private long createMetaFile() throws IOException {
-    List<FSVolumeInterface> volumes = fds.getVolumes();
+    List<FSVolume> volumes = fds.getVolumes();
     int index = rand.nextInt(volumes.size() - 1);
     long id = getFreeBlockId();
     File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
@@ -168,7 +168,7 @@ public class TestDirectoryScanner extends TestCase {
 
   /** Create block file and corresponding metafile in a rondom volume */
   private long createBlockMetaFile() throws IOException {
-    List<FSVolumeInterface> volumes = fds.getVolumes();
+    List<FSVolume> volumes = fds.getVolumes();
     int index = rand.nextInt(volumes.size() - 1);
     long id = getFreeBlockId();
     File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
@@ -228,7 +228,8 @@ public class TestDirectoryScanner extends TestCase {
     try {
       cluster.waitActive();
       bpid = cluster.getNamesystem().getBlockPoolId();
-      fds = (FSDataset) cluster.getDataNodes().get(0).getFSDataset();
+      fds = (FSDataset)DataNodeTestUtils.getFSDataset(
+          cluster.getDataNodes().get(0));
       CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
                   parallelism);
       DataNode dn = cluster.getDataNodes().get(0);

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMulitipleNNDataBlockScanner.java

@@ -149,6 +149,9 @@ public class TestMulitipleNNDataBlockScanner {
       cluster.waitActive();
       DataNode dn = cluster.getDataNodes().get(0);
       for (int i = 0; i < 3; i++) {
+        while (!dn.blockScanner.isInitialized(bpids[i])) {
+          Thread.sleep(1000);
+        }
         long blocksScanned = 0;
         while (blocksScanned != 20) {
           if (dn.blockScanner != null) {

+ 6 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java

@@ -43,8 +43,10 @@ public class TestRoundRobinVolumesPolicy {
     volumes.add(Mockito.mock(FSVolumeInterface.class));
     Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);
 
-    RoundRobinVolumesPolicy policy = ReflectionUtils.newInstance(
-        RoundRobinVolumesPolicy.class, null);
+    @SuppressWarnings("unchecked")
+    final RoundRobinVolumesPolicy<FSVolumeInterface> policy = 
+        (RoundRobinVolumesPolicy<FSVolumeInterface>)ReflectionUtils.newInstance(
+            RoundRobinVolumesPolicy.class, null);
     
     // Test two rounds of round-robin choosing
     Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0));
@@ -79,7 +81,8 @@ public class TestRoundRobinVolumesPolicy {
     volumes.add(Mockito.mock(FSVolumeInterface.class));
     Mockito.when(volumes.get(1).getAvailable()).thenReturn(600L);
 
-    RoundRobinVolumesPolicy policy = new RoundRobinVolumesPolicy();
+    final RoundRobinVolumesPolicy<FSVolumeInterface> policy
+        = new RoundRobinVolumesPolicy<FSVolumeInterface>();
     int blockSize = 700;
     try {
       policy.chooseVolume(volumes, blockSize);

+ 12 - 13
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java

@@ -21,6 +21,7 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
@@ -28,8 +29,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
-import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
 import org.apache.hadoop.util.DataChecksum;
 
@@ -56,7 +55,7 @@ public class TestSimulatedFSDataset extends TestCase {
     return blkid*BLOCK_LENGTH_MULTIPLIER;
   }
   
-  int addSomeBlocks(FSDatasetInterface fsdataset, int startingBlockId)
+  int addSomeBlocks(SimulatedFSDataset fsdataset, int startingBlockId)
       throws IOException {
     int bytesAdded = 0;
     for (int i = startingBlockId; i < startingBlockId+NUMBLOCKS; ++i) {
@@ -83,24 +82,24 @@ public class TestSimulatedFSDataset extends TestCase {
     }
     return bytesAdded;  
   }
-  int addSomeBlocks(FSDatasetInterface fsdataset ) throws IOException {
+  int addSomeBlocks(SimulatedFSDataset fsdataset ) throws IOException {
     return addSomeBlocks(fsdataset, 1);
   }
   
   public void testFSDatasetFactory() {
     final Configuration conf = new Configuration();
-    FSDatasetInterface.Factory f = FSDatasetInterface.Factory.getFactory(conf);
+    FSDatasetInterface.Factory<?> f = FSDatasetInterface.Factory.getFactory(conf);
     assertEquals(FSDataset.Factory.class, f.getClass());
     assertFalse(f.isSimulated());
 
     SimulatedFSDataset.setFactory(conf);
-    FSDatasetInterface.Factory s = FSDatasetInterface.Factory.getFactory(conf);
+    FSDatasetInterface.Factory<?> s = FSDatasetInterface.Factory.getFactory(conf);
     assertEquals(SimulatedFSDataset.Factory.class, s.getClass());
     assertTrue(s.isSimulated());
   }
 
   public void testGetMetaData() throws IOException {
-    FSDatasetInterface fsdataset = getSimulatedFSDataset(); 
+    final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
     ExtendedBlock b = new ExtendedBlock(bpid, 1, 5, 0);
     try {
       assertFalse(fsdataset.metaFileExists(b));
@@ -121,7 +120,7 @@ public class TestSimulatedFSDataset extends TestCase {
 
 
   public void testStorageUsage() throws IOException {
-    FSDatasetInterface fsdataset = getSimulatedFSDataset(); 
+    final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
     assertEquals(fsdataset.getDfsUsed(), 0);
     assertEquals(fsdataset.getRemaining(), fsdataset.getCapacity());
     int bytesAdded = addSomeBlocks(fsdataset);
@@ -131,7 +130,7 @@ public class TestSimulatedFSDataset extends TestCase {
 
 
 
-  void checkBlockDataAndSize(FSDatasetInterface fsdataset, ExtendedBlock b,
+  void checkBlockDataAndSize(SimulatedFSDataset fsdataset, ExtendedBlock b,
       long expectedLen) throws IOException { 
     InputStream input = fsdataset.getBlockInputStream(b);
     long lengthRead = 0;
@@ -144,7 +143,7 @@ public class TestSimulatedFSDataset extends TestCase {
   }
   
   public void testWriteRead() throws IOException {
-    FSDatasetInterface fsdataset = getSimulatedFSDataset(); 
+    final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
     addSomeBlocks(fsdataset);
     for (int i=1; i <= NUMBLOCKS; ++i) {
       ExtendedBlock b = new ExtendedBlock(bpid, i, 0, 0);
@@ -244,7 +243,7 @@ public class TestSimulatedFSDataset extends TestCase {
   }
 
   public void checkInvalidBlock(ExtendedBlock b) throws IOException {
-    FSDatasetInterface fsdataset = getSimulatedFSDataset(); 
+    final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
     assertFalse(fsdataset.isValidBlock(b));
     try {
       fsdataset.getLength(b);
@@ -269,7 +268,7 @@ public class TestSimulatedFSDataset extends TestCase {
   }
   
   public void testInValidBlocks() throws IOException {
-    FSDatasetInterface fsdataset = getSimulatedFSDataset(); 
+    final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
     ExtendedBlock b = new ExtendedBlock(bpid, 1, 5, 0);
     checkInvalidBlock(b);
     
@@ -280,7 +279,7 @@ public class TestSimulatedFSDataset extends TestCase {
   }
 
   public void testInvalidate() throws IOException {
-    FSDatasetInterface fsdataset = getSimulatedFSDataset(); 
+    final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
     int bytesAdded = addSomeBlocks(fsdataset);
     Block[] deleteBlocks = new Block[2];
     deleteBlocks[0] = new Block(1, 0, 0);

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSInodeInfo;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -564,7 +565,8 @@ public class TestDNFencing {
       throws IOException {
     int count = 0;
     for (DataNode dn : cluster.getDataNodes()) {
-      if (dn.getFSDataset().getStoredBlock(block.getBlockPoolId(), block.getBlockId()) != null) {
+      if (DataNodeTestUtils.getFSDataset(dn).getStoredBlock(
+          block.getBlockPoolId(), block.getBlockId()) != null) {
         count++;
       }
     }

+ 6 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -44,6 +44,9 @@ Trunk (unreleased changes)
 
     MAPREDUCE-2944. Improve checking of input for JobClient.displayTasks() (XieXianshan via harsh)
 
+    MAPREDUCE-3956. Remove the use of the deprecated Syncable.sync() method from
+    TeraOutputFormat in the terasort example.  (szetszwo)
+
   BUG FIXES
 
     MAPREDUCE-3757. [Rumen] Fixed Rumen Folder to adjust shuffleFinished and
@@ -97,6 +100,9 @@ Release 0.23.3 - UNRELEASED
 
     MAPREDUCE-3909 Javadoc the Service interfaces (stevel)
 
+    MAPREDUCE-3885. Avoid an unnecessary copy for all requests/responses in 
+    MRs ProtoOverHadoopRpcEngine. (Devaraj Das via sseth)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraOutputFormat.java

@@ -71,7 +71,7 @@ public class TeraOutputFormat extends FileOutputFormat<Text,Text> {
     
     public void close(TaskAttemptContext context) throws IOException {
       if (finalSync) {
-        out.sync();
+        out.hsync();
       }
       out.close();
     }

+ 9 - 6
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java

@@ -34,6 +34,8 @@ import javax.net.SocketFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputOutputStream;
+import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtocolMetaInfoPB;
@@ -46,6 +48,7 @@ import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ProtoUtil;
 import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
 import org.apache.hadoop.yarn.ipc.RpcProtos.ProtoSpecificRpcRequest;
 import org.apache.hadoop.yarn.ipc.RpcProtos.ProtoSpecificRpcResponse;
@@ -213,13 +216,13 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine {
 
     @Override
     public void write(DataOutput out) throws IOException {
-      out.writeInt(message.toByteArray().length);
-      out.write(message.toByteArray());
+      ((Message)message).writeDelimitedTo(
+          DataOutputOutputStream.constructOutputStream(out));
     }
 
     @Override
     public void readFields(DataInput in) throws IOException {
-      int length = in.readInt();
+      int length = ProtoUtil.readRawVarint32(in);
       byte[] bytes = new byte[length];
       in.readFully(bytes);
       message = ProtoSpecificRpcRequest.parseFrom(bytes);
@@ -241,13 +244,13 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine {
 
     @Override
     public void write(DataOutput out) throws IOException {
-      out.writeInt(message.toByteArray().length);
-      out.write(message.toByteArray());
+      ((Message)message).writeDelimitedTo(
+          DataOutputOutputStream.constructOutputStream(out));      
     }
 
     @Override
     public void readFields(DataInput in) throws IOException {
-      int length = in.readInt();
+      int length = ProtoUtil.readRawVarint32(in);
       byte[] bytes = new byte[length];
       in.readFully(bytes);
       message = ProtoSpecificRpcResponse.parseFrom(bytes);