Ver código fonte

HDFS-6925. DataNode should attempt to place replicas on transient storage first if lazyPersist flag is received. (Arpit Agarwal)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
arp 10 anos atrás
pai
commit
193528c84d
23 arquivos alterados com 231 adições e 72 exclusões
  1. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
  2. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
  3. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
  4. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java
  5. 5 25
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
  6. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
  7. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
  8. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
  9. 4 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
  10. 10 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
  11. 37 12
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
  12. 60 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsTransientVolumeImpl.java
  13. 9 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
  14. 44 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplAllocator.java
  15. 19 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
  16. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
  17. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
  18. 8 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
  19. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
  20. 5 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
  21. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
  22. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
  23. 6 6
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -139,7 +139,8 @@ class BlockReceiver implements Closeable {
       final long newGs, final long minBytesRcvd, final long maxBytesRcvd, 
       final String clientname, final DatanodeInfo srcDataNode,
       final DataNode datanode, DataChecksum requestedChecksum,
-      CachingStrategy cachingStrategy) throws IOException {
+      CachingStrategy cachingStrategy,
+      final boolean allowLazyPersist) throws IOException {
     try{
       this.block = block;
       this.in = in;
@@ -180,7 +181,7 @@ class BlockReceiver implements Closeable {
       } else {
         switch (stage) {
         case PIPELINE_SETUP_CREATE:
-          replicaInfo = datanode.data.createRbw(storageType, block);
+          replicaInfo = datanode.data.createRbw(storageType, block, allowLazyPersist);
           datanode.notifyNamenodeReceivingBlock(
               block, replicaInfo.getStorageUuid());
           break;

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -607,8 +607,8 @@ class DataXceiver extends Receiver implements Runnable {
             peer.getLocalAddressString(),
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             clientname, srcDataNode, datanode, requestedChecksum,
-            cachingStrategy);
-        
+            cachingStrategy, allowLazyPersist);
+
         storageUuid = blockReceiver.getStorageUuid();
       } else {
         storageUuid = datanode.data.recoverClose(
@@ -1048,7 +1048,7 @@ class DataXceiver extends Receiver implements Runnable {
           proxyReply, proxySock.getRemoteSocketAddress().toString(),
           proxySock.getLocalSocketAddress().toString(),
           null, 0, 0, 0, "", null, datanode, remoteChecksum,
-          CachingStrategy.newDropBehind());
+          CachingStrategy.newDropBehind(), false);
 
       // receive a block
       blockReceiver.receiveBlock(null, null, replyOut, null, 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java

@@ -399,7 +399,7 @@ public class DirectoryScanner implements Runnable {
   /**
    * Reconcile differences between disk and in-memory blocks
    */
-  void reconcile() {
+  void reconcile() throws IOException {
     scan();
     for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
       String bpid = entry.getKey();

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java

@@ -59,4 +59,9 @@ public interface Replica {
    * Return the storageUuid of the volume that stores this replica.
    */
   public String getStorageUuid();
+
+  /**
+   * Return true if the target volume is backed by RAM.
+   */
+  public boolean isOnTransientStorage();
 }

+ 5 - 25
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java

@@ -61,17 +61,6 @@ abstract public class ReplicaInfo extends Block implements Replica {
   
   private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
 
-  /**
-   * Constructor for a zero length replica
-   * @param blockId block id
-   * @param genStamp replica generation stamp
-   * @param vol volume where replica is located
-   * @param dir directory path where block and meta files are located
-   */
-  ReplicaInfo(long blockId, long genStamp, FsVolumeSpi vol, File dir) {
-    this( blockId, 0L, genStamp, vol, dir);
-  }
-  
   /**
    * Constructor
    * @param block a block
@@ -296,20 +285,6 @@ abstract public class ReplicaInfo extends Block implements Replica {
     return true;
   }
 
-  /**
-   * Set this replica's generation stamp to be a newer one
-   * @param newGS new generation stamp
-   * @throws IOException is the new generation stamp is not greater than the current one
-   */
-  void setNewerGenerationStamp(long newGS) throws IOException {
-    long curGS = getGenerationStamp();
-    if (newGS <= curGS) {
-      throw new IOException("New generation stamp (" + newGS 
-          + ") must be greater than current one (" + curGS + ")");
-    }
-    setGenerationStamp(newGS);
-  }
-  
   @Override  //Object
   public String toString() {
     return getClass().getSimpleName()
@@ -321,4 +296,9 @@ abstract public class ReplicaInfo extends Block implements Replica {
         + "\n  getVolume()       = " + getVolume()
         + "\n  getBlockFile()    = " + getBlockFile();
   }
+
+  @Override
+  public boolean isOnTransientStorage() {
+    return volume.isTransientStorage();
+  }
 }

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java

@@ -37,8 +37,7 @@ public class ReplicaUnderRecovery extends ReplicaInfo {
                            // that the replica will be bumped to after recovery
 
   public ReplicaUnderRecovery(ReplicaInfo replica, long recoveryId) {
-    super(replica.getBlockId(), replica.getNumBytes(), replica.getGenerationStamp(),
-        replica.getVolume(), replica.getDir());
+    super(replica, replica.getVolume(), replica.getDir());
     if ( replica.getState() != ReplicaState.FINALIZED &&
          replica.getState() != ReplicaState.RBW &&
          replica.getState() != ReplicaState.RWR ) {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java

@@ -99,7 +99,7 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
 
   @Override
   public synchronized V chooseVolume(List<V> volumes,
-      final long replicaSize) throws IOException {
+      long replicaSize) throws IOException {
     if (volumes.size() < 1) {
       throw new DiskOutOfSpaceException("No more available volumes");
     }

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java

@@ -122,7 +122,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * as corrupted.
    */
   public void checkAndUpdate(String bpid, long blockId, File diskFile,
-      File diskMetaFile, FsVolumeSpi vol);
+      File diskMetaFile, FsVolumeSpi vol) throws IOException;
 
   /**
    * @param b - the block
@@ -197,7 +197,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * @throws IOException if an error occurs
    */
   public ReplicaInPipelineInterface createRbw(StorageType storageType,
-      ExtendedBlock b) throws IOException;
+      ExtendedBlock b, boolean allowLazyPersist) throws IOException;
 
   /**
    * Recovers a RBW replica and returns the meta info of the replica

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java

@@ -56,4 +56,7 @@ public interface FsVolumeSpi {
    * Release disk space previously reserved for RBW block.
    */
   public void releaseReservedSpace(long bytesToRelease);
-}
+
+  /** Returns true if the volume is NOT backed by persistent storage. */
+  public boolean isTransientStorage();
+}

+ 10 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java

@@ -20,6 +20,9 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 
 /**
@@ -27,12 +30,14 @@ import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
  */
 public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi>
     implements VolumeChoosingPolicy<V> {
+  public static final Log LOG = LogFactory.getLog(RoundRobinVolumeChoosingPolicy.class);
 
   private int curVolume = 0;
 
   @Override
-  public synchronized V chooseVolume(final List<V> volumes, final long blockSize
-      ) throws IOException {
+  public synchronized V chooseVolume(final List<V> volumes, long blockSize)
+      throws IOException {
+
     if(volumes.size() < 1) {
       throw new DiskOutOfSpaceException("No more available volumes");
     }
@@ -50,7 +55,9 @@ public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi>
       final V volume = volumes.get(curVolume);
       curVolume = (curVolume + 1) % volumes.size();
       long availableVolumeSize = volume.getAvailable();
-      if (availableVolumeSize > blockSize) { return volume; }
+      if (availableVolumeSize > blockSize) {
+        return volume;
+      }
       
       if (availableVolumeSize > maxAvailable) {
         maxAvailable = availableVolumeSize;

+ 37 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -45,6 +45,9 @@ import javax.management.ObjectName;
 import javax.management.StandardMBean;
 
 import com.google.common.collect.Lists;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.TreeMultimap;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -278,7 +281,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     // If IOException raises from FsVolumeImpl() or getVolumeMap(), there is
     // nothing needed to be rolled back to make various data structures, e.g.,
     // storageMap and asyncDiskService, consistent.
-    FsVolumeImpl fsVolume = new FsVolumeImpl(
+    FsVolumeImpl fsVolume = FsVolumeImplAllocator.createVolume(
         this, sd.getStorageUuid(), dir, this.conf, storageType);
     ReplicaMap tempVolumeMap = new ReplicaMap(this);
     fsVolume.getVolumeMap(tempVolumeMap);
@@ -550,16 +553,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * Get File name for a given block.
    */
   private File getBlockFile(ExtendedBlock b) throws IOException {
-    return getBlockFile(b.getBlockPoolId(), b.getLocalBlock());
+    return getBlockFile(b.getBlockPoolId(), b.getBlockId());
   }
   
   /**
    * Get File name for a given block.
    */
-  File getBlockFile(String bpid, Block b) throws IOException {
-    File f = validateBlockFile(bpid, b);
+  File getBlockFile(String bpid, long blockId) throws IOException {
+    File f = validateBlockFile(bpid, blockId);
     if(f == null) {
-      throw new IOException("Block " + b + " is not valid.");
+      throw new IOException("BlockId " + blockId + " is not valid.");
     }
     return f;
   }
@@ -949,8 +952,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override // FsDatasetSpi
   public synchronized ReplicaInPipeline createRbw(StorageType storageType,
-      ExtendedBlock b) throws IOException {
-    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 
+      ExtendedBlock b, boolean allowLazyPersist) throws IOException {
+    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
         b.getBlockId());
     if (replicaInfo != null) {
       throw new ReplicaAlreadyExistsException("Block " + b +
@@ -958,8 +961,25 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       " and thus cannot be created.");
     }
     // create a new block
-    FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
-    // create a rbw file to hold block in the designated volume
+    FsVolumeImpl v;
+    while (true) {
+      try {
+        if (allowLazyPersist) {
+          // First try to place the block on a transient volume.
+          v = volumes.getNextTransientVolume(b.getNumBytes());
+        } else {
+          v = volumes.getNextVolume(storageType, b.getNumBytes());
+        }
+      } catch (DiskOutOfSpaceException de) {
+        if (allowLazyPersist) {
+          allowLazyPersist = false;
+          continue;
+        }
+        throw de;
+      }
+      break;
+    }
+    // create an rbw file to hold block in the designated volume
     File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
     ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), 
         b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
@@ -1321,11 +1341,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   /**
    * Find the file corresponding to the block and return it if it exists.
    */
-  File validateBlockFile(String bpid, Block b) {
+  File validateBlockFile(String bpid, long blockId) {
     //Should we check for metadata file too?
     final File f;
     synchronized(this) {
-      f = getFile(bpid, b.getBlockId());
+      f = getFile(bpid, blockId);
     }
     
     if(f != null ) {
@@ -1337,7 +1357,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
     
     if (LOG.isDebugEnabled()) {
-      LOG.debug("b=" + b + ", f=" + f);
+      LOG.debug("blockId=" + blockId + ", f=" + f);
     }
     return null;
   }
@@ -1497,6 +1517,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
               ": volume was not an instance of FsVolumeImpl.");
           return;
         }
+        if (volume.isTransientStorage()) {
+          LOG.warn("Caching not supported on block with id " + blockId +
+              " since the volume is backed by RAM.");
+          return;
+        }
         success = true;
       } finally {
         if (!success) {

+ 60 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsTransientVolumeImpl.java

@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.StorageType;
+
+/**
+ * Volume for storing replicas in memory. These can be deleted at any time
+ * to make space for new replicas and there is no persistence guarantee.
+ *
+ * The backing store for these replicas is expected to be RAM_DISK.
+ * The backing store may be disk when testing.
+ *
+ * It uses the {@link FsDatasetImpl} object for synchronization.
+ */
+@InterfaceAudience.Private
+@VisibleForTesting
+public class FsTransientVolumeImpl extends FsVolumeImpl {
+
+
+  FsTransientVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
+      Configuration conf, StorageType storageType)
+          throws IOException {
+    super(dataset, storageID, currentDir, conf, storageType);
+  }
+
+  @Override
+  protected ThreadPoolExecutor initializeCacheExecutor(File parent) {
+    // Can't 'cache' replicas already in RAM.
+    return null;
+  }
+
+  @Override
+  public boolean isTransientStorage() {
+    return true;
+  }
+}

+ 9 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java

@@ -77,7 +77,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
    * dfs.datanode.fsdatasetcache.max.threads.per.volume) to limit resource
    * contention.
    */
-  private final ThreadPoolExecutor cacheExecutor;
+  protected ThreadPoolExecutor cacheExecutor;
   
   FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
       Configuration conf, StorageType storageType) throws IOException {
@@ -201,6 +201,11 @@ public class FsVolumeImpl implements FsVolumeSpi {
     return currentDir.getParent();
   }
   
+  @Override
+  public boolean isTransientStorage() {
+    return false;
+  }
+
   @Override
   public String getPath(String bpid) throws IOException {
     return getBlockPoolSlice(bpid).getDirectory().getAbsolutePath();
@@ -324,7 +329,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
   }
 
   void shutdown() {
-    cacheExecutor.shutdown();
+    if (cacheExecutor != null) {
+      cacheExecutor.shutdown();
+    }
     Set<Entry<String, BlockPoolSlice>> set = bpSlices.entrySet();
     for (Entry<String, BlockPoolSlice> entry : set) {
       entry.getValue().shutdown();
@@ -417,6 +424,5 @@ public class FsVolumeImpl implements FsVolumeSpi {
   DatanodeStorage toDatanodeStorage() {
     return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType);
   }
-
 }
 

+ 44 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplAllocator.java

@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.StorageType;
+
+/**
+ * Generate volumes based on the storageType.
+ */
+@InterfaceAudience.Private
+class FsVolumeImplAllocator {
+  static FsVolumeImpl createVolume(FsDatasetImpl fsDataset, String storageUuid,
+      File dir, Configuration conf, StorageType storageType)
+          throws IOException {
+    switch(storageType) {
+      case RAM_DISK:
+        return new FsTransientVolumeImpl(
+            fsDataset, storageUuid, dir, conf, storageType);
+      default:
+        return new FsVolumeImpl(
+            fsDataset, storageUuid, dir, conf, storageType);
+    }
+  }
+}

+ 19 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java

@@ -68,7 +68,25 @@ class FsVolumeList {
     }
     return blockChooser.chooseVolume(list, blockSize);
   }
-    
+
+  /**
+   * Get next volume. Synchronized to ensure {@link #curVolume} is updated
+   * by a single thread and next volume is chosen with no concurrent
+   * update to {@link #volumes}.
+   * @param blockSize free space needed on the volume
+   * @return next volume to store the block in.
+   */
+  synchronized FsVolumeImpl getNextTransientVolume(
+      long blockSize) throws IOException {
+    final List<FsVolumeImpl> list = new ArrayList<FsVolumeImpl>(volumes.size());
+    for(FsVolumeImpl v : volumes) {
+      if (v.isTransientStorage()) {
+        list.add(v);
+      }
+    }
+    return blockChooser.chooseVolume(list, blockSize);
+  }
+
   long getDfsUsed() throws IOException {
     long dfsUsed = 0L;
     for (FsVolumeImpl v : volumes) {

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java

@@ -98,9 +98,10 @@ public class TestWriteBlockGetsBlockLengthHint {
      */
     @Override
     public synchronized ReplicaInPipelineInterface createRbw(
-        StorageType storageType, ExtendedBlock b) throws IOException {
+        StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
+        throws IOException {
       assertThat(b.getLocalBlock().getNumBytes(), is(EXPECTED_BLOCK_LENGTH));
-      return super.createRbw(storageType, b);
+      return super.createRbw(storageType, b, allowLazyPersist);
     }
   }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java

@@ -364,7 +364,7 @@ public abstract class BlockReportTestBase {
     // Create a bogus new block which will not be present on the namenode.
     ExtendedBlock b = new ExtendedBlock(
         poolId, rand.nextLong(), 1024L, rand.nextLong());
-    dn.getFSDataset().createRbw(StorageType.DEFAULT, b);
+    dn.getFSDataset().createRbw(StorageType.DEFAULT, b, false);
 
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
     StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);

+ 8 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -300,6 +300,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     public ChunkChecksum getLastChecksumAndDataLen() {
       return new ChunkChecksum(oStream.getLength(), null);
     }
+
+    @Override
+    public boolean isOnTransientStorage() {
+      return false;
+    }
   }
   
   /**
@@ -747,7 +752,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
   @Override // FsDatasetSpi
   public synchronized ReplicaInPipelineInterface createRbw(
-      StorageType storageType, ExtendedBlock b) throws IOException {
+      StorageType storageType, ExtendedBlock b,
+      boolean allowLazyPersist) throws IOException {
     return createTemporary(storageType, b);
   }
 
@@ -1083,7 +1089,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
   @Override
   public void checkAndUpdate(String bpid, long blockId, File diskFile,
-      File diskMetaFile, FsVolumeSpi vol) {
+      File diskMetaFile, FsVolumeSpi vol) throws IOException {
     throw new UnsupportedOperationException();
   }
 

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java

@@ -529,7 +529,7 @@ public class TestBlockRecovery {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
     }
-    dn.data.createRbw(StorageType.DEFAULT, block);
+    dn.data.createRbw(StorageType.DEFAULT, block, false);
     try {
       dn.syncBlock(rBlock, initBlockRecords(dn));
       fail("Sync should fail");
@@ -553,7 +553,7 @@ public class TestBlockRecovery {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
     }
     ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(
-        StorageType.DEFAULT, block);
+        StorageType.DEFAULT, block, false);
     ReplicaOutputStreams streams = null;
     try {
       streams = replicaInfo.createStreams(true,

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java

@@ -215,7 +215,7 @@ public class TestDirectoryScanner {
   }
 
   private void scan(long totalBlocks, int diffsize, long missingMetaFile, long missingBlockFile,
-      long missingMemoryBlocks, long mismatchBlocks) {
+      long missingMemoryBlocks, long mismatchBlocks) throws IOException {
     scanner.reconcile();
     
     assertTrue(scanner.diffs.containsKey(bpid));
@@ -431,6 +431,10 @@ public class TestDirectoryScanner {
 
     @Override
     public void releaseReservedSpace(long bytesToRelease) {
+
+    @Override
+    public boolean isTransientStorage() {
+      return false;
     }
   }
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java

@@ -67,7 +67,7 @@ public class TestSimulatedFSDataset {
       // we pass expected len as zero, - fsdataset should use the sizeof actual
       // data written
       ReplicaInPipelineInterface bInfo = fsdataset.createRbw(
-          StorageType.DEFAULT, b);
+          StorageType.DEFAULT, b, false);
       ReplicaOutputStreams out = bInfo.createStreams(true,
           DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
       try {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java

@@ -34,7 +34,7 @@ public class FsDatasetTestUtil {
 
   public static File getBlockFile(FsDatasetSpi<?> fsd, String bpid, Block b
       ) throws IOException {
-    return ((FsDatasetImpl)fsd).getBlockFile(bpid, b);
+    return ((FsDatasetImpl)fsd).getBlockFile(bpid, b.getBlockId());
   }
 
   public static File getMetaFile(FsDatasetSpi<?> fsd, String bpid, Block b)

+ 6 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java

@@ -358,7 +358,7 @@ public class TestWriteToReplica {
     }
  
     try {
-      dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED]);
+      dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED], false);
       Assert.fail("Should not have created a replica that's already " +
       		"finalized " + blocks[FINALIZED]);
     } catch (ReplicaAlreadyExistsException e) {
@@ -376,7 +376,7 @@ public class TestWriteToReplica {
     }
 
     try {
-      dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY]);
+      dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY], false);
       Assert.fail("Should not have created a replica that had created as " +
       		"temporary " + blocks[TEMPORARY]);
     } catch (ReplicaAlreadyExistsException e) {
@@ -386,7 +386,7 @@ public class TestWriteToReplica {
         0L, blocks[RBW].getNumBytes());  // expect to be successful
     
     try {
-      dataSet.createRbw(StorageType.DEFAULT, blocks[RBW]);
+      dataSet.createRbw(StorageType.DEFAULT, blocks[RBW], false);
       Assert.fail("Should not have created a replica that had created as RBW " +
           blocks[RBW]);
     } catch (ReplicaAlreadyExistsException e) {
@@ -402,7 +402,7 @@ public class TestWriteToReplica {
     }
 
     try {
-      dataSet.createRbw(StorageType.DEFAULT, blocks[RWR]);
+      dataSet.createRbw(StorageType.DEFAULT, blocks[RWR], false);
       Assert.fail("Should not have created a replica that was waiting to be " +
       		"recovered " + blocks[RWR]);
     } catch (ReplicaAlreadyExistsException e) {
@@ -418,7 +418,7 @@ public class TestWriteToReplica {
     }
 
     try {
-      dataSet.createRbw(StorageType.DEFAULT, blocks[RUR]);
+      dataSet.createRbw(StorageType.DEFAULT, blocks[RUR], false);
       Assert.fail("Should not have created a replica that was under recovery " +
           blocks[RUR]);
     } catch (ReplicaAlreadyExistsException e) {
@@ -435,7 +435,7 @@ public class TestWriteToReplica {
           e.getMessage().contains(ReplicaNotFoundException.NON_EXISTENT_REPLICA));
     }
     
-    dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT]);
+    dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT], false);
   }
   
   private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {